Skip to content

Commit 0fa7b62

Browse files
committed
fix bug with low QPS
1 parent fda56d8 commit 0fa7b62

15 files changed

+891
-193
lines changed

include/pika_auxiliary_thread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ class PikaAuxiliaryThread : public net::Thread {
1616
~PikaAuxiliaryThread() override;
1717
pstd::Mutex mu_;
1818
pstd::CondVar cv_;
19+
20+
//Execute scheduled tasks
21+
void DoTimingTask();
1922

2023
private:
2124
void* ThreadMain() override;

include/pika_consensus.h

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#define PIKA_CONSENSUS_H_
77

88
#include <utility>
9+
#include <condition_variable>
10+
#include <unordered_map>
911

1012
#include "include/pika_binlog_transverter.h"
1113
#include "include/pika_client_conn.h"
@@ -170,7 +172,10 @@ class ConsensusCoordinator {
170172

171173
SyncProgress& SyncPros() { return sync_pros_; }
172174
std::shared_ptr<StableLog> StableLogger() { return stable_logger_; }
173-
std::shared_ptr<MemLog> MemLogger() { return mem_logger_; }
175+
// Expose the internally maintained mem_logger_ object to the outside world.
176+
std::shared_ptr<MemLog> MemLogger() { return mem_logger_; }
177+
// Allows an external user to inject a new StableLog instance into the current object and replace the original one
178+
void SetStableLogger(std::shared_ptr<StableLog> logger) { stable_logger_ = logger; }
174179

175180
LogOffset committed_index() {
176181
std::lock_guard lock(index_mu_);
@@ -186,35 +191,20 @@ class ConsensusCoordinator {
186191
};
187192
static int InitCmd(net::RedisParser* parser, const net::RedisCmdArgsType& argv);
188193

189-
std::string ToStringStatus() {
190-
std::stringstream tmp_stream;
191-
{
192-
std::lock_guard lock(index_mu_);
193-
tmp_stream << " Committed_index: " << committed_index_.ToString() << "\r\n";
194-
}
195-
tmp_stream << " Context: "
196-
<< "\r\n"
197-
<< context_->ToString();
198-
{
199-
std::shared_lock lock(term_rwlock_);
200-
tmp_stream << " Term: " << term_ << "\r\n";
201-
}
202-
tmp_stream << " Mem_logger size: " << mem_logger_->Size() << " last offset "
203-
<< mem_logger_->last_offset().ToString() << "\r\n";
204-
tmp_stream << " Stable_logger first offset " << stable_logger_->first_offset().ToString() << "\r\n";
205-
LogOffset log_status;
206-
stable_logger_->Logger()->GetProducerStatus(&(log_status.b_offset.filenum), &(log_status.b_offset.offset),
207-
&(log_status.l_offset.term), &(log_status.l_offset.index));
208-
tmp_stream << " Physical Binlog Status: " << log_status.ToString() << "\r\n";
209-
return tmp_stream.str();
210-
}
194+
std::string ToStringStatus();
195+
// Called after committed_id_ is updated, waking up all threads waiting for the offset
196+
void NotifyLogCommitted(const LogOffset& offset);
197+
// Block until the specified offset ≤ the current committed_id_, or return false if the timeout expires.
198+
bool WaitLogCommitted(const LogOffset& offset, std::chrono::milliseconds timeout);
211199

212200
private:
213201
pstd::Status TruncateTo(const LogOffset& offset);
214202

215203
pstd::Status InternalAppendLog(const std::shared_ptr<Cmd>& cmd_ptr);
216204
pstd::Status InternalAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
217205
void InternalApply(const MemLog::LogItem& log);
206+
// Batch processing version
207+
void InternalApplyFollower(const std::vector<std::shared_ptr<Cmd>>& cmds);
218208
void InternalApplyFollower(const std::shared_ptr<Cmd>& cmd_ptr);
219209

220210
pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset);
@@ -274,21 +264,27 @@ class ConsensusCoordinator {
274264
}
275265
void SetCommittedId(const LogOffset& offset) {
276266
std::lock_guard l(committed_id_rwlock_);
277-
committed_id_ = offset;
278-
context_->UpdateAppliedIndex(committed_id_);
267+
if (offset > committed_id_) {
268+
committed_id_ = offset; // Update in-memory committed_id_
269+
context_->UpdateAppliedIndex(committed_id_); // Persist to Context so it survives restart
270+
log_commit_cv_.notify_all(); // Wake up any threads waiting in WaitLogCommitted()
271+
}
279272
}
280273

281274
private:
282275
pstd::Status PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset);
283276

284-
private:
277+
std::shared_ptr<Log> logs_;
285278
std::shared_mutex is_consistency_rwlock_;
286279
bool is_consistency_ = false;
287280
std::shared_mutex committed_id_rwlock_;
288281
LogOffset committed_id_ = LogOffset();
289282
std::shared_mutex prepared_id__rwlock_;
290283
LogOffset prepared_id_ = LogOffset();
291-
std::shared_ptr<Log> logs_;
284+
// used to notify that the log has been submitted
285+
std::condition_variable_any log_commit_cv_;
286+
// Used to track threads waiting for a specific log to be submitted
287+
std::mutex waiting_threads_mu_;
292288
};
293289

