Skip to content

Commit d197727

Browse files
committed
always dial mode
1 parent 28d24bc commit d197727

File tree

6 files changed

+51
-28
lines changed

6 files changed

+51
-28
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ default-members = ["compute"]
99

1010
[workspace.package]
1111
edition = "2021"
12-
version = "0.3.3"
12+
version = "0.3.4"
1313
license = "Apache-2.0"
1414
readme = "README.md"
1515

compute/src/node/core.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ impl DriaComputeNode {
1111
/// Number of seconds between refreshing for diagnostic prints.
1212
const DIAGNOSTIC_REFRESH_INTERVAL_SECS: u64 = 30;
1313
/// Number of seconds between refreshing the available nodes.
14-
const AVAILABLE_NODES_REFRESH_INTERVAL_SECS: u64 = 10 * 60; // 30 minutes
14+
const AVAILABLE_NODES_REFRESH_INTERVAL_SECS: u64 = 10 * 60;
1515

1616
// prepare durations for sleeps
1717
let mut diagnostic_refresh_interval =

compute/src/node/diagnostic.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use dkn_p2p::libp2p::multiaddr::Protocol;
12
use std::time::Duration;
23
use tokio::time::Instant;
34

@@ -86,23 +87,31 @@ impl DriaComputeNode {
8687
};
8788

8889
// dial all rpc nodes
89-
for rpc_addr in self.dria_nodes.rpc_nodes.iter() {
90-
log::info!("Dialling RPC node: {}", rpc_addr);
91-
92-
let fut = self.p2p.dial(rpc_addr.clone());
93-
match tokio::time::timeout(Duration::from_secs(10), fut).await {
94-
Err(timeout) => {
95-
log::error!("Timeout dialling RPC node: {:?}", timeout);
96-
}
97-
Ok(res) => match res {
98-
Err(e) => {
99-
log::warn!("Error dialling RPC node: {:?}", e);
90+
for addr in self.dria_nodes.rpc_nodes.iter() {
91+
log::info!("Dialling RPC node: {}", addr);
92+
93+
// get peer id from rpc address
94+
if let Some(peer_id) = addr.iter().find_map(|p| match p {
95+
Protocol::P2p(peer_id) => Some(peer_id),
96+
_ => None,
97+
}) {
98+
let fut = self.p2p.dial(peer_id, addr.clone());
99+
match tokio::time::timeout(Duration::from_secs(10), fut).await {
100+
Err(timeout) => {
101+
log::error!("Timeout dialling RPC node: {:?}", timeout);
100102
}
101-
Ok(_) => {
102-
log::info!("Successfully dialled RPC node: {}", rpc_addr);
103-
}
104-
},
105-
};
103+
Ok(res) => match res {
104+
Err(e) => {
105+
log::warn!("Error dialling RPC node: {:?}", e);
106+
}
107+
Ok(_) => {
108+
log::info!("Successfully dialled RPC node: {}", addr);
109+
}
110+
},
111+
};
112+
} else {
113+
log::warn!("Missing peerID in address: {}", addr);
114+
}
106115
}
107116

108117
log::info!("Finished refreshing!");

p2p/src/client.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use libp2p::futures::StreamExt;
33
use libp2p::gossipsub::{Message, MessageId};
44
use libp2p::kad::{GetClosestPeersError, GetClosestPeersOk, QueryResult};
55
use libp2p::request_response::{self, ResponseChannel};
6+
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
67
use libp2p::swarm::SwarmEvent;
78
use libp2p::{autonat, gossipsub, identify, kad, multiaddr::Protocol, noise, tcp, yamux};
89
use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder};
@@ -182,8 +183,16 @@ impl DriaP2PClient {
182183
/// Handles a single command, which originates from `DriaP2PCommander`.
183184
pub async fn handle_command(&mut self, command: DriaP2PCommand) {
184185
match command {
185-
DriaP2PCommand::Dial { peer_id, sender } => {
186-
let _ = sender.send(self.swarm.dial(peer_id));
186+
DriaP2PCommand::Dial {
187+
peer_id,
188+
address,
189+
sender,
190+
} => {
191+
let opts = DialOpts::peer_id(peer_id)
192+
.addresses(vec![address])
193+
.condition(PeerCondition::Always)
194+
.build();
195+
let _ = sender.send(self.swarm.dial(opts));
187196
}
188197
DriaP2PCommand::NetworkInfo { sender } => {
189198
let _ = sender.send(self.swarm.network_info());

p2p/src/commands.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ pub enum DriaP2PCommand {
2222
PeerCounts {
2323
sender: oneshot::Sender<(usize, usize)>,
2424
},
25-
/// Dial a peer.
25+
/// Dial a known peer.
2626
Dial {
27-
peer_id: Multiaddr,
27+
peer_id: PeerId,
28+
address: Multiaddr,
2829
sender: oneshot::Sender<Result<(), swarm::DialError>>,
2930
},
3031
/// Subscribe to a topic.
@@ -206,11 +207,15 @@ impl DriaP2PCommander {
206207
}
207208

208209
/// Dials a given peer.
209-
pub async fn dial(&mut self, peer_id: Multiaddr) -> Result<()> {
210+
pub async fn dial(&mut self, peer_id: PeerId, address: Multiaddr) -> Result<()> {
210211
let (sender, receiver) = oneshot::channel();
211212

212213
self.sender
213-
.send(DriaP2PCommand::Dial { peer_id, sender })
214+
.send(DriaP2PCommand::Dial {
215+
peer_id,
216+
address,
217+
sender,
218+
})
214219
.await
215220
.wrap_err("could not send")?;
216221

0 commit comments

Comments
 (0)