Skip to content

[23432] Pass value of TransportPriorityQosPolicy to transport layer #5933

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fde4a23
Refs #23432. Add `transport_priority` to `SenderResource::send`
MiguelCompany Jul 2, 2025
9500517
Refs #23432. Add new lambda that receives the priority to `SenderReso…
MiguelCompany Jul 2, 2025
62443cc
Refs #23432. Leave old method to avoid API break.
MiguelCompany Jul 2, 2025
0669899
Refs #23432. Update chaining transport interfaces.
MiguelCompany Jul 2, 2025
34c48c3
Refs #23432. Deprecate old version.
MiguelCompany Jul 2, 2025
9f5036b
Refs #23432. Add `transport_priority` to `RTPSParticipantImpl::sendSy…
MiguelCompany Jul 2, 2025
773ec5d
Refs #23432. Add `transport_priority` to `WriterAttributes`.
MiguelCompany Jul 2, 2025
7616d65
Refs #23432. Fix sign of value in TransportPriorityQosPolicy.
MiguelCompany Jul 2, 2025
6603b28
Refs #23432. Set field in WriterAttributes when enabling DataWriter.
MiguelCompany Jul 2, 2025
3666b01
Refs #23432. Update field when updating QoS.
MiguelCompany Jul 2, 2025
9be7521
Refs #23432. Pass transport_priority to `UDPTransportInterface::send()`.
MiguelCompany Jul 11, 2025
e4b8a35
Refs #23432. Pass transport_priority on locator filter in test_UDPv4T…
MiguelCompany Jul 11, 2025
763a8f5
Refs #23432. Add blackbox tests.
MiguelCompany Jul 11, 2025
883728f
Refs #23432. Add XML support for `transport_priority`.
MiguelCompany Jul 14, 2025
1733dd8
Refs #23432. Uncrustify.
MiguelCompany Jul 14, 2025
4bb6cfa
Refs #23432. Fix comments.
MiguelCompany Jul 15, 2025
e4e3341
Refs #23432. Add const qualifier to priority argument.
MiguelCompany Jul 15, 2025
a6bbd18
Refs #23432. Add transport_priority to TCP and SHM.
MiguelCompany Jul 15, 2025
d27454f
Refs #23432. Remove unnecessary lambda in `ChainingSenderResource`.
MiguelCompany Jul 15, 2025
7bed167
Refs #23432. Update versions.md.
MiguelCompany Jul 18, 2025
ceb844d
Refs #23432. Move transport_priority from PublisherAttributes to Writ…
MiguelCompany Jul 21, 2025
e9829af
Refs #23432. Move XML for transport_priority inside qos tag.
MiguelCompany Jul 21, 2025
5607bcd
Refs #23432. Improve `TRP-FUN-03` test.
MiguelCompany Jul 21, 2025
a3e1cce
Refs #23432. Make `transport_priority` updates be transmitted.
MiguelCompany Jul 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct TopicBuiltinTopicData
//!Reliability Qos, implemented in the library.
ReliabilityQosPolicy reliability;

//!Transport Priority Qos, NOT implemented in the library.
//!Transport Priority Qos, implemented in the library.
TransportPriorityQosPolicy transport_priority;

