diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 757be3b2..05d0ef0c 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -13,7 +13,10 @@ use libp2p::{ tcp, yamux, }; use multiaddr::Protocol; -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, warn}; @@ -21,6 +24,7 @@ use tracing::{debug, warn}; pub use libp2p::{Multiaddr, StreamProtocol}; const DEFAULT_MAX_PEER_COUNT: u32 = 50; +const DEFAULT_PEER_RETRY_INTERVAL: Duration = Duration::from_secs(60); /// A message that can be sent between peers. pub trait Message: @@ -119,6 +123,10 @@ impl Node { .wrap_err("swarm failed to listen on multiaddr")?; } + let mut known_peers_info = Vec::new(); + let mut retry_interval = tokio::time::interval(DEFAULT_PEER_RETRY_INTERVAL); + retry_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + for mut address in known_peers { let peer_id = match address.pop() { Some(multiaddr::Protocol::P2p(peer_id)) => peer_id, @@ -128,8 +136,9 @@ impl Node { }; swarm.add_peer_address(peer_id, address.clone()); swarm - .dial(address) + .dial(address.clone()) .wrap_err("swarm failed to dial known peer")?; + known_peers_info.push((peer_id, address)); } let handles = incoming_streams_handlers @@ -145,6 +154,19 @@ impl Node { handles.into_iter().for_each(|h| h.abort()); break Ok(()); } + _ = retry_interval.tick() => { + // Check for disconnected known peers and retry connection + let connected_peers: HashSet = swarm.connected_peers().copied().collect(); + for (peer_id, address) in &known_peers_info { + if !connected_peers.contains(peer_id) { + debug!("retrying connection to disconnected known peer {peer_id} at {address}"); + swarm.add_peer_address(*peer_id, address.clone()); + if let Err(e) = swarm.dial(address.clone()) { + warn!("failed to retry dial to known peer {peer_id} at {address}: {e:?}"); + } + } + } + } Some(message) = outgoing_message_rx.recv() => { let protocol = message.protocol(); debug!("received message to broadcast on protocol {protocol}");