Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions lib/base/tlsstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,31 @@ class SeenStream : public ARS
{
public:
template<class... Args>
SeenStream(Args&&... args) : ARS(std::forward<Args>(args)...)
explicit SeenStream(Args&&... args) : ARS(std::forward<Args>(args)...), m_Seen(nullptr)
{
m_Seen.store(nullptr);
}

template<class... Args>
auto async_read_some(Args&&... args) -> decltype(((ARS*)nullptr)->async_read_some(std::forward<Args>(args)...))
{
{
auto seen (m_Seen.load());
auto* seen (m_Seen.load());

if (seen) {
*seen = Utility::GetTime();
*seen = std::chrono::steady_clock::now();
}
}

return ((ARS*)this)->async_read_some(std::forward<Args>(args)...);
}

inline void SetSeen(double* seen)
void SetSeen(std::chrono::steady_clock::time_point* seen)
{
m_Seen.store(seen);
}

private:
std::atomic<double*> m_Seen;
std::atomic<std::chrono::steady_clock::time_point*> m_Seen;
};

struct UnbufferedAsioTlsStreamParams
Expand Down
7 changes: 6 additions & 1 deletion lib/remote/jsonrpcconnection-heartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler);
* cluster connection alive when there isn't much going on.
*/

void JsonRpcConnection::SetHeartbeatInterval(std::chrono::milliseconds interval)
{
m_HeartbeatInterval = interval;
}

void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc)
{
boost::system::error_code ec;

for (;;) {
m_HeartbeatTimer.expires_from_now(boost::posix_time::seconds(20));
m_HeartbeatTimer.expires_from_now(boost::posix_time::milliseconds(m_HeartbeatInterval.count()));
m_HeartbeatTimer.async_wait(yc[ec]);

if (m_ShuttingDown) {
Expand Down
18 changes: 12 additions & 6 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const Stri
JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated,
const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role),
m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_IoStrand(io),
m_Timestamp(Utility::GetTime()), m_Seen(std::chrono::steady_clock::now()), m_IoStrand(io),
m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false), m_WaitGroup(waitGroup),
m_CheckLivenessTimer(io), m_HeartbeatTimer(io)
{
Expand Down Expand Up @@ -81,7 +81,7 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
break;
}

m_Seen = Utility::GetTime();
m_Seen = std::chrono::steady_clock::now();
if (m_Endpoint) {
m_Endpoint->AddMessageReceived(jsonString.GetLength());
}
Expand Down Expand Up @@ -236,6 +236,11 @@ void JsonRpcConnection::SendRawMessage(const String& message)
});
}

void JsonRpcConnection::SetLivenessTimeout(std::chrono::milliseconds timeout)
{
m_LivenessTimeout = timeout;
}

void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
{
if (m_ShuttingDown) {
Expand Down Expand Up @@ -411,7 +416,7 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
* leaking the connection. Therefore close it after a timeout.
*/

m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(10));
m_CheckLivenessTimer.expires_from_now(boost::posix_time::milliseconds(m_LivenessTimeout.count() / 6));
m_CheckLivenessTimer.async_wait(yc[ec]);

if (m_ShuttingDown) {
Expand All @@ -426,16 +431,17 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
Disconnect();
} else {
for (;;) {
m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30));
m_CheckLivenessTimer.expires_from_now(boost::posix_time::milliseconds((m_LivenessTimeout / 2).count()));
m_CheckLivenessTimer.async_wait(yc[ec]);

if (m_ShuttingDown) {
break;
}

if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
if (m_Seen + m_LivenessTimeout < std::chrono::steady_clock::now() &&
(!m_Endpoint || !m_Endpoint->GetSyncing())) {
Log(LogInformation, "JsonRpcConnection")
<< "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
<< "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";

Disconnect();
break;
Expand Down
11 changes: 8 additions & 3 deletions lib/remote/jsonrpcconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
namespace icinga
{

using namespace std::chrono_literals;

enum ClientRole
{
ClientInbound,
Expand Down Expand Up @@ -61,6 +63,9 @@ class JsonRpcConnection final : public Object
void SendMessage(const Dictionary::Ptr& request);
void SendRawMessage(const String& request);

void SetLivenessTimeout(std::chrono::milliseconds timeout);
void SetHeartbeatInterval(std::chrono::milliseconds interval);

static Value HeartbeatAPIHandler(const intrusive_ptr<MessageOrigin>& origin, const Dictionary::Ptr& params);

static double GetWorkQueueRate();
Expand All @@ -74,13 +79,15 @@ class JsonRpcConnection final : public Object
Shared<AsioTlsStream>::Ptr m_Stream;
ConnectionRole m_Role;
double m_Timestamp;
double m_Seen;
std::chrono::steady_clock::time_point m_Seen;
boost::asio::io_context::strand m_IoStrand;
std::vector<String> m_OutgoingMessagesQueue;
AsioEvent m_OutgoingMessagesQueued;
AsioEvent m_WriterDone;
Atomic<bool> m_ShuttingDown;
WaitGroup::Ptr m_WaitGroup;
std::chrono::milliseconds m_LivenessTimeout{60s};
std::chrono::milliseconds m_HeartbeatInterval{20s};
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;

JsonRpcConnection(const WaitGroup::Ptr& waitgroup, const String& identity, bool authenticated,
Expand All @@ -95,8 +102,6 @@ class JsonRpcConnection final : public Object

void MessageHandler(const Dictionary::Ptr& message);

void CertificateRequestResponseHandler(const Dictionary::Ptr& message);

void SendMessageInternal(const Dictionary::Ptr& request);
};

Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ set(base_test_SOURCES
remote-configpackageutility.cpp
remote-httpserverconnection.cpp
remote-httpmessage.cpp
remote-jsonrpcconnection.cpp
remote-url.cpp
${base_OBJS}
$<TARGET_OBJECTS:config>
Expand Down
15 changes: 14 additions & 1 deletion test/base-testloggerfixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
#include <boost/test/test_tools.hpp>
#include <future>

#define CHECK_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(ExpectLogPattern(pattern, timeout))
#define REQUIRE_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(ExpectLogPattern(pattern, timeout))

#define CHECK_NO_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(!ExpectLogPattern(pattern, timeout))
#define REQUIRE_NO_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(!ExpectLogPattern(pattern, timeout))

namespace icinga {

class TestLogger : public Logger
Expand Down Expand Up @@ -52,6 +58,13 @@ class TestLogger : public Logger
return ret;
}

void Clear()
{
std::unique_lock lock(m_Mutex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use std::lock_guard or std::scoped_lock here isntead?

m_Expects.clear();
m_LogEntries.clear();
}

private:
void ProcessLogEntry(const LogEntry& entry) override
{
Expand Down Expand Up @@ -87,9 +100,9 @@ struct TestLoggerFixture
{
TestLoggerFixture()
{
testLogger->SetSeverity(testLogger->SeverityToString(LogDebug));
testLogger->Activate(true);
testLogger->SetActive(true);
testLogger->SetSeverity(TestLogger::SeverityToString(LogDebug));
Comment on lines -90 to +105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should call SetActive before Activate instead of this change.

}

~TestLoggerFixture()
Expand Down
Loading
Loading