//!Lifespan Qos, implemented in the library.
Expand Down
4 changes: 1 addition & 3 deletions include/fastdds/dds/core/policy/QosPolicies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1984,16 +1984,14 @@ class OwnershipStrengthQosPolicy : public Parameter_t, public QosPolicy
/**
* This policy is a hint to the infrastructure as to how to set the priority of the underlying transport used to send the data.
*
* @warning This QosPolicy can be defined and is transmitted to the rest of the network but is not implemented in this version.
*
* @note Mutable Qos Policy
*/
class TransportPriorityQosPolicy : public Parameter_t, public QosPolicy
{
public:

//!Priority <br> By default, 0.
uint32_t value;
int32_t value;

/**
* @brief Constructor
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/dds/publisher/qos/DataWriterQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ class DataWriterQos
//!Resource Limits Qos, implemented in the library.
ResourceLimitsQosPolicy resource_limits_;

//!Transport Priority Qos, NOT implemented in the library.
//!Transport Priority Qos, implemented in the library.
TransportPriorityQosPolicy transport_priority_;

//!Lifespan Qos, implemented in the library.
Expand Down
6 changes: 5 additions & 1 deletion include/fastdds/dds/publisher/qos/WriterQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class WriterQos
(this->m_publishMode == b.m_publishMode) &&
(this->m_disablePositiveACKs == b.m_disablePositiveACKs) &&
(this->representation == b.representation) &&
(this->data_sharing == b.data_sharing);
(this->data_sharing == b.data_sharing) &&
(this->transport_priority == b.transport_priority);
}

//!Durability Qos, implemented in the library.
Expand Down Expand Up @@ -114,6 +115,9 @@ class WriterQos
//!Group Data Qos, NOT implemented in the library.
GroupDataQosPolicy m_groupData;

//! Transport priority Qos, implemented in the library.
TransportPriorityQosPolicy transport_priority;

//!Publication Mode Qos, implemented in the library.
PublishModeQosPolicy m_publishMode;

Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/dds/topic/qos/TopicQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ class TopicQos
//!Resource Limits Qos, implemented in the library.
ResourceLimitsQosPolicy resource_limits_;

//!Transport Priority Qos, NOT implemented in the library.
//!Transport Priority Qos, implemented in the library.
TransportPriorityQosPolicy transport_priority_;

//!Lifespan Qos, implemented in the library.
Expand Down
6 changes: 6 additions & 0 deletions include/fastdds/rtps/attributes/WriterAttributes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
#ifndef FASTDDS_RTPS_ATTRIBUTES__WRITERATTRIBUTES_HPP
#define FASTDDS_RTPS_ATTRIBUTES__WRITERATTRIBUTES_HPP

#include <cstdint>
#include <functional>
#include <string>

#include <fastdds/dds/core/policy/QosPolicies.hpp>
#include <fastdds/rtps/attributes/EndpointAttributes.hpp>
#include <fastdds/rtps/common/Guid.hpp>
#include <fastdds/rtps/common/Time_t.hpp>
#include <fastdds/rtps/common/Types.hpp>
#include <fastdds/rtps/flowcontrol/FlowControllerConsts.hpp>
#include <fastdds/utils/collections/ResourceLimitedContainerConfig.hpp>

Expand Down Expand Up @@ -126,6 +129,9 @@ class WriterAttributes

//! Whether to send data to each matched reader separately.
bool separate_sending = false;

//! Transport priority for this Writer.
int32_t transport_priority = 0;
};

} // namespace rtps
Expand Down
33 changes: 33 additions & 0 deletions include/fastdds/rtps/transport/ChainingTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,39 @@ class ChainingTransport : public TransportInterface
fastdds::rtps::LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point& timeout) = 0;

/**
* Blocking Send through the specified channel. It may perform operations on the output buffer.
* At the end the function must call to the low-level transport's `send()` function.
* @code{.cpp}
// Example of calling the low-level transport `send()` function.
return low_sender_resource->send(buffers, total_bytes, destination_locators_begin,
destination_locators_end, timeout, transport_priority);
@endcode
* @param low_sender_resource SenderResource generated by the lower transport.
* @param buffers Vector of buffers to send.
* @param total_bytes Length of all buffers to be sent. Will be used as a boundary for the previous parameter.
* It must not exceed the \c sendBufferSize fed to this class during construction.
* @param destination_locators_begin First iterator of the list of Locators describing the remote destinations
* we're sending to.
* @param destination_locators_end End iterator of the list of Locators describing the remote destinations
* we're sending to.
* @param timeout Maximum blocking time.
* @param transport_priority Transport priority to be used for the send operation.
*/
FASTDDS_EXPORTED_API virtual bool send_w_priority(
fastdds::rtps::SenderResource* low_sender_resource,
const std::vector<NetworkBuffer>& buffers,
uint32_t total_bytes,
fastdds::rtps::LocatorsIterator* destination_locators_begin,
fastdds::rtps::LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point& timeout,
int32_t transport_priority)
{
static_cast<void>(transport_priority);
return send(low_sender_resource, buffers, total_bytes,
destination_locators_begin, destination_locators_end, timeout);
}

/*!
* Blocking Receive from the specified channel. It may perform operations on the input buffer.
* At the end the function must call to the `next_receiver`'s `OnDataReceived` function.
Expand Down
35 changes: 29 additions & 6 deletions include/fastdds/rtps/transport/SenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ class SenderResource

using NetworkBuffer = eprosima::fastdds::rtps::NetworkBuffer;

FASTDDS_DEPRECATED_UNTIL(4, send, "Use send with transport_priority instead")
bool send(
const std::vector<NetworkBuffer>& buffers,
const uint32_t& total_bytes,
LocatorsIterator* destination_locators_begin,
LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point& max_blocking_time_point)
{
return send(buffers, total_bytes, destination_locators_begin, destination_locators_end,
max_blocking_time_point, 0);
}

/**
* Sends to a destination locator, through the channel managed by this resource.
* @param buffers Vector of buffers to send.
Expand All @@ -59,15 +71,22 @@ class SenderResource
* @param destination_locators_begin destination endpoint Locators iterator begin.
* @param destination_locators_end destination endpoint Locators iterator end.
* @param max_blocking_time_point If transport supports it then it will use it as maximum blocking time.
* @param transport_priority Transport priority to be used for the send operation.
* @return Success of the send operation.
*/
bool send(
const std::vector<NetworkBuffer>& buffers,
const uint32_t& total_bytes,
LocatorsIterator* destination_locators_begin,
LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point& max_blocking_time_point)
const std::chrono::steady_clock::time_point& max_blocking_time_point,
int32_t transport_priority)
{
if (send_lambda_)
{
return send_lambda_(buffers, total_bytes, destination_locators_begin, destination_locators_end,
max_blocking_time_point, transport_priority);
}
return send_buffers_lambda_(buffers, total_bytes, destination_locators_begin, destination_locators_end,
max_blocking_time_point);
}
Expand All @@ -77,11 +96,7 @@ class SenderResource
* construction outside of the factory are forbidden.
*/
SenderResource(
SenderResource&& rValueResource)
{
clean_up.swap(rValueResource.clean_up);
send_buffers_lambda_.swap(rValueResource.send_buffers_lambda_);
}
SenderResource&& rValueResource) = default;

