diff --git a/easytier-core/src/common/error.rs b/easytier-core/src/common/error.rs index 82e89c5a..c48f780f 100644 --- a/easytier-core/src/common/error.rs +++ b/easytier-core/src/common/error.rs @@ -16,8 +16,8 @@ pub enum Error { TunnelError(#[from] tunnels::TunnelError), #[error("Peer has no conn, PeerId: {0}")] PeerNoConnectionError(PeerId), - #[error("RouteError: {0}")] - RouteError(String), + #[error("RouteError: {0:?}")] + RouteError(Option), #[error("Not found")] NotFound, #[error("Invalid Url: {0}")] diff --git a/easytier-core/src/peers/foreign_network_client.rs b/easytier-core/src/peers/foreign_network_client.rs index 434d8204..1137ef32 100644 --- a/easytier-core/src/peers/foreign_network_client.rs +++ b/easytier-core/src/peers/foreign_network_client.rs @@ -150,9 +150,18 @@ impl ForeignNetworkClient { pub async fn send_msg(&self, msg: Bytes, peer_id: PeerId) -> Result<(), Error> { if let Some(next_hop) = self.get_next_hop(peer_id) { - return self.peer_map.send_msg_directly(msg, next_hop).await; + let ret = self.peer_map.send_msg_directly(msg, next_hop).await; + if ret.is_err() { + tracing::error!( + ?ret, + ?peer_id, + ?next_hop, + "foreign network client send msg failed" + ); + } + return ret; } - Err(Error::RouteError("no next hop".to_string())) + Err(Error::RouteError(Some("no next hop".to_string()))) } pub fn list_foreign_peers(&self) -> Vec { diff --git a/easytier-core/src/peers/foreign_network_manager.rs b/easytier-core/src/peers/foreign_network_manager.rs index ef2a42f6..04771930 100644 --- a/easytier-core/src/peers/foreign_network_manager.rs +++ b/easytier-core/src/peers/foreign_network_manager.rs @@ -57,12 +57,12 @@ impl ForeignNetworkManagerData { let network_name = self .peer_network_map .get(&dst_peer_id) - .ok_or_else(|| Error::RouteError("network not found".to_string()))? + .ok_or_else(|| Error::RouteError(Some("network not found".to_string())))? .clone(); let entry = self .network_peer_maps .get(&network_name) - .ok_or_else(|| Error::RouteError("no peer in network".to_string()))? + .ok_or_else(|| Error::RouteError(Some("no peer in network".to_string())))? .clone(); entry.peer_map.send_msg(msg, dst_peer_id).await } @@ -287,6 +287,28 @@ impl ForeignNetworkManager { self.start_packet_recv().await; self.register_peer_rpc_service().await; } + + pub async fn list_foreign_networks(&self) -> DashMap> { + let ret = DashMap::new(); + for item in self.data.network_peer_maps.iter() { + let network_name = item.key().clone(); + ret.insert(network_name, vec![]); + } + + for mut n in ret.iter_mut() { + let network_name = n.key().clone(); + let Some(item) = self + .data + .network_peer_maps + .get(&network_name) + .map(|v| v.clone()) + else { + continue; + }; + n.value_mut().extend(item.peer_map.list_peers().await); + } + ret + } } #[cfg(test)] diff --git a/easytier-core/src/peers/peer_conn.rs b/easytier-core/src/peers/peer_conn.rs index 78bcd24b..23ea9970 100644 --- a/easytier-core/src/peers/peer_conn.rs +++ b/easytier-core/src/peers/peer_conn.rs @@ -56,7 +56,7 @@ macro_rules! wait_response { match &resp_payload { $pattern => $out_var = $value, _ => { - log::error!( + tracing::error!( "unexpected packet: {:?}, pattern: {:?}", rsp_bytes, stringify!($pattern) @@ -67,6 +67,7 @@ macro_rules! wait_response { }; } +#[derive(Debug)] pub struct PeerInfo { magic: u32, pub my_peer_id: PeerId, @@ -348,13 +349,15 @@ impl PeerConn { self.conn_id } + #[tracing::instrument] pub async fn do_handshake_as_server(&mut self) -> Result<(), TunnelError> { let mut stream = self.tunnel.pin_stream(); let mut sink = self.tunnel.pin_sink(); + tracing::info!("waiting for handshake request from client"); wait_response!(stream, hs_req, CtrlPacketPayload::HandShake(x) => x); self.info = Some(PeerInfo::from(hs_req)); - log::info!("handshake request: {:?}", hs_req); + tracing::info!("handshake request: {:?}", hs_req); let hs_req = self .global_ctx @@ -365,6 +368,7 @@ impl PeerConn { Ok(()) } + #[tracing::instrument] pub async fn do_handshake_as_client(&mut self) -> Result<(), TunnelError> { let mut stream = self.tunnel.pin_stream(); let mut sink = self.tunnel.pin_sink(); @@ -375,9 +379,10 @@ impl PeerConn { .run(|| packet::Packet::new_handshake(self.my_peer_id, &self.global_ctx.network)); sink.send(hs_req.into()).await?; + tracing::info!("waiting for handshake request from server"); wait_response!(stream, hs_rsp, CtrlPacketPayload::HandShake(x) => x); self.info = Some(PeerInfo::from(hs_rsp)); - log::info!("handshake response: {:?}", hs_rsp); + tracing::info!("handshake response: {:?}", hs_rsp); Ok(()) } @@ -535,6 +540,16 @@ impl Drop for PeerConn { } } +impl Debug for PeerConn { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PeerConn") + .field("conn_id", &self.conn_id) + .field("my_peer_id", &self.my_peer_id) + .field("info", &self.info) + .finish() + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/easytier-core/src/peers/peer_manager.rs b/easytier-core/src/peers/peer_manager.rs index 9f25f377..c2f30fe6 100644 --- a/easytier-core/src/peers/peer_manager.rs +++ b/easytier-core/src/peers/peer_manager.rs @@ -5,7 +5,7 @@ use std::{ }; use async_trait::async_trait; -use futures::{StreamExt, TryFutureExt}; +use futures::StreamExt; use tokio::{ sync::{ @@ -67,11 +67,18 @@ impl PeerRpcManagerTransport for RpcTransport { .ok_or(Error::Unknown)?; let peers = self.peers.upgrade().ok_or(Error::Unknown)?; - if foreign_peers.has_next_hop(dst_peer_id) { + let ret = peers.send_msg(msg.clone(), dst_peer_id).await; + + if matches!(ret, Err(Error::RouteError(..))) && foreign_peers.has_next_hop(dst_peer_id) { + tracing::info!( + ?dst_peer_id, + ?self.my_peer_id, + "failed to send msg to peer, try foreign network", + ); return foreign_peers.send_msg(msg, dst_peer_id).await; } - peers.send_msg(msg, dst_peer_id).map_err(|e| e.into()).await + ret } async fn recv(&self) -> Result { @@ -484,13 +491,18 @@ impl PeerManager { let mut errs: Vec = vec![]; for peer_id in dst_peers.iter() { - let send_ret = self - .peers - .send_msg( - packet::Packet::new_data_packet(self.my_peer_id, peer_id.clone(), &msg).into(), - *peer_id, - ) - .await; + let msg: Bytes = + packet::Packet::new_data_packet(self.my_peer_id, peer_id.clone(), &msg).into(); + let send_ret = self.peers.send_msg(msg.clone(), *peer_id).await; + + if matches!(send_ret, Err(Error::RouteError(..))) + && self.foreign_network_client.has_next_hop(*peer_id) + { + let foreign_send_ret = self.foreign_network_client.send_msg(msg, *peer_id).await; + if foreign_send_ret.is_ok() { + continue; + } + } if let Err(send_ret) = send_ret { errs.push(send_ret); diff --git a/easytier-core/src/peers/peer_map.rs b/easytier-core/src/peers/peer_map.rs index 8189a09b..cda479ba 100644 --- a/easytier-core/src/peers/peer_map.rs +++ b/easytier-core/src/peers/peer_map.rs @@ -87,7 +87,7 @@ impl PeerMap { } None => { log::error!("no peer for dst_peer_id: {}", dst_peer_id); - return Ok(()); + return Err(Error::RouteError(None)); } } @@ -119,13 +119,13 @@ impl PeerMap { } let Some(gateway_peer_id) = gateway_peer_id else { - log::error!( + tracing::trace!( "no gateway for dst_peer_id: {}, peers: {:?}, my_peer_id: {}", dst_peer_id, self.peer_map.iter().map(|v| *v.key()).collect::>(), self.my_peer_id ); - return Ok(()); + return Err(Error::RouteError(None)); }; self.send_msg_directly(msg.clone(), gateway_peer_id).await?; diff --git a/easytier-core/src/tests/three_node.rs b/easytier-core/src/tests/three_node.rs index bcfa36b8..47fdc993 100644 --- a/easytier-core/src/tests/three_node.rs +++ b/easytier-core/src/tests/three_node.rs @@ -1,4 +1,7 @@ -use std::sync::{atomic::AtomicU32, Arc}; +use std::{ + sync::{atomic::AtomicU32, Arc}, + time::Duration, +}; use tokio::{net::UdpSocket, task::JoinSet}; @@ -6,10 +9,11 @@ use super::*; use crate::{ common::{ - config::{ConfigLoader, TomlConfigLoader}, + config::{ConfigLoader, NetworkIdentity, TomlConfigLoader}, netns::{NetNS, ROOT_NETNS_NAME}, }, instance::instance::Instance, + peers::tests::wait_for_condition, tunnels::{ common::tests::_tunnel_pingpong_netns, ring_tunnel::RingTunnelConnector, @@ -307,3 +311,65 @@ pub async fn udp_broadcast_test() { tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 2); } + +#[tokio::test] +#[serial_test::serial] +pub async fn foreign_network_forward_nic_data() { + prepare_linux_namespaces(); + + let center_node_config = get_inst_config("inst1", Some("net_a"), "10.144.144.1"); + center_node_config.set_network_identity(NetworkIdentity { + network_name: "center".to_string(), + network_secret: "".to_string(), + }); + let mut center_inst = Instance::new(center_node_config); + + let mut inst1 = Instance::new(get_inst_config("inst1", Some("net_b"), "10.144.145.1")); + let mut inst2 = Instance::new(get_inst_config("inst2", Some("net_c"), "10.144.145.2")); + + center_inst.run().await.unwrap(); + inst1.run().await.unwrap(); + inst2.run().await.unwrap(); + + assert_ne!(inst1.id(), center_inst.id()); + assert_ne!(inst2.id(), center_inst.id()); + + inst1 + .get_conn_manager() + .add_connector(RingTunnelConnector::new( + format!("ring://{}", center_inst.id()).parse().unwrap(), + )); + + inst2 + .get_conn_manager() + .add_connector(RingTunnelConnector::new( + format!("ring://{}", center_inst.id()).parse().unwrap(), + )); + + wait_for_condition( + || async { + inst1.get_peer_manager().list_routes().await.len() == 1 + && inst2.get_peer_manager().list_routes().await.len() == 1 + }, + Duration::from_secs(5), + ) + .await; + + let _g = NetNS::new(Some(ROOT_NETNS_NAME.to_owned())).guard(); + let code = tokio::process::Command::new("ip") + .args(&[ + "netns", + "exec", + "net_b", + "ping", + "-c", + "1", + "-W", + "1", + "10.144.145.2", + ]) + .status() + .await + .unwrap(); + assert_eq!(code.code().unwrap(), 0); +} diff --git a/easytier-core/src/tunnels/ring_tunnel.rs b/easytier-core/src/tunnels/ring_tunnel.rs index b26f1b36..2d037bbb 100644 --- a/easytier-core/src/tunnels/ring_tunnel.rs +++ b/easytier-core/src/tunnels/ring_tunnel.rs @@ -10,7 +10,10 @@ use crossbeam_queue::ArrayQueue; use async_trait::async_trait; use futures::Sink; use once_cell::sync::Lazy; -use tokio::sync::{Mutex, Notify}; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + Mutex, Notify, +}; use futures::FutureExt; use tokio_util::bytes::BytesMut; @@ -197,7 +200,6 @@ impl RingTunnel { struct Connection { client: RingTunnel, server: RingTunnel, - connect_notify: Arc, } impl Tunnel for RingTunnel { @@ -219,18 +221,23 @@ impl Tunnel for RingTunnel { } } -static CONNECTION_MAP: Lazy>>>> = +static CONNECTION_MAP: Lazy>>>>> = Lazy::new(|| Arc::new(Mutex::new(HashMap::new()))); #[derive(Debug)] pub struct RingTunnelListener { listerner_addr: url::Url, + conn_sender: UnboundedSender>, + conn_receiver: UnboundedReceiver>, } impl RingTunnelListener { pub fn new(key: url::Url) -> Self { + let (conn_sender, conn_receiver) = tokio::sync::mpsc::unbounded_channel(); RingTunnelListener { listerner_addr: key, + conn_sender, + conn_receiver, } } } @@ -279,17 +286,6 @@ impl Tunnel for ConnectionForClient { } impl RingTunnelListener { - async fn add_connection(listener_addr: uuid::Uuid) { - CONNECTION_MAP.lock().await.insert( - listener_addr.clone(), - Arc::new(Connection { - client: RingTunnel::new(RING_TUNNEL_CAP), - server: RingTunnel::new_with_id(listener_addr.clone(), RING_TUNNEL_CAP), - connect_notify: Arc::new(Notify::new()), - }), - ); - } - fn get_addr(&self) -> Result { check_scheme_and_get_socket_addr::(&self.listerner_addr, "ring") } @@ -299,23 +295,29 @@ impl RingTunnelListener { impl TunnelListener for RingTunnelListener { async fn listen(&mut self) -> Result<(), TunnelError> { log::info!("listen new conn of key: {}", self.listerner_addr); - Self::add_connection(self.get_addr()?).await; + CONNECTION_MAP + .lock() + .await + .insert(self.get_addr()?, self.conn_sender.clone()); Ok(()) } async fn accept(&mut self) -> Result, TunnelError> { log::info!("waiting accept new conn of key: {}", self.listerner_addr); - let val = CONNECTION_MAP - .lock() - .await - .get(&self.get_addr()?) - .unwrap() - .clone(); - val.connect_notify.notified().await; - CONNECTION_MAP.lock().await.remove(&self.get_addr()?); - Self::add_connection(self.get_addr()?).await; - log::info!("accept new conn of key: {}", self.listerner_addr); - Ok(Box::new(ConnectionForServer { conn: val })) + let my_addr = self.get_addr()?; + if let Some(conn) = self.conn_receiver.recv().await { + if conn.server.id == my_addr { + log::info!("accept new conn of key: {}", self.listerner_addr); + return Ok(Box::new(ConnectionForServer { conn })); + } else { + tracing::error!(?conn.server.id, ?my_addr, "got new conn with wrong id"); + return Err(TunnelError::CommonError( + "accept got wrong ring server id".to_owned(), + )); + } + } + + return Err(TunnelError::CommonError("conn receiver stopped".to_owned())); } fn local_url(&self) -> url::Url { @@ -336,18 +338,22 @@ impl RingTunnelConnector { #[async_trait] impl TunnelConnector for RingTunnelConnector { async fn connect(&mut self) -> Result, super::TunnelError> { - let val = CONNECTION_MAP + let remote_addr = check_scheme_and_get_socket_addr::(&self.remote_addr, "ring")?; + let entry = CONNECTION_MAP .lock() .await - .get(&check_scheme_and_get_socket_addr::( - &self.remote_addr, - "ring", - )?) + .get(&remote_addr) .unwrap() .clone(); - val.connect_notify.notify_one(); log::info!("connecting"); - Ok(Box::new(ConnectionForClient { conn: val })) + let conn = Arc::new(Connection { + client: RingTunnel::new(RING_TUNNEL_CAP), + server: RingTunnel::new_with_id(remote_addr.clone(), RING_TUNNEL_CAP), + }); + entry + .send(conn.clone()) + .map_err(|_| TunnelError::CommonError("send conn to listner failed".to_owned()))?; + Ok(Box::new(ConnectionForClient { conn })) } fn remote_url(&self) -> url::Url { @@ -359,11 +365,10 @@ pub fn create_ring_tunnel_pair() -> (Box, Box) { let conn = Arc::new(Connection { client: RingTunnel::new(RING_TUNNEL_CAP), server: RingTunnel::new(RING_TUNNEL_CAP), - connect_notify: Arc::new(Notify::new()), }); ( Box::new(ConnectionForServer { conn: conn.clone() }), - Box::new(ConnectionForClient { conn: conn }), + Box::new(ConnectionForClient { conn }), ) }