Skip to content

Commit 4655394

Browse files
committed
[WIP] feat: D node sends a pull signal to P node while having sufficient resources.
1 parent 2e852c8 commit 4655394

File tree

12 files changed

+263
-47
lines changed

12 files changed

+263
-47
lines changed

xllm/core/common/macros.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,6 @@ namespace xllm {
6262

6363
#define CALLBACK_WITH_ERROR(CODE, MSG) callback(Status{CODE, MSG});
6464

65-
#define DLOG VLOG(1) << "[Local offline] "
65+
#define DVLOG VLOG(1) << "[Offline pull] "
6666

6767
} // namespace xllm

xllm/core/distributed_runtime/pd_ooc_service.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,12 @@ void PDOOCService::FirstGeneration(
6262
pd_ooc_service_impl_->decode_recv_first_generation(request, response);
6363
}
6464

65+
void PDOOCService::SendPullSignal(::google::protobuf::RpcController* controller,
66+
const proto::PullSignal* request,
67+
proto::Status* response,
68+
::google::protobuf::Closure* done) {
69+
brpc::ClosureGuard done_guard(done);
70+
pd_ooc_service_impl_->prefill_recv_pull_signal(request, response);
71+
}
72+
6573
} // namespace xllm

xllm/core/distributed_runtime/pd_ooc_service.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ class PDOOCService : public proto::PDOOCService {
5050
proto::Status* response,
5151
::google::protobuf::Closure* done) override;
5252

53+
void SendPullSignal(::google::protobuf::RpcController* controller,
54+
const proto::PullSignal* request,
55+
proto::Status* response,
56+
::google::protobuf::Closure* done) override;
57+
5358
private:
5459
DISALLOW_COPY_AND_ASSIGN(PDOOCService);
5560
std::unique_ptr<PDOOCServiceImpl> pd_ooc_service_impl_;

xllm/core/distributed_runtime/pd_ooc_service_impl.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,4 +286,15 @@ void PDOOCServiceImpl::prefill_recv_generations(
286286
}
287287
}
288288

289+
void PDOOCServiceImpl::prefill_recv_pull_signal(
290+
const proto::PullSignal* request,
291+
proto::Status* response) {
292+
// Put the pull signal into a queue and response
293+
bool result = scheduler_->write_pull_signal(proto::PullSignal(*request));
294+
295+
if (response) {
296+
response->set_ok(result);
297+
}
298+
}
299+
289300
} // namespace xllm

xllm/core/distributed_runtime/pd_ooc_service_impl.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ class PDOOCServiceImplInterface {
4545
virtual void prefill_recv_generations(
4646
const proto::DisaggStreamGenerations* requests,
4747
proto::StatusSet* responses) {}
48+
49+
virtual void prefill_recv_pull_signal(const proto::PullSignal* request,
50+
proto::Status* response) {}
4851
};
4952