virtual ~SenderResource() = default;

Expand Down Expand Up @@ -120,6 +135,14 @@ class SenderResource
LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point&)> send_buffers_lambda_;

std::function<bool(
const std::vector<NetworkBuffer>&,
uint32_t,
LocatorsIterator* destination_locators_begin,
LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point&,
int32_t transport_priority)> send_lambda_;

private:

SenderResource() = delete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct test_UDPv4TransportDescriptor : public SocketTransportDescriptor
//! Custom message filtering functions
typedef std::function<bool (eprosima::fastdds::rtps::CDRMessage_t& msg)> filter;
//! Locator filtering function
typedef std::function<bool (const Locator& destination)> DestinationLocatorFilter;
typedef std::function<bool (const Locator& destination, int32_t priority)> DestinationLocatorFilter;

//! Test transport options
std::shared_ptr<TestUDPv4TransportOptions> test_transport_options = std::make_shared<TestUDPv4TransportOptions>();
Expand Down Expand Up @@ -115,7 +115,7 @@ struct test_UDPv4TransportDescriptor : public SocketTransportDescriptor
};

//! Filtering function for dropping messages to specific destinations
DestinationLocatorFilter locator_filter_ = [](const Locator&)
DestinationLocatorFilter locator_filter_ = [](const Locator&, int32_t)
{
return false;
};
Expand Down Expand Up @@ -168,7 +168,7 @@ struct TestUDPv4TransportOptions
std::atomic<uint32_t> test_UDPv4Transport_DropLogLength{0};
std::atomic<bool> always_drop_participant_builtin_topic_data{false};
std::atomic<bool> simulate_no_interfaces{false};
test_UDPv4TransportDescriptor::DestinationLocatorFilter locator_filter = [](const Locator&)
test_UDPv4TransportDescriptor::DestinationLocatorFilter locator_filter = [](const Locator&, int32_t)
{
return false;
};
Expand Down
7 changes: 7 additions & 0 deletions include/fastdds/rtps/writer/RTPSWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ class RTPSWriter : public Endpoint
*/
FASTDDS_EXPORTED_API virtual bool get_disable_positive_acks() const = 0;

