Skip to content

Commit 8a2c88e

Browse files
authored
Merge pull request #182 from firstbatchxyz/erhant/steps-log
feat: add steps info to diagnostics
2 parents f7d545e + 3c14541 commit 8a2c88e

File tree

15 files changed

+151
-306
lines changed

15 files changed

+151
-306
lines changed

Cargo.lock

Lines changed: 15 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@
22
resolver = "2"
33
members = ["compute", "p2p", "workflows", "utils"]
44

5-
# FIXME: removing this breaks the workflows
6-
# compute node is the default member, until Oracle comes in
7-
# then, a Launcher will be the default member
5+
# removing this breaks the workflows
86
default-members = ["compute"]
97

108
[workspace.package]
119
edition = "2021"
12-
version = "0.3.7"
10+
version = "0.3.8"
1311
license = "Apache-2.0"
1412
readme = "README.md"
1513

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ Compute nodes can technically do any arbitrary task, from computing the square r
3939

4040
### Running a Node
4141

42-
Refer to [node guide](./docs/NODE_GUIDE.md) to quickly get started and run your own node!
42+
Use the [Dria Compute Launcher](https://github.com/firstbatchxyz/dkn-compute-launcher/) to run a compute node with many more features!
4343

4444
## Releases
4545

compute/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ rand.workspace = true
3434
env_logger.workspace = true
3535
log.workspace = true
3636
eyre.workspace = true
37+
colored = "3.0.0"
3738

3839
# encryption (ecies) & signatures (ecdsa) & hashing & bloom-filters
3940
ecies = { version = "0.2", default-features = false, features = ["pure"] }

compute/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub struct DriaComputeNodeConfig {
1818
pub secret_key: SecretKey,
1919
/// Wallet public key, derived from the secret key.
2020
pub public_key: PublicKey,
21-
/// Wallet address in hex, derived from the public key.
21+
/// Wallet address in hex without `0x` prefix, derived from the public key.
2222
pub address: String,
2323
/// Peer ID of the node.
2424
pub peer_id: PeerId,

compute/src/gossipsub/pingpong.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use colored::Colorize;
12
use dkn_p2p::libp2p::gossipsub::MessageAcceptance;
23
use dkn_utils::get_current_time_nanos;
34
use dkn_workflows::{Model, ModelProvider};
@@ -62,7 +63,7 @@ impl PingpongHandler {
6263
return Ok(MessageAcceptance::Ignore);
6364
}
6465

65-
log::info!("Received a ping for: {}", pingpong.uuid);
66+
log::info!("Received a {} for: {}", "ping".blue(), pingpong.uuid);
6667

6768
// record ping moment
6869
node.last_pinged_at = Instant::now();

compute/src/node/diagnostic.rs

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
use colored::Colorize;
12
use dkn_p2p::libp2p::multiaddr::Protocol;
23
use std::time::Duration;
34
use tokio::time::Instant;
45

5-
use crate::{refresh_dria_nodes, DriaComputeNode, DRIA_COMPUTE_NODE_VERSION};
6+
use crate::{refresh_dria_nodes, utils::get_steps, DriaComputeNode, DRIA_COMPUTE_NODE_VERSION};
67

78
/// Number of seconds such that if the last ping is older than this, the node is considered unreachable.
89
const PING_LIVENESS_SECS: u64 = 150;
@@ -21,20 +22,32 @@ impl DriaComputeNode {
2122
pub(crate) async fn handle_diagnostic_refresh(&self) {
2223
let mut diagnostics = vec![format!("Diagnostics (v{}):", DRIA_COMPUTE_NODE_VERSION)];
2324

25+
// if we have not received pings for a while, we are considered offline
26+
let is_offline =
27+
self.last_pinged_at < Instant::now() - Duration::from_secs(PING_LIVENESS_SECS);
28+
2429
// print peer counts
2530
match self.p2p.peer_counts().await {
26-
Ok((mesh, all)) => {
27-
diagnostics.push(format!("Peer Count (mesh/all): {} / {}", mesh, all))
28-
}
31+
Ok((mesh, all)) => diagnostics.push(format!(
32+
"Peer Count (mesh/all): {} / {}",
33+
if mesh == 0 {
34+
"0".red()
35+
} else {
36+
mesh.to_string().white()
37+
},
38+
all
39+
)),
2940
Err(e) => log::error!("Error getting peer counts: {:?}", e),
3041
}
3142

32-
// print tasks count
33-
let [single, batch] = self.get_pending_task_count();
34-
diagnostics.push(format!(
35-
"Pending Tasks (single/batch): {} / {}",
36-
single, batch
37-
));
43+
// print steps
44+
if let Ok(steps) = get_steps(&self.config.address).await {
45+
let earned = steps.score - self.initial_steps;
46+
diagnostics.push(format!(
47+
"Steps: {} total, {} earned in this run, within top {}%",
48+
steps.score, earned, steps.percentile
49+
));
50+
}
3851

3952
// completed tasks count is printed as well in debug
4053
if log::log_enabled!(log::Level::Debug) {
@@ -60,10 +73,20 @@ impl DriaComputeNode {
6073
.join(", ")
6174
));
6275

76+
// add network status as well
77+
diagnostics.push(format!(
78+
"Node Status: {}",
79+
if is_offline {
80+
"OFFLINE".bold().red()
81+
} else {
82+
"ONLINE".bold().green()
83+
}
84+
));
85+
6386
log::info!("{}", diagnostics.join("\n "));
6487

65-
// check liveness of the node w.r.t last ping-pong time
66-
if self.last_pinged_at < Instant::now() - Duration::from_secs(PING_LIVENESS_SECS) {
88+
// if offline, print this error message as well
89+
if is_offline {
6790
log::error!(
6891
"Node has not received any pings for at least {} seconds & it may be unreachable!\nPlease restart your node!",
6992
PING_LIVENESS_SECS
@@ -72,7 +95,7 @@ impl DriaComputeNode {
7295

7396
// added rpc nodes check, sometimes this happens when API is down / bugs for some reason
7497
if self.dria_nodes.rpc_peerids.is_empty() {
75-
log::error!("No RPC peerids were found to be available, please restart your node!",);
98+
log::error!("No RPC peer IDs were found to be available, please restart your node!",);
7699
}
77100
}
78101

compute/src/node/gossipsub.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use colored::Colorize;
12
use dkn_p2p::libp2p::gossipsub::{Message, MessageAcceptance, MessageId};
23
use dkn_p2p::libp2p::PeerId;
34
use eyre::Result;
@@ -40,7 +41,11 @@ impl DriaComputeNode {
4041
pub async fn publish(&mut self, message: DriaMessage) -> Result<()> {
4142
let message_bytes = serde_json::to_vec(&message)?;
4243
let message_id = self.p2p.publish(&message.topic, message_bytes).await?;
43-
log::info!("Published {} message ({})", message.topic, message_id);
44+
log::info!(
45+
"Published {} message ({})",
46+
message.topic.blue(),
47+
message_id
48+
);
4449
Ok(())
4550
}
4651

@@ -80,11 +85,10 @@ impl DriaComputeNode {
8085

8186
// parse the raw gossipsub message to a prepared DKN message
8287
// the received message is expected to use IdentHash for the topic, so we can see the name of the topic immediately.
83-
log::debug!("Parsing {} message.", gossipsub_message.topic.as_str());
8488
let message: DriaMessage = match serde_json::from_slice(&gossipsub_message.data) {
8589
Ok(message) => message,
8690
Err(e) => {
87-
log::error!("Error parsing message: {:?}", e);
91+
log::error!("Error parsing {} message: {:?}", gossipsub_message.topic, e);
8892
log::debug!(
8993
"Message: {}",
9094
String::from_utf8_lossy(&gossipsub_message.data)

compute/src/node/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tokio::{sync::mpsc, time::Instant};
1313
use crate::{
1414
config::*,
1515
gossipsub::*,
16-
utils::{crypto::secret_to_keypair, refresh_dria_nodes, SpecCollector},
16+
utils::{crypto::secret_to_keypair, get_steps, refresh_dria_nodes, SpecCollector},
1717
workers::task::{TaskWorker, TaskWorkerInput, TaskWorkerMetadata, TaskWorkerOutput},
1818
};
1919

@@ -56,6 +56,8 @@ pub struct DriaComputeNode {
5656
completed_tasks_batch: usize,
5757
/// Specifications collector.
5858
spec_collector: SpecCollector,
59+
/// Initial steps count.
60+
initial_steps: u64,
5961
}
6062

6163
impl DriaComputeNode {
@@ -114,6 +116,12 @@ impl DriaComputeNode {
114116
};
115117

116118
let model_names = config.workflows.get_model_names();
119+
120+
let initial_steps = get_steps(&config.address)
121+
.await
122+
.map(|s| s.score)
123+
.unwrap_or_default();
124+
117125
Ok((
118126
DriaComputeNode {
119127
config,
@@ -132,6 +140,7 @@ impl DriaComputeNode {
132140
completed_tasks_single: 0,
133141
completed_tasks_batch: 0,
134142
// others
143+
initial_steps,
135144
spec_collector: SpecCollector::new(model_names),
136145
last_pinged_at: Instant::now(),
137146
},

compute/src/node/reqres.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use colored::Colorize;
12
use dkn_p2p::libp2p::{request_response::ResponseChannel, PeerId};
23
use eyre::{eyre, Result};
34

@@ -46,7 +47,8 @@ impl DriaComputeNode {
4647
spec_request: <SpecResponder as IsResponder>::Request,
4748
) -> Result<()> {
4849
log::info!(
49-
"Got a spec request from peer {} with id {}",
50+
"Got a {} request from peer {} with id {}",
51+
"spec".green(),
5052
peer_id,
5153
spec_request.request_id
5254
);
@@ -55,7 +57,8 @@ impl DriaComputeNode {
5557
let response_data = serde_json::to_vec(&response)?;
5658

5759
log::info!(
58-
"Responding to spec request from peer {} with id {}",
60+
"Responding to {} request from peer {} with id {}",
61+
"spec".green(),
5962
peer_id,
6063
response.request_id
6164
);
@@ -75,7 +78,7 @@ impl DriaComputeNode {
7578
channel: ResponseChannel<Vec<u8>>,
7679
task_request: <TaskResponder as IsResponder>::Request,
7780
) -> Result<()> {
78-
log::info!("Received a task request from {}", peer_id);
81+
log::info!("Received a {} request from {}", "task".yellow(), peer_id);
7982

8083
let (task_input, task_metadata) =
8184
TaskResponder::prepare_worker_input(self, &task_request, channel).await?;

0 commit comments

Comments
 (0)