-
Notifications
You must be signed in to change notification settings - Fork 58
feat: add initial version of Online-Offline Co-location (OOC) scheduler. #131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds an initial version of the Online-Offline Co-location (OOC) scheduler, which enables handling both online and offline decoding requests on prefill nodes (P nodes) locally rather than always sending them to decode nodes (D nodes). The implementation is based on the existing DisaggPDScheduler but copied instead of inherited to allow for significant architectural divergence.
Key changes:
- Added new PDOOCScheduler class with offline request handling capabilities
- Extended configuration options to support the new OOC mode
- Added debug logging for offline request processing
Reviewed Changes
Copilot reviewed 26 out of 27 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
xllm/core/scheduler/pd_ooc_scheduler.h | New header defining PDOOCScheduler class with offline request handling |
xllm/core/scheduler/pd_ooc_scheduler.cpp | Implementation of PDOOCScheduler with local offline processing logic |
xllm/core/scheduler/scheduler_factory.cpp | Factory method updated to create PDOOCScheduler when OOC is enabled |
xllm/core/scheduler/continuous_scheduler.h | Added enable_pd_ooc option to scheduler configuration |
xllm/core/scheduler/continuous_scheduler.cpp | Added debug logging for offline request tracking |
xllm/core/distributed_runtime/* | New PDOOCService RPC service implementation |
xllm/proto/disagg_pd.proto | Added PDOOCService protocol definition |
xllm/core/common/* | Added enable_pd_ooc flag and option throughout configuration |
xllm/pybind/* | Extended Python bindings to expose OOC configuration |
xllm/server/* | Added server support for PDOOCService |
Comments suppressed due to low confidence (3)
xllm/core/scheduler/pd_ooc_scheduler.cpp:1
- The log message incorrectly states that an online request is being put into the offline queue. This should be 'waiting_priority_queue_' based on the context (line 630).
/* Copyright 2025 The xLLM Authors. All Rights Reserved.
xllm/core/scheduler/pd_ooc_scheduler.cpp:1
- This debug message suggests an 'unknown' priority strategy, but the else branch handles all non-FCFS strategies normally. The message should be more accurate, such as 'Using non-FCFS priority_strategy' or remove it entirely if this is expected behavior.
/* Copyright 2025 The xLLM Authors. All Rights Reserved.
xllm/core/scheduler/pd_ooc_scheduler.cpp:1
- The StepStatus enum is defined but never meaningfully used in the implementation. The step_status member is initialized to IDLE but not updated to reflect actual processing states, making this enum effectively dead code.
/* Copyright 2025 The xLLM Authors. All Rights Reserved.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
static constexpr size_t kOutputTheadNum_ = 128; // magic num | ||
size_t next_thread_idx = 0; | ||
ThreadPool output_threadpools_[kOutputTheadNum_]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable name has a typo: 'kOutputTheadNum_' should be 'kOutputThreadNum_'.
static constexpr size_t kOutputTheadNum_ = 128; // magic num | |
size_t next_thread_idx = 0; | |
ThreadPool output_threadpools_[kOutputTheadNum_]; | |
static constexpr size_t kOutputThreadNum_ = 128; // magic num | |
size_t next_thread_idx = 0; | |
ThreadPool output_threadpools_[kOutputThreadNum_]; |
Copilot uses AI. Check for mistakes.
virtual ~PDOOCService() = default; | ||
|
||
// for prefill recv decode response | ||
void Generation(::google::protobuf::RpcController* controller, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
函数命名要小写,需要和其他代码一致
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
函数命名要小写,需要和其他代码一致
This function inherits from the proto::PDOOCService and its name cannot be modified.
PDOOCServiceImpl::PDOOCServiceImpl(PDOOCScheduler* scheduler, Engine* engine) | ||
: PDOOCServiceImplInterface(), scheduler_(scheduler), engine_(engine) {} | ||
|
||
std::shared_ptr<Request> PDOOCServiceImpl::generate_request( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this function be more appropriately placed in request builder?
std::shared_ptr<Request> request = *it; | ||
request->update_connection_status(); | ||
if (request->finished() || request->cancelled()) { | ||
// DVLOG << "Found a finished request in running_requests_"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete useless comments.
std::shared_ptr<Request> request = nullptr; | ||
int request_thread_idx = -1; | ||
{ | ||
std::lock_guard<std::mutex> lock(remote_requests_map_mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is necessary to create a concurrent map/thread_local map to encapsulate the mutex and two maps.
…uler and DisaggPDService.
…g sufficient resources
PDOOCServiceImplInterface() = default; | ||
virtual ~PDOOCServiceImplInterface() = default; | ||
|
||
virtual void decode_recv_new_requests(const proto::DisaggRequests* request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we'd best to use abstract function in Interface
class. And class DisaggPDServiceImplInterface
also need be modified .
example: virtual void decode_recv_new_requests(...) = 0;
const proto::DisaggRequest& req) { | ||
// create a new request | ||
// TODO: Should to support best_of > 1 case, now we only consider | ||
// to allocate blocks for the first sequence in the request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the function is same to DisaggPDServiceImpl::generate_request
. maybe we need to make it as a common util function.
void PDOOCServiceImpl::decode_recv_new_requests( | ||
const proto::DisaggRequests* request, | ||
proto::DisaggResponses* response) { | ||
// link prefill cluster |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the function is also same to DisaggPDServiceImpl::decode_recv_new_requests
.
ok, maybe we need to create a base class like: PDServiceImpl
, we implement some common function like: generate_request
, decode_recv_new_requests
...
class DisaggPDServiceImpl
and PDOOCServiceImpl
should inherits from PDServiceImpl
.
|
||
// TODO: support embedding later, now we only support tokens | ||
void PDOOCServiceImpl::decode_recv_first_generation( | ||
const proto::DisaggGenerations* request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
} | ||
|
||
bool PDOOCServiceImpl::prefill_recv_generation( | ||
const proto::DisaggStreamGeneration* request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
} | ||
|
||
void PDOOCServiceImpl::prefill_recv_generations( | ||
const proto::DisaggStreamGenerations* requests, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
|
||
std::vector<Token> Sequence::get_generated_tokens() const { | ||
std::vector<Token> generated_tokens; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use Slice
type to instead vector
. like Slice<int32_t> tokens() const { return {tokens_, num_tokens_}; }
or use const std::vector<Token>&
, these both will aovoid copy tokens.
|
||
// if (request->offline()) { | ||
// DVLOG << "Read an offline request from request_queue_"; | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete useless comments.
GAUGE_SET(num_free_blocks, util::max(block_manager_pool_->num_free_blocks())); | ||
GAUGE_SET(num_used_blocks, util::min(block_manager_pool_->num_used_blocks())); | ||
if (!batches[0].empty()) { | ||
DVLOG << "Built a batch"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that it's not needed either.
|
||
enum class StepStatus { ONLINE_PREFILL, OFFLINE_PREFILL, OFFLINE_DECODE, IDLE }; | ||
|
||
class PDOOCScheduler : public ContinuousScheduler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we can add some descriptions comments here to introduce the PPC Scheduler. :)
virtual ~PDOOCScheduler(); | ||
|
||
virtual uint32_t get_waiting_requests_num() const override { | ||
return waiting_priority_queue_.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that: currently we have two waiting queue
RequestPriorityQueue waiting_priority_queue_;
RequestPriorityQueue waiting_priority_queue_offline_;
std::vector<std::shared_ptr<Request>>, | ||
std::function<bool(const std::shared_ptr<Request>&, | ||
const std::shared_ptr<Request>&)>>; | ||
RequestPriorityQueue waiting_priority_queue_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waiting_priority_queue_
and waiting_priority_queue_offline_
have beed defined in class ContinuousScheduler
, the protected
member can be used here.
// thread. | ||
std::unordered_map<proto::PDOOCService_Stub*, size_t> | ||
remote_prefill_thread_map_; | ||
size_t next_prefill_thread_idx = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: next_prefill_thread_idx
-> next_prefill_thread_idx_
// TODO: maybe we should consider update info case even if info already exists | ||
// in local. | ||
bool PDOOCScheduler::check_remote_instance_info( | ||
const std::string& instance_name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like service impl
, maybe we need to create a base class for PDOOCScheduler
and DisaggPDScheduler
.
same function to DisaggPDScheduler::check_remote_instance_info(...)
|
||
proto::PDOOCService_Stub* PDOOCScheduler::create_rpc_channel( | ||
const std::string& instance_name) { | ||
std::lock_guard<std::mutex> lock(instance_channel_map_mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
Except for this line code.
proto::PDOOCService_Stub* stub = new proto::PDOOCService_Stub(channel);
# local files | ||
/local | ||
|
||
CLAUDE.md |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea how these files created.
The initial version of OOC scheduler is mainly based on DisaggPDScheduler, except for handling offline decoding requests on P nodes locally rather than sending to D nodes.
I choose to copy instead of inherit DisaggPDScheduler considering the potential huge gap between OOC scheduler and DisaggPDScheduler.