diff --git a/Cargo.lock b/Cargo.lock index 03a89df2..299fd394 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -820,15 +820,6 @@ dependencies = [ "syn 2.0.87", ] -[[package]] -name = "bounded_join_set" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae18fd8f4a623bcf416b5bc8f1e0905534d9911597ed17cc57ab9b6eed65454d" -dependencies = [ - "tokio", -] - [[package]] name = "brotli" version = "7.0.0" @@ -1941,7 +1932,6 @@ dependencies = [ "base64 0.22.1", "bitflags 2.8.0", "boringtun-easytier", - "bounded_join_set", "bytecodec", "byteorder", "bytes", diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index ad1a4926..4f0e11f3 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -213,8 +213,6 @@ humantime-serde = "1.1.1" multimap = "0.10.0" version-compare = "0.2.0" -bounded_join_set = "0.3.0" - jemallocator = { version = "0.5.4", optional = true } jemalloc-ctl = { version = "0.5.4", optional = true } jemalloc-sys = { version = "0.5.4", features = [ diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index e8896f8f..5e1e6c51 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -221,16 +221,18 @@ impl DirectConnectorManagerData { Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn try_connect_to_ip( self: Arc, dst_peer_id: PeerId, addr: String, ) -> Result<(), Error> { let mut rand_gen = rand::rngs::OsRng::default(); - let backoff_ms = vec![1000, 2000]; + let backoff_ms = vec![1000, 2000, 4000]; let mut backoff_idx = 0; + tracing::debug!(?dst_peer_id, ?addr, "try_connect_to_ip start"); + self.dst_listener_blacklist.cleanup(); if self @@ -244,12 +246,21 @@ impl DirectConnectorManagerData { } loop { + if self.peer_manager.has_directly_connected_conn(dst_peer_id) { + return Ok(()); + } + + tracing::debug!(?dst_peer_id, ?addr, "try_connect_to_ip start one round"); let ret = self.do_try_connect_to_ip(dst_peer_id, addr.clone()).await; tracing::debug!(?ret, ?dst_peer_id, ?addr, "try_connect_to_ip return"); if ret.is_ok() { return Ok(()); } + if self.peer_manager.has_directly_connected_conn(dst_peer_id) { + return Ok(()); + } + if backoff_idx < backoff_ms.len() { let delta = backoff_ms[backoff_idx] >> 1; assert!(delta > 0); @@ -273,37 +284,19 @@ impl DirectConnectorManagerData { } } - #[tracing::instrument] - async fn do_try_direct_connect_internal( + fn spawn_direct_connect_task( self: &Arc, dst_peer_id: PeerId, - ip_list: GetIpListResponse, - ) -> Result<(), Error> { - let enable_ipv6 = self.global_ctx.get_flags().enable_ipv6; - let available_listeners = ip_list - .listeners - .into_iter() - .map(Into::::into) - .filter_map(|l| if l.scheme() != "ring" { Some(l) } else { None }) - .filter(|l| l.port().is_some() && l.host().is_some()) - .filter(|l| enable_ipv6 || !matches!(l.host().unwrap().to_owned(), Host::Ipv6(_))) - .collect::>(); - - tracing::debug!(?available_listeners, "got available listeners"); - - if available_listeners.is_empty() { - return Err(anyhow::anyhow!("peer {} have no valid listener", dst_peer_id).into()); - } - - // if have default listener, use it first - let listener = available_listeners - .iter() - .find(|l| l.scheme() == self.global_ctx.get_flags().default_protocol) - .unwrap_or(available_listeners.get(0).unwrap()); - - let mut tasks = bounded_join_set::JoinSet::new(2); - - let listener_host = listener.socket_addrs(|| None)?.pop(); + ip_list: &GetIpListResponse, + listener: &url::Url, + tasks: &mut JoinSet>, + ) { + let Ok(mut addrs) = listener.socket_addrs(|| None) else { + tracing::error!(?listener, "failed to parse socket address from listener"); + return; + }; + let listener_host = addrs.pop(); + tracing::info!(?listener_host, ?listener, "try direct connect to peer"); match listener_host { Some(SocketAddr::V4(s_addr)) => { if s_addr.ip().is_unspecified() { @@ -386,30 +379,90 @@ impl DirectConnectorManagerData { tracing::error!(?p, ?listener, "failed to parse ip version from listener"); } } + } - while let Some(ret) = tasks.join_next().await { - match ret { - Ok(Ok(_)) => { - tracing::info!( - ?dst_peer_id, - ?listener, - "try direct connect to peer success" - ); + #[tracing::instrument(skip(self))] + async fn do_try_direct_connect_internal( + self: &Arc, + dst_peer_id: PeerId, + ip_list: GetIpListResponse, + ) -> Result<(), Error> { + let enable_ipv6 = self.global_ctx.get_flags().enable_ipv6; + let available_listeners = ip_list + .listeners + .clone() + .into_iter() + .map(Into::::into) + .filter_map(|l| if l.scheme() != "ring" { Some(l) } else { None }) + .filter(|l| l.port().is_some() && l.host().is_some()) + .filter(|l| enable_ipv6 || !matches!(l.host().unwrap().to_owned(), Host::Ipv6(_))) + .collect::>(); + + tracing::debug!(?available_listeners, "got available listeners"); + + if available_listeners.is_empty() { + return Err(anyhow::anyhow!("peer {} have no valid listener", dst_peer_id).into()); + } + + let default_protocol = self.global_ctx.get_flags().default_protocol; + // sort available listeners, default protocol has the highest priority, udp is second, others just random + // highest priority is in the last + let mut available_listeners = available_listeners; + available_listeners.sort_by_key(|l| { + let scheme = l.scheme(); + if scheme == default_protocol { + 3 + } else if scheme == "udp" { + 2 + } else { + 1 + } + }); + + while !available_listeners.is_empty() { + let mut tasks = JoinSet::new(); + let mut listener_list = vec![]; + + let cur_scheme = available_listeners.last().unwrap().scheme().to_owned(); + while let Some(listener) = available_listeners.last() { + if listener.scheme() != cur_scheme { break; } - Ok(Err(e)) => { - tracing::info!(?e, "try direct connect to peer failed"); - } - Err(e) => { - tracing::error!(?e, "try direct connect to peer task join failed"); - } + + tracing::debug!("try direct connect to peer with listener: {}", listener); + self.spawn_direct_connect_task( + dst_peer_id.clone(), + &ip_list, + &listener, + &mut tasks, + ); + + listener_list.push(listener.clone().to_string()); + available_listeners.pop(); + } + + let ret = tasks.join_all().await; + tracing::debug!( + ?ret, + ?dst_peer_id, + ?cur_scheme, + ?listener_list, + "all tasks finished for current scheme" + ); + + if self.peer_manager.has_directly_connected_conn(dst_peer_id) { + tracing::info!( + "direct connect to peer {} success, has direct conn", + dst_peer_id + ); + return Ok(()); } } Ok(()) } - #[tracing::instrument] + #[tracing::instrument(skip(self))] async fn do_try_direct_connect( self: Arc, dst_peer_id: PeerId,