diff --git a/easytier-core/Cargo.toml b/easytier-core/Cargo.toml index 601cadc3..5408d009 100644 --- a/easytier-core/Cargo.toml +++ b/easytier-core/Cargo.toml @@ -13,7 +13,7 @@ path = "src/rpc/lib.rs" [dependencies] tracing = { version = "0.1", features = ["log"] } -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "time"] } tracing-appender = "0.2.3" log = "0.4" thiserror = "1.0" diff --git a/easytier-core/src/main.rs b/easytier-core/src/main.rs index 0bde9c98..11329529 100644 --- a/easytier-core/src/main.rs +++ b/easytier-core/src/main.rs @@ -50,6 +50,7 @@ fn init_logger() { file_layer.set_ansi(false); let file_layer = file_layer .with_writer(file_appender) + .with_timer(tracing_subscriber::fmt::time::OffsetTime::local_rfc_3339().unwrap()) .with_filter(file_filter); // logger to console @@ -59,6 +60,7 @@ fn init_logger() { .unwrap(); let console_layer = tracing_subscriber::fmt::layer() .pretty() + .with_timer(tracing_subscriber::fmt::time::OffsetTime::local_rfc_3339().unwrap()) .with_writer(std::io::stderr) .with_filter(console_filter); diff --git a/easytier-core/src/peers/peer.rs b/easytier-core/src/peers/peer.rs index e56a1b7a..1edbd2ba 100644 --- a/easytier-core/src/peers/peer.rs +++ b/easytier-core/src/peers/peer.rs @@ -100,6 +100,7 @@ impl Peer { pub async fn add_peer_conn(&self, mut conn: PeerConn) { conn.set_close_event_sender(self.close_event_sender.clone()); conn.start_recv_loop(self.packet_recv_chan.clone()); + conn.start_pingpong(); self.global_ctx .issue_event(GlobalCtxEvent::PeerConnAdded(conn.get_conn_info())); self.conns diff --git a/easytier-core/src/peers/peer_conn.rs b/easytier-core/src/peers/peer_conn.rs index ef981db8..cfbab4eb 100644 --- a/easytier-core/src/peers/peer_conn.rs +++ b/easytier-core/src/peers/peer_conn.rs @@ -211,9 +211,12 @@ impl PeerConnPinger { let (ping_res_sender, mut ping_res_receiver) = tokio::sync::mpsc::channel(100); + let stopped = Arc::new(AtomicU32::new(0)); + // generate a pingpong task every 200ms let mut pingpong_tasks = JoinSet::new(); let ctrl_resp_sender = self.ctrl_sender.clone(); + let stopped_clone = stopped.clone(); self.tasks.spawn(async move { let mut req_seq = 0; loop { @@ -221,6 +224,10 @@ impl PeerConnPinger { let ping_res_sender = ping_res_sender.clone(); let sink = sink.clone(); + if stopped_clone.load(Ordering::Relaxed) != 0 { + return Ok(()); + } + while pingpong_tasks.len() > 5 { pingpong_tasks.join_next().await; } @@ -236,7 +243,9 @@ impl PeerConnPinger { ) .await; - let _ = ping_res_sender.send(pingpong_once_ret).await; + if let Err(e) = ping_res_sender.send(pingpong_once_ret).await { + tracing::info!(?e, "pingpong task send result error, exit.."); + }; }); req_seq += 1; @@ -267,7 +276,7 @@ impl PeerConnPinger { let loss_rate_20: f64 = loss_rate_stats_20.get_latency_us(); let loss_rate_1: f64 = loss_rate_stats_1.get_latency_us(); - tracing::warn!( + tracing::trace!( ?ret, ?self, ?loss_rate_1, @@ -276,19 +285,22 @@ impl PeerConnPinger { ); if (counter > 5 && loss_rate_20 > 0.74) || (counter > 150 && loss_rate_1 > 0.20) { - log::warn!( - "pingpong loss rate too high, my_node_id: {}, peer_id: {}, loss_rate_20: {}, loss_rate_1: {}", - my_node_id, - peer_id, - loss_rate_20, - loss_rate_1, - ); + tracing::warn!( + ?ret, + ?self, + ?loss_rate_1, + ?loss_rate_20, + "pingpong loss rate too high, closing" + ); break; } self.loss_rate_stats .store((loss_rate_20 * 100.0) as u32, Ordering::Relaxed); } + + stopped.store(1, Ordering::Relaxed); + ping_res_receiver.close(); } } @@ -430,7 +442,7 @@ impl PeerConn { } } - fn start_pingpong(&mut self) { + pub fn start_pingpong(&mut self) { let mut pingpong = PeerConnPinger::new( self.my_node_id, self.get_peer_id(),