5053
class PDOOCServiceImpl final : public PDOOCServiceImplInterface {
@@ -64,6 +67,9 @@ class PDOOCServiceImpl final : public PDOOCServiceImplInterface {
6467
void decode_recv_first_generation(const proto::DisaggGenerations* request,
6568
proto::Status* response) override;
6669

70+
void prefill_recv_pull_signal(const proto::PullSignal* request,
71+
proto::Status* response) override;
72+
6773
private:
6874
std::shared_ptr<Request> generate_request(const proto::DisaggRequest& req);
6975

xllm/core/runtime/xservice_client.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,23 @@ std::vector<std::string> XServiceClient::get_static_decode_list() {
403403
return std::vector<std::string>(resp.names().begin(), resp.names().end());
404404
}
405405

406+
std::vector<std::string> XServiceClient::get_static_prefill_list() {
407+
brpc::Controller cntl;
408+
xllm_service::proto::InstanceID req;
409+
xllm_service::proto::InstanceIDs resp;
410+
req.set_name(instance_name_);
411+
{
412+
std::shared_lock<std::shared_mutex> lock(mutex_);
413+
xservice_stub_->GetStaticPrefillList(&cntl, &req, &resp, nullptr);
414+
}
415+
if (cntl.Failed()) {
416+
LOG(ERROR) << "Fail to get static prefill list from xservice server "
417+
<< xservice_addr_ << ", error text: " << cntl.ErrorText();
418+
return {};
419+
}
420+
return std::vector<std::string>(resp.names().begin(), resp.names().end());
421+
}
422+
406423
ServiceConfig XServiceClient::get_config() {
407424
brpc::Controller cntl;
408425
xllm_service::proto::Empty req;

xllm/core/runtime/xservice_client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class XServiceClient {
5656
void heartbeat();
5757
InstanceInfo get_instance_info(const std::string& instance_name);
5858
std::vector<std::string> get_static_decode_list();
59+
std::vector<std::string> get_static_prefill_list();
5960
ServiceConfig get_config();
6061

6162
// response generation tokens to xllm service

xllm/core/scheduler/continuous_scheduler.cpp

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -612,9 +612,9 @@ std::vector<Batch> ContinuousScheduler::prepare_batch() {
612612
while (request_queue_.read(request)) {
613613
CHECK(request);
614614

615-
if (request->offline()) {
616-
DLOG << "Read an offline request from request_queue_";
617-
}
615+
// if (request->offline()) {
616+
// DVLOG << "Read an offline request from request_queue_";
617+
// }
618618

619619
// expand sequences to the target number if prefix cache is disabled.
620620
if (!enable_prefix_cache_) {
@@ -625,10 +625,12 @@ std::vector<Batch> ContinuousScheduler::prepare_batch() {
625625
if (request->sequences()[0]->kv_state().kv_cache_tokens_num() == 0) {
626626
if (request->offline()) {
627627
waiting_priority_queue_offline_.push(request);
628-
DLOG << "Put an offline request into waiting_priority_queue_offline_";
628+
// DVLOG << "Put an offline request into
629+
// waiting_priority_queue_offline_";
629630
} else {
630631
waiting_priority_queue_.push(request);
631-
DLOG << "Put an online request into waiting_priority_queue_offline_";
632+
// DVLOG << "Put an online request into
633+
// waiting_priority_queue_offline_";
632634
}
633635
} else {
634636
// request from prefill instance in disagge pd mode.
@@ -646,7 +648,7 @@ std::vector<Batch> ContinuousScheduler::prepare_batch() {
646648
std::shared_ptr<Request> request = *it;
647649
request->update_connection_status();
648650
if (request->finished() || request->cancelled()) {
649-
DLOG << "Found a finished request in running_requests_";
651+
// DVLOG << "Found a finished request in running_requests_";
650652
block_manager_pool_->deallocate(request.get());
651653
// release the ownership of the request
652654
finished_requests.emplace_back(request);
@@ -671,10 +673,10 @@ std::vector<Batch> ContinuousScheduler::prepare_batch() {
671673
handle_running_requests(*it);
672674
if ((*it)->offline()) {
673675
running_queue_offline_->push(*it, last_step_prefill_);
674-
DLOG << "Put an offline request into running_queue_offline_";
676+
// DVLOG << "Put an offline request into running_queue_offline_";
675677
} else {
676678
running_queue_->push(*it, last_step_prefill_);
677-
DLOG << "Put an online request into running_queue_";
679+
// DVLOG << "Put an online request into running_queue_";
678680
}
679681
}
680682
} else {
@@ -697,16 +699,17 @@ std::vector<Batch> ContinuousScheduler::prepare_batch() {
697699
handle_running_requests(*it);
698700
if ((*it)->offline()) {
699701
running_queue_offline_->push(*it, last_step_prefill_);
700-
DLOG << "Pushed an offline request into running_queue_offline_";
702+
// DVLOG << "Pushed an offline request into running_queue_offline_";
701703
} else {
702704
running_queue_->push(*it, last_step_prefill_);
703-
DLOG << "Pushed an online request into running_queue_";
705+
// DVLOG << "Pushed an online request into running_queue_";
704706
}
705707
}
706708
}
707709
} else {
708-
DLOG << "Using unknown priority_strategy: " << options_.priority_strategy();
709-
// directly push running requests to the priority queue
710+
// DVLOG << "Using unknown priority_strategy: " <<
711+
// options_.priority_strategy(); directly push running requests to the
712+
// priority queue
710713
for (auto it = running_requests_.begin(); it != running_requests_.end();
711714
++it) {
712715
if (*it == nullptr) {
@@ -715,10 +718,10 @@ std::vector<Batch> ContinuousScheduler::prepare_batch() {
715718
handle_running_requests(*it);
716719
if ((*it)->offline()) {
717720
running_queue_offline_->push(*it);
718-
DLOG << "Pushed an offline request into running_queue_offline_";
721+
// DVLOG << "Pushed an offline request into running_queue_offline_";
719722
} else {
720723
running_queue_->push(*it);
721-
DLOG << "Pushed an online request into running_queue_";
724+
// DVLOG << "Pushed an online request into running_queue_";
722725
}
723726
}
724727
}
@@ -827,7 +830,7 @@ std::vector<Batch> ContinuousScheduler::prepare_batch() {
827830
GAUGE_SET(num_free_blocks, util::max(block_manager_pool_->num_free_blocks()));
828831
GAUGE_SET(num_used_blocks, util::min(block_manager_pool_->num_used_blocks()));
829832
if (!batches[0].empty()) {
830-
DLOG << "Built a batch";
833+
DVLOG << "Built a batch";
831834
}
832835
return batches;
833836
}

0 commit comments

Comments
 (0)