diff --git a/easytier-cli/src/main.rs b/easytier-cli/src/main.rs index 5e826099..2beb20e8 100644 --- a/easytier-cli/src/main.rs +++ b/easytier-cli/src/main.rs @@ -5,7 +5,8 @@ use easytier_core::{ common::stun::{StunInfoCollector, UdpNatTypeDetector}, rpc::{ connector_manage_rpc_client::ConnectorManageRpcClient, - peer_manage_rpc_client::PeerManageRpcClient, *, + peer_center_rpc_client::PeerCenterRpcClient, peer_manage_rpc_client::PeerManageRpcClient, + *, }, }; use humansize::format_size; @@ -30,6 +31,7 @@ enum SubCommand { Connector(ConnectorArgs), Stun, Route, + PeerCenter, } #[derive(Args, Debug)] @@ -202,6 +204,12 @@ impl CommandHandler { Ok(ConnectorManageRpcClient::connect(self.addr.clone()).await?) } + async fn get_peer_center_client( + &self, + ) -> Result, Error> { + Ok(PeerCenterRpcClient::connect(self.addr.clone()).await?) + } + async fn list_peers(&self) -> Result { let mut client = self.get_peer_manager_client().await?; let request = tonic::Request::new(ListPeerRequest::default()); @@ -424,6 +432,46 @@ async fn main() -> Result<(), Error> { let stun = UdpNatTypeDetector::new(StunInfoCollector::get_default_servers()); println!("udp type: {:?}", stun.get_udp_nat_type(0).await); } + SubCommand::PeerCenter => { + let mut peer_center_client = handler.get_peer_center_client().await?; + let resp = peer_center_client + .get_global_peer_map(GetGlobalPeerMapRequest::default()) + .await? + .into_inner(); + + #[derive(tabled::Tabled)] + struct PeerCenterTableItem { + node_id: String, + direct_peers: String, + } + + let mut table_rows = vec![]; + for (k, v) in resp.global_peer_map.iter() { + let node_id = k; + let direct_peers = v + .direct_peers + .iter() + .map(|(k, v)| { + format!( + "{}:{:?}", + k, + LatencyLevel::try_from(v.latency_level).unwrap() + ) + }) + .collect::>(); + table_rows.push(PeerCenterTableItem { + node_id: node_id.clone(), + direct_peers: direct_peers.join("\n"), + }); + } + + println!( + "{}", + tabled::Table::new(table_rows) + .with(Style::modern()) + .to_string() + ); + } } Ok(()) diff --git a/easytier-core/build.rs b/easytier-core/build.rs index dd8a4fb1..e39d77a3 100644 --- a/easytier-core/build.rs +++ b/easytier-core/build.rs @@ -92,6 +92,9 @@ fn main() -> Result<(), Box> { tonic_build::configure() .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") + .type_attribute("cli.DirectConnectedPeerInfo", "#[derive(Hash)]") + .type_attribute("cli.PeerInfoForGlobalMap", "#[derive(Hash)]") + .btree_map(&["."]) .compile(&["proto/cli.proto"], &["proto/"]) .unwrap(); // tonic_build::compile_protos("proto/cli.proto")?; diff --git a/easytier-core/proto/cli.proto b/easytier-core/proto/cli.proto index f7e6f40c..ebbd5685 100644 --- a/easytier-core/proto/cli.proto +++ b/easytier-core/proto/cli.proto @@ -115,3 +115,29 @@ service ConnectorManageRpc { rpc ListConnector (ListConnectorRequest) returns (ListConnectorResponse); rpc ManageConnector (ManageConnectorRequest) returns (ManageConnectorResponse); } + +enum LatencyLevel { + VeryLow = 0; + Low = 1; + Normal = 2; + High = 3; + VeryHigh = 4; +} + +message DirectConnectedPeerInfo { + LatencyLevel latency_level = 2; +} + +message PeerInfoForGlobalMap { + map direct_peers = 1; +} + +message GetGlobalPeerMapRequest {} + +message GetGlobalPeerMapResponse { + map global_peer_map = 1; +} + +service PeerCenterRpc { + rpc GetGlobalPeerMap (GetGlobalPeerMapRequest) returns (GetGlobalPeerMapResponse); +} diff --git a/easytier-core/src/instance/instance.rs b/easytier-core/src/instance/instance.rs index e18319cf..ffd1d684 100644 --- a/easytier-core/src/instance/instance.rs +++ b/easytier-core/src/instance/instance.rs @@ -20,6 +20,7 @@ use crate::connector::manual::{ConnectorManagerRpcService, ManualConnectorManage use crate::connector::udp_hole_punch::UdpHolePunchConnector; use crate::gateway::icmp_proxy::IcmpProxy; use crate::gateway::tcp_proxy::TcpProxy; +use crate::peer_center::instance::PeerCenterInstance; use crate::peers::peer_manager::PeerManager; use crate::peers::rpc_service::PeerManagerRpcService; use crate::tunnels::SinkItem; @@ -85,6 +86,8 @@ pub struct Instance { tcp_proxy: Arc, icmp_proxy: Arc, + peer_center: Arc, + global_ctx: ArcGlobalCtx, } @@ -141,6 +144,8 @@ impl Instance { let arc_tcp_proxy = TcpProxy::new(global_ctx.clone(), peer_manager.clone()); let arc_icmp_proxy = IcmpProxy::new(global_ctx.clone(), peer_manager.clone()).unwrap(); + let peer_center = Arc::new(PeerCenterInstance::new(peer_manager.clone())); + Instance { inst_name: inst_name.to_string(), id, @@ -158,6 +163,8 @@ impl Instance { tcp_proxy: arc_tcp_proxy, icmp_proxy: arc_icmp_proxy, + peer_center, + global_ctx, } } @@ -281,6 +288,8 @@ impl Instance { self.udp_hole_puncher.lock().await.run().await?; + self.peer_center.init().await; + Ok(()) } @@ -312,6 +321,7 @@ impl Instance { let peer_mgr = self.peer_manager.clone(); let conn_manager = self.conn_manager.clone(); let net_ns = self.global_ctx.net_ns.clone(); + let peer_center = self.peer_center.clone(); self.tasks.spawn(async move { let _g = net_ns.guard(); @@ -327,6 +337,11 @@ impl Instance { ConnectorManagerRpcService(conn_manager.clone()), ), ) + .add_service( + crate::rpc::peer_center_rpc_server::PeerCenterRpcServer::new( + peer_center.get_rpc_service(), + ), + ) .serve(addr) .await .unwrap(); diff --git a/easytier-core/src/peer_center/instance.rs b/easytier-core/src/peer_center/instance.rs index ee54cd6c..42075e42 100644 --- a/easytier-core/src/peer_center/instance.rs +++ b/easytier-core/src/peer_center/instance.rs @@ -9,20 +9,27 @@ use std::{ }; use futures::Future; -use tokio::{sync::Mutex, task::JoinSet}; +use tokio::{ + sync::{Mutex, RwLock}, + task::JoinSet, +}; use tracing::Instrument; -use crate::peers::{peer_manager::PeerManager, rpc_service::PeerManagerRpcService, PeerId}; +use crate::{ + peers::{peer_manager::PeerManager, rpc_service::PeerManagerRpcService, PeerId}, + rpc::{GetGlobalPeerMapRequest, GetGlobalPeerMapResponse}, +}; use super::{ server::PeerCenterServer, - service::{PeerCenterService, PeerCenterServiceClient, PeerInfoForGlobalMap}, + service::{GlobalPeerMap, PeerCenterService, PeerCenterServiceClient, PeerInfoForGlobalMap}, Digest, Error, }; -pub struct PeerCenterClient { +struct PeerCenterBase { peer_mgr: Arc, tasks: Arc>>, + lock: Arc>, } static SERVICE_ID: u32 = 5; @@ -32,7 +39,7 @@ struct PeridicJobCtx { job_ctx: T, } -impl PeerCenterClient { +impl PeerCenterBase { pub async fn init(&self) -> Result<(), Error> { self.peer_mgr.get_peer_rpc_mgr().run_service( SERVICE_ID, @@ -67,21 +74,22 @@ impl PeerCenterClient { ) -> () { let my_node_id = self.peer_mgr.my_node_id(); let peer_mgr = self.peer_mgr.clone(); + let lock = self.lock.clone(); self.tasks.lock().await.spawn( async move { let ctx = Arc::new(PeridicJobCtx { peer_mgr: peer_mgr.clone(), job_ctx, }); - tracing::warn!(?my_node_id, "before periodic job loop"); loop { let Some(center_peer) = Self::select_center_peer(&peer_mgr).await else { tracing::warn!("no center peer found, sleep 1 second"); tokio::time::sleep(Duration::from_secs(1)).await; continue; }; - tracing::warn!(?center_peer, "run periodic job"); + tracing::info!(?center_peer, "run periodic job"); let rpc_mgr = peer_mgr.get_peer_rpc_mgr(); + let _g = lock.lock().await; let ret = rpc_mgr .do_client_rpc_scoped(SERVICE_ID, center_peer, |c| async { let client = @@ -90,6 +98,7 @@ impl PeerCenterClient { job_fn(client, ctx.clone()).await }) .await; + drop(_g); let Ok(sleep_time_ms) = ret else { tracing::error!("periodic job to center server rpc failed: {:?}", ret); @@ -106,35 +115,85 @@ impl PeerCenterClient { ); } - pub async fn new(peer_mgr: Arc) -> Self { - PeerCenterClient { + pub fn new(peer_mgr: Arc) -> Self { + PeerCenterBase { peer_mgr, tasks: Arc::new(Mutex::new(JoinSet::new())), + lock: Arc::new(Mutex::new(())), } } } -struct PeerCenterInstance { +pub struct PeerCenterInstanceService { + global_peer_map: Arc>, + global_peer_map_digest: Arc>, +} + +#[tonic::async_trait] +impl crate::rpc::cli::peer_center_rpc_server::PeerCenterRpc for PeerCenterInstanceService { + async fn get_global_peer_map( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + let global_peer_map = self.global_peer_map.read().await.clone(); + Ok(tonic::Response::new(GetGlobalPeerMapResponse { + global_peer_map: global_peer_map + .map + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(), + })) + } +} + +pub struct PeerCenterInstance { peer_mgr: Arc, - client: Arc, + + client: Arc, + global_peer_map: Arc>, + global_peer_map_digest: Arc>, } impl PeerCenterInstance { - pub async fn new(peer_mgr: Arc) -> Self { - let client = Arc::new(PeerCenterClient::new(peer_mgr.clone()).await); - client.init().await.unwrap(); + pub fn new(peer_mgr: Arc) -> Self { + PeerCenterInstance { + peer_mgr: peer_mgr.clone(), + client: Arc::new(PeerCenterBase::new(peer_mgr.clone())), + global_peer_map: Arc::new(RwLock::new(GlobalPeerMap::new())), + global_peer_map_digest: Arc::new(RwLock::new(Digest::default())), + } + } - PeerCenterInstance { peer_mgr, client } + pub async fn init(&self) { + self.client.init().await.unwrap(); + self.init_get_global_info_job().await; + self.init_report_peers_job().await; } async fn init_get_global_info_job(&self) { + struct Ctx { + global_peer_map: Arc>, + global_peer_map_digest: Arc>, + } + + let ctx = Arc::new(Ctx { + global_peer_map: self.global_peer_map.clone(), + global_peer_map_digest: self.global_peer_map_digest.clone(), + }); + self.client - .init_periodic_job({}, |client, _ctx| async move { + .init_periodic_job(ctx, |client, ctx| async move { + let mut rpc_ctx = tarpc::context::current(); + rpc_ctx.deadline = SystemTime::now() + Duration::from_secs(3); + let ret = client - .get_global_peer_map(tarpc::context::current(), 0) + .get_global_peer_map( + rpc_ctx, + ctx.job_ctx.global_peer_map_digest.read().await.clone(), + ) .await?; - let Ok(global_peer_map) = ret else { + let Ok(resp) = ret else { tracing::error!( "get global info from center server got error result: {:?}", ret @@ -142,7 +201,18 @@ impl PeerCenterInstance { return Ok(1000); }; - tracing::warn!("get global info from center server: {:?}", global_peer_map); + let Some(resp) = resp else { + return Ok(1000); + }; + + tracing::info!( + "get global info from center server: {:?}, digest: {:?}", + resp.global_peer_map, + resp.digest + ); + + *ctx.job_ctx.global_peer_map.write().await = resp.global_peer_map; + *ctx.job_ctx.global_peer_map_digest.write().await = resp.digest; Ok(5000) }) @@ -196,10 +266,19 @@ impl PeerCenterInstance { }) .await; } + + pub fn get_rpc_service(&self) -> PeerCenterInstanceService { + PeerCenterInstanceService { + global_peer_map: self.global_peer_map.clone(), + global_peer_map_digest: self.global_peer_map_digest.clone(), + } + } } #[cfg(test)] mod tests { + use std::ops::Deref; + use crate::{ peer_center::server::get_global_data, peers::tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear}, @@ -213,13 +292,14 @@ mod tests { let peer_mgr_b = create_mock_peer_manager().await; let peer_mgr_c = create_mock_peer_manager().await; - let peer_center_a = PeerCenterInstance::new(peer_mgr_a.clone()).await; - let peer_center_b = PeerCenterInstance::new(peer_mgr_b.clone()).await; - let peer_center_c = PeerCenterInstance::new(peer_mgr_c.clone()).await; + let peer_center_a = PeerCenterInstance::new(peer_mgr_a.clone()); + let peer_center_b = PeerCenterInstance::new(peer_mgr_b.clone()); + let peer_center_c = PeerCenterInstance::new(peer_mgr_c.clone()); - peer_center_a.init_report_peers_job().await; - peer_center_b.init_report_peers_job().await; - peer_center_c.init_report_peers_job().await; + let peer_centers = vec![&peer_center_a, &peer_center_b, &peer_center_c]; + for pc in peer_centers.iter() { + pc.init().await; + } connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; @@ -228,7 +308,7 @@ mod tests { .await .unwrap(); - let center_peer = PeerCenterClient::select_center_peer(&peer_mgr_a) + let center_peer = PeerCenterBase::select_center_peer(&peer_mgr_a) .await .unwrap(); let center_data = get_global_data(center_peer); @@ -248,5 +328,29 @@ mod tests { } tokio::time::sleep(Duration::from_millis(100)).await; } + + let mut digest = None; + for pc in peer_centers.iter() { + let rpc_service = pc.get_rpc_service(); + let now = std::time::Instant::now(); + while now.elapsed().as_secs() < 10 { + if rpc_service.global_peer_map.read().await.map.len() == 3 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + assert_eq!(rpc_service.global_peer_map.read().await.map.len(), 3); + println!("rpc service ready, {:#?}", rpc_service.global_peer_map); + + if digest.is_none() { + digest = Some(rpc_service.global_peer_map_digest.read().await.clone()); + } else { + let v = rpc_service.global_peer_map_digest.read().await; + assert_eq!(digest.as_ref().unwrap(), v.deref()); + } + } + + let global_digest = get_global_data(center_peer).read().await.digest.clone(); + assert_eq!(digest.as_ref().unwrap(), &global_digest); } } diff --git a/easytier-core/src/peer_center/mod.rs b/easytier-core/src/peer_center/mod.rs index 32dbd377..3239fca8 100644 --- a/easytier-core/src/peer_center/mod.rs +++ b/easytier-core/src/peer_center/mod.rs @@ -5,7 +5,7 @@ // peer center is not guaranteed to be stable and can be changed when peer enter or leave. // it's used to reduce the cost to exchange infos between peers. -mod instance; +pub mod instance; mod server; mod service; diff --git a/easytier-core/src/peer_center/server.rs b/easytier-core/src/peer_center/server.rs index efd5c4ef..94872829 100644 --- a/easytier-core/src/peer_center/server.rs +++ b/easytier-core/src/peer_center/server.rs @@ -5,12 +5,12 @@ use std::{ use dashmap::DashMap; use once_cell::sync::Lazy; -use tokio::sync::RwLock; +use tokio::{sync::RwLock, task::JoinSet}; use crate::peers::PeerId; use super::{ - service::{GlobalPeerMap, PeerCenterService, PeerInfoForGlobalMap}, + service::{GetGlobalPeerMapResponse, GlobalPeerMap, PeerCenterService, PeerInfoForGlobalMap}, Digest, Error, }; @@ -18,6 +18,7 @@ pub(crate) struct PeerCenterServerGlobalData { pub global_peer_map: GlobalPeerMap, pub digest: Digest, pub update_time: std::time::Instant, + pub peer_update_time: DashMap, } impl PeerCenterServerGlobalData { @@ -26,6 +27,7 @@ impl PeerCenterServerGlobalData { global_peer_map: GlobalPeerMap::new(), digest: Digest::default(), update_time: std::time::Instant::now(), + peer_update_time: DashMap::new(), } } } @@ -47,13 +49,41 @@ pub struct PeerCenterServer { // every peer has its own server, so use per-struct dash map is ok. my_node_id: PeerId, digest_map: DashMap, + + tasks: Arc>, } impl PeerCenterServer { pub fn new(my_node_id: PeerId) -> Self { + let mut tasks = JoinSet::new(); + tasks.spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + PeerCenterServer::clean_outdated_peer(my_node_id).await; + } + }); + PeerCenterServer { my_node_id, digest_map: DashMap::new(), + + tasks: Arc::new(tasks), + } + } + + async fn clean_outdated_peer(my_node_id: PeerId) { + let data = get_global_data(my_node_id); + let mut locked_data = data.write().await; + let now = std::time::Instant::now(); + let mut to_remove = Vec::new(); + for kv in locked_data.peer_update_time.iter() { + if now.duration_since(*kv.value()).as_secs() > 10 { + to_remove.push(*kv.key()); + } + } + for peer_id in to_remove { + locked_data.global_peer_map.map.remove(&peer_id); + locked_data.peer_update_time.remove(&peer_id); } } } @@ -70,6 +100,12 @@ impl PeerCenterService for PeerCenterServer { ) -> Result<(), Error> { tracing::warn!("receive report_peers"); + let data = get_global_data(self.my_node_id); + let mut locked_data = data.write().await; + locked_data + .peer_update_time + .insert(my_peer_id, std::time::Instant::now()); + let old_digest = self.digest_map.get(&my_peer_id); // if digest match, no need to update if let Some(old_digest) = old_digest { @@ -83,8 +119,6 @@ impl PeerCenterService for PeerCenterServer { } self.digest_map.insert(my_peer_id, digest); - let data = get_global_data(self.my_node_id); - let mut locked_data = data.write().await; locked_data .global_peer_map .map @@ -93,6 +127,7 @@ impl PeerCenterService for PeerCenterServer { let mut hasher = std::collections::hash_map::DefaultHasher::new(); locked_data.global_peer_map.map.hash(&mut hasher); locked_data.digest = hasher.finish() as Digest; + locked_data.update_time = std::time::Instant::now(); Ok(()) } @@ -101,7 +136,7 @@ impl PeerCenterService for PeerCenterServer { self, _: tarpc::context::Context, digest: Digest, - ) -> Result, Error> { + ) -> Result, Error> { let data = get_global_data(self.my_node_id); if digest == data.read().await.digest { return Ok(None); @@ -109,6 +144,9 @@ impl PeerCenterService for PeerCenterServer { let data = get_global_data(self.my_node_id); let locked_data = data.read().await; - Ok(Some(locked_data.global_peer_map.clone())) + Ok(Some(GetGlobalPeerMapResponse { + global_peer_map: locked_data.global_peer_map.clone(), + digest: locked_data.digest, + })) } } diff --git a/easytier-core/src/peer_center/service.rs b/easytier-core/src/peer_center/service.rs index 58b07a8b..61c735c7 100644 --- a/easytier-core/src/peer_center/service.rs +++ b/easytier-core/src/peer_center/service.rs @@ -1,18 +1,11 @@ use std::collections::BTreeMap; -use crate::peers::PeerId; +use crate::{peers::PeerId, rpc::DirectConnectedPeerInfo}; use super::{Digest, Error}; use crate::rpc::PeerInfo; -#[derive(Debug, Clone, Hash, serde::Deserialize, serde::Serialize)] -pub enum LatencyLevel { - VeryLow, - Low, - Normal, - High, - VeryHigh, -} +pub type LatencyLevel = crate::rpc::cli::LatencyLevel; impl LatencyLevel { pub const fn from_latency_ms(lat_ms: u32) -> Self { @@ -30,33 +23,25 @@ impl LatencyLevel { } } -#[derive(Debug, Clone, Hash, serde::Deserialize, serde::Serialize)] -pub struct PeerConnInfoForGlobalMap { - to_peer_id: PeerId, - latency_level: LatencyLevel, -} - -#[derive(Debug, Clone, Hash, serde::Deserialize, serde::Serialize)] -pub struct PeerInfoForGlobalMap { - pub direct_peers: BTreeMap>, -} +pub type PeerInfoForGlobalMap = crate::rpc::cli::PeerInfoForGlobalMap; impl From> for PeerInfoForGlobalMap { fn from(peers: Vec) -> Self { let mut peer_map = BTreeMap::new(); for peer in peers { - let mut conn_info = Vec::new(); - for conn in peer.conns { - conn_info.push(PeerConnInfoForGlobalMap { - to_peer_id: conn.peer_id.parse().unwrap(), - latency_level: LatencyLevel::from_latency_ms( - conn.stats.unwrap().latency_us as u32 / 1000, - ), - }); - } + let min_lat = peer + .conns + .iter() + .map(|conn| conn.stats.as_ref().unwrap().latency_us) + .min() + .unwrap_or(0); + + let dp_info = DirectConnectedPeerInfo { + latency_level: LatencyLevel::from_latency_ms(min_lat as u32 / 1000) as i32, + }; + // sort conn info so hash result is stable - conn_info.sort_by(|a, b| a.to_peer_id.cmp(&b.to_peer_id)); - peer_map.insert(peer.peer_id.parse().unwrap(), conn_info); + peer_map.insert(peer.peer_id, dp_info); } PeerInfoForGlobalMap { direct_peers: peer_map, @@ -78,6 +63,12 @@ impl GlobalPeerMap { } } +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct GetGlobalPeerMapResponse { + pub global_peer_map: GlobalPeerMap, + pub digest: Digest, +} + #[tarpc::service] pub trait PeerCenterService { // report center server which peer is directly connected to me @@ -88,5 +79,6 @@ pub trait PeerCenterService { digest: Digest, ) -> Result<(), Error>; - async fn get_global_peer_map(digest: Digest) -> Result, Error>; + async fn get_global_peer_map(digest: Digest) + -> Result, Error>; }