Skip to content

Commit a88f40c

Browse files
committed
fix bug with low QPS
1 parent fda56d8 commit a88f40c

11 files changed

+663
-174
lines changed

include/pika_auxiliary_thread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ class PikaAuxiliaryThread : public net::Thread {
1616
~PikaAuxiliaryThread() override;
1717
pstd::Mutex mu_;
1818
pstd::CondVar cv_;
19+
20+
// 执行定时任务
21+
void DoTimingTask();
1922

2023
private:
2124
void* ThreadMain() override;

include/pika_consensus.h

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#define PIKA_CONSENSUS_H_
77

88
#include <utility>
9+
#include <condition_variable>
10+
#include <unordered_map>
911

1012
#include "include/pika_binlog_transverter.h"
1113
#include "include/pika_client_conn.h"
@@ -170,7 +172,7 @@ class ConsensusCoordinator {
170172

171173
SyncProgress& SyncPros() { return sync_pros_; }
172174
std::shared_ptr<StableLog> StableLogger() { return stable_logger_; }
173-
std::shared_ptr<MemLog> MemLogger() { return mem_logger_; }
175+
std::shared_ptr<MemLog> MemLogger() { return mem_logger_; } void SetStableLogger(std::shared_ptr<StableLog> logger) { stable_logger_ = logger; }
174176

175177
LogOffset committed_index() {
176178
std::lock_guard lock(index_mu_);
@@ -186,28 +188,9 @@ class ConsensusCoordinator {
186188
};
187189
static int InitCmd(net::RedisParser* parser, const net::RedisCmdArgsType& argv);
188190

189-
std::string ToStringStatus() {
190-
std::stringstream tmp_stream;
191-
{
192-
std::lock_guard lock(index_mu_);
193-
tmp_stream << " Committed_index: " << committed_index_.ToString() << "\r\n";
194-
}
195-
tmp_stream << " Context: "
196-
<< "\r\n"
197-
<< context_->ToString();
198-
{
199-
std::shared_lock lock(term_rwlock_);
200-
tmp_stream << " Term: " << term_ << "\r\n";
201-
}
202-
tmp_stream << " Mem_logger size: " << mem_logger_->Size() << " last offset "
203-
<< mem_logger_->last_offset().ToString() << "\r\n";
204-
tmp_stream << " Stable_logger first offset " << stable_logger_->first_offset().ToString() << "\r\n";
205-
LogOffset log_status;
206-
stable_logger_->Logger()->GetProducerStatus(&(log_status.b_offset.filenum), &(log_status.b_offset.offset),
207-
&(log_status.l_offset.term), &(log_status.l_offset.index));
208-
tmp_stream << " Physical Binlog Status: " << log_status.ToString() << "\r\n";
209-
return tmp_stream.str();
210-
}
191+
std::string ToStringStatus();
192+
void NotifyLogCommitted(const LogOffset& offset);
193+
bool WaitLogCommitted(const LogOffset& offset, std::chrono::milliseconds timeout);
211194

212195
private:
213196
pstd::Status TruncateTo(const LogOffset& offset);
@@ -274,21 +257,28 @@ class ConsensusCoordinator {
274257
}
275258
void SetCommittedId(const LogOffset& offset) {
276259
std::lock_guard l(committed_id_rwlock_);
277-
committed_id_ = offset;
278-
context_->UpdateAppliedIndex(committed_id_);
260+
if (offset > committed_id_) {
261+
committed_id_ = offset;
262+
context_->UpdateAppliedIndex(committed_id_);
263+
// 通知所有等待的线程
264+
log_commit_cv_.notify_all();
265+
}
279266
}
280267

281268
private:
282269
pstd::Status PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset);
283270

284-
private:
271+
std::shared_ptr<Log> logs_;
285272
std::shared_mutex is_consistency_rwlock_;
286273
bool is_consistency_ = false;
287274
std::shared_mutex committed_id_rwlock_;
288275
LogOffset committed_id_ = LogOffset();
289276
std::shared_mutex prepared_id__rwlock_;
290277
LogOffset prepared_id_ = LogOffset();
291-
std::shared_ptr<Log> logs_;
278+
// 条件变量,用于通知日志已提交
279+
std::condition_variable_any log_commit_cv_;
280+
// 用于跟踪等待特定日志提交的线程
281+
std::mutex waiting_threads_mu_;
292282
};
293283

294284
#endif // INCLUDE_PIKA_CONSENSUS_H_

include/pika_repl_client.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,18 @@ class PikaReplClient {
8686
int32_t db_index = db_name.back() - '0';
8787
assert(db_index >= 0 && db_index <= 7);
8888
async_write_db_task_counts_[db_index].fetch_sub(incr_step, std::memory_order::memory_order_seq_cst);
89+
90+
NotifyAsyncWriteComplete(db_name);
8991
}
9092

9193
int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) {
9294
int32_t db_index = db_name.back() - '0';
9395
assert(db_index >= 0 && db_index <= 7);
9496
return async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst);
9597
}
98+
99+
// 声明通知方法,实现在cpp文件中
100+
void NotifyAsyncWriteComplete(const std::string& db_name);
96101

97102
private:
98103
size_t GetBinlogWorkerIndexByDBName(const std::string &db_name);

