Skip to content

Commit fda56d8

Browse files
author
wangshaoyi
committed
fix bugs and add TODOS
1 parent d6a50a4 commit fda56d8

File tree

6 files changed

+40
-25
lines changed

6 files changed

+40
-25
lines changed

src/net/src/client_thread.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,6 @@ void ClientThread::ProcessNotifyEvents(const NetFiredEvent* pfe) {
356356
NotifyWrite(ip_port);
357357
}
358358
} else if (ti.notify_type() == kNotiClose) {
359-
LOG(INFO) << "received kNotiClose";
360359
net_multiplexer_->NetDelEvent(fd, 0);
361360
CloseFd(fd, ip_port);
362361
fd_conns_.erase(fd);

src/net/src/holy_thread.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ namespace net {
1919
HolyThread::HolyThread(int port, ConnFactory* conn_factory, int cron_interval, const ServerHandle* handle, bool async)
2020
: ServerThread::ServerThread(port, cron_interval, handle),
2121
conn_factory_(conn_factory),
22-
22+
2323
keepalive_timeout_(kDefaultKeepAliveTime),
2424
async_(async) {}
2525

@@ -290,7 +290,6 @@ void HolyThread::ProcessNotifyEvents(const net::NetFiredEvent* pfe) {
290290
if (ti.notify_type() == net::kNotiWrite) {
291291
net_multiplexer_->NetModEvent(ti.fd(), 0, kReadable | kWritable);
292292
} else if (ti.notify_type() == net::kNotiClose) {
293-
LOG(INFO) << "receive noti close";
294293
std::shared_ptr<net::NetConn> conn = get_conn(fd);
295294
if (!conn) {
296295
continue;

src/net/src/pb_conn.cc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
// LICENSE file in the root directory of this source tree. An additional grant
44
// of patent rights can be found in the PATENTS file in the same directory.
55

6+
#include <sys/socket.h>
7+
#include <netinet/tcp.h>
8+
#include <netinet/in.h>
9+
610
#include "net/include/pb_conn.h"
711

812
#include <arpa/inet.h>
@@ -20,7 +24,7 @@ namespace net {
2024

2125
PbConn::PbConn(const int fd, const std::string& ip_port, Thread* thread, NetMultiplexer* mpx)
2226
: NetConn(fd, ip_port, thread, mpx),
23-
27+
2428
write_buf_(0)
2529
{
2630
rbuf_ = reinterpret_cast<char*>(malloc(sizeof(char) * PB_IOBUF_LEN));
@@ -36,7 +40,9 @@ ReadStatus PbConn::GetRequest() {
3640
while (true) {
3741
switch (connStatus_) {
3842
case kHeader: {
43+
int quickack = 1;
3944
ssize_t nread = read(fd(), rbuf_ + cur_pos_, COMMAND_HEADER_LENGTH - cur_pos_);
45+
setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack));
4046
if (nread == -1) {
4147
if (errno == EAGAIN) {
4248
return kReadHalf;
@@ -75,6 +81,8 @@ ReadStatus PbConn::GetRequest() {
7581
}
7682
// read msg body
7783
ssize_t nread = read(fd(), rbuf_ + cur_pos_, remain_packet_len_);
84+
int quickack = 1;
85+
setsockopt(fd(), IPPROTO_TCP, TCP_QUICKACK, &quickack, sizeof(quickack));
7886
if (nread == -1) {
7987
if (errno == EAGAIN) {
8088
return kReadHalf;
@@ -123,6 +131,7 @@ WriteStatus PbConn::SendReply() {
123131
while (item_len - write_buf_.item_pos_ > 0) {
124132
nwritten = write(fd(), item.data() + write_buf_.item_pos_, item_len - write_buf_.item_pos_);
125133
if (nwritten <= 0) {
134+
LOG(ERROR) << "nwritten less than 0";
126135
break;
127136
}
128137
g_network_statistic->IncrReplOutputBytes(nwritten);
@@ -144,6 +153,7 @@ WriteStatus PbConn::SendReply() {
144153
if (item_len - write_buf_.item_pos_ != 0) {
145154
return kWriteHalf;
146155
}
156+
LOG(ERROR) << "write item success";
147157
}
148158
return kWriteAll;
149159
}

src/pika_command.cc

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -874,20 +874,24 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
874874
db_->DBLockShared();
875875
}
876876
if(g_pika_server->IsConsistency()){
877+
uint64_t before_do_binlog_us = pstd::NowMicros();
877878
DoBinlog();
879+
uint64_t end_us = pstd::NowMicros();
880+
this->binlog_duration_ms = (end_us - before_do_binlog_us) / 1000;
878881
if(res().ok()){
882+
uint64_t before_do_command_us = pstd::NowMicros();
879883
DoCommand(hint_keys);
880-
}
881-
if (g_pika_conf->slowlog_slower_than() >= 0) {
882-
do_duration_ += pstd::NowMicros() - start_us;
884+
this->command_duration_ms = (pstd::NowMicros() - before_do_command_us) / 1000;
883885
}
884886
}else{
887+
uint64_t before_do_command_us = pstd::NowMicros();
885888
DoCommand(hint_keys);
886-
if (g_pika_conf->slowlog_slower_than() >= 0) {
887-
do_duration_ += pstd::NowMicros() - start_us;
888-
}
889+
this->command_duration_ms = (pstd::NowMicros() - before_do_command_us) / 1000;
889890

891+
uint64_t before_do_binlog_us = pstd::NowMicros();
890892
DoBinlog();
893+
uint64_t end_us = pstd::NowMicros();
894+
this->binlog_duration_ms = (end_us - before_do_binlog_us) / 1000;
891895
}
892896

893897
if (!IsSuspend()) {
@@ -896,9 +900,6 @@ void Cmd::InternalProcessCommand(const HintKeys& hint_keys) {
896900
if (is_write()) {
897901
record_lock.Unlock(current_key());
898902
}
899-
900-
uint64_t end_us = pstd::NowMicros();
901-
this->binlog_duration_ms = (end_us - before_do_binlog_us) / 1000;
902903
}
903904

904905
void Cmd::DoCommand(const HintKeys& hint_keys) {
@@ -977,7 +978,6 @@ void Cmd::DoBinlog() {
977978
<< s.ToString();
978979
res().SetRes(CmdRes::kErrOther, s.ToString());
979980
}
980-
981981
return;
982982
}
983983
}

src/pika_consensus.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ Status Context::Init() {
6565
void Context::UpdateAppliedIndex(const LogOffset& offset) {
6666
std::lock_guard l(rwlock_);
6767
LogOffset cur_offset;
68-
applied_win_.Update(SyncWinItem(offset), SyncWinItem(offset), &cur_offset);
68+
// TODO: 暂时注释掉这一行,因为applied_win_没有push调用,只有update,窗口永远对不上
69+
//applied_win_.Update(SyncWinItem(offset), SyncWinItem(offset), &cur_offset);
6970
if (cur_offset > applied_index_) {
7071
applied_index_ = cur_offset;
7172
StableSave();
@@ -811,6 +812,8 @@ bool ConsensusCoordinator::GetISConsistency() {
811812
}
812813

813814
bool ConsensusCoordinator::checkFinished(const LogOffset& offset) {
815+
//TODO: 暂时加了读写锁,后期考虑替换为原子变量
816+
std::lock_guard l(committed_id_rwlock_);
814817
if (offset <= committed_id_) {
815818
return true;
816819
}
@@ -836,6 +839,8 @@ Status ConsensusCoordinator::PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd
836839
return s;
837840
}
838841
// If successful, append the log entry to the logs
842+
// TODO: 这里logs_的appendlog操作和上边的stable_logger_->Logger()->Put不是原子的,可能导致offset大的先被追加到logs_中,
843+
// 多线程写入的时候窗口会对不上,最终主从断开连接。需要加逻辑保证原子性
839844
logs_->AppendLog(Log::LogItem(cur_offset, cmd_ptr, binlog));
840845

841846
SetPreparedId(cur_offset);
@@ -948,7 +953,8 @@ Status ConsensusCoordinator::ApplyBinlog(const std::shared_ptr<Cmd>& cmd_ptr) {
948953
} else {
949954
int32_t wait_ms = 250;
950955
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
951-
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
956+
// TODO: 暂时去掉了sleep的逻辑,考虑使用条件变量唤醒
957+
//std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
952958
wait_ms *= 2;
953959
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
954960
}

src/pika_rm.cc

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -437,9 +437,9 @@ Status SyncMasterDB::CommitAppLog(const LogOffset& master_committed_id){
437437
return coordinator_.CommitAppLog(master_committed_id);
438438
}
439439
Status SyncMasterDB::AppendCandidateBinlog(const std::string& ip, int port, const LogOffset& offset) {
440-
std::shared_ptr<SlaveNode> slave_ptr = GetSlaveNode(ip, port);
440+
std::shared_ptr<SlaveNode> slave_ptr = GetSlaveNode(ip, port);
441441
if (!slave_ptr) {
442-
return Status::NotFound("ip " + ip + " port " + std::to_string(port));
442+
return Status::NotFound("ip " + ip + " port " + std::to_string(port));
443443
}
444444

445445
{
@@ -452,23 +452,23 @@ Status SyncMasterDB::AppendCandidateBinlog(const std::string& ip, int port, cons
452452
if(slave_ptr->slave_state == KCandidate){
453453
LOG(INFO)<<"PacificA first binlog slave_state is Candidate";
454454
}
455-
slave_ptr->sent_offset = offset;
455+
slave_ptr->sent_offset = offset;
456456
slave_ptr->acked_offset = offset;
457457
slave_ptr->target_offset =GetPreparedId();
458458
Status s = slave_ptr->InitBinlogFileReader(Logger(), offset.b_offset);
459459
if (!s.ok()) {
460460
return Status::Corruption("Init binlog file reader failed" + s.ToString()); // 如果初始化失败,返回错误状态
461461
}
462-
g_pika_rm->DropItemInOneWriteQueue(ip, port, slave_ptr->DBName());
463-
slave_ptr->b_state = kReadFromFile;
462+
g_pika_rm->DropItemInOneWriteQueue(ip, port, slave_ptr->DBName());
463+
slave_ptr->b_state = kReadFromFile;
464464
}
465465

466466
Status s = coordinator_.SendBinlog(slave_ptr, slave_ptr->DBName());
467467
if (!s.ok()) {
468-
return s;
468+
return s;
469469
}
470470

471-
return Status::OK();
471+
return Status::OK();
472472
}
473473

474474
Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
@@ -489,9 +489,10 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
489489
while (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start).count() < 10) {
490490
// Check if consensus has been achieved for the given log offset
491491
if (checkFinished(offset)) {
492-
return Status::OK();
492+
return Status::OK();
493493
}
494-
std::this_thread::sleep_for(std::chrono::milliseconds(50));
494+
// TODO: 这里暂时注掉了sleep等待,50ms耗时过长,影响写入链路,后期需要改成条件变量唤醒方式
495+
//std::this_thread::sleep_for(std::chrono::milliseconds(50));
495496
}
496497

497498
return Status::Timeout("No consistency achieved within 10 seconds");

0 commit comments

Comments
 (0)