From ba3da97ad40cf45042598d612d1cb9baa7349357 Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Mon, 30 Sep 2024 15:56:24 +0800 Subject: [PATCH] fix ipv6 direct connector not work --- easytier/locales/app.yml | 5 +- easytier/src/common/config.rs | 6 +- easytier/src/connector/direct.rs | 125 +++++++++++++++++++++-------- easytier/src/easytier-core.rs | 12 +++ easytier/src/instance/listeners.rs | 3 +- 5 files changed, 113 insertions(+), 38 deletions(-) diff --git a/easytier/locales/app.yml b/easytier/locales/app.yml index 20cb6ab9..73cde18b 100644 --- a/easytier/locales/app.yml +++ b/easytier/locales/app.yml @@ -116,4 +116,7 @@ core_clap: zh-CN: "转发所有对等节点的RPC数据包,即使对等节点不在转发网络白名单中。这可以帮助白名单外网络中的对等节点建立P2P连接。" socks5: en: "enable socks5 server, allow socks5 client to access virtual network. format: , e.g.: 1080" - zh-CN: "启用 socks5 服务器,允许 socks5 客户端访问虚拟网络. 格式: <端口>,例如:1080" \ No newline at end of file + zh-CN: "启用 socks5 服务器,允许 socks5 客户端访问虚拟网络. 格式: <端口>,例如:1080" + ipv6_listener: + en: "the url of the ipv6 listener, e.g.: tcp://[::]:11010, if not set, will listen on random udp port" + zh-CN: "IPv6 监听器的URL,例如:tcp://[::]:11010,如果未设置,将在随机UDP端口上监听" \ No newline at end of file diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index 709c6efc..48879047 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -180,6 +180,8 @@ pub struct Flags { pub relay_all_peer_rpc: bool, #[derivative(Default(value = "false"))] pub disable_udp_hole_punching: bool, + #[derivative(Default(value = "\"udp://[::]:0\".to_string()"))] + pub ipv6_listener: String, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] @@ -260,8 +262,6 @@ impl TomlConfigLoader { serde_json::from_str::>(&default_flags_json) .unwrap(); - tracing::debug!("default_flags_hashmap: {:?}", default_flags_hashmap); - let mut merged_hashmap = serde_json::Map::new(); for (key, value) in default_flags_hashmap { if let Some(v) = flags_hashmap.remove(&key) { @@ -271,8 +271,6 @@ impl TomlConfigLoader { } } - tracing::debug!("merged_hashmap: {:?}", merged_hashmap); - serde_json::from_value(serde_json::Value::Object(merged_hashmap)).unwrap() } } diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index bf7b71b0..cda5e582 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -1,6 +1,6 @@ // try connect peers directly, with either its public ip or lan ip -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use crate::{ common::{error::Error, global_ctx::ArcGlobalCtx, PeerId}, @@ -19,6 +19,7 @@ use crate::{ use crate::proto::cli::PeerConnInfo; use anyhow::Context; +use rand::Rng; use tokio::{task::JoinSet, time::timeout}; use tracing::Instrument; use url::Host; @@ -64,13 +65,13 @@ impl PeerManagerForDirectConnector for PeerManager { struct DstBlackListItem(PeerId, String); #[derive(Hash, Eq, PartialEq, Clone)] -struct DstSchemeBlackListItem(PeerId, String); +struct DstListenerUrlBlackListItem(PeerId, url::Url); struct DirectConnectorManagerData { global_ctx: ArcGlobalCtx, peer_manager: Arc, dst_blacklist: timedmap::TimedMap, - dst_sceme_blacklist: timedmap::TimedMap, + dst_listener_blacklist: timedmap::TimedMap, } impl DirectConnectorManagerData { @@ -79,7 +80,7 @@ impl DirectConnectorManagerData { global_ctx, peer_manager, dst_blacklist: timedmap::TimedMap::new(), - dst_sceme_blacklist: timedmap::TimedMap::new(), + dst_listener_blacklist: timedmap::TimedMap::new(), } } } @@ -147,7 +148,7 @@ impl DirectConnectorManager { } while let Some(task_ret) = tasks.join_next().await { - tracing::trace!(?task_ret, "direct connect task ret"); + tracing::debug!(?task_ret, ?my_peer_id, "direct connect task ret"); } tokio::time::sleep(std::time::Duration::from_secs(5)).await; } @@ -168,7 +169,7 @@ impl DirectConnectorManager { .dst_blacklist .contains(&DstBlackListItem(dst_peer_id.clone(), addr.clone())) { - tracing::trace!("try_connect_to_ip failed, addr in blacklist: {}", addr); + tracing::debug!("try_connect_to_ip failed, addr in blacklist: {}", addr); return Err(Error::UrlInBlacklist); } @@ -203,24 +204,38 @@ impl DirectConnectorManager { dst_peer_id: PeerId, addr: String, ) -> Result<(), Error> { - let ret = Self::do_try_connect_to_ip(data.clone(), dst_peer_id, addr.clone()).await; - if let Err(e) = ret { - if !matches!(e, Error::UrlInBlacklist) { - tracing::info!( - "try_connect_to_ip failed: {:?}, peer_id: {}", - e, - dst_peer_id - ); + let mut rand_gen = rand::rngs::OsRng::default(); + let backoff_ms = vec![1000, 2000, 4000]; + let mut backoff_idx = 0; + + loop { + let ret = Self::do_try_connect_to_ip(data.clone(), dst_peer_id, addr.clone()).await; + tracing::debug!(?ret, ?dst_peer_id, ?addr, "try_connect_to_ip return"); + if matches!(ret, Err(Error::UrlInBlacklist) | Ok(_)) { + return ret; + } + + if backoff_idx < backoff_ms.len() { + let delta = backoff_ms[backoff_idx] >> 1; + assert!(delta > 0); + assert!(delta < backoff_ms[backoff_idx]); + + tokio::time::sleep(Duration::from_millis( + (backoff_ms[backoff_idx] + rand_gen.gen_range(-delta..delta)) as u64, + )) + .await; + + backoff_idx += 1; + continue; + } else { data.dst_blacklist.insert( DstBlackListItem(dst_peer_id.clone(), addr.clone()), (), std::time::Duration::from_secs(DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC), ); + + return ret; } - return Err(e); - } else { - tracing::info!("try_connect_to_ip success, peer_id: {}", dst_peer_id); - return Ok(()); } } @@ -230,6 +245,8 @@ impl DirectConnectorManager { dst_peer_id: PeerId, ip_list: GetIpListResponse, ) -> Result<(), Error> { + data.dst_listener_blacklist.cleanup(); + let enable_ipv6 = data.global_ctx.get_flags().enable_ipv6; let available_listeners = ip_list .listeners @@ -238,14 +255,15 @@ impl DirectConnectorManager { .filter_map(|l| if l.scheme() != "ring" { Some(l) } else { None }) .filter(|l| l.port().is_some() && l.host().is_some()) .filter(|l| { - !data.dst_sceme_blacklist.contains(&DstSchemeBlackListItem( - dst_peer_id.clone(), - l.scheme().to_string(), - )) + !data + .dst_listener_blacklist + .contains(&DstListenerUrlBlackListItem(dst_peer_id.clone(), l.clone())) }) .filter(|l| enable_ipv6 || !matches!(l.host().unwrap().to_owned(), Host::Ipv6(_))) .collect::>(); + tracing::debug!(?available_listeners, "got available listeners"); + let mut listener = available_listeners.get(0).ok_or(anyhow::anyhow!( "peer {} have no valid listener", dst_peer_id @@ -270,6 +288,13 @@ impl DirectConnectorManager { dst_peer_id.clone(), addr.to_string(), )); + } else { + tracing::error!( + ?ip, + ?listener, + ?dst_peer_id, + "failed to set host for interface ipv4" + ); } }); @@ -284,6 +309,13 @@ impl DirectConnectorManager { dst_peer_id.clone(), addr.to_string(), )); + } else { + tracing::error!( + ?public_ipv4, + ?listener, + ?dst_peer_id, + "failed to set host for public ipv4" + ); } } } @@ -299,6 +331,13 @@ impl DirectConnectorManager { dst_peer_id.clone(), addr.to_string(), )); + } else { + tracing::error!( + ?ip, + ?listener, + ?dst_peer_id, + "failed to set host for interface ipv6" + ); } }); @@ -313,6 +352,13 @@ impl DirectConnectorManager { dst_peer_id.clone(), addr.to_string(), )); + } else { + tracing::error!( + ?public_ipv6, + ?listener, + ?dst_peer_id, + "failed to set host for public ipv6" + ); } } } @@ -323,16 +369,28 @@ impl DirectConnectorManager { let mut has_succ = false; while let Some(ret) = tasks.join_next().await { - if let Err(e) = ret { - tracing::error!("join direct connect task failed: {:?}", e); - } else if let Ok(Ok(_)) = ret { - has_succ = true; + match ret { + Ok(Ok(_)) => { + has_succ = true; + tracing::info!( + ?dst_peer_id, + ?listener, + "try direct connect to peer success" + ); + break; + } + Ok(Err(e)) => { + tracing::info!(?e, "try direct connect to peer failed"); + } + Err(e) => { + tracing::error!(?e, "try direct connect to peer task join failed"); + } } } if !has_succ { - data.dst_sceme_blacklist.insert( - DstSchemeBlackListItem(dst_peer_id.clone(), listener.scheme().to_string()), + data.dst_listener_blacklist.insert( + DstListenerUrlBlackListItem(dst_peer_id.clone(), listener.clone()), (), std::time::Duration::from_secs(DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC), ); @@ -355,7 +413,7 @@ impl DirectConnectorManager { } } - tracing::trace!("try direct connect to peer: {}", dst_peer_id); + tracing::debug!("try direct connect to peer: {}", dst_peer_id); let rpc_stub = peer_manager .get_peer_rpc_mgr() @@ -384,7 +442,7 @@ mod tests { use crate::{ connector::direct::{ DirectConnectorManager, DirectConnectorManagerData, DstBlackListItem, - DstSchemeBlackListItem, + DstListenerUrlBlackListItem, }, instance::listeners::ListenerManager, peers::tests::{ @@ -461,8 +519,11 @@ mod tests { .unwrap(); assert!(data - .dst_sceme_blacklist - .contains(&DstSchemeBlackListItem(1, "tcp".into()))); + .dst_listener_blacklist + .contains(&DstListenerUrlBlackListItem( + 1, + "tcp://127.0.0.1:10222".parse().unwrap() + ))); assert!(data .dst_blacklist diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 8454829b..4c80e1c3 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -286,6 +286,12 @@ struct Cli { help = t!("core_clap.socks5").to_string() )] socks5: Option, + + #[arg( + long, + help = t!("core_clap.ipv6_listener").to_string() + )] + ipv6_listener: Option, } rust_i18n::i18n!("locales", fallback = "en"); @@ -512,6 +518,12 @@ impl From for TomlConfigLoader { } f.disable_p2p = cli.disable_p2p; f.relay_all_peer_rpc = cli.relay_all_peer_rpc; + if let Some(ipv6_listener) = cli.ipv6_listener { + f.ipv6_listener = ipv6_listener + .parse() + .with_context(|| format!("failed to parse ipv6 listener: {}", ipv6_listener)) + .unwrap(); + } cfg.set_flags(f); cfg.set_exit_nodes(cli.exit_nodes.clone()); diff --git a/easytier/src/instance/listeners.rs b/easytier/src/instance/listeners.rs index b34c28fd..1bdfbd53 100644 --- a/easytier/src/instance/listeners.rs +++ b/easytier/src/instance/listeners.rs @@ -111,9 +111,10 @@ impl ListenerManage } if self.global_ctx.config.get_flags().enable_ipv6 { + let ipv6_listener = self.global_ctx.config.get_flags().ipv6_listener.clone(); let _ = self .add_listener( - UdpTunnelListener::new("udp://[::]:0".parse().unwrap()), + UdpTunnelListener::new(ipv6_listener.parse().unwrap()), false, ) .await?;