/**
* @brief Get the transport priority of this writer.
*
* @return Transport priority of this writer.
*/
FASTDDS_EXPORTED_API virtual int32_t get_transport_priority() const = 0;

/**
* @brief Fills the provided vector with the GUIDs of the matched readers.
*
Expand Down
6 changes: 4 additions & 2 deletions resources/xsd/fastdds_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@
├ propertiesPolicy [0~1],
├ userDefinedID [int16],
├ entityID [int16],
└ matchedSubscribersAllocation [0~1]-->
└ matchedSubscribersAllocation [0~1] -->
<xs:complexType name="publisherProfileType">
<xs:complexContent>
<xs:extension base="publisherProfileNoAttributesType">
Expand Down Expand Up @@ -1288,7 +1288,8 @@
├ reliability [0~1],
├ timeBasedFilter [0~1],
├ topicData [0~1],
└ userData [0~1] -->
├ userData [0~1],
└ transport_priority [0~1] -->
<xs:complexType name="dataWriterQosPoliciesType">
<xs:all>
<xs:element name="data_sharing" type="dataSharingQosPolicyType" minOccurs="0" maxOccurs="1"/>
Expand All @@ -1311,6 +1312,7 @@
<xs:element name="timeBasedFilter" type="timeBasedFilterQosPolicyType" minOccurs="0" maxOccurs="1"/>
<xs:element name="topicData" type="octectVectorQosPolicyType" minOccurs="0" maxOccurs="1"/>
<xs:element name="userData" type="octectVectorQosPolicyType" minOccurs="0" maxOccurs="1"/>
<xs:element name="transport_priority" type="int32" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>

Expand Down
4 changes: 2 additions & 2 deletions src/cpp/fastdds/core/policy/QosPoliciesSerializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ inline bool QosPoliciesSerializer<TransportPriorityQosPolicy>::add_content_to_cd
const TransportPriorityQosPolicy& qos_policy,
rtps::CDRMessage_t* cdr_message)
{
bool valid = rtps::CDRMessage::addUInt32(cdr_message, qos_policy.value);
bool valid = rtps::CDRMessage::addInt32(cdr_message, qos_policy.value);
return valid;
}

Expand All @@ -797,7 +797,7 @@ inline bool QosPoliciesSerializer<TransportPriorityQosPolicy>::read_content_from
return false;
}
qos_policy.length = parameter_length;
return rtps::CDRMessage::readUInt32(cdr_message, &qos_policy.value);
return rtps::CDRMessage::readInt32(cdr_message, &qos_policy.value);
}

template<>
Expand Down
9 changes: 7 additions & 2 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ ReturnCode_t DataWriterImpl::enable()
w_att.liveliness_announcement_period = qos_.liveliness().announcement_period;
w_att.matched_readers_allocation = qos_.writer_resource_limits().matched_subscriber_allocation;
w_att.disable_heartbeat_piggyback = qos_.reliable_writer_qos().disable_heartbeat_piggyback;
w_att.transport_priority = qos_.transport_priority().value;

// TODO(Ricardo) Remove in future
// Insert topic_name and partitions
Expand Down Expand Up @@ -1221,14 +1222,18 @@ ReturnCode_t DataWriterImpl::set_qos(

if (enabled)
{
if (qos_.reliability().kind == ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS &&
qos_.reliable_writer_qos() == qos_to_set.reliable_writer_qos())
int32_t transport_priority = writer_->get_transport_priority();

if ((qos_.reliability().kind == ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS &&
qos_.reliable_writer_qos() == qos_to_set.reliable_writer_qos()) ||
(transport_priority != qos_to_set.transport_priority().value))
{
// Update times and positive_acks attributes on RTPS Layer
WriterAttributes w_att;
w_att.times = qos_.reliable_writer_qos().times;
w_att.disable_positive_acks = qos_.reliable_writer_qos().disable_positive_acks.enabled;
w_att.keep_duration = qos_.reliable_writer_qos().disable_positive_acks.duration;
w_att.transport_priority = qos_to_set.transport_priority().value;
writer_->update_attributes(w_att);
}

Expand Down
1 change: 1 addition & 0 deletions src/cpp/fastdds/publisher/qos/DataWriterQos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ WriterQos DataWriterQos::get_writerqos(
qos.m_userData = user_data();
qos.representation = representation();
qos.data_sharing = data_sharing();
qos.transport_priority = transport_priority();

if (qos.data_sharing.kind() != OFF &&
qos.data_sharing.domain_ids().empty())
Expand Down
1 change: 1 addition & 0 deletions src/cpp/fastdds/utils/QosConverters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ void set_qos_from_attributes(
qos.resource_limits() = attr.topic.resourceLimitsQos;
qos.data_sharing() = attr.qos.data_sharing;
qos.reliable_writer_qos().disable_heartbeat_piggyback = attr.qos.disable_heartbeat_piggyback;
qos.transport_priority().value = attr.qos.transport_priority.value;

if (attr.qos.m_partition.size() > 0 )
{
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/builtin/data/WriterProxyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1606,7 +1606,7 @@ void WriterProxyData::set_qos(
{
resource_limits = qos.resource_limits;
}
if (first_time && qos.transport_priority.has_value())
if (qos.transport_priority.has_value())
{
transport_priority = qos.transport_priority;
}
Expand Down Expand Up @@ -1705,6 +1705,7 @@ void WriterProxyData::set_qos(
{
ownership_strength = qos.m_ownershipStrength;
}
transport_priority = qos.transport_priority;
if (first_time)
{
disable_positive_acks = qos.m_disablePositiveACKs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ bool DirectMessageSender::send(
std::chrono::steady_clock::time_point max_blocking_time_point) const
{
return participant_->sendSync(buffers, total_bytes, participant_->getGuid(),
Locators(locators_->begin()), Locators(locators_->end()), max_blocking_time_point);
Locators(locators_->begin()), Locators(locators_->end()), max_blocking_time_point, 0);
}

} /* namespace rtps */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ bool PDPStatelessWriter::send_to_fixed_locators(
ret = initial_peers_.empty() ||
mp_RTPSParticipant->sendSync(buffers, total_bytes, m_guid,
Locators(initial_peers_.begin()), Locators(initial_peers_.end()),
max_blocking_time_point);
max_blocking_time_point, transport_priority_);

