Skip to content

Commit 68da651

Browse files
committed
add two-step parser
1 parent a4d779e commit 68da651

File tree

1 file changed

+42
-6
lines changed

1 file changed

+42
-6
lines changed

compute/src/reqres/task.rs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,35 +21,71 @@ impl TaskResponder {
2121
compute_message: &DriaMessage,
2222
channel: ResponseChannel<Vec<u8>>,
2323
) -> Result<(TaskWorkerInput, TaskWorkerMetadata)> {
24+
// parse this in two-steps so that if something goes wrong we know the task id
2425
let task = compute_message
25-
.parse_payload::<TaskRequestPayload<TaskBody>>()
26-
.wrap_err("could not parse task payload")?;
26+
.parse_payload::<TaskRequestPayload<serde_json::Value>>()
27+
.wrap_err("could not parse task request payload")?;
28+
let task_body = match serde_json::from_value::<TaskBody>(task.input)
29+
.wrap_err("could not parse task body")
30+
{
31+
Ok(task_body) => task_body,
32+
Err(err) => {
33+
let err_string = format!("{:#}", err);
34+
log::error!(
35+
"Task {}/{} failed due to parsing error: {}",
36+
task.file_id,
37+
task.task_id,
38+
err_string
39+
);
40+
41+
// prepare error payload
42+
let error_payload = TaskResponsePayload {
43+
result: None,
44+
error: Some(err_string),
45+
row_id: task.row_id,
46+
file_id: task.file_id,
47+
task_id: task.task_id,
48+
model: Default::default(),
49+
stats: TaskStats::new(),
50+
};
51+
52+
let error_payload_str = serde_json::to_string(&error_payload)
53+
.wrap_err("could not serialize payload")?;
54+
55+
// respond through the channel to notify about the parsing error
56+
let response = node.new_message(error_payload_str, TASK_RESULT_TOPIC);
57+
node.p2p.respond(response.into(), channel).await?;
58+
59+
return Err(err);
60+
}
61+
};
62+
2763
let stats = TaskStats::new().record_received_at();
2864
log::info!(
2965
"Handling {} {} with model {}",
3066
"task".yellow(),
3167
task.row_id,
32-
task.input.model.to_string().yellow()
68+
task_body.model.to_string().yellow()
3369
);
3470

3571
// check if the model is available in this node, if so
3672
// it will return an executor that can run this model
3773
let executor = node
3874
.config
3975
.executors
40-
.get_executor(&task.input.model)
76+
.get_executor(&task_body.model)
4177
.await
4278
.wrap_err("could not get an executor")?;
4379

4480
let task_metadata = TaskWorkerMetadata {
4581
task_id: task.task_id,
4682
file_id: task.file_id,
47-
model_name: task.input.model.to_string(),
83+
model_name: task_body.model.to_string(),
4884
channel,
4985
};
5086
let task_input = TaskWorkerInput {
5187
executor,
52-
task: task.input,
88+
task: task_body,
5389
row_id: task.row_id,
5490
stats,
5591
};

0 commit comments

Comments
 (0)