Skip to content

[22814] Improve DS routines (backport #5764) #5784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ struct test_UDPv4TransportDescriptor : public SocketTransportDescriptor
mutable std::atomic<uint8_t> dropDataMessagesPercentage;
//! Filtering function for dropping data messages
filter drop_data_messages_filter_;
//! Filtering function for dropping builtin data messages
filter drop_builtin_data_messages_filter_;
//! Flag to enable dropping of discovery Participant DATA(P) messages
bool dropParticipantBuiltinTopicData;
//! Flag to enable dropping of discovery Writer DATA(W) messages
Expand Down
72 changes: 44 additions & 28 deletions src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace fastdds {
namespace rtps {
namespace ddb {

using ParticipantState = DiscoveryParticipantsAckStatus::ParticipantState;

DiscoveryDataBase::DiscoveryDataBase(
fastrtps::rtps::GuidPrefix_t server_guid_prefix,
std::set<fastrtps::rtps::GuidPrefix_t> servers)
Expand Down Expand Up @@ -267,8 +269,8 @@ void DiscoveryDataBase::update_change_and_unmatch_(
changes_to_release_.push_back(entity.update_and_unmatch(new_change));
// Manually set relevant participants ACK status of this server, and of the participant that sent the
// change, to 1. This way, we avoid backprogation of the data.
entity.add_or_update_ack_participant(server_guid_prefix_, true);
entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, true);
entity.add_or_update_ack_participant(server_guid_prefix_, ParticipantState::ACKED);
entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, ParticipantState::ACKED);
}

void DiscoveryDataBase::add_ack_(
Expand All @@ -292,7 +294,7 @@ void DiscoveryDataBase::add_ack_(
// database has been updated, so this ACK is not relevant anymore
if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity())
{
it->second.add_or_update_ack_participant(acked_entity, true);
it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED);
}
}
}
Expand All @@ -307,7 +309,7 @@ void DiscoveryDataBase::add_ack_(
// database has been updated, so this ACK is not relevant anymore
if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity())
{
it->second.add_or_update_ack_participant(acked_entity, true);
it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED);
}
}
}
Expand All @@ -322,7 +324,7 @@ void DiscoveryDataBase::add_ack_(
// database has been updated, so this ACK is not relevant anymore
if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity())
{
it->second.add_or_update_ack_participant(acked_entity, true);
it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED);
}
}
}
Expand Down Expand Up @@ -694,7 +696,7 @@ void DiscoveryDataBase::create_new_participant_from_change_(

// Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
// we avoid backprogation of the data.
ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);

