Skip to content

Conversation

tzh21
Copy link
Collaborator

@tzh21 tzh21 commented Sep 14, 2025

The initial version of OOC scheduler is mainly based on DisaggPDScheduler, except for handling offline decoding requests on P nodes locally rather than sending to D nodes.
I choose to copy instead of inherit DisaggPDScheduler considering the potential huge gap between OOC scheduler and DisaggPDScheduler.

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds an initial version of the Online-Offline Co-location (OOC) scheduler, which enables handling both online and offline decoding requests on prefill nodes (P nodes) locally rather than always sending them to decode nodes (D nodes). The implementation is based on the existing DisaggPDScheduler but copied instead of inherited to allow for significant architectural divergence.

Key changes:

  • Added new PDOOCScheduler class with offline request handling capabilities
  • Extended configuration options to support the new OOC mode
  • Added debug logging for offline request processing

Reviewed Changes

Copilot reviewed 26 out of 27 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
xllm/core/scheduler/pd_ooc_scheduler.h New header defining PDOOCScheduler class with offline request handling
xllm/core/scheduler/pd_ooc_scheduler.cpp Implementation of PDOOCScheduler with local offline processing logic
xllm/core/scheduler/scheduler_factory.cpp Factory method updated to create PDOOCScheduler when OOC is enabled
xllm/core/scheduler/continuous_scheduler.h Added enable_pd_ooc option to scheduler configuration
xllm/core/scheduler/continuous_scheduler.cpp Added debug logging for offline request tracking
xllm/core/distributed_runtime/* New PDOOCService RPC service implementation
xllm/proto/disagg_pd.proto Added PDOOCService protocol definition
xllm/core/common/* Added enable_pd_ooc flag and option throughout configuration
xllm/pybind/* Extended Python bindings to expose OOC configuration
xllm/server/* Added server support for PDOOCService
Comments suppressed due to low confidence (3)

xllm/core/scheduler/pd_ooc_scheduler.cpp:1

  • The log message incorrectly states that an online request is being put into the offline queue. This should be 'waiting_priority_queue_' based on the context (line 630).
/* Copyright 2025 The xLLM Authors. All Rights Reserved.

xllm/core/scheduler/pd_ooc_scheduler.cpp:1

  • This debug message suggests an 'unknown' priority strategy, but the else branch handles all non-FCFS strategies normally. The message should be more accurate, such as 'Using non-FCFS priority_strategy' or remove it entirely if this is expected behavior.
/* Copyright 2025 The xLLM Authors. All Rights Reserved.

xllm/core/scheduler/pd_ooc_scheduler.cpp:1

  • The StepStatus enum is defined but never meaningfully used in the implementation. The step_status member is initialized to IDLE but not updated to reflect actual processing states, making this enum effectively dead code.
/* Copyright 2025 The xLLM Authors. All Rights Reserved.

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +158 to +197
static constexpr size_t kOutputTheadNum_ = 128; // magic num
size_t next_thread_idx = 0;
ThreadPool output_threadpools_[kOutputTheadNum_];
Copy link
Preview

Copilot AI Sep 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name has a typo: 'kOutputTheadNum_' should be 'kOutputThreadNum_'.

Suggested change
static constexpr size_t kOutputTheadNum_ = 128; // magic num
size_t next_thread_idx = 0;
ThreadPool output_threadpools_[kOutputTheadNum_];
static constexpr size_t kOutputThreadNum_ = 128; // magic num
size_t next_thread_idx = 0;
ThreadPool output_threadpools_[kOutputThreadNum_];

Copilot uses AI. Check for mistakes.

virtual ~PDOOCService() = default;

// for prefill recv decode response
void Generation(::google::protobuf::RpcController* controller,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

函数命名要小写,需要和其他代码一致

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

函数命名要小写,需要和其他代码一致

This function inherits from the proto::PDOOCService and its name cannot be modified.

PDOOCServiceImpl::PDOOCServiceImpl(PDOOCScheduler* scheduler, Engine* engine)
: PDOOCServiceImplInterface(), scheduler_(scheduler), engine_(engine) {}

std::shared_ptr<Request> PDOOCServiceImpl::generate_request(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this function be more appropriately placed in request builder?

std::shared_ptr<Request> request = *it;
request->update_connection_status();
if (request->finished() || request->cancelled()) {
// DVLOG << "Found a finished request in running_requests_";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete useless comments.

std::shared_ptr<Request> request = nullptr;
int request_thread_idx = -1;
{
std::lock_guard<std::mutex> lock(remote_requests_map_mutex_);
Copy link
Collaborator

@liutongxuan liutongxuan Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary to create a concurrent map/thread_local map to encapsulate the mutex and two maps.

PDOOCServiceImplInterface() = default;
virtual ~PDOOCServiceImplInterface() = default;

virtual void decode_recv_new_requests(const proto::DisaggRequests* request,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we'd best to use abstract function in Interface class. And class DisaggPDServiceImplInterface also need be modified .

example: virtual void decode_recv_new_requests(...) = 0;

const proto::DisaggRequest& req) {
// create a new request
// TODO: Should to support best_of > 1 case, now we only consider
// to allocate blocks for the first sequence in the request.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the function is same to DisaggPDServiceImpl::generate_request. maybe we need to make it as a common util function.

void PDOOCServiceImpl::decode_recv_new_requests(
const proto::DisaggRequests* request,
proto::DisaggResponses* response) {
// link prefill cluster
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the function is also same to DisaggPDServiceImpl::decode_recv_new_requests.

ok, maybe we need to create a base class like: PDServiceImpl, we implement some common function like: generate_request, decode_recv_new_requests ...

class DisaggPDServiceImpl and PDOOCServiceImpl should inherits from PDServiceImpl.


// TODO: support embedding later, now we only support tokens
void PDOOCServiceImpl::decode_recv_first_generation(
const proto::DisaggGenerations* request,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

}

bool PDOOCServiceImpl::prefill_recv_generation(
const proto::DisaggStreamGeneration* request,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

}

void PDOOCServiceImpl::prefill_recv_generations(
const proto::DisaggStreamGenerations* requests,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above


std::vector<Token> Sequence::get_generated_tokens() const {
std::vector<Token> generated_tokens;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use Slice type to instead vector. like Slice<int32_t> tokens() const { return {tokens_, num_tokens_}; }
or use const std::vector<Token>&, these both will aovoid copy tokens.


// if (request->offline()) {
// DVLOG << "Read an offline request from request_queue_";
// }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete useless comments.

GAUGE_SET(num_free_blocks, util::max(block_manager_pool_->num_free_blocks()));
GAUGE_SET(num_used_blocks, util::min(block_manager_pool_->num_used_blocks()));
if (!batches[0].empty()) {
DVLOG << "Built a batch";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that it's not needed either.


enum class StepStatus { ONLINE_PREFILL, OFFLINE_PREFILL, OFFLINE_DECODE, IDLE };

class PDOOCScheduler : public ContinuousScheduler {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we can add some descriptions comments here to introduce the PPC Scheduler. :)

virtual ~PDOOCScheduler();

virtual uint32_t get_waiting_requests_num() const override {
return waiting_priority_queue_.size();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that: currently we have two waiting queue

  RequestPriorityQueue waiting_priority_queue_;
  RequestPriorityQueue waiting_priority_queue_offline_;

std::vector<std::shared_ptr<Request>>,
std::function<bool(const std::shared_ptr<Request>&,
const std::shared_ptr<Request>&)>>;
RequestPriorityQueue waiting_priority_queue_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting_priority_queue_ and waiting_priority_queue_offline_ have beed defined in class ContinuousScheduler, the protected member can be used here.

// thread.
std::unordered_map<proto::PDOOCService_Stub*, size_t>
remote_prefill_thread_map_;
size_t next_prefill_thread_idx = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: next_prefill_thread_idx -> next_prefill_thread_idx_

// TODO: maybe we should consider update info case even if info already exists
// in local.
bool PDOOCScheduler::check_remote_instance_info(
const std::string& instance_name) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like service impl, maybe we need to create a base class for PDOOCScheduler and DisaggPDScheduler.

same function to DisaggPDScheduler::check_remote_instance_info(...)


proto::PDOOCService_Stub* PDOOCScheduler::create_rpc_channel(
const std::string& instance_name) {
std::lock_guard<std::mutex> lock(instance_channel_map_mutex_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

Except for this line code.
proto::PDOOCService_Stub* stub = new proto::PDOOCService_Stub(channel);

# local files
/local

CLAUDE.md
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea how these files created.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants