From c376cc11edad4f0d47578b0d3b1010017d46d358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ferreira=20Gonz=C3=A1lez?= Date: Fri, 25 Apr 2025 08:08:23 +0200 Subject: [PATCH 1/5] Improve DS routines (#5764) * Refs #22814: Data(p) test Signed-off-by: cferreiragonz * Refs #22814: Data(r/w) test Signed-off-by: cferreiragonz * Refs #22814: Tristate for ParticipantsAckStatus Signed-off-by: cferreiragonz * Refs #22814: Send direct messages to new clients Signed-off-by: cferreiragonz * Refs #22814: Review - Changes Signed-off-by: cferreiragonz * Refs #22814: Uncrustify Signed-off-by: cferreiragonz --------- Signed-off-by: cferreiragonz Signed-off-by: Eugenio Collado --- .../discovery/database/DiscoveryDataBase.cpp | 72 ++-- .../DiscoveryParticipantsAckStatus.cpp | 19 +- .../DiscoveryParticipantsAckStatus.hpp | 36 +- .../database/DiscoverySharedInfo.cpp | 2 +- .../database/DiscoverySharedInfo.hpp | 8 +- .../discovery/participant/PDPServer.cpp | 1 - .../api/dds-pim/PubSubParticipant.hpp | 7 + .../common/BlackboxTestsDiscovery.cpp | 336 ++++++++++++++++++ 8 files changed, 444 insertions(+), 37 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp index c6fa1c574a0..a1308cd4a62 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp @@ -36,6 +36,8 @@ namespace fastdds { namespace rtps { namespace ddb { +using ParticipantState = DiscoveryParticipantsAckStatus::ParticipantState; + DiscoveryDataBase::DiscoveryDataBase( fastrtps::rtps::GuidPrefix_t server_guid_prefix, std::set servers) @@ -267,8 +269,8 @@ void DiscoveryDataBase::update_change_and_unmatch_( changes_to_release_.push_back(entity.update_and_unmatch(new_change)); // Manually set relevant participants ACK status of this server, and of the participant that sent the // change, to 1. This way, we avoid backprogation of the data. - entity.add_or_update_ack_participant(server_guid_prefix_, true); - entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, true); + entity.add_or_update_ack_participant(server_guid_prefix_, ParticipantState::ACKED); + entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, ParticipantState::ACKED); } void DiscoveryDataBase::add_ack_( @@ -292,7 +294,7 @@ void DiscoveryDataBase::add_ack_( // database has been updated, so this ACK is not relevant anymore if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity()) { - it->second.add_or_update_ack_participant(acked_entity, true); + it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED); } } } @@ -307,7 +309,7 @@ void DiscoveryDataBase::add_ack_( // database has been updated, so this ACK is not relevant anymore if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity()) { - it->second.add_or_update_ack_participant(acked_entity, true); + it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED); } } } @@ -322,7 +324,7 @@ void DiscoveryDataBase::add_ack_( // database has been updated, so this ACK is not relevant anymore if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity()) { - it->second.add_or_update_ack_participant(acked_entity, true); + it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED); } } } @@ -694,7 +696,7 @@ void DiscoveryDataBase::create_new_participant_from_change_( // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way, // we avoid backprogation of the data. - ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); // If the DATA(p) it's from this server, it is already in history and we do nothing here if (change_guid.guidPrefix != server_guid_prefix_) @@ -796,7 +798,7 @@ void DiscoveryDataBase::update_participant_from_change_( if (ch->write_params.sample_identity().sequence_number() == participant_info.change()->write_params.sample_identity().sequence_number()) { - participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); } // we release it if it's the same or if it is lower @@ -846,7 +848,7 @@ void DiscoveryDataBase::create_writers_from_change_( if (ch->write_params.sample_identity().sequence_number() == writer_it->second.change()->write_params.sample_identity().sequence_number()) { - writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); } // we release it if it's the same or if it is lower @@ -894,7 +896,7 @@ void DiscoveryDataBase::create_writers_from_change_( // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way, // we avoid backprogation of the data. - writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); // if topic is virtual, it must iterate over all readers if (topic_name == virtual_topic_) @@ -964,7 +966,7 @@ void DiscoveryDataBase::create_readers_from_change_( if (ch->write_params.sample_identity().sequence_number() == reader_it->second.change()->write_params.sample_identity().sequence_number()) { - reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); } // we release it if it's the same or if it is lower @@ -1012,7 +1014,7 @@ void DiscoveryDataBase::create_readers_from_change_( // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way, // we avoid backprogation of the data. - reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); // if topic is virtual, it must iterate over all readers if (topic_name == virtual_topic_) @@ -1407,10 +1409,6 @@ bool DiscoveryDataBase::process_dirty_topics() // Find participants with writer info and participant with reader info in participants_ parts_reader_it = participants_.find(reader.guidPrefix); parts_writer_it = participants_.find(writer.guidPrefix); - // Find reader info in readers_ - readers_it = readers_.find(reader); - // Find writer info in writers_ - writers_it = writers_.find(writer); // Check in `participants_` whether the client with the reader has acknowledge the PDP of the client // with the writer. @@ -1418,26 +1416,35 @@ bool DiscoveryDataBase::process_dirty_topics() { if (parts_reader_it->second.is_matched(writer.guidPrefix)) { + // Find reader info in readers_ + readers_it = readers_.find(reader); // Check the status of the writer in `readers_[reader]::relevant_participants_builtin_ack_status`. if (readers_it != readers_.end() && readers_it->second.is_relevant_participant(writer.guidPrefix) && - !readers_it->second.is_matched(writer.guidPrefix)) + !readers_it->second.is_waiting_ack(writer.guidPrefix)) { // If the status is 0, add DATA(r) to a `edp_publications_to_send_` (if it's not there). if (add_edp_subscriptions_to_send_(readers_it->second.change())) { EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind DATA(r) to send: " << readers_it->second.change()->instanceHandle); + readers_it->second.add_or_update_ack_participant(writer.guidPrefix, + ParticipantState::WAITING_ACK); } } } else if (parts_reader_it->second.is_relevant_participant(writer.guidPrefix)) { - // Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there). - if (add_pdp_to_send_(parts_reader_it->second.change())) + if (!parts_reader_it->second.is_waiting_ack(writer.guidPrefix)) { - EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind readers' DATA(p) to send: " - << parts_reader_it->second.change()->instanceHandle); + // Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there). + if (add_pdp_to_send_(parts_reader_it->second.change())) + { + EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding readers' DATA(p) to send: " + << parts_reader_it->second.change()->instanceHandle); + parts_reader_it->second.add_or_update_ack_participant(writer.guidPrefix, + ParticipantState::WAITING_ACK); + } } // Set topic as not-clearable. is_clearable = false; @@ -1450,26 +1457,35 @@ bool DiscoveryDataBase::process_dirty_topics() { if (parts_writer_it->second.is_matched(reader.guidPrefix)) { + // Find writer info in writers_ + writers_it = writers_.find(writer); // Check the status of the reader in `writers_[writer]::relevant_participants_builtin_ack_status`. if (writers_it != writers_.end() && writers_it->second.is_relevant_participant(reader.guidPrefix) && - !writers_it->second.is_matched(reader.guidPrefix)) + !writers_it->second.is_waiting_ack(reader.guidPrefix)) { // If the status is 0, add DATA(w) to a `edp_subscriptions_to_send_` (if it's not there). if (add_edp_publications_to_send_(writers_it->second.change())) { EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind DATA(w) to send: " << writers_it->second.change()->instanceHandle); + writers_it->second.add_or_update_ack_participant(reader.guidPrefix, + ParticipantState::WAITING_ACK); } } } else if (parts_writer_it->second.is_relevant_participant(reader.guidPrefix)) { - // Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there). - if (add_pdp_to_send_(parts_writer_it->second.change())) + if (!parts_writer_it->second.is_waiting_ack(reader.guidPrefix)) { - EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind writers' DATA(p) to send: " - << parts_writer_it->second.change()->instanceHandle); + // Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there). + if (add_pdp_to_send_(parts_writer_it->second.change())) + { + EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding writers' DATA(p) to send: " + << parts_writer_it->second.change()->instanceHandle); + parts_writer_it->second.add_or_update_ack_participant(reader.guidPrefix, + ParticipantState::WAITING_ACK); + } } // Set topic as not-clearable. is_clearable = false; @@ -2463,7 +2479,7 @@ bool DiscoveryDataBase::from_json( // Populate GuidPrefix_t std::istringstream(it_ack.key()) >> prefix_aux_ack; - dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); + dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); } // Add Participant @@ -2501,7 +2517,7 @@ bool DiscoveryDataBase::from_json( // Populate GuidPrefix_t std::istringstream(it_ack.key()) >> prefix_aux_ack; - dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); + dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); } // Add Participant @@ -2561,7 +2577,7 @@ bool DiscoveryDataBase::from_json( // Populate GuidPrefix_t std::istringstream(it_ack.key()) >> prefix_aux_ack; - dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); + dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); } // Add Participant diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp index 6232854ed7b..58ffab2d141 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp @@ -34,7 +34,7 @@ namespace ddb { void DiscoveryParticipantsAckStatus::add_or_update_participant( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p, - bool status = false) + ParticipantState status = ParticipantState::PENDING_SEND) { relevant_participants_map_[guid_p] = status; } @@ -45,13 +45,24 @@ void DiscoveryParticipantsAckStatus::remove_participant( relevant_participants_map_.erase(guid_p); } +bool DiscoveryParticipantsAckStatus::is_waiting_ack( + const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const +{ + auto it = relevant_participants_map_.find(guid_p); + if (it != relevant_participants_map_.end()) + { + return it->second >= ParticipantState::WAITING_ACK; + } + return false; +} + bool DiscoveryParticipantsAckStatus::is_matched( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const { auto it = relevant_participants_map_.find(guid_p); if (it != relevant_participants_map_.end()) { - return it->second; + return it->second == ParticipantState::ACKED; } return false; } @@ -60,7 +71,7 @@ void DiscoveryParticipantsAckStatus::unmatch_all() { for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it) { - it->second = false; + it->second = ParticipantState::PENDING_SEND; } } @@ -89,7 +100,7 @@ bool DiscoveryParticipantsAckStatus::is_acked_by_all() const { for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it) { - if (!it->second) + if (it->second != ParticipantState::ACKED) { return false; } diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp index 9eeb99ac0d3..ee8f9fd1d8e 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp @@ -45,15 +45,25 @@ class DiscoveryParticipantsAckStatus ~DiscoveryParticipantsAckStatus() = default; + enum class ParticipantState : uint8_t + { + PENDING_SEND, // Data(p) has not been sent yet + WAITING_ACK, // Data(p) has already been sent but ACK has not been received + ACKED // Data(p) has been acked + }; + void add_or_update_participant( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p, - bool status); + ParticipantState status); void remove_participant( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p); void unmatch_all(); + bool is_waiting_ack( + const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const; + bool is_matched( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const; @@ -69,9 +79,31 @@ class DiscoveryParticipantsAckStatus private: - std::map relevant_participants_map_; + std::map relevant_participants_map_; }; +inline std::ostream& operator <<( + std::ostream& os, + DiscoveryParticipantsAckStatus::ParticipantState child) +{ + switch (child) + { + case DiscoveryParticipantsAckStatus::ParticipantState::PENDING_SEND: + os << "PENDING_SEND"; + break; + case DiscoveryParticipantsAckStatus::ParticipantState::WAITING_ACK: + os << "WAITING_ACK"; + break; + case DiscoveryParticipantsAckStatus::ParticipantState::ACKED: + os << "ACKED"; + break; + default: + os << "UNKNOWN"; + break; + } + return os; +} + } /* namespace ddb */ } /* namespace rtps */ } /* namespace fastdds */ diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp index 805241a9468..2727d8223c9 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp @@ -36,7 +36,7 @@ DiscoverySharedInfo::DiscoverySharedInfo( : change_(change) { // the server already knows every message - add_or_update_ack_participant(known_participant, true); + add_or_update_ack_participant(known_participant, DiscoveryParticipantsAckStatus::ParticipantState::ACKED); } eprosima::fastrtps::rtps::CacheChange_t* DiscoverySharedInfo::update_and_unmatch( diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp index 530a814333c..7d82f979995 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp @@ -56,7 +56,7 @@ class DiscoverySharedInfo void add_or_update_ack_participant( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p, - bool status = false) + DiscoveryParticipantsAckStatus::ParticipantState status = DiscoveryParticipantsAckStatus::ParticipantState::PENDING_SEND) { EPROSIMA_LOG_INFO( DISCOVERY_DATABASE, @@ -72,6 +72,12 @@ class DiscoverySharedInfo relevant_participants_builtin_ack_status_.remove_participant(guid_p); } + bool is_waiting_ack( + const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const + { + return relevant_participants_builtin_ack_status_.is_waiting_ack(guid_p); + } + bool is_matched( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const { diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index df7f793a007..0ca34b1b29c 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -1661,7 +1661,6 @@ void PDPServer::send_announcement( EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "Error sending announcement from server to clients"); } } - } bool PDPServer::read_backup( diff --git a/test/blackbox/api/dds-pim/PubSubParticipant.hpp b/test/blackbox/api/dds-pim/PubSubParticipant.hpp index a4e7f4dbf84..2e374bf3158 100644 --- a/test/blackbox/api/dds-pim/PubSubParticipant.hpp +++ b/test/blackbox/api/dds-pim/PubSubParticipant.hpp @@ -648,6 +648,13 @@ class PubSubParticipant return *this; } + PubSubParticipant& setup_transports( + eprosima::fastdds::rtps::BuiltinTransports transports) + { + participant_qos_.setup_transports(transports); + return *this; + } + PubSubParticipant& user_data( const std::vector& user_data) { diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index a11191f494e..40cd5fe0e6e 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -2318,3 +2318,339 @@ TEST(Discovery, discovery_cyclone_participant_with_custom_pid) /* Clean up */ factory->delete_participant(participant); } + +// This test checks that a Discover Server does not send duplicated PDP messages of itself when new clients +// are discovered +TEST_P(Discovery, discovery_server_pdp_messages_sent) +{ + // Skip test in intraprocess and datasharing mode + if (TRANSPORT != GetParam()) + { + GTEST_SKIP() << "Only makes sense on TRANSPORT"; + return; + } + + using namespace eprosima::fastdds::dds; + + // One discovery server will be created, with multiple direct clients connected to it. + // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. + // The main participant will use the test transport to count the number of Data(p) sent. + + // Look for the PID_DOMAIN_ID in the message as it is only present in Data(p) messages + auto builtin_msg_is_data_p = [](CDRMessage_t& msg, std::atomic& num_data_p) + { + uint32_t qos_size = 0; + uint32_t original_pos = msg.pos; + bool is_sentinel = false; + bool inline_qos_msg = false; + + while (!is_sentinel) + { + msg.pos = original_pos + qos_size; + + uint16_t pid = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + uint16_t plength = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + bool valid = true; + + // If inline_qos submessage is found we will have an additional Sentinel + if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) + { + inline_qos_msg = true; + } + else if (pid == eprosima::fastdds::dds::PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + if (!inline_qos_msg) + { + // If the PID is not inline qos, then we need to set the sentinel + // to true, as it is the last PID + is_sentinel = true; + } + } + + qos_size += (4 + plength); + + // Align to 4 byte boundary and prepare for next iteration + qos_size = (qos_size + 3) & ~3; + + if (!valid || ((msg.pos + plength) > msg.length)) + { + return false; + } + else if (!is_sentinel) + { + if (pid == eprosima::fastdds::dds::PID_DOMAIN_ID) + { + std::cout << "Data(p) sent by the server" << std::endl; + inline_qos_msg = false; + num_data_p.fetch_add(1u, std::memory_order_seq_cst); + break; + } + } + } + + // Do not drop the packet in any case + return false; + }; + + // Declare a test transport that will count the number of Data(p) messages sent + std::atomic num_data_p_sends{ 0 }; + auto test_transport = std::make_shared(); + test_transport->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + { + return builtin_msg_is_data_p(msg, num_data_p_sends); + }; + + // Create the main participant + auto server = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server; // UDPv4 locator by default + eprosima::fastdds::rtps::IPLocator::setIPv4(locator_server, 127, 0, 0, 1); + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server, global_port); + + WireProtocolConfigQos server_wp_qos; + server_wp_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + server_wp_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server); + + server_wp_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; + server_wp_qos.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; + server_wp_qos.builtin.discovery_config.initial_announcements.count = 0; + + // The main participant will use the test transport and a specific announcements configuration + server->disable_builtin_transport().add_user_transport_to_pparams(test_transport) + .wire_protocol(server_wp_qos); + + // Start the main participant + ASSERT_TRUE(server->init_participant()); + + // Create a client that connects to the first server + PubSubParticipant client_1(0u, 0u, 0u, 0u); + PubSubParticipant client_2(0u, 0u, 0u, 0u); + PubSubParticipant client_3(0u, 0u, 0u, 0u); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server); + client_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; + client_qos.builtin.discovery_config.initial_announcements.count = 1; + // Init client 1 + ASSERT_TRUE(client_1.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 1, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 1u); + + // Init client 2 + ASSERT_TRUE(client_2.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 2, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 2u); + + // Init client 3 + ASSERT_TRUE(client_3.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 3, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 3u); +} + +TEST_P(Discovery, discovery_server_edp_messages_sent) +{ + // Skip test in intraprocess and datasharing mode + if (TRANSPORT != GetParam()) + { + GTEST_SKIP() << "Only makes sense on TRANSPORT"; + return; + } + + using namespace eprosima::fastdds::dds; + + // Two discovery servers will be created, each with a direct client connected to them. + // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. + // The main participant will use the test transport to count the number of Data(r/w) sent. + + // Look for the PID_ENDPOINT_GUID in the message as it is only present in Data(r/w) messages + auto builtin_msg_is_data_r_w = [](CDRMessage_t& msg, std::atomic& num_data_r_w) + { + uint32_t qos_size = 0; + uint32_t original_pos = msg.pos; + bool is_sentinel = false; + bool inline_qos_msg = false; + + while (!is_sentinel) + { + msg.pos = original_pos + qos_size; + + uint16_t pid = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + uint16_t plength = eprosima::fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + bool valid = true; + + if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) + { + inline_qos_msg = true; + } + else if (pid == eprosima::fastdds::dds::PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + if (!inline_qos_msg) + { + // If the PID is not inline qos, then we need to set the sentinel + // to true, as it is the last PID + is_sentinel = true; + } + } + + qos_size += (4 + plength); + + // Align to 4 byte boundary and prepare for next iteration + qos_size = (qos_size + 3) & ~3; + + if (!valid || ((msg.pos + plength) > msg.length)) + { + return false; + } + else if (!is_sentinel) + { + if (pid == eprosima::fastdds::dds::PID_ENDPOINT_GUID) + { + std::cout << "Data (r/w) sent by the server" << std::endl; + num_data_r_w.fetch_add(1u, std::memory_order_seq_cst); + break; + } + else if (pid == eprosima::fastdds::dds::PID_VENDORID) + { + // Vendor ID is present in both Data(p) and Data(r/w) messages + inline_qos_msg = false; + } + } + } + + // Do not drop the packet in any case + return false; + }; + + // Declare a test transport that will count the number of Data(r/w) messages sent + std::atomic num_data_r_w_sends_s1{ 0 }; + std::atomic num_data_r_w_sends_s2{ 0 }; + auto test_transport_s1 = std::make_shared(); + test_transport_s1->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + { + return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s1); + }; + + auto test_transport_s2 = std::make_shared(); + test_transport_s2->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + { + return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s2); + }; + + // Create server 1 + auto server_1 = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server_1; // UDPv4 locator by default + eprosima::fastdds::rtps::IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1); + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server_1, global_port); + + WireProtocolConfigQos server_wp_qos_1; + server_wp_qos_1.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + server_wp_qos_1.builtin.metatrafficUnicastLocatorList.push_back(locator_server_1); + + server_wp_qos_1.builtin.discovery_config.leaseDuration = c_TimeInfinite; + server_wp_qos_1.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; + server_wp_qos_1.builtin.discovery_config.initial_announcements.count = 0; + + // The main participant will use the test transport and a specific announcements configuration + server_1->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s1) + .wire_protocol(server_wp_qos_1); + + // Start the main participant + ASSERT_TRUE(server_1->init_participant()); + + // Create server 2 + auto server_2 = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server_2 = locator_server_1; // UDPv4 locator by default + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server_2, global_port + 1); + + WireProtocolConfigQos server_wp_qos_2 = server_wp_qos_1; + server_wp_qos_2.builtin.metatrafficUnicastLocatorList.clear(); + server_wp_qos_2.builtin.metatrafficUnicastLocatorList.push_back(locator_server_2); + // Configure 1 initial announcement as this Server will connect to the first one + server_wp_qos_2.builtin.discovery_config.initial_announcements.count = 1; + server_wp_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_1); + + // The main participant will use the test transport and a specific announcements configuration + server_2->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s2) + .wire_protocol(server_wp_qos_2); + + // Start the main participant + ASSERT_TRUE(server_2->init_participant()); + + // Both servers match + server_1->wait_discovery(std::chrono::seconds(5), 1, true); + server_2->wait_discovery(std::chrono::seconds(5), 1, true); + // Let some time for the server to run the internal routine and match virtual endpoints + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Create a client that connects to their corresponding server + PubSubWriter client_1(TEST_TOPIC_NAME); + PubSubReader client_2(TEST_TOPIC_NAME); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_1); + client_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = { 15, 0 }; + client_qos.builtin.discovery_config.initial_announcements.count = 0; + + // Init client 1 + client_1.set_wire_protocol_qos(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init(); + + // Init client 2 + client_qos.builtin.discovery_config.m_DiscoveryServers.clear(); + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_2); + client_2.set_wire_protocol_qos(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init(); + + ASSERT_TRUE(client_1.isInitialized()); + ASSERT_TRUE(client_2.isInitialized()); + + // Wait the lease announcement period to discover endpoints + server_1->wait_discovery(std::chrono::seconds(5), 2, true); + server_2->wait_discovery(std::chrono::seconds(5), 2, true); + + // Ensure that no additional Data(r/w) messages are sent by DS routine + std::this_thread::sleep_for(std::chrono::seconds(15)); + + EXPECT_EQ(num_data_r_w_sends_s1.load(std::memory_order::memory_order_seq_cst), 2u); + EXPECT_EQ(num_data_r_w_sends_s2.load(std::memory_order::memory_order_seq_cst), 2u); +} From dde86be15ca3a87ffa8e49d0167ae7576148e172 Mon Sep 17 00:00:00 2001 From: Eugenio Collado Date: Mon, 30 Jun 2025 15:16:37 +0200 Subject: [PATCH 2/5] Fix tests compilation Signed-off-by: Eugenio Collado --- .../common/BlackboxTestsDiscovery.cpp | 31 ++++++----- test/blackbox/utils/filter_helpers.hpp | 53 +++++++++++++++++++ 2 files changed, 72 insertions(+), 12 deletions(-) create mode 100644 test/blackbox/utils/filter_helpers.hpp diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index 40cd5fe0e6e..3735b04a90f 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -40,6 +40,7 @@ #include "PubSubWriter.hpp" #include "PubSubWriterReader.hpp" #include "PubSubParticipant.hpp" +#include "../utils/filter_helpers.hpp" using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; @@ -2401,7 +2402,7 @@ TEST_P(Discovery, discovery_server_pdp_messages_sent) // Declare a test transport that will count the number of Data(p) messages sent std::atomic num_data_p_sends{ 0 }; auto test_transport = std::make_shared(); - test_transport->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + test_transport->drop_data_messages_filter_ = [&](CDRMessage_t& msg) { return builtin_msg_is_data_p(msg, num_data_p_sends); }; @@ -2410,8 +2411,8 @@ TEST_P(Discovery, discovery_server_pdp_messages_sent) auto server = std::make_shared>(0, 0, 0, 0); Locator_t locator_server; // UDPv4 locator by default - eprosima::fastdds::rtps::IPLocator::setIPv4(locator_server, 127, 0, 0, 1); - eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server, global_port); + IPLocator::setIPv4(locator_server, 127, 0, 0, 1); + IPLocator::setPhysicalPort(locator_server, global_port); WireProtocolConfigQos server_wp_qos; server_wp_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; @@ -2435,7 +2436,9 @@ TEST_P(Discovery, discovery_server_pdp_messages_sent) // Set participant as client WireProtocolConfigQos client_qos; client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; - client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server); + RemoteServerAttributes remote_server_att; + remote_server_att.metatrafficUnicastLocatorList.push_back(locator_server); + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att); client_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; client_qos.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; client_qos.builtin.discovery_config.initial_announcements.count = 1; @@ -2559,13 +2562,13 @@ TEST_P(Discovery, discovery_server_edp_messages_sent) std::atomic num_data_r_w_sends_s1{ 0 }; std::atomic num_data_r_w_sends_s2{ 0 }; auto test_transport_s1 = std::make_shared(); - test_transport_s1->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + test_transport_s1->drop_data_messages_filter_ = [&](CDRMessage_t& msg) { return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s1); }; auto test_transport_s2 = std::make_shared(); - test_transport_s2->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) + test_transport_s2->drop_data_messages_filter_ = [&](CDRMessage_t& msg) { return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s2); }; @@ -2574,8 +2577,8 @@ TEST_P(Discovery, discovery_server_edp_messages_sent) auto server_1 = std::make_shared>(0, 0, 0, 0); Locator_t locator_server_1; // UDPv4 locator by default - eprosima::fastdds::rtps::IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1); - eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server_1, global_port); + IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1); + IPLocator::setPhysicalPort(locator_server_1, global_port); WireProtocolConfigQos server_wp_qos_1; server_wp_qos_1.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; @@ -2596,14 +2599,16 @@ TEST_P(Discovery, discovery_server_edp_messages_sent) auto server_2 = std::make_shared>(0, 0, 0, 0); Locator_t locator_server_2 = locator_server_1; // UDPv4 locator by default - eprosima::fastdds::rtps::IPLocator::setPhysicalPort(locator_server_2, global_port + 1); + IPLocator::setPhysicalPort(locator_server_2, global_port + 1); WireProtocolConfigQos server_wp_qos_2 = server_wp_qos_1; server_wp_qos_2.builtin.metatrafficUnicastLocatorList.clear(); server_wp_qos_2.builtin.metatrafficUnicastLocatorList.push_back(locator_server_2); // Configure 1 initial announcement as this Server will connect to the first one server_wp_qos_2.builtin.discovery_config.initial_announcements.count = 1; - server_wp_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_1); + RemoteServerAttributes remote_server_att_1; + remote_server_att_1.metatrafficUnicastLocatorList.push_back(locator_server_1); + server_wp_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_1); // The main participant will use the test transport and a specific announcements configuration server_2->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s2) @@ -2624,7 +2629,7 @@ TEST_P(Discovery, discovery_server_edp_messages_sent) // Set participant as client WireProtocolConfigQos client_qos; client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; - client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_1); + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_1); client_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; client_qos.builtin.discovery_config.leaseDuration_announcementperiod = { 15, 0 }; client_qos.builtin.discovery_config.initial_announcements.count = 0; @@ -2636,7 +2641,9 @@ TEST_P(Discovery, discovery_server_edp_messages_sent) // Init client 2 client_qos.builtin.discovery_config.m_DiscoveryServers.clear(); - client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(locator_server_2); + RemoteServerAttributes remote_server_att_2; + remote_server_att_2.metatrafficUnicastLocatorList.push_back(locator_server_2); + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_2); client_2.set_wire_protocol_qos(client_qos) .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) .init(); diff --git a/test/blackbox/utils/filter_helpers.hpp b/test/blackbox/utils/filter_helpers.hpp new file mode 100644 index 00000000000..c903623aff6 --- /dev/null +++ b/test/blackbox/utils/filter_helpers.hpp @@ -0,0 +1,53 @@ +// Copyright 2019, 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include + +// #include "../types/core/core_typesPubSubTypes.hpp" + +namespace eprosima { +namespace fastdds { +namespace helpers { + +inline uint16_t cdr_parse_u16( + char* serialized_buffer) +{ + uint16_t u16; + eprosima::fastcdr::FastBuffer buffer(serialized_buffer, 2); + eprosima::fastcdr::Cdr cdr(buffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::XCDRv1); + cdr >> u16; + return u16; +} + +inline uint32_t cdr_parse_u32( + char* serialized_buffer) +{ + uint32_t u32; + eprosima::fastcdr::FastBuffer buffer(serialized_buffer, 4); + eprosima::fastcdr::Cdr cdr(buffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::XCDRv1); + cdr >> u32; + return u32; +} + +} // namespace helpers +} // namespace fastdds +} // namespace eprosima From 5d1082762c7d31caecfd3b334900a7c2478082bd Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Tue, 22 Jul 2025 10:37:20 +0200 Subject: [PATCH 3/5] Add GUID prefix and builtin data filter Signed-off-by: cferreiragonz --- .../transport/test_UDPv4TransportDescriptor.h | 2 ++ src/cpp/rtps/transport/test_UDPv4Transport.cpp | 17 +++++++++++++++++ src/cpp/rtps/transport/test_UDPv4Transport.h | 1 + test/blackbox/common/BlackboxTestsDiscovery.cpp | 16 +++++++++++----- 4 files changed, 31 insertions(+), 5 deletions(-) diff --git a/include/fastdds/rtps/transport/test_UDPv4TransportDescriptor.h b/include/fastdds/rtps/transport/test_UDPv4TransportDescriptor.h index dafa7798ae8..4a3bf31f7c7 100644 --- a/include/fastdds/rtps/transport/test_UDPv4TransportDescriptor.h +++ b/include/fastdds/rtps/transport/test_UDPv4TransportDescriptor.h @@ -41,6 +41,8 @@ struct test_UDPv4TransportDescriptor : public SocketTransportDescriptor mutable std::atomic dropDataMessagesPercentage; //! Filtering function for dropping data messages filter drop_data_messages_filter_; + //! Filtering function for dropping builtin data messages + filter drop_builtin_data_messages_filter_; //! Flag to enable dropping of discovery Participant DATA(P) messages bool dropParticipantBuiltinTopicData; //! Flag to enable dropping of discovery Writer DATA(W) messages diff --git a/src/cpp/rtps/transport/test_UDPv4Transport.cpp b/src/cpp/rtps/transport/test_UDPv4Transport.cpp index 396836dff42..83ea2c95402 100644 --- a/src/cpp/rtps/transport/test_UDPv4Transport.cpp +++ b/src/cpp/rtps/transport/test_UDPv4Transport.cpp @@ -46,6 +46,7 @@ test_UDPv4Transport::test_UDPv4Transport( const test_UDPv4TransportDescriptor& descriptor) : drop_data_messages_percentage_(descriptor.dropDataMessagesPercentage) , drop_data_messages_filter_(descriptor.drop_data_messages_filter_) + , drop_builtin_data_messages_filter_(descriptor.drop_builtin_data_messages_filter_) , drop_participant_builtin_topic_data_(descriptor.dropParticipantBuiltinTopicData) , drop_publication_builtin_topic_data_(descriptor.dropPublicationBuiltinTopicData) , drop_subscription_builtin_topic_data_(descriptor.dropSubscriptionBuiltinTopicData) @@ -83,6 +84,10 @@ test_UDPv4TransportDescriptor::test_UDPv4TransportDescriptor() { return false; }) + , drop_builtin_data_messages_filter_([](CDRMessage_t&) + { + return false; + }) , dropParticipantBuiltinTopicData(false) , dropPublicationBuiltinTopicData(false) , dropSubscriptionBuiltinTopicData(false) @@ -377,6 +382,10 @@ bool test_UDPv4Transport::packet_should_drop( { return true; } + else if (drop_builtin_data_messages_filter_(cdrMessage)) + { + return true; + } } else if (writer_id == fastrtps::rtps::c_EntityId_SEDPPubWriter) { @@ -384,6 +393,10 @@ bool test_UDPv4Transport::packet_should_drop( { return true; } + else if (drop_builtin_data_messages_filter_(cdrMessage)) + { + return true; + } } else if (writer_id == fastrtps::rtps::c_EntityId_SEDPSubWriter) { @@ -391,6 +404,10 @@ bool test_UDPv4Transport::packet_should_drop( { return true; } + else if (drop_builtin_data_messages_filter_(cdrMessage)) + { + return true; + } } else { diff --git a/src/cpp/rtps/transport/test_UDPv4Transport.h b/src/cpp/rtps/transport/test_UDPv4Transport.h index 9ef1df6398d..8668c4424c6 100644 --- a/src/cpp/rtps/transport/test_UDPv4Transport.h +++ b/src/cpp/rtps/transport/test_UDPv4Transport.h @@ -86,6 +86,7 @@ class test_UDPv4Transport : public UDPv4Transport PercentageData drop_data_messages_percentage_; test_UDPv4TransportDescriptor::filter drop_data_messages_filter_; + test_UDPv4TransportDescriptor::filter drop_builtin_data_messages_filter_; bool drop_participant_builtin_topic_data_; bool drop_publication_builtin_topic_data_; bool drop_subscription_builtin_topic_data_; diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index 3735b04a90f..db586f35d24 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -2337,7 +2337,7 @@ TEST_P(Discovery, discovery_server_pdp_messages_sent) // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. // The main participant will use the test transport to count the number of Data(p) sent. - // Look for the PID_DOMAIN_ID in the message as it is only present in Data(p) messages + // Look for the PID_PARTICIPANT_LEASE_DURATION in the message as it is only present in Data(p) messages auto builtin_msg_is_data_p = [](CDRMessage_t& msg, std::atomic& num_data_p) { uint32_t qos_size = 0; @@ -2385,7 +2385,7 @@ TEST_P(Discovery, discovery_server_pdp_messages_sent) } else if (!is_sentinel) { - if (pid == eprosima::fastdds::dds::PID_DOMAIN_ID) + if (pid == eprosima::fastdds::dds::PID_PARTICIPANT_LEASE_DURATION) { std::cout << "Data(p) sent by the server" << std::endl; inline_qos_msg = false; @@ -2402,7 +2402,7 @@ TEST_P(Discovery, discovery_server_pdp_messages_sent) // Declare a test transport that will count the number of Data(p) messages sent std::atomic num_data_p_sends{ 0 }; auto test_transport = std::make_shared(); - test_transport->drop_data_messages_filter_ = [&](CDRMessage_t& msg) + test_transport->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) { return builtin_msg_is_data_p(msg, num_data_p_sends); }; @@ -2416,6 +2416,7 @@ TEST_P(Discovery, discovery_server_pdp_messages_sent) WireProtocolConfigQos server_wp_qos; server_wp_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + std::istringstream("44.53.00.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos.prefix; server_wp_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server); server_wp_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; @@ -2437,6 +2438,7 @@ TEST_P(Discovery, discovery_server_pdp_messages_sent) WireProtocolConfigQos client_qos; client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; RemoteServerAttributes remote_server_att; + remote_server_att.ReadguidPrefix("44.53.00.5f.45.50.52.4f.53.49.4d.41"); remote_server_att.metatrafficUnicastLocatorList.push_back(locator_server); client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att); client_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; @@ -2562,13 +2564,13 @@ TEST_P(Discovery, discovery_server_edp_messages_sent) std::atomic num_data_r_w_sends_s1{ 0 }; std::atomic num_data_r_w_sends_s2{ 0 }; auto test_transport_s1 = std::make_shared(); - test_transport_s1->drop_data_messages_filter_ = [&](CDRMessage_t& msg) + test_transport_s1->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) { return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s1); }; auto test_transport_s2 = std::make_shared(); - test_transport_s2->drop_data_messages_filter_ = [&](CDRMessage_t& msg) + test_transport_s2->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) { return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s2); }; @@ -2582,6 +2584,7 @@ TEST_P(Discovery, discovery_server_edp_messages_sent) WireProtocolConfigQos server_wp_qos_1; server_wp_qos_1.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + std::istringstream("44.53.01.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos_1.prefix; server_wp_qos_1.builtin.metatrafficUnicastLocatorList.push_back(locator_server_1); server_wp_qos_1.builtin.discovery_config.leaseDuration = c_TimeInfinite; @@ -2602,11 +2605,13 @@ TEST_P(Discovery, discovery_server_edp_messages_sent) IPLocator::setPhysicalPort(locator_server_2, global_port + 1); WireProtocolConfigQos server_wp_qos_2 = server_wp_qos_1; + std::istringstream("44.53.02.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos_2.prefix; server_wp_qos_2.builtin.metatrafficUnicastLocatorList.clear(); server_wp_qos_2.builtin.metatrafficUnicastLocatorList.push_back(locator_server_2); // Configure 1 initial announcement as this Server will connect to the first one server_wp_qos_2.builtin.discovery_config.initial_announcements.count = 1; RemoteServerAttributes remote_server_att_1; + remote_server_att_1.ReadguidPrefix("44.53.01.5f.45.50.52.4f.53.49.4d.41"); remote_server_att_1.metatrafficUnicastLocatorList.push_back(locator_server_1); server_wp_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_1); @@ -2642,6 +2647,7 @@ TEST_P(Discovery, discovery_server_edp_messages_sent) // Init client 2 client_qos.builtin.discovery_config.m_DiscoveryServers.clear(); RemoteServerAttributes remote_server_att_2; + remote_server_att_2.ReadguidPrefix("44.53.02.5f.45.50.52.4f.53.49.4d.41"); remote_server_att_2.metatrafficUnicastLocatorList.push_back(locator_server_2); client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_2); client_2.set_wire_protocol_qos(client_qos) From 699c1ba9d6ecce61a6bbca28bc873000b3c4242d Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Tue, 22 Jul 2025 11:11:53 +0200 Subject: [PATCH 4/5] Avoid using parametrized test struct Signed-off-by: cferreiragonz --- .../blackbox/common/BlackboxTestsDiscovery.cpp | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index db586f35d24..1c2a2e911b0 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -2322,15 +2322,8 @@ TEST(Discovery, discovery_cyclone_participant_with_custom_pid) // This test checks that a Discover Server does not send duplicated PDP messages of itself when new clients // are discovered -TEST_P(Discovery, discovery_server_pdp_messages_sent) +TEST(Discovery, discovery_server_pdp_messages_sent) { - // Skip test in intraprocess and datasharing mode - if (TRANSPORT != GetParam()) - { - GTEST_SKIP() << "Only makes sense on TRANSPORT"; - return; - } - using namespace eprosima::fastdds::dds; // One discovery server will be created, with multiple direct clients connected to it. @@ -2480,15 +2473,8 @@ TEST_P(Discovery, discovery_server_pdp_messages_sent) EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 3u); } -TEST_P(Discovery, discovery_server_edp_messages_sent) +TEST(Discovery, discovery_server_edp_messages_sent) { - // Skip test in intraprocess and datasharing mode - if (TRANSPORT != GetParam()) - { - GTEST_SKIP() << "Only makes sense on TRANSPORT"; - return; - } - using namespace eprosima::fastdds::dds; // Two discovery servers will be created, each with a direct client connected to them. From 33d32806481e0033580f9ddcb47f2beea349f68e Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Tue, 22 Jul 2025 14:31:47 +0200 Subject: [PATCH 5/5] Move test to DDS suite Signed-off-by: cferreiragonz --- .../common/BlackboxTestsDiscovery.cpp | 335 ----------------- .../common/DDSBlackboxTestsDiscovery.cpp | 339 ++++++++++++++++++ 2 files changed, 339 insertions(+), 335 deletions(-) diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index 1c2a2e911b0..a11191f494e 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -40,7 +40,6 @@ #include "PubSubWriter.hpp" #include "PubSubWriterReader.hpp" #include "PubSubParticipant.hpp" -#include "../utils/filter_helpers.hpp" using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; @@ -2319,337 +2318,3 @@ TEST(Discovery, discovery_cyclone_participant_with_custom_pid) /* Clean up */ factory->delete_participant(participant); } - -// This test checks that a Discover Server does not send duplicated PDP messages of itself when new clients -// are discovered -TEST(Discovery, discovery_server_pdp_messages_sent) -{ - using namespace eprosima::fastdds::dds; - - // One discovery server will be created, with multiple direct clients connected to it. - // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. - // The main participant will use the test transport to count the number of Data(p) sent. - - // Look for the PID_PARTICIPANT_LEASE_DURATION in the message as it is only present in Data(p) messages - auto builtin_msg_is_data_p = [](CDRMessage_t& msg, std::atomic& num_data_p) - { - uint32_t qos_size = 0; - uint32_t original_pos = msg.pos; - bool is_sentinel = false; - bool inline_qos_msg = false; - - while (!is_sentinel) - { - msg.pos = original_pos + qos_size; - - uint16_t pid = eprosima::fastdds::helpers::cdr_parse_u16( - (char*)&msg.buffer[msg.pos]); - msg.pos += 2; - uint16_t plength = eprosima::fastdds::helpers::cdr_parse_u16( - (char*)&msg.buffer[msg.pos]); - msg.pos += 2; - bool valid = true; - - // If inline_qos submessage is found we will have an additional Sentinel - if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) - { - inline_qos_msg = true; - } - else if (pid == eprosima::fastdds::dds::PID_SENTINEL) - { - // PID_SENTINEL is always considered of length 0 - plength = 0; - if (!inline_qos_msg) - { - // If the PID is not inline qos, then we need to set the sentinel - // to true, as it is the last PID - is_sentinel = true; - } - } - - qos_size += (4 + plength); - - // Align to 4 byte boundary and prepare for next iteration - qos_size = (qos_size + 3) & ~3; - - if (!valid || ((msg.pos + plength) > msg.length)) - { - return false; - } - else if (!is_sentinel) - { - if (pid == eprosima::fastdds::dds::PID_PARTICIPANT_LEASE_DURATION) - { - std::cout << "Data(p) sent by the server" << std::endl; - inline_qos_msg = false; - num_data_p.fetch_add(1u, std::memory_order_seq_cst); - break; - } - } - } - - // Do not drop the packet in any case - return false; - }; - - // Declare a test transport that will count the number of Data(p) messages sent - std::atomic num_data_p_sends{ 0 }; - auto test_transport = std::make_shared(); - test_transport->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) - { - return builtin_msg_is_data_p(msg, num_data_p_sends); - }; - - // Create the main participant - auto server = std::make_shared>(0, 0, 0, 0); - - Locator_t locator_server; // UDPv4 locator by default - IPLocator::setIPv4(locator_server, 127, 0, 0, 1); - IPLocator::setPhysicalPort(locator_server, global_port); - - WireProtocolConfigQos server_wp_qos; - server_wp_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; - std::istringstream("44.53.00.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos.prefix; - server_wp_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server); - - server_wp_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; - server_wp_qos.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; - server_wp_qos.builtin.discovery_config.initial_announcements.count = 0; - - // The main participant will use the test transport and a specific announcements configuration - server->disable_builtin_transport().add_user_transport_to_pparams(test_transport) - .wire_protocol(server_wp_qos); - - // Start the main participant - ASSERT_TRUE(server->init_participant()); - - // Create a client that connects to the first server - PubSubParticipant client_1(0u, 0u, 0u, 0u); - PubSubParticipant client_2(0u, 0u, 0u, 0u); - PubSubParticipant client_3(0u, 0u, 0u, 0u); - // Set participant as client - WireProtocolConfigQos client_qos; - client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; - RemoteServerAttributes remote_server_att; - remote_server_att.ReadguidPrefix("44.53.00.5f.45.50.52.4f.53.49.4d.41"); - remote_server_att.metatrafficUnicastLocatorList.push_back(locator_server); - client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att); - client_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; - client_qos.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; - client_qos.builtin.discovery_config.initial_announcements.count = 1; - // Init client 1 - ASSERT_TRUE(client_1.wire_protocol(client_qos) - .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) - .init_participant()); - - // Wait for the initial announcements to be sent - server->wait_discovery(std::chrono::seconds(5), 1, true); - // Let some time for the server to run the internal routine and check if it sent Data(p) - std::this_thread::sleep_for(std::chrono::seconds(3)); - EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 1u); - - // Init client 2 - ASSERT_TRUE(client_2.wire_protocol(client_qos) - .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) - .init_participant()); - - - // Wait for the initial announcements to be sent - server->wait_discovery(std::chrono::seconds(5), 2, true); - // Let some time for the server to run the internal routine and check if it sent Data(p) - std::this_thread::sleep_for(std::chrono::seconds(3)); - EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 2u); - - // Init client 3 - ASSERT_TRUE(client_3.wire_protocol(client_qos) - .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) - .init_participant()); - - - // Wait for the initial announcements to be sent - server->wait_discovery(std::chrono::seconds(5), 3, true); - // Let some time for the server to run the internal routine and check if it sent Data(p) - std::this_thread::sleep_for(std::chrono::seconds(3)); - EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 3u); -} - -TEST(Discovery, discovery_server_edp_messages_sent) -{ - using namespace eprosima::fastdds::dds; - - // Two discovery servers will be created, each with a direct client connected to them. - // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. - // The main participant will use the test transport to count the number of Data(r/w) sent. - - // Look for the PID_ENDPOINT_GUID in the message as it is only present in Data(r/w) messages - auto builtin_msg_is_data_r_w = [](CDRMessage_t& msg, std::atomic& num_data_r_w) - { - uint32_t qos_size = 0; - uint32_t original_pos = msg.pos; - bool is_sentinel = false; - bool inline_qos_msg = false; - - while (!is_sentinel) - { - msg.pos = original_pos + qos_size; - - uint16_t pid = eprosima::fastdds::helpers::cdr_parse_u16( - (char*)&msg.buffer[msg.pos]); - msg.pos += 2; - uint16_t plength = eprosima::fastdds::helpers::cdr_parse_u16( - (char*)&msg.buffer[msg.pos]); - msg.pos += 2; - bool valid = true; - - if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) - { - inline_qos_msg = true; - } - else if (pid == eprosima::fastdds::dds::PID_SENTINEL) - { - // PID_SENTINEL is always considered of length 0 - plength = 0; - if (!inline_qos_msg) - { - // If the PID is not inline qos, then we need to set the sentinel - // to true, as it is the last PID - is_sentinel = true; - } - } - - qos_size += (4 + plength); - - // Align to 4 byte boundary and prepare for next iteration - qos_size = (qos_size + 3) & ~3; - - if (!valid || ((msg.pos + plength) > msg.length)) - { - return false; - } - else if (!is_sentinel) - { - if (pid == eprosima::fastdds::dds::PID_ENDPOINT_GUID) - { - std::cout << "Data (r/w) sent by the server" << std::endl; - num_data_r_w.fetch_add(1u, std::memory_order_seq_cst); - break; - } - else if (pid == eprosima::fastdds::dds::PID_VENDORID) - { - // Vendor ID is present in both Data(p) and Data(r/w) messages - inline_qos_msg = false; - } - } - } - - // Do not drop the packet in any case - return false; - }; - - // Declare a test transport that will count the number of Data(r/w) messages sent - std::atomic num_data_r_w_sends_s1{ 0 }; - std::atomic num_data_r_w_sends_s2{ 0 }; - auto test_transport_s1 = std::make_shared(); - test_transport_s1->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) - { - return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s1); - }; - - auto test_transport_s2 = std::make_shared(); - test_transport_s2->drop_builtin_data_messages_filter_ = [&](CDRMessage_t& msg) - { - return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s2); - }; - - // Create server 1 - auto server_1 = std::make_shared>(0, 0, 0, 0); - - Locator_t locator_server_1; // UDPv4 locator by default - IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1); - IPLocator::setPhysicalPort(locator_server_1, global_port); - - WireProtocolConfigQos server_wp_qos_1; - server_wp_qos_1.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; - std::istringstream("44.53.01.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos_1.prefix; - server_wp_qos_1.builtin.metatrafficUnicastLocatorList.push_back(locator_server_1); - - server_wp_qos_1.builtin.discovery_config.leaseDuration = c_TimeInfinite; - server_wp_qos_1.builtin.discovery_config.leaseDuration_announcementperiod = c_TimeInfinite; - server_wp_qos_1.builtin.discovery_config.initial_announcements.count = 0; - - // The main participant will use the test transport and a specific announcements configuration - server_1->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s1) - .wire_protocol(server_wp_qos_1); - - // Start the main participant - ASSERT_TRUE(server_1->init_participant()); - - // Create server 2 - auto server_2 = std::make_shared>(0, 0, 0, 0); - - Locator_t locator_server_2 = locator_server_1; // UDPv4 locator by default - IPLocator::setPhysicalPort(locator_server_2, global_port + 1); - - WireProtocolConfigQos server_wp_qos_2 = server_wp_qos_1; - std::istringstream("44.53.02.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos_2.prefix; - server_wp_qos_2.builtin.metatrafficUnicastLocatorList.clear(); - server_wp_qos_2.builtin.metatrafficUnicastLocatorList.push_back(locator_server_2); - // Configure 1 initial announcement as this Server will connect to the first one - server_wp_qos_2.builtin.discovery_config.initial_announcements.count = 1; - RemoteServerAttributes remote_server_att_1; - remote_server_att_1.ReadguidPrefix("44.53.01.5f.45.50.52.4f.53.49.4d.41"); - remote_server_att_1.metatrafficUnicastLocatorList.push_back(locator_server_1); - server_wp_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_1); - - // The main participant will use the test transport and a specific announcements configuration - server_2->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s2) - .wire_protocol(server_wp_qos_2); - - // Start the main participant - ASSERT_TRUE(server_2->init_participant()); - - // Both servers match - server_1->wait_discovery(std::chrono::seconds(5), 1, true); - server_2->wait_discovery(std::chrono::seconds(5), 1, true); - // Let some time for the server to run the internal routine and match virtual endpoints - std::this_thread::sleep_for(std::chrono::seconds(2)); - - // Create a client that connects to their corresponding server - PubSubWriter client_1(TEST_TOPIC_NAME); - PubSubReader client_2(TEST_TOPIC_NAME); - // Set participant as client - WireProtocolConfigQos client_qos; - client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; - client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_1); - client_qos.builtin.discovery_config.leaseDuration = c_TimeInfinite; - client_qos.builtin.discovery_config.leaseDuration_announcementperiod = { 15, 0 }; - client_qos.builtin.discovery_config.initial_announcements.count = 0; - - // Init client 1 - client_1.set_wire_protocol_qos(client_qos) - .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) - .init(); - - // Init client 2 - client_qos.builtin.discovery_config.m_DiscoveryServers.clear(); - RemoteServerAttributes remote_server_att_2; - remote_server_att_2.ReadguidPrefix("44.53.02.5f.45.50.52.4f.53.49.4d.41"); - remote_server_att_2.metatrafficUnicastLocatorList.push_back(locator_server_2); - client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_2); - client_2.set_wire_protocol_qos(client_qos) - .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) - .init(); - - ASSERT_TRUE(client_1.isInitialized()); - ASSERT_TRUE(client_2.isInitialized()); - - // Wait the lease announcement period to discover endpoints - server_1->wait_discovery(std::chrono::seconds(5), 2, true); - server_2->wait_discovery(std::chrono::seconds(5), 2, true); - - // Ensure that no additional Data(r/w) messages are sent by DS routine - std::this_thread::sleep_for(std::chrono::seconds(15)); - - EXPECT_EQ(num_data_r_w_sends_s1.load(std::memory_order::memory_order_seq_cst), 2u); - EXPECT_EQ(num_data_r_w_sends_s2.load(std::memory_order::memory_order_seq_cst), 2u); -} diff --git a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp index 7defcecf58e..399a87454b1 100644 --- a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp @@ -44,6 +44,7 @@ #include #include #include +#include "../utils/filter_helpers.hpp" // Regression test for redmine issue 11857 TEST(DDSDiscovery, IgnoreParticipantFlags) @@ -2001,3 +2002,341 @@ TEST(DDSDiscovery, DataracePDP) settings.intraprocess_delivery = prev_intraprocess_delivery; fastrtps::xmlparser::XMLProfileManager::library_settings(settings); } + +// This test checks that a Discover Server does not send duplicated PDP messages of itself when new clients +// are discovered +TEST(Discovery, discovery_server_pdp_messages_sent) +{ + using namespace eprosima; + using namespace eprosima::fastdds::dds; + using namespace eprosima::fastrtps::rtps; + + // One discovery server will be created, with multiple direct clients connected to it. + // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. + // The main participant will use the test transport to count the number of Data(p) sent. + + // Look for the PID_PARTICIPANT_LEASE_DURATION in the message as it is only present in Data(p) messages + auto builtin_msg_is_data_p = [](fastrtps::rtps::CDRMessage_t& msg, std::atomic& num_data_p) + { + uint32_t qos_size = 0; + uint32_t original_pos = msg.pos; + bool is_sentinel = false; + bool inline_qos_msg = false; + + while (!is_sentinel) + { + msg.pos = original_pos + qos_size; + + uint16_t pid = fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + uint16_t plength = fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + bool valid = true; + + // If inline_qos submessage is found we will have an additional Sentinel + if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) + { + inline_qos_msg = true; + } + else if (pid == eprosima::fastdds::dds::PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + if (!inline_qos_msg) + { + // If the PID is not inline qos, then we need to set the sentinel + // to true, as it is the last PID + is_sentinel = true; + } + } + + qos_size += (4 + plength); + + // Align to 4 byte boundary and prepare for next iteration + qos_size = (qos_size + 3) & ~3; + + if (!valid || ((msg.pos + plength) > msg.length)) + { + return false; + } + else if (!is_sentinel) + { + if (pid == eprosima::fastdds::dds::PID_PARTICIPANT_LEASE_DURATION) + { + std::cout << "Data(p) sent by the server" << std::endl; + inline_qos_msg = false; + num_data_p.fetch_add(1u, std::memory_order_seq_cst); + break; + } + } + } + + // Do not drop the packet in any case + return false; + }; + + // Declare a test transport that will count the number of Data(p) messages sent + std::atomic num_data_p_sends{ 0 }; + auto test_transport = std::make_shared(); + test_transport->drop_builtin_data_messages_filter_ = [&](fastrtps::rtps::CDRMessage_t& msg) + { + return builtin_msg_is_data_p(msg, num_data_p_sends); + }; + + // Create the main participant + auto server = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server; // UDPv4 locator by default + IPLocator::setIPv4(locator_server, 127, 0, 0, 1); + IPLocator::setPhysicalPort(locator_server, global_port); + + WireProtocolConfigQos server_wp_qos; + server_wp_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + std::istringstream("44.53.00.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos.prefix; + server_wp_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server); + + server_wp_qos.builtin.discovery_config.leaseDuration = fastrtps::c_TimeInfinite; + server_wp_qos.builtin.discovery_config.leaseDuration_announcementperiod = fastrtps::c_TimeInfinite; + server_wp_qos.builtin.discovery_config.initial_announcements.count = 0; + + // The main participant will use the test transport and a specific announcements configuration + server->disable_builtin_transport().add_user_transport_to_pparams(test_transport) + .wire_protocol(server_wp_qos); + + // Start the main participant + ASSERT_TRUE(server->init_participant()); + + // Create a client that connects to the first server + PubSubParticipant client_1(0u, 0u, 0u, 0u); + PubSubParticipant client_2(0u, 0u, 0u, 0u); + PubSubParticipant client_3(0u, 0u, 0u, 0u); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + RemoteServerAttributes remote_server_att; + remote_server_att.ReadguidPrefix("44.53.00.5f.45.50.52.4f.53.49.4d.41"); + remote_server_att.metatrafficUnicastLocatorList.push_back(locator_server); + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att); + client_qos.builtin.discovery_config.leaseDuration = fastrtps::c_TimeInfinite; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = fastrtps::c_TimeInfinite; + client_qos.builtin.discovery_config.initial_announcements.count = 1; + // Init client 1 + ASSERT_TRUE(client_1.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 1, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 1u); + + // Init client 2 + ASSERT_TRUE(client_2.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 2, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 2u); + + // Init client 3 + ASSERT_TRUE(client_3.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 3, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 3u); +} + +TEST(Discovery, discovery_server_edp_messages_sent) +{ + using namespace eprosima; + using namespace eprosima::fastdds::dds; + using namespace eprosima::fastrtps::rtps; + + // Two discovery servers will be created, each with a direct client connected to them. + // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. + // The main participant will use the test transport to count the number of Data(r/w) sent. + + // Look for the PID_ENDPOINT_GUID in the message as it is only present in Data(r/w) messages + auto builtin_msg_is_data_r_w = [](fastrtps::rtps::CDRMessage_t& msg, std::atomic& num_data_r_w) + { + uint32_t qos_size = 0; + uint32_t original_pos = msg.pos; + bool is_sentinel = false; + bool inline_qos_msg = false; + + while (!is_sentinel) + { + msg.pos = original_pos + qos_size; + + uint16_t pid = fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + uint16_t plength = fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + bool valid = true; + + if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) + { + inline_qos_msg = true; + } + else if (pid == eprosima::fastdds::dds::PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + if (!inline_qos_msg) + { + // If the PID is not inline qos, then we need to set the sentinel + // to true, as it is the last PID + is_sentinel = true; + } + } + + qos_size += (4 + plength); + + // Align to 4 byte boundary and prepare for next iteration + qos_size = (qos_size + 3) & ~3; + + if (!valid || ((msg.pos + plength) > msg.length)) + { + return false; + } + else if (!is_sentinel) + { + if (pid == eprosima::fastdds::dds::PID_ENDPOINT_GUID) + { + std::cout << "Data (r/w) sent by the server" << std::endl; + num_data_r_w.fetch_add(1u, std::memory_order_seq_cst); + break; + } + else if (pid == eprosima::fastdds::dds::PID_VENDORID) + { + // Vendor ID is present in both Data(p) and Data(r/w) messages + inline_qos_msg = false; + } + } + } + + // Do not drop the packet in any case + return false; + }; + + // Declare a test transport that will count the number of Data(r/w) messages sent + std::atomic num_data_r_w_sends_s1{ 0 }; + std::atomic num_data_r_w_sends_s2{ 0 }; + auto test_transport_s1 = std::make_shared(); + test_transport_s1->drop_builtin_data_messages_filter_ = [&](fastrtps::rtps::CDRMessage_t& msg) + { + return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s1); + }; + + auto test_transport_s2 = std::make_shared(); + test_transport_s2->drop_builtin_data_messages_filter_ = [&](fastrtps::rtps::CDRMessage_t& msg) + { + return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s2); + }; + + // Create server 1 + auto server_1 = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server_1; // UDPv4 locator by default + IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1); + IPLocator::setPhysicalPort(locator_server_1, global_port); + + WireProtocolConfigQos server_wp_qos_1; + server_wp_qos_1.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + std::istringstream("44.53.01.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos_1.prefix; + server_wp_qos_1.builtin.metatrafficUnicastLocatorList.push_back(locator_server_1); + + server_wp_qos_1.builtin.discovery_config.leaseDuration = fastrtps::c_TimeInfinite; + server_wp_qos_1.builtin.discovery_config.leaseDuration_announcementperiod = fastrtps::c_TimeInfinite; + server_wp_qos_1.builtin.discovery_config.initial_announcements.count = 0; + + // The main participant will use the test transport and a specific announcements configuration + server_1->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s1) + .wire_protocol(server_wp_qos_1); + + // Start the main participant + ASSERT_TRUE(server_1->init_participant()); + + // Create server 2 + auto server_2 = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server_2 = locator_server_1; // UDPv4 locator by default + IPLocator::setPhysicalPort(locator_server_2, global_port + 1); + + WireProtocolConfigQos server_wp_qos_2 = server_wp_qos_1; + std::istringstream("44.53.02.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos_2.prefix; + server_wp_qos_2.builtin.metatrafficUnicastLocatorList.clear(); + server_wp_qos_2.builtin.metatrafficUnicastLocatorList.push_back(locator_server_2); + // Configure 1 initial announcement as this Server will connect to the first one + server_wp_qos_2.builtin.discovery_config.initial_announcements.count = 1; + RemoteServerAttributes remote_server_att_1; + remote_server_att_1.ReadguidPrefix("44.53.01.5f.45.50.52.4f.53.49.4d.41"); + remote_server_att_1.metatrafficUnicastLocatorList.push_back(locator_server_1); + server_wp_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_1); + + // The main participant will use the test transport and a specific announcements configuration + server_2->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s2) + .wire_protocol(server_wp_qos_2); + + // Start the main participant + ASSERT_TRUE(server_2->init_participant()); + + // Both servers match + server_1->wait_discovery(std::chrono::seconds(5), 1, true); + server_2->wait_discovery(std::chrono::seconds(5), 1, true); + // Let some time for the server to run the internal routine and match virtual endpoints + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Create a client that connects to their corresponding server + PubSubWriter client_1(TEST_TOPIC_NAME); + PubSubReader client_2(TEST_TOPIC_NAME); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_1); + client_qos.builtin.discovery_config.leaseDuration = fastrtps::c_TimeInfinite; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = { 15, 0 }; + client_qos.builtin.discovery_config.initial_announcements.count = 0; + + // Init client 1 + client_1.set_wire_protocol_qos(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init(); + + // Init client 2 + client_qos.builtin.discovery_config.m_DiscoveryServers.clear(); + RemoteServerAttributes remote_server_att_2; + remote_server_att_2.ReadguidPrefix("44.53.02.5f.45.50.52.4f.53.49.4d.41"); + remote_server_att_2.metatrafficUnicastLocatorList.push_back(locator_server_2); + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_2); + client_2.set_wire_protocol_qos(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init(); + + ASSERT_TRUE(client_1.isInitialized()); + ASSERT_TRUE(client_2.isInitialized()); + + // Wait the lease announcement period to discover endpoints + server_1->wait_discovery(std::chrono::seconds(5), 2, true); + server_2->wait_discovery(std::chrono::seconds(5), 2, true); + + // Ensure that no additional Data(r/w) messages are sent by DS routine + std::this_thread::sleep_for(std::chrono::seconds(15)); + + EXPECT_EQ(num_data_r_w_sends_s1.load(std::memory_order::memory_order_seq_cst), 2u); + EXPECT_EQ(num_data_r_w_sends_s2.load(std::memory_order::memory_order_seq_cst), 2u); +}