Skip to content

Commit c4a6960

Browse files
committed
added "connecting" log and pings per sec
1 parent 778d169 commit c4a6960

File tree

9 files changed

+54
-27
lines changed

9 files changed

+54
-27
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ default-members = ["compute"]
77

88
[workspace.package]
99
edition = "2021"
10-
version = "0.3.8"
10+
version = "0.3.9"
1111
license = "Apache-2.0"
1212
readme = "README.md"
1313

Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ debug:
1414
build:
1515
cargo build --workspace
1616

17+
.PHONY: trace # | Run with TRACE logs
18+
trace:
19+
RUST_LOG=warn,dkn_compute=trace,libp2p=debug \
20+
cargo run --bin dkn-compute
21+
1722
.PHONY: profile-cpu # | Profile CPU usage with flamegraph
1823
profile-cpu:
1924
DKN_EXIT_TIMEOUT=120 cargo flamegraph --root --profile=profiling --bin dkn-compute

compute/src/gossipsub/pingpong.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@ impl PingpongHandler {
6565

6666
log::info!("Received a {} for: {}", "ping".blue(), pingpong.uuid);
6767

68-
// record ping moment
68+
// record ping
6969
node.last_pinged_at = Instant::now();
70+
node.num_pings += 1;
7071

7172
// respond
7273
let response_body = PingpongResponse {

compute/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use workers::task::TaskWorker;
77

88
#[tokio::main]
99
async fn main() -> Result<()> {
10-
// load a particular env, or `.env` by default
10+
// load a particular environment file specified by DKN_COMPUTE_ENV, or `.env` by default
1111
let env_path = env::var("DKN_COMPUTE_ENV").unwrap_or_else(|_| ".env".to_string());
1212
let dotenv_result = dotenvy::from_path(&env_path);
1313

compute/src/node/diagnostic.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@ impl DriaComputeNode {
2222
pub(crate) async fn handle_diagnostic_refresh(&self) {
2323
let mut diagnostics = vec![format!("Diagnostics (v{}):", DRIA_COMPUTE_NODE_VERSION)];
2424

25-
// if we have not received pings for a while, we are considered offline
26-
let is_offline =
27-
self.last_pinged_at < Instant::now() - Duration::from_secs(PING_LIVENESS_SECS);
28-
2925
// print peer counts
3026
match self.p2p.peer_counts().await {
3127
Ok((mesh, all)) => diagnostics.push(format!(
@@ -74,14 +70,27 @@ impl DriaComputeNode {
7470
));
7571

7672
// add network status as well
77-
diagnostics.push(format!(
78-
"Node Status: {}",
79-
if is_offline {
80-
"OFFLINE".bold().red()
81-
} else {
82-
"ONLINE".bold().green()
83-
}
84-
));
73+
// if we have not received pings for a while, we are considered offline
74+
let is_offline = Instant::now().duration_since(self.last_pinged_at)
75+
> Duration::from_secs(PING_LIVENESS_SECS);
76+
if self.num_pings == 0 {
77+
// if we didnt have any pings, we might still be connecting
78+
diagnostics.push(format!("Node Status: {}", "CONNECTING".yellow()));
79+
} else {
80+
diagnostics.push(format!(
81+
"Node Status: {}",
82+
if is_offline {
83+
"OFFLINE".red()
84+
} else {
85+
"ONLINE".green()
86+
}
87+
));
88+
}
89+
90+
// add pings per second
91+
let elapsed = Instant::now().duration_since(self.started_at).as_secs_f64();
92+
let pings_per_second = self.num_pings as f64 / elapsed; // elapsed is always > 0
93+
diagnostics.push(format!("Pings/sec: {:.3}", pings_per_second));
8594

8695
log::info!("{}", diagnostics.join("\n "));
8796

compute/src/node/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ pub struct DriaComputeNode {
3333
pub p2p: DriaP2PCommander,
3434
/// The last time the node was pinged by the network.
3535
/// If this is too much, we can say that the node is not reachable by RPC.
36-
pub last_pinged_at: Instant,
36+
pub(crate) last_pinged_at: Instant,
37+
/// Number of pings received.
38+
pub(crate) num_pings: u64,
39+
/// The time the node was started.
40+
pub(crate) started_at: Instant,
3741
/// Gossipsub message receiver, used by peer-to-peer client in a separate thread.
3842
///
3943
/// It will publish messages sent to this channel to the network.
@@ -143,6 +147,8 @@ impl DriaComputeNode {
143147
initial_steps,
144148
spec_collector: SpecCollector::new(model_names),
145149
last_pinged_at: Instant::now(),
150+
num_pings: 0,
151+
started_at: Instant::now(),
146152
},
147153
p2p_client,
148154
task_batch_worker,

p2p/src/behaviour.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ fn create_gossipsub_behaviour(author: PeerId) -> Result<gossipsub::Behaviour> {
183183
MessageId::from(digest.to_be_bytes())
184184
};
185185

186+
// TODO: make `flood_publish false?`
186187
Behaviour::new(
187188
MessageAuthenticity::Author(author),
188189
ConfigBuilder::default()

p2p/src/client.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ impl DriaP2PClient {
4848
///
4949
/// The `version` is used to create the protocol strings for the client, and its very important that
5050
/// they match with the clients existing within the network.
51+
///
52+
/// If for any reason the given `listen_addr` is not available, it will try to listen on a random port on `localhost`.
5153
#[allow(clippy::type_complexity)]
5254
pub fn new(
5355
keypair: Keypair,
@@ -62,7 +64,6 @@ impl DriaP2PClient {
6264
)> {
6365
// this is our peerId
6466
let peer_id = keypair.public().to_peer_id();
65-
log::info!("Compute node peer address: {}", peer_id);
6667

6768
let mut swarm = SwarmBuilder::with_existing_identity(keypair)
6869
.with_tokio()
@@ -123,19 +124,23 @@ impl DriaP2PClient {
123124

124125
// listen on all interfaces for incoming connections
125126
log::info!("Listening p2p network on: {}", listen_addr);
126-
swarm.listen_on(listen_addr)?;
127+
if let Err(e) = swarm.listen_on(listen_addr) {
128+
log::error!("Could not listen on address: {:?}", e);
129+
log::warn!("Trying fallback address with localhost random port");
130+
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())?;
131+
}
127132

128133
// listen on relay addresses with p2p circuit
129-
for addr in &nodes.relay_nodes {
134+
for addr in nodes.relay_nodes.iter().cloned() {
130135
log::info!("Listening to relay: {}", addr);
131-
swarm.listen_on(addr.clone().with(Protocol::P2pCircuit))?;
136+
swarm.listen_on(addr.with(Protocol::P2pCircuit))?;
132137
}
133138

134139
// dial rpc nodes
135-
for rpc_addr in &nodes.rpc_nodes {
140+
for rpc_addr in nodes.rpc_nodes.iter().cloned() {
136141
log::info!("Dialing RPC node: {}", rpc_addr);
137-
if let Err(e) = swarm.dial(rpc_addr.clone()) {
138-
log::error!("Error dialing RPC node: {:?}", e);
142+
if let Err(e) = swarm.dial(rpc_addr) {
143+
log::error!("Could not dial RPC node: {:?}", e);
139144
};
140145
}
141146

0 commit comments

Comments
 (0)