diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index d347026a..3fbc225d 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -23,7 +23,7 @@ use crate::peer_center::instance::PeerCenterInstance; use crate::peers::peer_conn::PeerConnId; use crate::peers::peer_manager::{PeerManager, RouteAlgoType}; use crate::peers::rpc_service::PeerManagerRpcService; -use crate::peers::PacketRecvChanReceiver; +use crate::peers::{create_packet_recv_chan, recv_packet_from_chan, PacketRecvChanReceiver}; use crate::proto::cli::VpnPortalRpc; use crate::proto::cli::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo}; use crate::proto::peer_rpc::PeerCenterRpcServer; @@ -137,7 +137,7 @@ impl Instance { global_ctx.config.dump() ); - let (peer_packet_sender, peer_packet_receiver) = tokio::sync::mpsc::channel(100); + let (peer_packet_sender, peer_packet_receiver) = create_packet_recv_chan(); let id = global_ctx.get_id(); @@ -230,7 +230,7 @@ impl Instance { let mut tasks = JoinSet::new(); tasks.spawn(async move { let mut packet_recv = packet_recv.lock().await; - while let Some(packet) = packet_recv.recv().await { + while let Ok(packet) = recv_packet_from_chan(&mut packet_recv).await { tracing::trace!("packet consumed by mock nic ctx: {:?}", packet); } }); diff --git a/easytier/src/instance/virtual_nic.rs b/easytier/src/instance/virtual_nic.rs index c45108ba..d9e1c6c4 100644 --- a/easytier/src/instance/virtual_nic.rs +++ b/easytier/src/instance/virtual_nic.rs @@ -12,7 +12,7 @@ use crate::{ global_ctx::{ArcGlobalCtx, GlobalCtxEvent}, ifcfg::{IfConfiger, IfConfiguerTrait}, }, - peers::{peer_manager::PeerManager, PacketRecvChanReceiver}, + peers::{peer_manager::PeerManager, recv_packet_from_chan, PacketRecvChanReceiver}, tunnel::{ common::{reserve_buf, FramedWriter, TunnelWrapper, ZCPacketToBytes}, packet_def::{ZCPacket, ZCPacketType, TAIL_RESERVED_SIZE}, @@ -610,7 +610,7 @@ impl NicCtx { self.tasks.spawn(async move { // unlock until coroutine finished let mut channel = channel.lock().await; - while let Some(packet) = channel.recv().await { + while let Ok(packet) = recv_packet_from_chan(&mut channel).await { tracing::trace!( "[USER_PACKET] forward packet from peers to nic. packet: {:?}", packet diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 47c30563..c249e7d9 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -37,11 +37,13 @@ use crate::{ }; use super::{ + create_packet_recv_chan, peer_conn::PeerConn, peer_map::PeerMap, peer_ospf_route::PeerRoute, peer_rpc::{PeerRpcManager, PeerRpcManagerTransport}, peer_rpc_service::DirectConnectorManagerRpcServer, + recv_packet_from_chan, route_trait::NextHopPolicy, PacketRecvChan, PacketRecvChanReceiver, }; @@ -79,7 +81,7 @@ impl ForeignNetworkEntry { ) -> Self { let foreign_global_ctx = Self::build_foreign_global_ctx(&network, global_ctx.clone()); - let (packet_sender, packet_recv) = mpsc::channel(64); + let (packet_sender, packet_recv) = create_packet_recv_chan(); let peer_map = Arc::new(PeerMap::new( packet_sender, @@ -251,7 +253,7 @@ impl ForeignNetworkEntry { let network_name = self.network.network_name.clone(); self.tasks.lock().await.spawn(async move { - while let Some(zc_packet) = recv.recv().await { + while let Ok(zc_packet) = recv_packet_from_chan(&mut recv).await { let Some(hdr) = zc_packet.peer_manager_header() else { tracing::warn!("invalid packet, skip"); continue; @@ -622,7 +624,7 @@ mod tests { network: &str, secret: &str, ) -> Arc { - let (s, _r) = tokio::sync::mpsc::channel(1000); + let (s, _r) = create_packet_recv_chan(); let peer_mgr = Arc::new(PeerManager::new( RouteAlgoType::Ospf, get_mock_global_ctx_with_network(Some(NetworkIdentity::new( diff --git a/easytier/src/peers/mod.rs b/easytier/src/peers/mod.rs index 6056c583..ff11ef23 100644 --- a/easytier/src/peers/mod.rs +++ b/easytier/src/peers/mod.rs @@ -39,5 +39,21 @@ pub trait NicPacketFilter { type BoxPeerPacketFilter = Box; type BoxNicPacketFilter = Box; +// pub type PacketRecvChan = tachyonix::Sender; +// pub type PacketRecvChanReceiver = tachyonix::Receiver; +// pub fn create_packet_recv_chan() -> (PacketRecvChan, PacketRecvChanReceiver) { +// tachyonix::channel(128) +// } pub type PacketRecvChan = tokio::sync::mpsc::Sender; pub type PacketRecvChanReceiver = tokio::sync::mpsc::Receiver; +pub fn create_packet_recv_chan() -> (PacketRecvChan, PacketRecvChanReceiver) { + tokio::sync::mpsc::channel(128) +} +pub async fn recv_packet_from_chan( + packet_recv_chan_receiver: &mut PacketRecvChanReceiver, +) -> Result { + packet_recv_chan_receiver + .recv() + .await + .ok_or(anyhow::anyhow!("recv_packet_from_chan failed")) +} diff --git a/easytier/src/peers/peer.rs b/easytier/src/peers/peer.rs index 7e0f728b..248bfc27 100644 --- a/easytier/src/peers/peer.rs +++ b/easytier/src/peers/peer.rs @@ -171,11 +171,11 @@ impl Drop for Peer { #[cfg(test)] mod tests { - use tokio::{sync::mpsc, time::timeout}; + use tokio::time::timeout; use crate::{ common::{global_ctx::tests::get_mock_global_ctx, new_peer_id}, - peers::peer_conn::PeerConn, + peers::{create_packet_recv_chan, peer_conn::PeerConn}, tunnel::ring::create_ring_tunnel_pair, }; @@ -183,8 +183,8 @@ mod tests { #[tokio::test] async fn close_peer() { - let (local_packet_send, _local_packet_recv) = mpsc::channel(10); - let (remote_packet_send, _remote_packet_recv) = mpsc::channel(10); + let (local_packet_send, _local_packet_recv) = create_packet_recv_chan(); + let (remote_packet_send, _remote_packet_recv) = create_packet_recv_chan(); let global_ctx = get_mock_global_ctx(); let local_peer = Peer::new(new_peer_id(), local_packet_send, global_ctx.clone()); let remote_peer = Peer::new(new_peer_id(), remote_packet_send, global_ctx.clone()); diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 5650fbca..9c5a78c2 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -413,6 +413,7 @@ mod tests { use crate::common::global_ctx::tests::get_mock_global_ctx; use crate::common::new_peer_id; use crate::common::scoped_task::ScopedTask; + use crate::peers::create_packet_recv_chan; use crate::tunnel::filter::tests::DropSendTunnelFilter; use crate::tunnel::filter::PacketRecorderTunnelFilter; use crate::tunnel::ring::create_ring_tunnel_pair; @@ -496,9 +497,7 @@ mod tests { ); s_peer.set_close_event_sender(tokio::sync::mpsc::channel(1).0); - s_peer - .start_recv_loop(tokio::sync::mpsc::channel(200).0) - .await; + s_peer.start_recv_loop(create_packet_recv_chan().0).await; // do not start ping for s, s only reponde to ping from c assert!(c_ret.is_ok()); @@ -507,9 +506,7 @@ mod tests { let (close_send, mut close_recv) = tokio::sync::mpsc::channel(1); c_peer.set_close_event_sender(close_send); c_peer.start_pingpong(); - c_peer - .start_recv_loop(tokio::sync::mpsc::channel(200).0) - .await; + c_peer.start_recv_loop(create_packet_recv_chan().0).await; let throughput = c_peer.throughput.clone(); let _t = ScopedTask::from(tokio::spawn(async move { diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 905fc720..1be8d404 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -30,6 +30,7 @@ use crate::{ peers::{ peer_conn::PeerConn, peer_rpc::PeerRpcManagerTransport, + recv_packet_from_chan, route_trait::{ForeignNetworkRouteInfoMap, NextHopPolicy, RouteInterface}, PeerPacketFilter, }, @@ -43,11 +44,12 @@ use crate::{ tunnel::{ self, packet_def::{CompressorAlgo, PacketType, ZCPacket}, - SinkItem, Tunnel, TunnelConnector, + Tunnel, TunnelConnector, }, }; use super::{ + create_packet_recv_chan, encrypt::{Encryptor, NullCipher}, foreign_network_client::ForeignNetworkClient, foreign_network_manager::{ForeignNetworkManager, GlobalForeignNetworkAccessor}, @@ -56,7 +58,7 @@ use super::{ peer_ospf_route::PeerRoute, peer_rpc::PeerRpcManager, route_trait::{ArcRoute, Route}, - BoxNicPacketFilter, BoxPeerPacketFilter, PacketRecvChanReceiver, + BoxNicPacketFilter, BoxPeerPacketFilter, PacketRecvChan, PacketRecvChanReceiver, }; struct RpcTransport { @@ -116,7 +118,7 @@ pub struct PeerManager { my_peer_id: PeerId, global_ctx: ArcGlobalCtx, - nic_channel: mpsc::Sender, + nic_channel: PacketRecvChan, tasks: Arc>>, @@ -155,11 +157,11 @@ impl PeerManager { pub fn new( route_algo: RouteAlgoType, global_ctx: ArcGlobalCtx, - nic_channel: mpsc::Sender, + nic_channel: PacketRecvChan, ) -> Self { let my_peer_id = rand::random(); - let (packet_send, packet_recv) = mpsc::channel(128); + let (packet_send, packet_recv) = create_packet_recv_chan(); let peers = Arc::new(PeerMap::new( packet_send.clone(), global_ctx.clone(), @@ -417,7 +419,7 @@ impl PeerManager { let encryptor = self.encryptor.clone(); self.tasks.lock().await.spawn(async move { tracing::trace!("start_peer_recv"); - while let Some(ret) = recv.recv().await { + while let Ok(ret) = recv_packet_from_chan(&mut recv).await { let Err(mut ret) = Self::try_handle_foreign_network_packet(ret, my_peer_id, &peers, &foreign_mgr) .await @@ -505,7 +507,7 @@ impl PeerManager { async fn init_packet_process_pipeline(&self) { // for tun/tap ip/eth packet. struct NicPacketProcessor { - nic_channel: mpsc::Sender, + nic_channel: PacketRecvChan, } #[async_trait::async_trait] impl PeerPacketFilter for NicPacketProcessor { @@ -875,7 +877,7 @@ impl PeerManager { self.global_ctx.clone() } - pub fn get_nic_channel(&self) -> mpsc::Sender { + pub fn get_nic_channel(&self) -> PacketRecvChan { self.nic_channel.clone() } @@ -935,6 +937,7 @@ mod tests { }, instance::listeners::get_listener_by_url, peers::{ + create_packet_recv_chan, peer_manager::RouteAlgoType, peer_rpc::tests::register_service, route_trait::NextHopPolicy, @@ -1078,7 +1081,7 @@ mod tests { #[tokio::test] async fn communicate_between_enc_and_non_enc() { let create_mgr = |enable_encryption| async move { - let (s, _r) = tokio::sync::mpsc::channel(1000); + let (s, _r) = create_packet_recv_chan(); let mock_global_ctx = get_mock_global_ctx(); mock_global_ctx.config.set_flags(Flags { enable_encryption, diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index b5ec5d45..4be5bbf2 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -70,11 +70,17 @@ impl PeerMap { pub async fn send_msg_directly(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> { if dst_peer_id == self.my_peer_id { - return Ok(self - .packet_send - .send(msg) - .await - .with_context(|| "send msg to self failed")?); + let packet_send = self.packet_send.clone(); + tokio::spawn(async move { + let ret = packet_send + .send(msg) + .await + .with_context(|| "send msg to self failed"); + if ret.is_err() { + tracing::error!("send msg to self failed: {:?}", ret); + } + }); + return Ok(()); } match self.get_peer_by_id(dst_peer_id) { diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 06f3083c..d966b263 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -2117,6 +2117,7 @@ mod tests { common::{global_ctx::tests::get_mock_global_ctx, PeerId}, connector::udp_hole_punch::tests::replace_stun_info_collector, peers::{ + create_packet_recv_chan, peer_manager::{PeerManager, RouteAlgoType}, route_trait::{NextHopPolicy, Route, RouteCostCalculatorInterface}, tests::connect_peer_manager, @@ -2154,7 +2155,7 @@ mod tests { } async fn create_mock_pmgr() -> Arc { - let (s, _r) = tokio::sync::mpsc::channel(1000); + let (s, _r) = create_packet_recv_chan(); let peer_mgr = Arc::new(PeerManager::new( RouteAlgoType::None, get_mock_global_ctx(), diff --git a/easytier/src/peers/tests.rs b/easytier/src/peers/tests.rs index a2764e2f..0b1d9713 100644 --- a/easytier/src/peers/tests.rs +++ b/easytier/src/peers/tests.rs @@ -1,14 +1,24 @@ use std::sync::Arc; use crate::{ - common::{error::Error, global_ctx::tests::get_mock_global_ctx, PeerId}, + common::{ + error::Error, + global_ctx::{ + tests::{get_mock_global_ctx, get_mock_global_ctx_with_network}, + NetworkIdentity, + }, + PeerId, + }, tunnel::ring::create_ring_tunnel_pair, }; -use super::peer_manager::{PeerManager, RouteAlgoType}; +use super::{ + create_packet_recv_chan, + peer_manager::{PeerManager, RouteAlgoType}, +}; pub async fn create_mock_peer_manager() -> Arc { - let (s, _r) = tokio::sync::mpsc::channel(1000); + let (s, _r) = create_packet_recv_chan(); let peer_mgr = Arc::new(PeerManager::new( RouteAlgoType::Ospf, get_mock_global_ctx(), @@ -18,6 +28,15 @@ pub async fn create_mock_peer_manager() -> Arc { peer_mgr } +pub async fn create_mock_peer_manager_with_name(network_name: String) -> Arc { + let (s, _r) = create_packet_recv_chan(); + let g = + get_mock_global_ctx_with_network(Some(NetworkIdentity::new(network_name, "".to_string()))); + let peer_mgr = Arc::new(PeerManager::new(RouteAlgoType::Ospf, g, s)); + peer_mgr.run().await.unwrap(); + peer_mgr +} + pub async fn connect_peer_manager(client: Arc, server: Arc) { let (a_ring, b_ring) = create_ring_tunnel_pair(); let a_mgr_copy = client.clone(); @@ -56,3 +75,55 @@ pub async fn wait_route_appear( wait_route_appear_with_cost(peer_mgr.clone(), target_peer.my_peer_id(), None).await?; wait_route_appear_with_cost(target_peer, peer_mgr.my_peer_id(), None).await } + +#[tokio::test] +async fn foreign_mgr_stress_test() { + const FOREIGN_NETWORK_COUNT: i32 = 20; + const PEER_PER_NETWORK: i32 = 3; + const PUBLIC_PEER_COUNT: i32 = 3; + + let mut public_peers = Vec::new(); + for _ in 0..PUBLIC_PEER_COUNT { + public_peers.push(create_mock_peer_manager().await); + } + connect_peer_manager(public_peers[0].clone(), public_peers[1].clone()).await; + connect_peer_manager(public_peers[0].clone(), public_peers[2].clone()).await; + connect_peer_manager(public_peers[1].clone(), public_peers[2].clone()).await; + + let mut foreigns = Vec::new(); + + for i in 0..FOREIGN_NETWORK_COUNT { + let mut peers = Vec::new(); + + let name = format!("foreign-network-test-{}", i); + + for _ in 0..PEER_PER_NETWORK { + let mgr = create_mock_peer_manager_with_name(name.clone()).await; + let public_peer_idx = rand::random::() % public_peers.len(); + connect_peer_manager(mgr.clone(), public_peers[public_peer_idx].clone()).await; + peers.push(mgr); + } + + foreigns.push(peers); + } + + for _ in 0..5 { + for i in 0..PUBLIC_PEER_COUNT { + let p = public_peers[i as usize].clone(); + println!( + "public peer {} routes: {:?}, global_foreign_network: {:?}, peers: {:?}", + i, + p.list_routes().await, + p.list_global_foreign_network().await.foreign_networks.len(), + p.get_peer_map().list_peers().await + ); + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + let new_peer = create_mock_peer_manager().await; + connect_peer_manager(new_peer.clone(), public_peers[0].clone()).await; + while let Err(e) = wait_route_appear(public_peers[1].clone(), new_peer.clone()).await { + println!("wait route ret: {:?}", e); + } + } +}