// If the DATA(p) it's from this server, it is already in history and we do nothing here
if (change_guid.guidPrefix != server_guid_prefix_)
Expand Down Expand Up @@ -796,7 +798,7 @@ void DiscoveryDataBase::update_participant_from_change_(
if (ch->write_params.sample_identity().sequence_number() ==
participant_info.change()->write_params.sample_identity().sequence_number())
{
participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
}

// we release it if it's the same or if it is lower
Expand Down Expand Up @@ -846,7 +848,7 @@ void DiscoveryDataBase::create_writers_from_change_(
if (ch->write_params.sample_identity().sequence_number() ==
writer_it->second.change()->write_params.sample_identity().sequence_number())
{
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
}

// we release it if it's the same or if it is lower
Expand Down Expand Up @@ -894,7 +896,7 @@ void DiscoveryDataBase::create_writers_from_change_(

// Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
// we avoid backprogation of the data.
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);

// if topic is virtual, it must iterate over all readers
if (topic_name == virtual_topic_)
Expand Down Expand Up @@ -964,7 +966,7 @@ void DiscoveryDataBase::create_readers_from_change_(
if (ch->write_params.sample_identity().sequence_number() ==
reader_it->second.change()->write_params.sample_identity().sequence_number())
{
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
}

// we release it if it's the same or if it is lower
Expand Down Expand Up @@ -1012,7 +1014,7 @@ void DiscoveryDataBase::create_readers_from_change_(

// Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
// we avoid backprogation of the data.
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);

// if topic is virtual, it must iterate over all readers
if (topic_name == virtual_topic_)
Expand Down Expand Up @@ -1407,37 +1409,42 @@ bool DiscoveryDataBase::process_dirty_topics()
// Find participants with writer info and participant with reader info in participants_
parts_reader_it = participants_.find(reader.guidPrefix);
parts_writer_it = participants_.find(writer.guidPrefix);
// Find reader info in readers_
readers_it = readers_.find(reader);
// Find writer info in writers_
writers_it = writers_.find(writer);

// Check in `participants_` whether the client with the reader has acknowledge the PDP of the client
// with the writer.
if (parts_reader_it != participants_.end())
{
if (parts_reader_it->second.is_matched(writer.guidPrefix))
{
// Find reader info in readers_
readers_it = readers_.find(reader);
// Check the status of the writer in `readers_[reader]::relevant_participants_builtin_ack_status`.
if (readers_it != readers_.end() &&
readers_it->second.is_relevant_participant(writer.guidPrefix) &&
!readers_it->second.is_matched(writer.guidPrefix))
!readers_it->second.is_waiting_ack(writer.guidPrefix))
{
// If the status is 0, add DATA(r) to a `edp_publications_to_send_` (if it's not there).
if (add_edp_subscriptions_to_send_(readers_it->second.change()))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind DATA(r) to send: "
<< readers_it->second.change()->instanceHandle);
readers_it->second.add_or_update_ack_participant(writer.guidPrefix,
ParticipantState::WAITING_ACK);
}
}
}
else if (parts_reader_it->second.is_relevant_participant(writer.guidPrefix))
{
// Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there).
if (add_pdp_to_send_(parts_reader_it->second.change()))
if (!parts_reader_it->second.is_waiting_ack(writer.guidPrefix))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind readers' DATA(p) to send: "
<< parts_reader_it->second.change()->instanceHandle);
// Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there).
if (add_pdp_to_send_(parts_reader_it->second.change()))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding readers' DATA(p) to send: "
<< parts_reader_it->second.change()->instanceHandle);
parts_reader_it->second.add_or_update_ack_participant(writer.guidPrefix,
ParticipantState::WAITING_ACK);
}
}
// Set topic as not-clearable.
is_clearable = false;
Expand All @@ -1450,26 +1457,35 @@ bool DiscoveryDataBase::process_dirty_topics()
{
if (parts_writer_it->second.is_matched(reader.guidPrefix))
{
// Find writer info in writers_
writers_it = writers_.find(writer);
// Check the status of the reader in `writers_[writer]::relevant_participants_builtin_ack_status`.
if (writers_it != writers_.end() &&
writers_it->second.is_relevant_participant(reader.guidPrefix) &&
!writers_it->second.is_matched(reader.guidPrefix))
!writers_it->second.is_waiting_ack(reader.guidPrefix))
{
// If the status is 0, add DATA(w) to a `edp_subscriptions_to_send_` (if it's not there).
if (add_edp_publications_to_send_(writers_it->second.change()))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind DATA(w) to send: "
<< writers_it->second.change()->instanceHandle);
writers_it->second.add_or_update_ack_participant(reader.guidPrefix,
ParticipantState::WAITING_ACK);
}
}
}
else if (parts_writer_it->second.is_relevant_participant(reader.guidPrefix))
{
// Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there).
if (add_pdp_to_send_(parts_writer_it->second.change()))
if (!parts_writer_it->second.is_waiting_ack(reader.guidPrefix))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind writers' DATA(p) to send: "
<< parts_writer_it->second.change()->instanceHandle);
// Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there).
if (add_pdp_to_send_(parts_writer_it->second.change()))
{
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding writers' DATA(p) to send: "
<< parts_writer_it->second.change()->instanceHandle);
parts_writer_it->second.add_or_update_ack_participant(reader.guidPrefix,
ParticipantState::WAITING_ACK);
}
}
// Set topic as not-clearable.
is_clearable = false;
Expand Down Expand Up @@ -2463,7 +2479,7 @@ bool DiscoveryDataBase::from_json(
// Populate GuidPrefix_t
std::istringstream(it_ack.key()) >> prefix_aux_ack;

dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<bool>());
dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<ParticipantState>());
}

// Add Participant
Expand Down Expand Up @@ -2501,7 +2517,7 @@ bool DiscoveryDataBase::from_json(
// Populate GuidPrefix_t
std::istringstream(it_ack.key()) >> prefix_aux_ack;

dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<bool>());
dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<ParticipantState>());
}