294290
#endif // INCLUDE_PIKA_CONSENSUS_H_

include/pika_db.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
148148
bool IsBgSaving();
149149
BgSaveInfo bgsave_info();
150150
pstd::Status GetKeyNum(std::vector<storage::KeyInfo>* key_info);
151+
// Execute a command synchronously inside DB context (used by consensus batch execution)
152+
void ExecuteCmd(const std::shared_ptr<Cmd>& cmd_ptr, int dummy_flag = 0);
151153

152154
private:
153155
bool opened_ = false;

include/pika_repl_client.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,16 @@ class PikaReplClient {
8686
int32_t db_index = db_name.back() - '0';
8787
assert(db_index >= 0 && db_index <= 7);
8888
async_write_db_task_counts_[db_index].fetch_sub(incr_step, std::memory_order::memory_order_seq_cst);
89+
NotifyAsyncWriteComplete(db_name);
8990
}
9091

9192
int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) {
9293
int32_t db_index = db_name.back() - '0';
9394
assert(db_index >= 0 && db_index <= 7);
9495
return async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst);
9596
}
97+
// Each asynchronous DB write task is triggered by the worker thread after completion.
98+
void NotifyAsyncWriteComplete(const std::string& db_name);
9699

97100
private:
98101
size_t GetBinlogWorkerIndexByDBName(const std::string &db_name);

include/pika_rm.h

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <string>
1313
#include <unordered_map>
1414
#include <vector>
15+
#include <condition_variable>
1516

1617
#include "pstd/include/pstd_status.h"
1718

@@ -23,12 +24,13 @@
2324
#include "include/pika_stable_log.h"
2425
#include "include/rsync_client.h"
2526

26-
#define kBinlogSendPacketNum 40
27-
#define kBinlogSendBatchNum 100
27+
// Batch processing parameters
28+
#define kBinlogSendPacketNum 500
29+
#define kBinlogSendBatchNum 1000
2830

29-
// unit seconds
30-
#define kSendKeepAliveTimeout (2 * 1000000)
31-
#define kRecvKeepAliveTimeout (20 * 1000000)
31+
// unit microseconds
32+
#define kSendKeepAliveTimeout (500 * 1000) // 500ms发送一次心跳
33+
#define kRecvKeepAliveTimeout (3 * 1000000) // 3 seconds receive timeout
3234

3335