if (ret)
{
Expand Down
6 changes: 4 additions & 2 deletions src/cpp/rtps/participant/RTPSParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ class RTPSParticipantImpl
* @param destination_locators_begin Iterator at the first destination locator.
* @param destination_locators_end Iterator at the end destination locator.
* @param max_blocking_time_point execution time limit timepoint.
* @param transport_priority Transport priority of the message.
* @return true if at least one locator has been sent.
*/
template<class LocatorIteratorT>
Expand All @@ -305,7 +306,8 @@ class RTPSParticipantImpl
const GUID_t& sender_guid,
const LocatorIteratorT& destination_locators_begin,
const LocatorIteratorT& destination_locators_end,
std::chrono::steady_clock::time_point& max_blocking_time_point)
std::chrono::steady_clock::time_point& max_blocking_time_point,
int32_t transport_priority)
{
bool ret_code = false;
#if HAVE_STRICT_REALTIME
Expand All @@ -322,7 +324,7 @@ class RTPSParticipantImpl
LocatorIteratorT locators_begin = destination_locators_begin;
LocatorIteratorT locators_end = destination_locators_end;
send_resource->send(buffers, total_bytes, &locators_begin, &locators_end,
max_blocking_time_point);
max_blocking_time_point, transport_priority);
}

lock.unlock();
Expand Down
Loading
Loading