diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index ba7ec006..4eabed8a 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -24,6 +24,7 @@ use common::config::{ ConsoleLoggerConfig, FileLoggerConfig, NetworkIdentity, PeerConfig, VpnPortalConfig, }; use instance::instance::Instance; +use tokio::net::TcpSocket; use crate::{ common::{ @@ -70,7 +71,7 @@ struct Cli { )] ipv4: Option, - #[arg(short, long, help = "peers to connect initially")] + #[arg(short, long, help = "peers to connect initially", num_args = 0..)] peers: Vec, #[arg(short, long, help = "use a public shared node to discover peers")] @@ -86,17 +87,29 @@ struct Cli { #[arg( short, long, - default_value = "127.0.0.1:15888", - help = "rpc portal address to listen for management" + default_value = "0", + help = "rpc portal address to listen for management. 0 means random +port, 12345 means listen on 12345 of localhost, 0.0.0.0:12345 means +listen on 12345 of all interfaces. default is 0 and will try 15888 first" )] - rpc_portal: SocketAddr, + rpc_portal: String, - #[arg(short, long, help = "listeners to accept connections, pass '' to avoid listening.", - default_values_t = ["tcp://0.0.0.0:11010".to_string(), - "udp://0.0.0.0:11010".to_string(), - "wg://0.0.0.0:11011".to_string()])] + #[arg(short, long, help = "listeners to accept connections, allow format: +a port number: 11010, means tcp/udp will listen on 11010, ws/wss will listen on 11010 and 11011, wg will listen on 11011 +url: tcp://0.0.0.0:11010, tcp can be tcp, udp, ring, wg, ws, wss, +proto:port: wg:11011, means listen on 11011 with wireguard protocol +url and proto:port can occur multiple times. + ", default_values_t = ["11010".to_string()], + num_args = 0..)] listeners: Vec, + #[arg( + long, + help = "do not listen on any port, only connect to peers", + default_value = "false" + )] + no_listener: bool, + #[arg(long, help = "console log level", value_parser = clap::builder::PossibleValuesParser::new(["trace", "debug", "info", "warn", "error", "off"]))] console_log_level: Option, @@ -161,6 +174,80 @@ and the vpn client is in network of 10.14.14.0/24" latency_first: bool, } +impl Cli { + fn parse_listeners(&self) -> Vec { + println!("parsing listeners: {:?}", self.listeners); + let proto_port_offset = vec![("tcp", 0), ("udp", 0), ("wg", 1), ("ws", 1), ("wss", 2)]; + + if self.no_listener || self.listeners.is_empty() { + return vec![]; + } + + let origin_listners = self.listeners.clone(); + let mut listeners: Vec = Vec::new(); + if origin_listners.len() == 1 { + if let Ok(port) = origin_listners[0].parse::() { + for (proto, offset) in proto_port_offset { + listeners.push(format!("{}://0.0.0.0:{}", proto, port + offset)); + } + return listeners; + } + } + + for l in &origin_listners { + let proto_port: Vec<&str> = l.split(':').collect(); + if proto_port.len() > 2 { + if let Ok(url) = l.parse::() { + listeners.push(url.to_string()); + } else { + panic!("failed to parse listener: {}", l); + } + } else { + let Some((proto, offset)) = proto_port_offset + .iter() + .find(|(proto, _)| *proto == proto_port[0]) + else { + panic!("unknown protocol: {}", proto_port[0]); + }; + + let port = if proto_port.len() == 2 { + proto_port[1].parse::().unwrap() + } else { + 11010 + offset + }; + + listeners.push(format!("{}://0.0.0.0:{}", proto, port)); + } + } + + println!("parsed listeners: {:?}", listeners); + + listeners + } + + fn check_tcp_available(port: u16) -> Option { + let s = format!("127.0.0.1:{}", port).parse::().unwrap(); + TcpSocket::new_v4().unwrap().bind(s).map(|_| s).ok() + } + + fn parse_rpc_portal(&self) -> SocketAddr { + if let Ok(port) = self.rpc_portal.parse::() { + if port == 0 { + // check tcp 15888 first + for i in 15888..15900 { + if let Some(s) = Cli::check_tcp_available(i) { + return s; + } + } + return "127.0.0.1:0".parse().unwrap(); + } + return format!("127.0.0.1:{}", port).parse().unwrap(); + } + + self.rpc_portal.parse().unwrap() + } +} + impl From for TomlConfigLoader { fn from(cli: Cli) -> Self { if let Some(config_file) = &cli.config_file { @@ -205,19 +292,9 @@ impl From for TomlConfigLoader { ); cfg.set_listeners( - cli.listeners - .iter() - .filter_map(|s| { - if s.is_empty() { - return None; - } - - Some( - s.parse() - .with_context(|| format!("failed to parse listener uri: {}", s)) - .unwrap(), - ) - }) + cli.parse_listeners() + .into_iter() + .map(|s| s.parse().unwrap()) .collect(), ); @@ -229,7 +306,7 @@ impl From for TomlConfigLoader { ); } - cfg.set_rpc_portal(cli.rpc_portal); + cfg.set_rpc_portal(cli.parse_rpc_portal()); if cli.external_node.is_some() { let mut old_peers = cfg.get_peers(); diff --git a/easytier/src/gateway/icmp_proxy.rs b/easytier/src/gateway/icmp_proxy.rs index 0f4a2ca7..7768d236 100644 --- a/easytier/src/gateway/icmp_proxy.rs +++ b/easytier/src/gateway/icmp_proxy.rs @@ -63,7 +63,7 @@ pub struct IcmpProxy { peer_manager: Arc, cidr_set: CidrSet, - socket: socket2::Socket, + socket: std::sync::Mutex>, nat_table: IcmpNatTable, @@ -158,23 +158,11 @@ impl IcmpProxy { peer_manager: Arc, ) -> Result, Error> { let cidr_set = CidrSet::new(global_ctx.clone()); - - let _g = global_ctx.net_ns.guard(); - let socket = socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::RAW, - Some(socket2::Protocol::ICMPV4), - )?; - socket.bind(&socket2::SockAddr::from(SocketAddrV4::new( - std::net::Ipv4Addr::UNSPECIFIED, - 0, - )))?; - let ret = Self { global_ctx, peer_manager, cidr_set, - socket, + socket: std::sync::Mutex::new(None), nat_table: Arc::new(dashmap::DashMap::new()), tasks: Mutex::new(JoinSet::new()), @@ -184,6 +172,18 @@ impl IcmpProxy { } pub async fn start(self: &Arc) -> Result<(), Error> { + let _g = self.global_ctx.net_ns.guard(); + let socket = socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::RAW, + Some(socket2::Protocol::ICMPV4), + )?; + socket.bind(&socket2::SockAddr::from(SocketAddrV4::new( + std::net::Ipv4Addr::UNSPECIFIED, + 0, + )))?; + self.socket.lock().unwrap().replace(socket); + self.start_icmp_proxy().await?; self.start_nat_table_cleaner().await?; Ok(()) @@ -204,7 +204,7 @@ impl IcmpProxy { } async fn start_icmp_proxy(self: &Arc) -> Result<(), Error> { - let socket = self.socket.try_clone()?; + let socket = self.socket.lock().unwrap().as_ref().unwrap().try_clone()?; let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel(); let nat_table = self.nat_table.clone(); thread::spawn(|| { @@ -237,7 +237,7 @@ impl IcmpProxy { dst_ip: Ipv4Addr, icmp_packet: &icmp::echo_request::EchoRequestPacket, ) -> Result<(), Error> { - self.socket.send_to( + self.socket.lock().unwrap().as_ref().unwrap().send_to( icmp_packet.packet(), &SocketAddrV4::new(dst_ip.into(), 0).into(), )?; diff --git a/easytier/src/instance/virtual_nic.rs b/easytier/src/instance/virtual_nic.rs index 5a98710f..402b9a39 100644 --- a/easytier/src/instance/virtual_nic.rs +++ b/easytier/src/instance/virtual_nic.rs @@ -58,7 +58,7 @@ impl Stream for TunStream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut self_mut = self.project(); let mut g = ready!(self_mut.l.poll_lock(cx)); - reserve_buf(&mut self_mut.cur_buf, 2500, 128 * 1024); + reserve_buf(&mut self_mut.cur_buf, 2500, 32 * 1024); if self_mut.cur_buf.len() == 0 { unsafe { self_mut.cur_buf.set_len(*self_mut.payload_offset);