3436
class SyncDB {
@@ -93,6 +95,8 @@ class SyncMasterDB : public SyncDB {
9395
pstd::Mutex session_mu_;
9496
int32_t session_id_ = 0;
9597
ConsensusCoordinator coordinator_;
98+
std::mutex cv_mu_;
99+
std::condition_variable cv_;
96100

97101
//pacificA public:
98102
public:
@@ -199,6 +203,9 @@ class PikaReplicaManager {
199203
void DropItemInOneWriteQueue(const std::string& ip, int port, const std::string& db_name);
200204
void DropItemInWriteQueue(const std::string& ip, int port);
201205
int ConsumeWriteQueue();
206+
207+
// Static callback functions
208+
static void ConsumeWriteQueueTask(void* arg);
202209

203210
// Schedule Task
204211
void ScheduleReplServerBGTask(net::TaskFunc func, void* arg);
@@ -233,7 +240,11 @@ class PikaReplicaManager {
233240
return pika_repl_client_->GetUnfinishedAsyncWriteDBTaskCount(db_name);
234241
}
235242

236-
private:
243+
void NotifyAsyncWriteComplete(const std::string& db_name);
244+
void WaitAsyncWriteComplete(const std::string& db_name);
245+
void UpdateAllMasterDBCommittedID();
246+
247+
private:
237248
void InitDB();
238249
pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip);
239250

@@ -247,6 +258,10 @@ class PikaReplicaManager {
247258
std::unordered_map<std::string, std::unordered_map<std::string, std::queue<WriteTask>>> write_queues_;
248259
std::unique_ptr<PikaReplClient> pika_repl_client_;
249260
std::unique_ptr<PikaReplServer> pika_repl_server_;
261+
262+
std::mutex async_write_mu_;
263+
std::condition_variable async_write_cv_;
264+
std::unordered_map<std::string, int> unfinished_async_write_count_;
250265
};
251266

252267
#endif // PIKA_RM_H

include/pika_server.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,9 @@ class PikaServer : public pstd::noncopyable {
524524
}
525525
}
526526

527+
// 添加批量处理命令的函数声明
528+
void ScheduleWriteDBMulti(const std::vector<std::shared_ptr<Cmd>>& cmds, const std::string& db_name);
529+
527530
private:
528531
/*
529532
* TimingTask use

src/pika_auxiliary_thread.cc

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,34 +19,93 @@ PikaAuxiliaryThread::~PikaAuxiliaryThread() {
1919
LOG(INFO) << "PikaAuxiliary thread " << thread_id() << " exit!!!";
2020
}
2121

22+
void PikaAuxiliaryThread::DoTimingTask() {
23+
uint64_t now = pstd::NowMicros();
24+
25+
static uint64_t last_check_sync_timeout = 0;
26+
static uint64_t last_update_committed_id = 0;
27+
static uint64_t last_wake_up_binlog_sync = 0;
28+
29+
// 降低CheckSyncTimeout频率为每5秒一次
30+
if (now - last_check_sync_timeout > 5000000) { // 5秒
31+
if (g_pika_server->role() & PIKA_ROLE_MASTER) {
32+
g_pika_rm->CheckSyncTimeout(now);
33+
}
34+
last_check_sync_timeout = now;
35+
}
36+
37+
// 提高UpdateAllMasterDBCommittedID频率为每2ms一次
38+
if (now - last_update_committed_id > 2000) { // 2毫秒
39+
g_pika_rm->UpdateAllMasterDBCommittedID();
40+
last_update_committed_id = now;
41+
}
42+
43+
// 控制WakeUpBinlogSync频率为每100ms一次
44+
if (now - last_wake_up_binlog_sync > 100000) { // 100毫秒
45+
g_pika_rm->WakeUpBinlogSync();
46+
last_wake_up_binlog_sync = now;
47+
}
48+
}
49+
2250
void* PikaAuxiliaryThread::ThreadMain() {
51+
// 记录上次同步的时间
52+
uint64_t last_sync_time = 0;
53+
uint64_t last_update_time = 0;
54+
uint64_t last_timing_task_time = 0;
55+
uint64_t last_check_time = 0;
56+
const uint64_t sync_interval = 3000; // 3ms,进一步提高同步频率
57+
const uint64_t update_interval = 2000; // 2ms,进一步提高更新频率
58+
const uint64_t timing_task_interval = 200000; // 200ms,进一步减少定时任务执行间隔
59+
const uint64_t check_interval = 1000000; // 1s,减少检查频率
60+
2361
while (!should_stop()) {
62+
uint64_t now = pstd::NowMicros();
63+
2464
if (g_pika_server->ShouldMetaSync()) {
2565
g_pika_rm->SendMetaSyncRequest();
2666
} else if (g_pika_server->MetaSyncDone()) {
2767
g_pika_rm->RunSyncSlaveDBStateMachine();
2868
}
2969

30-
pstd::Status s = g_pika_rm->CheckSyncTimeout(pstd::NowMicros());
31-
if (!s.ok()) {
70+
// 降低CheckSyncTimeout和CheckLeaderProtectedMode调用频率
71+
if (now - last_check_time >= check_interval) {
72+
pstd::Status s = g_pika_rm->CheckSyncTimeout(now);
73+
if (!s.ok() && !s.IsTimeout()) { // 忽略超时错误的日志记录
3274
LOG(WARNING) << s.ToString();
75+
}
76+
g_pika_server->CheckLeaderProtectedMode();
77+
last_check_time = now;
3378
}
3479

35-
g_pika_server->CheckLeaderProtectedMode();
36-
37-
// TODO(whoiami) timeout
38-
s = g_pika_server->TriggerSendBinlogSync();
39-
if (!s.ok()) {
40-
LOG(WARNING) << s.ToString();
80+
// 增加同步频率,减少同步间隔
81+
if (now - last_sync_time >= sync_interval) {
82+
pstd::Status s = g_pika_server->TriggerSendBinlogSync();
83+
if (!s.ok() && !s.IsTimeout()) { // 忽略超时错误的日志记录
84+
LOG(WARNING) << s.ToString();
85+
}
86+
last_sync_time = now;
87+
}
88+
89+
// 单独处理更新committed_id,提高更新频率
90+
if (now - last_update_time >= update_interval) {
91+
g_pika_rm->UpdateAllMasterDBCommittedID();
92+
last_update_time = now;
93+
}
94+
95+
// 定期执行定时任务
96+
if (now - last_timing_task_time >= timing_task_interval) {
97+
DoTimingTask();
98+
last_timing_task_time = now;
4199
}
100+
42101
// send to peer
43102
int res = g_pika_server->SendToPeer();
44103
if (res == 0) {
45-
// sleep 100 ms
46104
std::unique_lock lock(mu_);
47-
cv_.wait_for(lock, 100ms);
105+
cv_.wait_for(lock, 500us); // 减少等待时间到500微秒
48106
} else {
49-
// LOG_EVERY_N(INFO, 1000) << "Consume binlog number " << res;
107+
// 处理完成后立即继续下一轮,不等待
108+
continue;
50109
}
51110
}
52111
return nullptr;

0 commit comments

Comments
 (0)