From 08546925cceb86e8fd0550f7621748d88fdbd64e Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Mon, 27 Jan 2025 15:17:47 +0800 Subject: [PATCH] fix tests (#588) fix proxy_three_node_disconnect_test and hole_punching_symmetric_only_random --- .../connector/udp_hole_punch/sym_to_cone.rs | 1 + easytier/src/gateway/tcp_proxy.rs | 10 +- easytier/src/peers/peer_conn_ping.rs | 140 ++++++++++-------- easytier/src/tests/mod.rs | 3 +- easytier/src/tests/three_node.rs | 10 +- easytier/src/tunnel/stats.rs | 12 +- 6 files changed, 110 insertions(+), 66 deletions(-) diff --git a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs index dbba9e42..a913a586 100644 --- a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs +++ b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs @@ -529,6 +529,7 @@ pub mod tests { }; #[tokio::test] + #[serial_test::serial] #[serial_test::serial(hole_punch)] async fn hole_punching_symmetric_only_random() { RUN_TESTING.store(true, std::sync::atomic::Ordering::Relaxed); diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index 90ce931a..a95fc894 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -549,7 +549,13 @@ impl TcpProxy { let connector = self.connector.clone(); let accept_task = async move { let conn_map = conn_map.clone(); - while let Ok((tcp_stream, mut socket_addr)) = tcp_listener.accept().await { + loop { + let accept_ret = tcp_listener.accept().await; + let Ok((tcp_stream, mut socket_addr)) = accept_ret else { + tracing::error!("nat tcp listener accept failed: {:?}", accept_ret.err()); + continue; + }; + let my_ip = global_ctx .get_ipv4() .as_ref() @@ -594,8 +600,6 @@ impl TcpProxy { entry_clone, )); } - tracing::error!("nat tcp listener exited"); - panic!("nat tcp listener exited"); }; self.tasks .lock() diff --git a/easytier/src/peers/peer_conn_ping.rs b/easytier/src/peers/peer_conn_ping.rs index 47ed2496..c0c2eadb 100644 --- a/easytier/src/peers/peer_conn_ping.rs +++ b/easytier/src/peers/peer_conn_ping.rs @@ -1,4 +1,5 @@ use std::{ + fmt::Debug, sync::{ atomic::{AtomicU32, Ordering}, Arc, @@ -12,6 +13,7 @@ use tokio::{ task::JoinSet, time::{timeout, Interval}, }; +use tracing::Instrument; use crate::{ common::{error::Error, PeerId}, @@ -25,7 +27,7 @@ use crate::{ struct PingIntervalController { throughput: Arc, - loss_rate_20: Arc, + loss_counter: Arc, interval: Interval, @@ -38,13 +40,27 @@ struct PingIntervalController { last_throughput: Throughput, } +impl std::fmt::Debug for PingIntervalController { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PingIntervalController") + .field("throughput", &self.throughput) + .field("loss_counter", &self.loss_counter) + .field("logic_time", &self.logic_time) + .field("last_send_logic_time", &self.last_send_logic_time) + .field("backoff_idx", &self.backoff_idx) + .field("max_backoff_idx", &self.max_backoff_idx) + .field("last_throughput", &self.last_throughput) + .finish() + } +} + impl PingIntervalController { - fn new(throughput: Arc, loss_rate_20: Arc) -> Self { + fn new(throughput: Arc, loss_counter: Arc) -> Self { let last_throughput = *throughput; Self { throughput, - loss_rate_20, + loss_counter, interval: tokio::time::interval(Duration::from_secs(1)), logic_time: 0, last_send_logic_time: 0, @@ -69,13 +85,12 @@ impl PingIntervalController { self.throughput.rx_packets() > self.last_throughput.rx_packets() } + #[tracing::instrument] fn should_send_ping(&mut self) -> bool { - if self.loss_rate_20.get_latency_us::() > 0.0 { + tracing::trace!(?self, "check should_send_ping"); + if self.loss_counter.load(Ordering::Relaxed) > 0 { self.backoff_idx = 0; - } else if self.tx_increase() - && !self.rx_increase() - && self.logic_time - self.last_send_logic_time > 2 - { + } else if self.tx_increase() && !self.rx_increase() { // if tx increase but rx not increase, we should do pingpong more frequently self.backoff_idx = 0; } @@ -210,8 +225,8 @@ impl PeerConnPinger { // one with 1% precision let loss_rate_stats_1 = WindowLatency::new(100); - // one with 20% precision, so we can fast fail this conn. - let loss_rate_stats_20 = Arc::new(WindowLatency::new(5)); + // disconnect the connection if lost 5 pingpong consecutively + let loss_counter = Arc::new(AtomicU32::new(0)); let stopped = Arc::new(AtomicU32::new(0)); @@ -220,72 +235,73 @@ impl PeerConnPinger { let ctrl_resp_sender = self.ctrl_sender.clone(); let stopped_clone = stopped.clone(); let mut controller = - PingIntervalController::new(self.throughput_stats.clone(), loss_rate_stats_20.clone()); - self.tasks.spawn(async move { - let mut req_seq = 0; - loop { - controller.tick().await; + PingIntervalController::new(self.throughput_stats.clone(), loss_counter.clone()); + self.tasks.spawn( + async move { + let mut req_seq = 0; + loop { + controller.tick().await; - if stopped_clone.load(Ordering::Relaxed) != 0 { - return Ok(()); + if stopped_clone.load(Ordering::Relaxed) != 0 { + return Ok(()); + } + + while pingpong_tasks.len() > 5 { + pingpong_tasks.join_next().await; + } + + if !controller.should_send_ping() { + continue; + } + + let mut sink = sink.clone(); + let receiver = ctrl_resp_sender.subscribe(); + let ping_res_sender = ping_res_sender.clone(); + pingpong_tasks.spawn(async move { + let mut receiver = receiver.resubscribe(); + let pingpong_once_ret = Self::do_pingpong_once( + my_node_id, + peer_id, + &mut sink, + &mut receiver, + req_seq, + ) + .await; + + if let Err(e) = ping_res_sender.send(pingpong_once_ret).await { + tracing::info!(?e, "pingpong task send result error, exit.."); + }; + }); + + req_seq = req_seq.wrapping_add(1); } - - while pingpong_tasks.len() > 5 { - pingpong_tasks.join_next().await; - } - - if !controller.should_send_ping() { - continue; - } - - let mut sink = sink.clone(); - let receiver = ctrl_resp_sender.subscribe(); - let ping_res_sender = ping_res_sender.clone(); - pingpong_tasks.spawn(async move { - let mut receiver = receiver.resubscribe(); - let pingpong_once_ret = Self::do_pingpong_once( - my_node_id, - peer_id, - &mut sink, - &mut receiver, - req_seq, - ) - .await; - - if let Err(e) = ping_res_sender.send(pingpong_once_ret).await { - tracing::info!(?e, "pingpong task send result error, exit.."); - }; - }); - - req_seq = req_seq.wrapping_add(1); } - }); + .instrument(tracing::info_span!( + "pingpong_controller", + ?my_node_id, + ?peer_id + )), + ); - let mut counter: u64 = 0; let throughput = self.throughput_stats.clone(); let mut last_rx_packets = throughput.rx_packets(); while let Some(ret) = ping_res_receiver.recv().await { - counter += 1; - if let Ok(lat) = ret { latency_stats.record_latency(lat as u32); loss_rate_stats_1.record_latency(0); - loss_rate_stats_20.record_latency(0); } else { loss_rate_stats_1.record_latency(1); - loss_rate_stats_20.record_latency(1); + loss_counter.fetch_add(1, Ordering::Relaxed); } - let loss_rate_20: f64 = loss_rate_stats_20.get_latency_us(); let loss_rate_1: f64 = loss_rate_stats_1.get_latency_us(); tracing::trace!( ?ret, ?self, ?loss_rate_1, - ?loss_rate_20, "pingpong task recv pingpong_once result" ); @@ -293,23 +309,27 @@ impl PeerConnPinger { if last_rx_packets != current_rx_packets { // if we receive some packet from peers, reset the counter to avoid conn close. // conn will close only if we have 5 continous round pingpong loss after no packet received. - counter = 0; + loss_counter.store(0, Ordering::Relaxed); } tracing::debug!( - "counter: {}, loss_rate_1: {}, loss_rate_20: {}, cur_rx_packets: {}, last_rx: {}, node_id: {}", - counter, loss_rate_1, loss_rate_20, current_rx_packets, last_rx_packets, my_node_id + "loss_counter: {:?}, loss_rate_1: {}, cur_rx_packets: {}, last_rx: {}, node_id: {}", + loss_counter, + loss_rate_1, + current_rx_packets, + last_rx_packets, + my_node_id ); - if (counter > 5 && loss_rate_20 > 0.74) || (counter > 100 && loss_rate_1 > 0.35) { + if loss_counter.load(Ordering::Relaxed) >= 5 { tracing::warn!( ?ret, ?self, ?loss_rate_1, - ?loss_rate_20, + ?loss_counter, ?last_rx_packets, ?current_rx_packets, - "pingpong loss rate too high, closing" + "pingpong loss too much pingpong packet and no other ingress packets, closing the connection", ); break; } diff --git a/easytier/src/tests/mod.rs b/easytier/src/tests/mod.rs index c51d61d1..271dfed7 100644 --- a/easytier/src/tests/mod.rs +++ b/easytier/src/tests/mod.rs @@ -167,7 +167,7 @@ async fn wait_proxy_route_appear( } fn set_link_status(net_ns: &str, up: bool) { - let _ = std::process::Command::new("ip") + let ret = std::process::Command::new("ip") .args([ "netns", "exec", @@ -180,4 +180,5 @@ fn set_link_status(net_ns: &str, up: bool) { ]) .output() .unwrap(); + tracing::info!("set link status: {:?}, net_ns: {}, up: {}", ret, net_ns, up); } diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index 2e8bdeeb..03994ab8 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -490,6 +490,11 @@ pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str } inst4.run().await.unwrap(); + tracing::info!("inst1 peer id: {:?}", insts[0].peer_id()); + tracing::info!("inst2 peer id: {:?}", insts[1].peer_id()); + tracing::info!("inst3 peer id: {:?}", insts[2].peer_id()); + tracing::info!("inst4 peer id: {:?}", inst4.peer_id()); + let task = tokio::spawn(async move { for _ in 1..=2 { tokio::time::sleep(tokio::time::Duration::from_secs(8)).await; @@ -521,7 +526,10 @@ pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str .find(|r| r.peer_id == inst4.peer_id()) .is_none() }, - Duration::from_secs(15), + // 0 down + // [1, 6) send ping + // [3, 8) ping fail and close connection + Duration::from_millis(8300), ) .await; set_link_status("net_d", true); diff --git a/easytier/src/tunnel/stats.rs b/easytier/src/tunnel/stats.rs index 7639d038..1446def3 100644 --- a/easytier/src/tunnel/stats.rs +++ b/easytier/src/tunnel/stats.rs @@ -9,6 +9,16 @@ pub struct WindowLatency { count: AtomicU32, } +impl std::fmt::Debug for WindowLatency { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WindowLatency") + .field("count", &self.count) + .field("window_size", &self.latency_us_window_size) + .field("window_latency", &self.get_latency_us::()) + .finish() + } +} + impl WindowLatency { pub fn new(window_size: u32) -> Self { Self { @@ -48,7 +58,7 @@ impl WindowLatency { } } -#[derive(Default, Copy, Clone)] +#[derive(Default, Copy, Clone, Debug)] pub struct Throughput { tx_bytes: u64, rx_bytes: u64,