diff --git a/include/fastdds/rtps/transport/test_UDPv4TransportDescriptor.h b/include/fastdds/rtps/transport/test_UDPv4TransportDescriptor.h index dafa7798ae8..4a3bf31f7c7 100644 --- a/include/fastdds/rtps/transport/test_UDPv4TransportDescriptor.h +++ b/include/fastdds/rtps/transport/test_UDPv4TransportDescriptor.h @@ -41,6 +41,8 @@ struct test_UDPv4TransportDescriptor : public SocketTransportDescriptor mutable std::atomic dropDataMessagesPercentage; //! Filtering function for dropping data messages filter drop_data_messages_filter_; + //! Filtering function for dropping builtin data messages + filter drop_builtin_data_messages_filter_; //! Flag to enable dropping of discovery Participant DATA(P) messages bool dropParticipantBuiltinTopicData; //! Flag to enable dropping of discovery Writer DATA(W) messages diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp index c6fa1c574a0..a1308cd4a62 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp @@ -36,6 +36,8 @@ namespace fastdds { namespace rtps { namespace ddb { +using ParticipantState = DiscoveryParticipantsAckStatus::ParticipantState; + DiscoveryDataBase::DiscoveryDataBase( fastrtps::rtps::GuidPrefix_t server_guid_prefix, std::set servers) @@ -267,8 +269,8 @@ void DiscoveryDataBase::update_change_and_unmatch_( changes_to_release_.push_back(entity.update_and_unmatch(new_change)); // Manually set relevant participants ACK status of this server, and of the participant that sent the // change, to 1. This way, we avoid backprogation of the data. - entity.add_or_update_ack_participant(server_guid_prefix_, true); - entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, true); + entity.add_or_update_ack_participant(server_guid_prefix_, ParticipantState::ACKED); + entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, ParticipantState::ACKED); } void DiscoveryDataBase::add_ack_( @@ -292,7 +294,7 @@ void DiscoveryDataBase::add_ack_( // database has been updated, so this ACK is not relevant anymore if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity()) { - it->second.add_or_update_ack_participant(acked_entity, true); + it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED); } } } @@ -307,7 +309,7 @@ void DiscoveryDataBase::add_ack_( // database has been updated, so this ACK is not relevant anymore if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity()) { - it->second.add_or_update_ack_participant(acked_entity, true); + it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED); } } } @@ -322,7 +324,7 @@ void DiscoveryDataBase::add_ack_( // database has been updated, so this ACK is not relevant anymore if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity()) { - it->second.add_or_update_ack_participant(acked_entity, true); + it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED); } } } @@ -694,7 +696,7 @@ void DiscoveryDataBase::create_new_participant_from_change_( // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way, // we avoid backprogation of the data. - ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); // If the DATA(p) it's from this server, it is already in history and we do nothing here if (change_guid.guidPrefix != server_guid_prefix_) @@ -796,7 +798,7 @@ void DiscoveryDataBase::update_participant_from_change_( if (ch->write_params.sample_identity().sequence_number() == participant_info.change()->write_params.sample_identity().sequence_number()) { - participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); } // we release it if it's the same or if it is lower @@ -846,7 +848,7 @@ void DiscoveryDataBase::create_writers_from_change_( if (ch->write_params.sample_identity().sequence_number() == writer_it->second.change()->write_params.sample_identity().sequence_number()) { - writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); } // we release it if it's the same or if it is lower @@ -894,7 +896,7 @@ void DiscoveryDataBase::create_writers_from_change_( // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way, // we avoid backprogation of the data. - writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); // if topic is virtual, it must iterate over all readers if (topic_name == virtual_topic_) @@ -964,7 +966,7 @@ void DiscoveryDataBase::create_readers_from_change_( if (ch->write_params.sample_identity().sequence_number() == reader_it->second.change()->write_params.sample_identity().sequence_number()) { - reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); } // we release it if it's the same or if it is lower @@ -1012,7 +1014,7 @@ void DiscoveryDataBase::create_readers_from_change_( // Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way, // we avoid backprogation of the data. - reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true); + reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED); // if topic is virtual, it must iterate over all readers if (topic_name == virtual_topic_) @@ -1407,10 +1409,6 @@ bool DiscoveryDataBase::process_dirty_topics() // Find participants with writer info and participant with reader info in participants_ parts_reader_it = participants_.find(reader.guidPrefix); parts_writer_it = participants_.find(writer.guidPrefix); - // Find reader info in readers_ - readers_it = readers_.find(reader); - // Find writer info in writers_ - writers_it = writers_.find(writer); // Check in `participants_` whether the client with the reader has acknowledge the PDP of the client // with the writer. @@ -1418,26 +1416,35 @@ bool DiscoveryDataBase::process_dirty_topics() { if (parts_reader_it->second.is_matched(writer.guidPrefix)) { + // Find reader info in readers_ + readers_it = readers_.find(reader); // Check the status of the writer in `readers_[reader]::relevant_participants_builtin_ack_status`. if (readers_it != readers_.end() && readers_it->second.is_relevant_participant(writer.guidPrefix) && - !readers_it->second.is_matched(writer.guidPrefix)) + !readers_it->second.is_waiting_ack(writer.guidPrefix)) { // If the status is 0, add DATA(r) to a `edp_publications_to_send_` (if it's not there). if (add_edp_subscriptions_to_send_(readers_it->second.change())) { EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind DATA(r) to send: " << readers_it->second.change()->instanceHandle); + readers_it->second.add_or_update_ack_participant(writer.guidPrefix, + ParticipantState::WAITING_ACK); } } } else if (parts_reader_it->second.is_relevant_participant(writer.guidPrefix)) { - // Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there). - if (add_pdp_to_send_(parts_reader_it->second.change())) + if (!parts_reader_it->second.is_waiting_ack(writer.guidPrefix)) { - EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind readers' DATA(p) to send: " - << parts_reader_it->second.change()->instanceHandle); + // Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there). + if (add_pdp_to_send_(parts_reader_it->second.change())) + { + EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding readers' DATA(p) to send: " + << parts_reader_it->second.change()->instanceHandle); + parts_reader_it->second.add_or_update_ack_participant(writer.guidPrefix, + ParticipantState::WAITING_ACK); + } } // Set topic as not-clearable. is_clearable = false; @@ -1450,26 +1457,35 @@ bool DiscoveryDataBase::process_dirty_topics() { if (parts_writer_it->second.is_matched(reader.guidPrefix)) { + // Find writer info in writers_ + writers_it = writers_.find(writer); // Check the status of the reader in `writers_[writer]::relevant_participants_builtin_ack_status`. if (writers_it != writers_.end() && writers_it->second.is_relevant_participant(reader.guidPrefix) && - !writers_it->second.is_matched(reader.guidPrefix)) + !writers_it->second.is_waiting_ack(reader.guidPrefix)) { // If the status is 0, add DATA(w) to a `edp_subscriptions_to_send_` (if it's not there). if (add_edp_publications_to_send_(writers_it->second.change())) { EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind DATA(w) to send: " << writers_it->second.change()->instanceHandle); + writers_it->second.add_or_update_ack_participant(reader.guidPrefix, + ParticipantState::WAITING_ACK); } } } else if (parts_writer_it->second.is_relevant_participant(reader.guidPrefix)) { - // Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there). - if (add_pdp_to_send_(parts_writer_it->second.change())) + if (!parts_writer_it->second.is_waiting_ack(reader.guidPrefix)) { - EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind writers' DATA(p) to send: " - << parts_writer_it->second.change()->instanceHandle); + // Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there). + if (add_pdp_to_send_(parts_writer_it->second.change())) + { + EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding writers' DATA(p) to send: " + << parts_writer_it->second.change()->instanceHandle); + parts_writer_it->second.add_or_update_ack_participant(reader.guidPrefix, + ParticipantState::WAITING_ACK); + } } // Set topic as not-clearable. is_clearable = false; @@ -2463,7 +2479,7 @@ bool DiscoveryDataBase::from_json( // Populate GuidPrefix_t std::istringstream(it_ack.key()) >> prefix_aux_ack; - dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); + dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); } // Add Participant @@ -2501,7 +2517,7 @@ bool DiscoveryDataBase::from_json( // Populate GuidPrefix_t std::istringstream(it_ack.key()) >> prefix_aux_ack; - dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); + dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); } // Add Participant @@ -2561,7 +2577,7 @@ bool DiscoveryDataBase::from_json( // Populate GuidPrefix_t std::istringstream(it_ack.key()) >> prefix_aux_ack; - dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); + dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get()); } // Add Participant diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp index 6232854ed7b..58ffab2d141 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp @@ -34,7 +34,7 @@ namespace ddb { void DiscoveryParticipantsAckStatus::add_or_update_participant( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p, - bool status = false) + ParticipantState status = ParticipantState::PENDING_SEND) { relevant_participants_map_[guid_p] = status; } @@ -45,13 +45,24 @@ void DiscoveryParticipantsAckStatus::remove_participant( relevant_participants_map_.erase(guid_p); } +bool DiscoveryParticipantsAckStatus::is_waiting_ack( + const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const +{ + auto it = relevant_participants_map_.find(guid_p); + if (it != relevant_participants_map_.end()) + { + return it->second >= ParticipantState::WAITING_ACK; + } + return false; +} + bool DiscoveryParticipantsAckStatus::is_matched( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const { auto it = relevant_participants_map_.find(guid_p); if (it != relevant_participants_map_.end()) { - return it->second; + return it->second == ParticipantState::ACKED; } return false; } @@ -60,7 +71,7 @@ void DiscoveryParticipantsAckStatus::unmatch_all() { for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it) { - it->second = false; + it->second = ParticipantState::PENDING_SEND; } } @@ -89,7 +100,7 @@ bool DiscoveryParticipantsAckStatus::is_acked_by_all() const { for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it) { - if (!it->second) + if (it->second != ParticipantState::ACKED) { return false; } diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp index 9eeb99ac0d3..ee8f9fd1d8e 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp @@ -45,15 +45,25 @@ class DiscoveryParticipantsAckStatus ~DiscoveryParticipantsAckStatus() = default; + enum class ParticipantState : uint8_t + { + PENDING_SEND, // Data(p) has not been sent yet + WAITING_ACK, // Data(p) has already been sent but ACK has not been received + ACKED // Data(p) has been acked + }; + void add_or_update_participant( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p, - bool status); + ParticipantState status); void remove_participant( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p); void unmatch_all(); + bool is_waiting_ack( + const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const; + bool is_matched( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const; @@ -69,9 +79,31 @@ class DiscoveryParticipantsAckStatus private: - std::map relevant_participants_map_; + std::map relevant_participants_map_; }; +inline std::ostream& operator <<( + std::ostream& os, + DiscoveryParticipantsAckStatus::ParticipantState child) +{ + switch (child) + { + case DiscoveryParticipantsAckStatus::ParticipantState::PENDING_SEND: + os << "PENDING_SEND"; + break; + case DiscoveryParticipantsAckStatus::ParticipantState::WAITING_ACK: + os << "WAITING_ACK"; + break; + case DiscoveryParticipantsAckStatus::ParticipantState::ACKED: + os << "ACKED"; + break; + default: + os << "UNKNOWN"; + break; + } + return os; +} + } /* namespace ddb */ } /* namespace rtps */ } /* namespace fastdds */ diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp index 805241a9468..2727d8223c9 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp @@ -36,7 +36,7 @@ DiscoverySharedInfo::DiscoverySharedInfo( : change_(change) { // the server already knows every message - add_or_update_ack_participant(known_participant, true); + add_or_update_ack_participant(known_participant, DiscoveryParticipantsAckStatus::ParticipantState::ACKED); } eprosima::fastrtps::rtps::CacheChange_t* DiscoverySharedInfo::update_and_unmatch( diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp index 530a814333c..7d82f979995 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp @@ -56,7 +56,7 @@ class DiscoverySharedInfo void add_or_update_ack_participant( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p, - bool status = false) + DiscoveryParticipantsAckStatus::ParticipantState status = DiscoveryParticipantsAckStatus::ParticipantState::PENDING_SEND) { EPROSIMA_LOG_INFO( DISCOVERY_DATABASE, @@ -72,6 +72,12 @@ class DiscoverySharedInfo relevant_participants_builtin_ack_status_.remove_participant(guid_p); } + bool is_waiting_ack( + const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const + { + return relevant_participants_builtin_ack_status_.is_waiting_ack(guid_p); + } + bool is_matched( const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const { diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index df7f793a007..0ca34b1b29c 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -1661,7 +1661,6 @@ void PDPServer::send_announcement( EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "Error sending announcement from server to clients"); } } - } bool PDPServer::read_backup( diff --git a/src/cpp/rtps/transport/test_UDPv4Transport.cpp b/src/cpp/rtps/transport/test_UDPv4Transport.cpp index 396836dff42..83ea2c95402 100644 --- a/src/cpp/rtps/transport/test_UDPv4Transport.cpp +++ b/src/cpp/rtps/transport/test_UDPv4Transport.cpp @@ -46,6 +46,7 @@ test_UDPv4Transport::test_UDPv4Transport( const test_UDPv4TransportDescriptor& descriptor) : drop_data_messages_percentage_(descriptor.dropDataMessagesPercentage) , drop_data_messages_filter_(descriptor.drop_data_messages_filter_) + , drop_builtin_data_messages_filter_(descriptor.drop_builtin_data_messages_filter_) , drop_participant_builtin_topic_data_(descriptor.dropParticipantBuiltinTopicData) , drop_publication_builtin_topic_data_(descriptor.dropPublicationBuiltinTopicData) , drop_subscription_builtin_topic_data_(descriptor.dropSubscriptionBuiltinTopicData) @@ -83,6 +84,10 @@ test_UDPv4TransportDescriptor::test_UDPv4TransportDescriptor() { return false; }) + , drop_builtin_data_messages_filter_([](CDRMessage_t&) + { + return false; + }) , dropParticipantBuiltinTopicData(false) , dropPublicationBuiltinTopicData(false) , dropSubscriptionBuiltinTopicData(false) @@ -377,6 +382,10 @@ bool test_UDPv4Transport::packet_should_drop( { return true; } + else if (drop_builtin_data_messages_filter_(cdrMessage)) + { + return true; + } } else if (writer_id == fastrtps::rtps::c_EntityId_SEDPPubWriter) { @@ -384,6 +393,10 @@ bool test_UDPv4Transport::packet_should_drop( { return true; } + else if (drop_builtin_data_messages_filter_(cdrMessage)) + { + return true; + } } else if (writer_id == fastrtps::rtps::c_EntityId_SEDPSubWriter) { @@ -391,6 +404,10 @@ bool test_UDPv4Transport::packet_should_drop( { return true; } + else if (drop_builtin_data_messages_filter_(cdrMessage)) + { + return true; + } } else { diff --git a/src/cpp/rtps/transport/test_UDPv4Transport.h b/src/cpp/rtps/transport/test_UDPv4Transport.h index 9ef1df6398d..8668c4424c6 100644 --- a/src/cpp/rtps/transport/test_UDPv4Transport.h +++ b/src/cpp/rtps/transport/test_UDPv4Transport.h @@ -86,6 +86,7 @@ class test_UDPv4Transport : public UDPv4Transport PercentageData drop_data_messages_percentage_; test_UDPv4TransportDescriptor::filter drop_data_messages_filter_; + test_UDPv4TransportDescriptor::filter drop_builtin_data_messages_filter_; bool drop_participant_builtin_topic_data_; bool drop_publication_builtin_topic_data_; bool drop_subscription_builtin_topic_data_; diff --git a/test/blackbox/api/dds-pim/PubSubParticipant.hpp b/test/blackbox/api/dds-pim/PubSubParticipant.hpp index a4e7f4dbf84..2e374bf3158 100644 --- a/test/blackbox/api/dds-pim/PubSubParticipant.hpp +++ b/test/blackbox/api/dds-pim/PubSubParticipant.hpp @@ -648,6 +648,13 @@ class PubSubParticipant return *this; } + PubSubParticipant& setup_transports( + eprosima::fastdds::rtps::BuiltinTransports transports) + { + participant_qos_.setup_transports(transports); + return *this; + } + PubSubParticipant& user_data( const std::vector& user_data) { diff --git a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp index 7defcecf58e..399a87454b1 100644 --- a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp @@ -44,6 +44,7 @@ #include #include #include +#include "../utils/filter_helpers.hpp" // Regression test for redmine issue 11857 TEST(DDSDiscovery, IgnoreParticipantFlags) @@ -2001,3 +2002,341 @@ TEST(DDSDiscovery, DataracePDP) settings.intraprocess_delivery = prev_intraprocess_delivery; fastrtps::xmlparser::XMLProfileManager::library_settings(settings); } + +// This test checks that a Discover Server does not send duplicated PDP messages of itself when new clients +// are discovered +TEST(Discovery, discovery_server_pdp_messages_sent) +{ + using namespace eprosima; + using namespace eprosima::fastdds::dds; + using namespace eprosima::fastrtps::rtps; + + // One discovery server will be created, with multiple direct clients connected to it. + // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. + // The main participant will use the test transport to count the number of Data(p) sent. + + // Look for the PID_PARTICIPANT_LEASE_DURATION in the message as it is only present in Data(p) messages + auto builtin_msg_is_data_p = [](fastrtps::rtps::CDRMessage_t& msg, std::atomic& num_data_p) + { + uint32_t qos_size = 0; + uint32_t original_pos = msg.pos; + bool is_sentinel = false; + bool inline_qos_msg = false; + + while (!is_sentinel) + { + msg.pos = original_pos + qos_size; + + uint16_t pid = fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + uint16_t plength = fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + bool valid = true; + + // If inline_qos submessage is found we will have an additional Sentinel + if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) + { + inline_qos_msg = true; + } + else if (pid == eprosima::fastdds::dds::PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + if (!inline_qos_msg) + { + // If the PID is not inline qos, then we need to set the sentinel + // to true, as it is the last PID + is_sentinel = true; + } + } + + qos_size += (4 + plength); + + // Align to 4 byte boundary and prepare for next iteration + qos_size = (qos_size + 3) & ~3; + + if (!valid || ((msg.pos + plength) > msg.length)) + { + return false; + } + else if (!is_sentinel) + { + if (pid == eprosima::fastdds::dds::PID_PARTICIPANT_LEASE_DURATION) + { + std::cout << "Data(p) sent by the server" << std::endl; + inline_qos_msg = false; + num_data_p.fetch_add(1u, std::memory_order_seq_cst); + break; + } + } + } + + // Do not drop the packet in any case + return false; + }; + + // Declare a test transport that will count the number of Data(p) messages sent + std::atomic num_data_p_sends{ 0 }; + auto test_transport = std::make_shared(); + test_transport->drop_builtin_data_messages_filter_ = [&](fastrtps::rtps::CDRMessage_t& msg) + { + return builtin_msg_is_data_p(msg, num_data_p_sends); + }; + + // Create the main participant + auto server = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server; // UDPv4 locator by default + IPLocator::setIPv4(locator_server, 127, 0, 0, 1); + IPLocator::setPhysicalPort(locator_server, global_port); + + WireProtocolConfigQos server_wp_qos; + server_wp_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + std::istringstream("44.53.00.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos.prefix; + server_wp_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server); + + server_wp_qos.builtin.discovery_config.leaseDuration = fastrtps::c_TimeInfinite; + server_wp_qos.builtin.discovery_config.leaseDuration_announcementperiod = fastrtps::c_TimeInfinite; + server_wp_qos.builtin.discovery_config.initial_announcements.count = 0; + + // The main participant will use the test transport and a specific announcements configuration + server->disable_builtin_transport().add_user_transport_to_pparams(test_transport) + .wire_protocol(server_wp_qos); + + // Start the main participant + ASSERT_TRUE(server->init_participant()); + + // Create a client that connects to the first server + PubSubParticipant client_1(0u, 0u, 0u, 0u); + PubSubParticipant client_2(0u, 0u, 0u, 0u); + PubSubParticipant client_3(0u, 0u, 0u, 0u); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + RemoteServerAttributes remote_server_att; + remote_server_att.ReadguidPrefix("44.53.00.5f.45.50.52.4f.53.49.4d.41"); + remote_server_att.metatrafficUnicastLocatorList.push_back(locator_server); + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att); + client_qos.builtin.discovery_config.leaseDuration = fastrtps::c_TimeInfinite; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = fastrtps::c_TimeInfinite; + client_qos.builtin.discovery_config.initial_announcements.count = 1; + // Init client 1 + ASSERT_TRUE(client_1.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 1, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 1u); + + // Init client 2 + ASSERT_TRUE(client_2.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 2, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 2u); + + // Init client 3 + ASSERT_TRUE(client_3.wire_protocol(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init_participant()); + + + // Wait for the initial announcements to be sent + server->wait_discovery(std::chrono::seconds(5), 3, true); + // Let some time for the server to run the internal routine and check if it sent Data(p) + std::this_thread::sleep_for(std::chrono::seconds(3)); + EXPECT_EQ(num_data_p_sends.load(std::memory_order::memory_order_seq_cst), 3u); +} + +TEST(Discovery, discovery_server_edp_messages_sent) +{ + using namespace eprosima; + using namespace eprosima::fastdds::dds; + using namespace eprosima::fastrtps::rtps; + + // Two discovery servers will be created, each with a direct client connected to them. + // Initial announcements will be disabled and lease announcements will be configured to control discovery sequence. + // The main participant will use the test transport to count the number of Data(r/w) sent. + + // Look for the PID_ENDPOINT_GUID in the message as it is only present in Data(r/w) messages + auto builtin_msg_is_data_r_w = [](fastrtps::rtps::CDRMessage_t& msg, std::atomic& num_data_r_w) + { + uint32_t qos_size = 0; + uint32_t original_pos = msg.pos; + bool is_sentinel = false; + bool inline_qos_msg = false; + + while (!is_sentinel) + { + msg.pos = original_pos + qos_size; + + uint16_t pid = fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + uint16_t plength = fastdds::helpers::cdr_parse_u16( + (char*)&msg.buffer[msg.pos]); + msg.pos += 2; + bool valid = true; + + if (pid == eprosima::fastdds::dds::PID_RELATED_SAMPLE_IDENTITY) + { + inline_qos_msg = true; + } + else if (pid == eprosima::fastdds::dds::PID_SENTINEL) + { + // PID_SENTINEL is always considered of length 0 + plength = 0; + if (!inline_qos_msg) + { + // If the PID is not inline qos, then we need to set the sentinel + // to true, as it is the last PID + is_sentinel = true; + } + } + + qos_size += (4 + plength); + + // Align to 4 byte boundary and prepare for next iteration + qos_size = (qos_size + 3) & ~3; + + if (!valid || ((msg.pos + plength) > msg.length)) + { + return false; + } + else if (!is_sentinel) + { + if (pid == eprosima::fastdds::dds::PID_ENDPOINT_GUID) + { + std::cout << "Data (r/w) sent by the server" << std::endl; + num_data_r_w.fetch_add(1u, std::memory_order_seq_cst); + break; + } + else if (pid == eprosima::fastdds::dds::PID_VENDORID) + { + // Vendor ID is present in both Data(p) and Data(r/w) messages + inline_qos_msg = false; + } + } + } + + // Do not drop the packet in any case + return false; + }; + + // Declare a test transport that will count the number of Data(r/w) messages sent + std::atomic num_data_r_w_sends_s1{ 0 }; + std::atomic num_data_r_w_sends_s2{ 0 }; + auto test_transport_s1 = std::make_shared(); + test_transport_s1->drop_builtin_data_messages_filter_ = [&](fastrtps::rtps::CDRMessage_t& msg) + { + return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s1); + }; + + auto test_transport_s2 = std::make_shared(); + test_transport_s2->drop_builtin_data_messages_filter_ = [&](fastrtps::rtps::CDRMessage_t& msg) + { + return builtin_msg_is_data_r_w(msg, num_data_r_w_sends_s2); + }; + + // Create server 1 + auto server_1 = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server_1; // UDPv4 locator by default + IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1); + IPLocator::setPhysicalPort(locator_server_1, global_port); + + WireProtocolConfigQos server_wp_qos_1; + server_wp_qos_1.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SERVER; + std::istringstream("44.53.01.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos_1.prefix; + server_wp_qos_1.builtin.metatrafficUnicastLocatorList.push_back(locator_server_1); + + server_wp_qos_1.builtin.discovery_config.leaseDuration = fastrtps::c_TimeInfinite; + server_wp_qos_1.builtin.discovery_config.leaseDuration_announcementperiod = fastrtps::c_TimeInfinite; + server_wp_qos_1.builtin.discovery_config.initial_announcements.count = 0; + + // The main participant will use the test transport and a specific announcements configuration + server_1->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s1) + .wire_protocol(server_wp_qos_1); + + // Start the main participant + ASSERT_TRUE(server_1->init_participant()); + + // Create server 2 + auto server_2 = std::make_shared>(0, 0, 0, 0); + + Locator_t locator_server_2 = locator_server_1; // UDPv4 locator by default + IPLocator::setPhysicalPort(locator_server_2, global_port + 1); + + WireProtocolConfigQos server_wp_qos_2 = server_wp_qos_1; + std::istringstream("44.53.02.5f.45.50.52.4f.53.49.4d.41") >> server_wp_qos_2.prefix; + server_wp_qos_2.builtin.metatrafficUnicastLocatorList.clear(); + server_wp_qos_2.builtin.metatrafficUnicastLocatorList.push_back(locator_server_2); + // Configure 1 initial announcement as this Server will connect to the first one + server_wp_qos_2.builtin.discovery_config.initial_announcements.count = 1; + RemoteServerAttributes remote_server_att_1; + remote_server_att_1.ReadguidPrefix("44.53.01.5f.45.50.52.4f.53.49.4d.41"); + remote_server_att_1.metatrafficUnicastLocatorList.push_back(locator_server_1); + server_wp_qos_2.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_1); + + // The main participant will use the test transport and a specific announcements configuration + server_2->disable_builtin_transport().add_user_transport_to_pparams(test_transport_s2) + .wire_protocol(server_wp_qos_2); + + // Start the main participant + ASSERT_TRUE(server_2->init_participant()); + + // Both servers match + server_1->wait_discovery(std::chrono::seconds(5), 1, true); + server_2->wait_discovery(std::chrono::seconds(5), 1, true); + // Let some time for the server to run the internal routine and match virtual endpoints + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Create a client that connects to their corresponding server + PubSubWriter client_1(TEST_TOPIC_NAME); + PubSubReader client_2(TEST_TOPIC_NAME); + // Set participant as client + WireProtocolConfigQos client_qos; + client_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT; + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_1); + client_qos.builtin.discovery_config.leaseDuration = fastrtps::c_TimeInfinite; + client_qos.builtin.discovery_config.leaseDuration_announcementperiod = { 15, 0 }; + client_qos.builtin.discovery_config.initial_announcements.count = 0; + + // Init client 1 + client_1.set_wire_protocol_qos(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init(); + + // Init client 2 + client_qos.builtin.discovery_config.m_DiscoveryServers.clear(); + RemoteServerAttributes remote_server_att_2; + remote_server_att_2.ReadguidPrefix("44.53.02.5f.45.50.52.4f.53.49.4d.41"); + remote_server_att_2.metatrafficUnicastLocatorList.push_back(locator_server_2); + client_qos.builtin.discovery_config.m_DiscoveryServers.push_back(remote_server_att_2); + client_2.set_wire_protocol_qos(client_qos) + .setup_transports(eprosima::fastdds::rtps::BuiltinTransports::UDPv4) + .init(); + + ASSERT_TRUE(client_1.isInitialized()); + ASSERT_TRUE(client_2.isInitialized()); + + // Wait the lease announcement period to discover endpoints + server_1->wait_discovery(std::chrono::seconds(5), 2, true); + server_2->wait_discovery(std::chrono::seconds(5), 2, true); + + // Ensure that no additional Data(r/w) messages are sent by DS routine + std::this_thread::sleep_for(std::chrono::seconds(15)); + + EXPECT_EQ(num_data_r_w_sends_s1.load(std::memory_order::memory_order_seq_cst), 2u); + EXPECT_EQ(num_data_r_w_sends_s2.load(std::memory_order::memory_order_seq_cst), 2u); +} diff --git a/test/blackbox/utils/filter_helpers.hpp b/test/blackbox/utils/filter_helpers.hpp new file mode 100644 index 00000000000..c903623aff6 --- /dev/null +++ b/test/blackbox/utils/filter_helpers.hpp @@ -0,0 +1,53 @@ +// Copyright 2019, 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include + +// #include "../types/core/core_typesPubSubTypes.hpp" + +namespace eprosima { +namespace fastdds { +namespace helpers { + +inline uint16_t cdr_parse_u16( + char* serialized_buffer) +{ + uint16_t u16; + eprosima::fastcdr::FastBuffer buffer(serialized_buffer, 2); + eprosima::fastcdr::Cdr cdr(buffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::XCDRv1); + cdr >> u16; + return u16; +} + +inline uint32_t cdr_parse_u32( + char* serialized_buffer) +{ + uint32_t u32; + eprosima::fastcdr::FastBuffer buffer(serialized_buffer, 4); + eprosima::fastcdr::Cdr cdr(buffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::XCDRv1); + cdr >> u32; + return u32; +} + +} // namespace helpers +} // namespace fastdds +} // namespace eprosima