include/pika_rm.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <string>
1313
#include <unordered_map>
1414
#include <vector>
15+
#include <condition_variable>
1516

1617
#include "pstd/include/pstd_status.h"
1718

@@ -93,6 +94,8 @@ class SyncMasterDB : public SyncDB {
9394
pstd::Mutex session_mu_;
9495
int32_t session_id_ = 0;
9596
ConsensusCoordinator coordinator_;
97+
std::mutex cv_mu_;
98+
std::condition_variable cv_;
9699

97100
//pacificA public:
98101
public:
@@ -233,7 +236,14 @@ class PikaReplicaManager {
233236
return pika_repl_client_->GetUnfinishedAsyncWriteDBTaskCount(db_name);
234237
}
235238

236-
private:
239+
// 添加条件变量相关方法
240+
void NotifyAsyncWriteComplete(const std::string& db_name);
241+
void WaitAsyncWriteComplete(const std::string& db_name);
242+
243+
// 在PikaReplicaManager类中添加以下函数声明
244+
void UpdateAllMasterDBCommittedID();
245+
246+
private:
237247
void InitDB();
238248
pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip);
239249

@@ -247,6 +257,10 @@ class PikaReplicaManager {
247257
std::unordered_map<std::string, std::unordered_map<std::string, std::queue<WriteTask>>> write_queues_;
248258
std::unique_ptr<PikaReplClient> pika_repl_client_;
249259
std::unique_ptr<PikaReplServer> pika_repl_server_;
260+
261+
std::mutex async_write_mu_;
262+
std::condition_variable async_write_cv_;
263+
std::unordered_map<std::string, int> unfinished_async_write_count_;
250264
};
251265

252266
#endif // PIKA_RM_H

src/pika_auxiliary_thread.cc

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,34 +19,74 @@ PikaAuxiliaryThread::~PikaAuxiliaryThread() {
1919
LOG(INFO) << "PikaAuxiliary thread " << thread_id() << " exit!!!";
2020
}
2121

22+
void PikaAuxiliaryThread::DoTimingTask() {
23+
uint64_t now = pstd::NowMicros();
24+
25+
if (g_pika_server->role() & PIKA_ROLE_MASTER) {
26+
g_pika_rm->CheckSyncTimeout(now);
27+
}
28+
29+
// 定期更新主库的CommittedID
30+
g_pika_rm->UpdateAllMasterDBCommittedID();
31+
32+
// wake up binlog sender
33+
g_pika_rm->WakeUpBinlogSync();
34+
}
35+
2236
void* PikaAuxiliaryThread::ThreadMain() {
37+
// 记录上次同步的时间
38+
uint64_t last_sync_time = 0;
39+
uint64_t last_update_time = 0;
40+
uint64_t last_timing_task_time = 0;
41+
const uint64_t sync_interval = 5000; // 5ms,大幅提高同步频率
42+
const uint64_t update_interval = 5000; // 5ms,大幅提高更新频率
43+
const uint64_t timing_task_interval = 500000; // 500ms,减少定时任务执行间隔
44+
2345
while (!should_stop()) {
46+
uint64_t now = pstd::NowMicros();
47+
2448
if (g_pika_server->ShouldMetaSync()) {
2549
g_pika_rm->SendMetaSyncRequest();
2650
} else if (g_pika_server->MetaSyncDone()) {
2751
g_pika_rm->RunSyncSlaveDBStateMachine();
2852
}
2953

30-
pstd::Status s = g_pika_rm->CheckSyncTimeout(pstd::NowMicros());
54+
pstd::Status s = g_pika_rm->CheckSyncTimeout(now);
3155
if (!s.ok()) {
3256
LOG(WARNING) << s.ToString();
3357
}
3458

3559
g_pika_server->CheckLeaderProtectedMode();
3660

37-
// TODO(whoiami) timeout
38-
s = g_pika_server->TriggerSendBinlogSync();
39-
if (!s.ok()) {
40-
LOG(WARNING) << s.ToString();
61+
// 增加同步频率,减少同步间隔
62+
if (now - last_sync_time >= sync_interval) {
63+
s = g_pika_server->TriggerSendBinlogSync();
64+
if (!s.ok()) {
65+
LOG(WARNING) << s.ToString();
66+
}
67+
last_sync_time = now;
68+
}
69+
70+
// 单独处理更新committed_id,提高更新频率
71+
if (now - last_update_time >= update_interval) {
72+
g_pika_rm->UpdateAllMasterDBCommittedID();
73+
last_update_time = now;
74+
}
75+
76+
// 定期执行定时任务
77+
if (now - last_timing_task_time >= timing_task_interval) {
78+
DoTimingTask();
79+
last_timing_task_time = now;
4180
}
81+
4282
// send to peer
4383
int res = g_pika_server->SendToPeer();
4484
if (res == 0) {
45-
// sleep 100 ms
4685
std::unique_lock lock(mu_);
47-
cv_.wait_for(lock, 100ms);
86+
cv_.wait_for(lock, 1ms);
4887
} else {
49-
// LOG_EVERY_N(INFO, 1000) << "Consume binlog number " << res;
88+
// 处理完成后立即继续下一轮,不等待
89+
continue;
5090
}
5191
}
5292
return nullptr;

0 commit comments

Comments
 (0)