// Add Participant
Expand Down Expand Up @@ -2561,7 +2577,7 @@ bool DiscoveryDataBase::from_json(
// Populate GuidPrefix_t
std::istringstream(it_ack.key()) >> prefix_aux_ack;

dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<bool>());
dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<ParticipantState>());
}

// Add Participant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace ddb {

void DiscoveryParticipantsAckStatus::add_or_update_participant(
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p,
bool status = false)
ParticipantState status = ParticipantState::PENDING_SEND)
{
relevant_participants_map_[guid_p] = status;
}
Expand All @@ -45,13 +45,24 @@ void DiscoveryParticipantsAckStatus::remove_participant(
relevant_participants_map_.erase(guid_p);
}

bool DiscoveryParticipantsAckStatus::is_waiting_ack(
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const
{
auto it = relevant_participants_map_.find(guid_p);
if (it != relevant_participants_map_.end())
{
return it->second >= ParticipantState::WAITING_ACK;
}
return false;
}

bool DiscoveryParticipantsAckStatus::is_matched(
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const
{
auto it = relevant_participants_map_.find(guid_p);
if (it != relevant_participants_map_.end())
{
return it->second;
return it->second == ParticipantState::ACKED;
}
return false;
}
Expand All @@ -60,7 +71,7 @@ void DiscoveryParticipantsAckStatus::unmatch_all()
{
for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it)
{
it->second = false;
it->second = ParticipantState::PENDING_SEND;
}
}

Expand Down Expand Up @@ -89,7 +100,7 @@ bool DiscoveryParticipantsAckStatus::is_acked_by_all() const
{
for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it)
{
if (!it->second)
if (it->second != ParticipantState::ACKED)
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,25 @@ class DiscoveryParticipantsAckStatus

~DiscoveryParticipantsAckStatus() = default;

enum class ParticipantState : uint8_t
{
PENDING_SEND, // Data(p) has not been sent yet
WAITING_ACK, // Data(p) has already been sent but ACK has not been received
ACKED // Data(p) has been acked
};

void add_or_update_participant(
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p,
bool status);
ParticipantState status);

void remove_participant(
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p);

void unmatch_all();

bool is_waiting_ack(
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const;

bool is_matched(
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const;

Expand All @@ -69,9 +79,31 @@ class DiscoveryParticipantsAckStatus

private:

std::map<eprosima::fastrtps::rtps::GuidPrefix_t, bool> relevant_participants_map_;
std::map<eprosima::fastrtps::rtps::GuidPrefix_t, ParticipantState> relevant_participants_map_;
};

inline std::ostream& operator <<(
std::ostream& os,
DiscoveryParticipantsAckStatus::ParticipantState child)
{
switch (child)
{
case DiscoveryParticipantsAckStatus::ParticipantState::PENDING_SEND:
os << "PENDING_SEND";
break;
case DiscoveryParticipantsAckStatus::ParticipantState::WAITING_ACK:
os << "WAITING_ACK";
break;
case DiscoveryParticipantsAckStatus::ParticipantState::ACKED:
os << "ACKED";
break;
default:
os << "UNKNOWN";
break;
}
return os;
}

} /* namespace ddb */
} /* namespace rtps */
} /* namespace fastdds */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ DiscoverySharedInfo::DiscoverySharedInfo(
: change_(change)
{
// the server already knows every message
add_or_update_ack_participant(known_participant, true);
add_or_update_ack_participant(known_participant, DiscoveryParticipantsAckStatus::ParticipantState::ACKED);
}

eprosima::fastrtps::rtps::CacheChange_t* DiscoverySharedInfo::update_and_unmatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DiscoverySharedInfo

void add_or_update_ack_participant(
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p,
bool status = false)
DiscoveryParticipantsAckStatus::ParticipantState status = DiscoveryParticipantsAckStatus::ParticipantState::PENDING_SEND)
{
EPROSIMA_LOG_INFO(
DISCOVERY_DATABASE,
Expand All @@ -72,6 +72,12 @@ class DiscoverySharedInfo
relevant_participants_builtin_ack_status_.remove_participant(guid_p);
}

bool is_waiting_ack(
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const
{
return relevant_participants_builtin_ack_status_.is_waiting_ack(guid_p);
}

bool is_matched(
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const
{
Expand Down
1 change: 0 additions & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1661,7 +1661,6 @@ void PDPServer::send_announcement(
EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "Error sending announcement from server to clients");
}
}

}

bool PDPServer::read_backup(
Expand Down
Loading
Loading