diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 3d0e8093..4593234e 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -191,7 +191,7 @@ jobs: if [[ $OS =~ ^windows.*$ ]]; then SUFFIX=.exe CORE_FEATURES="--features=mimalloc" - elif [[ $TARGET =~ ^riscv64.*$ || $TARGET =~ ^loongarch64.*$ ]]; then + elif [[ $TARGET =~ ^riscv64.*$ || $TARGET =~ ^loongarch64.*$ || $TARGET =~ ^aarch64.*$ ]]; then CORE_FEATURES="--features=mimalloc" else CORE_FEATURES="--features=jemalloc" diff --git a/Cargo.lock b/Cargo.lock index 2c4d6c1a..93e1af51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -312,16 +312,6 @@ dependencies = [ "zstd-safe", ] -[[package]] -name = "async-event" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1222afd3d2bce3995035054046a279ae7aa154d70d0766cea050073f3fd7ddf" -dependencies = [ - "loom 0.5.6", - "pin-project-lite", -] - [[package]] name = "async-executor" version = "1.13.0" @@ -1939,16 +1929,6 @@ dependencies = [ "syn 2.0.87", ] -[[package]] -name = "diatomic-waker" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28025fb55a9d815acf7b0877555f437254f373036eec6ed265116c7a5c0825e9" -dependencies = [ - "loom 0.5.6", - "waker-fn", -] - [[package]] name = "digest" version = "0.10.7" @@ -2197,7 +2177,6 @@ dependencies = [ "stun_codec", "sys-locale", "tabled", - "tachyonix", "tempfile", "thiserror 1.0.63", "thunk-rs", @@ -3012,19 +2991,6 @@ dependencies = [ "x11", ] -[[package]] -name = "generator" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e" -dependencies = [ - "cc", - "libc", - "log", - "rustversion", - "windows 0.48.0", -] - [[package]] name = "generator" version = "0.8.4" @@ -4266,7 +4232,7 @@ dependencies = [ [[package]] name = "kcp-sys" version = "0.1.0" -source = "git+https://github.com/EasyTier/kcp-sys?rev=0f0a0558391ba391c089806c23f369651f6c9eeb#0f0a0558391ba391c089806c23f369651f6c9eeb" +source = "git+https://github.com/EasyTier/kcp-sys?rev=71eff18c573a4a71bf99c7fabc6a8b9f211c84c1#71eff18c573a4a71bf99c7fabc6a8b9f211c84c1" dependencies = [ "anyhow", "auto_impl", @@ -4484,19 +4450,6 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" -[[package]] -name = "loom" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" -dependencies = [ - "cfg-if", - "generator 0.7.5", - "scoped-tls", - "tracing", - "tracing-subscriber", -] - [[package]] name = "loom" version = "0.7.2" @@ -4504,7 +4457,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" dependencies = [ "cfg-if", - "generator 0.8.4", + "generator", "scoped-tls", "tracing", "tracing-subscriber", @@ -4747,7 +4700,7 @@ dependencies = [ "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", - "loom 0.7.2", + "loom", "parking_lot", "portable-atomic", "rustc_version", @@ -8413,20 +8366,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "tachyonix" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1924ef47bc3b427ea2a0b55ba97d0e9116e9103483ecd75a43f47a66443527c5" -dependencies = [ - "async-event", - "crossbeam-utils", - "diatomic-waker", - "futures-core", - "loom 0.5.6", - "pin-project-lite", -] - [[package]] name = "tagptr" version = "0.2.0" @@ -9935,12 +9874,6 @@ dependencies = [ "libc", ] -[[package]] -name = "waker-fn" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" - [[package]] name = "walkdir" version = "2.5.0" @@ -10386,15 +10319,6 @@ dependencies = [ "windows-version", ] -[[package]] -name = "windows" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" -dependencies = [ - "windows-targets 0.48.5", -] - [[package]] name = "windows" version = "0.52.0" diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 72a1a41b..4fd23a9a 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -63,7 +63,6 @@ timedmap = "=1.0.1" zerocopy = { version = "0.7.32", features = ["derive", "simd"] } bytes = "1.5.0" pin-project-lite = "0.2.13" -tachyonix = "0.3.0" quinn = { version = "0.11.8", optional = true, features = ["ring"] } @@ -193,7 +192,7 @@ service-manager = { git = "https://github.com/chipsenkbeil/service-manager-rs.gi zstd = { version = "0.13" } -kcp-sys = { git = "https://github.com/EasyTier/kcp-sys", rev = "0f0a0558391ba391c089806c23f369651f6c9eeb" } +kcp-sys = { git = "https://github.com/EasyTier/kcp-sys", rev = "71eff18c573a4a71bf99c7fabc6a8b9f211c84c1" } prost-reflect = { version = "0.14.5", default-features = false, features = [ "derive", @@ -258,6 +257,7 @@ jemallocator = { package = "tikv-jemallocator", version = "0.6.0", optional = tr "unprefixed_malloc_on_supported_platforms" ] } jemalloc-ctl = { package = "tikv-jemalloc-ctl", version = "0.6.0", optional = true, features = [ + "use_std" ] } [target.'cfg(not(target_os = "macos"))'.dependencies] diff --git a/easytier/src/common/acl_processor.rs b/easytier/src/common/acl_processor.rs index b8ffaabc..51743a3f 100644 --- a/easytier/src/common/acl_processor.rs +++ b/easytier/src/common/acl_processor.rs @@ -3,7 +3,7 @@ use std::{ net::{IpAddr, SocketAddr}, str::FromStr as _, sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use crate::common::{config::ConfigLoader, global_ctx::ArcGlobalCtx, token_bucket::TokenBucket}; @@ -28,6 +28,12 @@ impl RateLimitKey { } } +/// Value wrapper for rate limiters with last update timestamp +pub struct RateLimitValue { + pub token_bucket: Arc, + pub last_update: Instant, +} + // Performance-optimized rule identifier to avoid string allocations #[derive(Debug, Clone, PartialEq, Eq)] pub enum RuleId { @@ -104,7 +110,7 @@ impl AclCacheKey { pub struct AclCacheEntry { pub action: Action, pub matched_rule: RuleId, - pub last_access: u64, + pub last_access: std::time::Instant, // New fields to track rule characteristics for proper cache behavior pub conn_track_key: Option, pub rate_limit_keys: Vec, @@ -188,7 +194,7 @@ impl AclLogContext { pub type SharedState = ( Arc>, - Arc>>, + Arc>, Arc>, ); @@ -209,7 +215,7 @@ pub struct AclProcessor { conn_track: Arc>, // Rate limiting buckets per rule using TokenBucket with optimized keys - rate_limiters: Arc>>, + rate_limiters: Arc>, // Rule lookup cache with LRU cleanup rule_cache: Arc>, @@ -234,7 +240,7 @@ impl AclProcessor { pub fn new_with_shared_state( acl_config: Acl, conn_track: Option>>, - rate_limiters: Option>>>, + rate_limiters: Option>>, stats: Option>>, ) -> Self { let (inbound_rules, outbound_rules, forward_rules) = Self::build_rules(&acl_config); @@ -261,7 +267,7 @@ impl AclProcessor { conn_track: conn_track.unwrap_or_else(|| Arc::new(DashMap::new())), rate_limiters: rate_limiters.unwrap_or_else(|| Arc::new(DashMap::new())), rule_cache: Arc::new(DashMap::new()), // Always start with fresh cache - cache_max_size: 10000, // Limit cache to 10k entries + cache_max_size: 1024, // Limit cache to 1k entries cache_cleanup_interval: Duration::from_secs(20), // Cleanup every 5 minutes stats: stats.unwrap_or_else(|| Arc::new(DashMap::new())), tasks, @@ -362,6 +368,7 @@ impl AclProcessor { /// Start periodic cache cleanup task fn start_cache_cleanup_task(&mut self) { + let rate_limiters = self.rate_limiters.clone(); let rule_cache = self.rule_cache.clone(); let cache_max_size = self.cache_max_size; let cleanup_interval = self.cache_cleanup_interval; @@ -371,6 +378,10 @@ impl AclProcessor { loop { interval.tick().await; Self::cleanup_cache(&rule_cache, cache_max_size); + rule_cache.shrink_to_fit(); + + rate_limiters.retain(|_, v| v.last_update.elapsed() < cleanup_interval); + rate_limiters.shrink_to_fit(); } }); @@ -380,19 +391,26 @@ impl AclProcessor { loop { interval.tick().await; Self::cleanup_expired_connections(conn_track.clone(), 60); + conn_track.shrink_to_fit(); } }); } /// Clean up cache using LRU strategy fn cleanup_cache(cache: &DashMap, max_size: usize) { + // remove cache not be used in last 15 second + let expired_timepoint = Instant::now() + .checked_sub(Duration::from_secs(15)) + .unwrap_or(Instant::now()); + cache.retain(|_, entry| entry.last_access > expired_timepoint); + let current_size = cache.len(); if current_size <= max_size { return; } // Remove oldest entries (LRU cleanup) - let mut entries: Vec<(AclCacheKey, u64)> = cache + let mut entries: Vec<(AclCacheKey, std::time::Instant)> = cache .iter() .map(|entry| (entry.key().clone(), entry.value().last_access)) .collect(); @@ -472,10 +490,7 @@ impl AclProcessor { // If cache hit and can skip checks, return cached result if let Some(mut cached) = self.rule_cache.get_mut(&cache_key) { // Update last access time for LRU - cached.last_access = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); + cached.last_access = Instant::now(); self.increment_stat(AclStatKey::CacheHits); return self.process_packet_with_cache_entry(packet_info, &cached); @@ -499,10 +514,7 @@ impl AclProcessor { let mut cache_entry = AclCacheEntry { action: Action::Allow, matched_rule: RuleId::Default, - last_access: SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(), + last_access: Instant::now(), conn_track_key: None, rate_limit_keys: vec![], chain_type, @@ -774,19 +786,26 @@ impl AclProcessor { return true; // No rate limiting } - let bucket = self + let mut rate_limiter = self .rate_limiters .entry(rule_key.clone()) .or_insert_with(|| { if !allow_create { panic!("Rate limit bucket not found"); } - TokenBucket::new(burst as u64, rate as u64, Duration::from_millis(10)) - }) - .clone(); + RateLimitValue { + token_bucket: TokenBucket::new( + burst as u64, + rate as u64, + Duration::from_millis(10), + ), + last_update: Instant::now(), + } + }); // Try to consume 1 token (1 packet) - bucket.try_consume(1) + rate_limiter.last_update = Instant::now(); + rate_limiter.token_bucket.try_consume(1) } /// Convert proto Rule to FastLookupRule diff --git a/easytier/src/common/mod.rs b/easytier/src/common/mod.rs index dc2b9c47..bb85b247 100644 --- a/easytier/src/common/mod.rs +++ b/easytier/src/common/mod.rs @@ -156,6 +156,16 @@ pub fn get_machine_id() -> uuid::Uuid { gen_mid } +pub fn shrink_dashmap( + map: &dashmap::DashMap, + threshold: Option, +) { + let threshold = threshold.unwrap_or(16); + if map.capacity() - map.len() > threshold { + map.shrink_to_fit(); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/easytier/src/common/stats_manager.rs b/easytier/src/common/stats_manager.rs index 39cc6737..9ce7ab58 100644 --- a/easytier/src/common/stats_manager.rs +++ b/easytier/src/common/stats_manager.rs @@ -527,6 +527,7 @@ impl StatsManager { counters.retain(|_, metric_data: &mut Arc| unsafe { metric_data.get_last_updated() > cutoff_time }); + counters.shrink_to_fit(); } }); diff --git a/easytier/src/common/token_bucket.rs b/easytier/src/common/token_bucket.rs index b3b2f503..ef81b8de 100644 --- a/easytier/src/common/token_bucket.rs +++ b/easytier/src/common/token_bucket.rs @@ -178,6 +178,7 @@ impl TokenBucketManager { loop { // Retain only buckets that are still in use buckets_clone.retain(|_, bucket| Arc::::strong_count(bucket) > 1); + buckets_clone.shrink_to_fit(); // Sleep for a while before next retention check tokio::time::sleep(Duration::from_secs(5)).await; } diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 50e42be5..d6ce205f 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -7,7 +7,7 @@ use std::{ net::{IpAddr, SocketAddr}, path::PathBuf, process::ExitCode, - sync::Arc, + sync::{atomic::AtomicBool, Arc}, }; use anyhow::Context; @@ -27,6 +27,7 @@ use easytier::{ stun::MockStunInfoCollector, }, connector::create_connector_by_url, + defer, instance_manager::NetworkInstanceManager, launcher::{add_proxy_network_to_config, ConfigSource}, proto::common::{CompressionAlgoPb, NatType}, @@ -57,7 +58,12 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; #[cfg(feature = "jemalloc-prof")] #[allow(non_upper_case_globals)] #[export_name = "malloc_conf"] -pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0"; +pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19,retain:false\0"; + +#[cfg(not(feature = "jemalloc-prof"))] +#[allow(non_upper_case_globals)] +#[export_name = "malloc_conf"] +pub static malloc_conf: &[u8] = b"retain:false\0"; fn set_prof_active(_active: bool) { #[cfg(feature = "jemalloc-prof")] @@ -68,16 +74,21 @@ fn set_prof_active(_active: bool) { } } +fn get_dump_profile_path(cur_allocated: usize, suffix: &str) -> String { + format!( + "profile-{}-{}.{}", + cur_allocated, + chrono::Local::now().format("%Y-%m-%d-%H-%M-%S"), + suffix + ) +} + fn dump_profile(_cur_allocated: usize) { #[cfg(feature = "jemalloc-prof")] { const PROF_DUMP: &[u8] = b"prof.dump\0"; static mut PROF_DUMP_FILE_NAME: [u8; 128] = [0; 128]; - let file_name_str = format!( - "profile-{}-{}.out", - _cur_allocated, - chrono::Local::now().format("%Y-%m-%d-%H-%M-%S") - ); + let file_name_str = get_dump_profile_path(_cur_allocated, "out"); // copy file name to PROF_DUMP let file_name = file_name_str.as_bytes(); let len = file_name.len(); @@ -1127,6 +1138,7 @@ fn win_service_main(arg: Vec) { } async fn run_main(cli: Cli) -> anyhow::Result<()> { + defer!(dump_profile(0);); init_logger(&cli.logging_options, true)?; let manager = Arc::new(NetworkInstanceManager::new()); @@ -1255,7 +1267,7 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> { Ok(()) } -fn memory_monitor() { +fn memory_monitor(_force_dump: Arc) { #[cfg(feature = "jemalloc-prof")] { let mut last_peak_size = 0; @@ -1273,9 +1285,10 @@ fn memory_monitor() { ); // dump every 75MB - if last_peak_size > 0 + if (last_peak_size > 0 && new_heap_size > last_peak_size - && new_heap_size - last_peak_size > 75 * 1024 * 1024 + && new_heap_size - last_peak_size > 10 * 1024 * 1024) + || _force_dump.load(std::sync::atomic::Ordering::Relaxed) { println!( "heap size increased: {} bytes, time: {}", @@ -1284,6 +1297,14 @@ fn memory_monitor() { ); dump_profile(new_heap_size); last_peak_size = new_heap_size; + if _force_dump.load(std::sync::atomic::Ordering::Relaxed) { + // also dump whole jemalloc stats + use jemalloc_ctl::stats_print::stats_print; + let tmp_file = get_dump_profile_path(new_heap_size, "stats"); + let mut file = std::fs::File::create(tmp_file).unwrap(); + let _ = stats_print(&mut file, Default::default()); + _force_dump.store(false, std::sync::atomic::Ordering::Relaxed); + } } if last_peak_size == 0 { @@ -1318,7 +1339,20 @@ async fn main() -> ExitCode { }; set_prof_active(true); - let _monitor = std::thread::spawn(memory_monitor); + // register a signal handler to set force dump when signal usr1 is received + let force_dump = Arc::new(AtomicBool::new(false)); + #[cfg(all(feature = "jemalloc-prof", not(target_os = "windows")))] + { + let force_dump_clone = force_dump.clone(); + let mut sigusr1 = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::user_defined1()).unwrap(); + tokio::task::spawn(async move { + while sigusr1.recv().await.is_some() { + force_dump_clone.store(true, std::sync::atomic::Ordering::Relaxed); + } + }); + } + let _monitor = std::thread::spawn(move || memory_monitor(force_dump)); let cli = parse_cli(); @@ -1346,8 +1380,6 @@ async fn main() -> ExitCode { } println!("Stopping easytier..."); - - dump_profile(0); set_prof_active(false); ExitCode::from(ret_code) diff --git a/easytier/src/gateway/icmp_proxy.rs b/easytier/src/gateway/icmp_proxy.rs index a9738eec..8be16ebe 100644 --- a/easytier/src/gateway/icmp_proxy.rs +++ b/easytier/src/gateway/icmp_proxy.rs @@ -255,6 +255,7 @@ impl IcmpProxy { loop { tokio::time::sleep(std::time::Duration::from_secs(1)).await; nat_table.retain(|_, v| v.start_time.elapsed().as_secs() < 20); + nat_table.shrink_to_fit(); } } .instrument(tracing::info_span!("icmp proxy nat table cleaner")), diff --git a/easytier/src/gateway/ip_reassembler.rs b/easytier/src/gateway/ip_reassembler.rs index 66f4b95a..be579d7c 100644 --- a/easytier/src/gateway/ip_reassembler.rs +++ b/easytier/src/gateway/ip_reassembler.rs @@ -187,6 +187,7 @@ impl IpReassembler { pub fn remove_expired_packets(&self) { let timeout = self.timeout; self.packets.retain(|_, v| v.timestamp.elapsed() <= timeout); + self.packets.shrink_to_fit(); } } diff --git a/easytier/src/gateway/kcp_proxy.rs b/easytier/src/gateway/kcp_proxy.rs index adaa7f6e..045e9c28 100644 --- a/easytier/src/gateway/kcp_proxy.rs +++ b/easytier/src/gateway/kcp_proxy.rs @@ -491,6 +491,9 @@ impl KcpProxyDst { ); crate::defer! { proxy_entries.remove(&conn_id); + if proxy_entries.capacity() - proxy_entries.len() > 16 { + proxy_entries.shrink_to_fit(); + } } let src_ip = src_socket.ip(); diff --git a/easytier/src/gateway/quic_proxy.rs b/easytier/src/gateway/quic_proxy.rs index 1e693f10..d4edc402 100644 --- a/easytier/src/gateway/quic_proxy.rs +++ b/easytier/src/gateway/quic_proxy.rs @@ -333,6 +333,9 @@ impl QUICProxyDst { let remote_addr = conn.remote_address(); defer!( proxy_entries.remove(&remote_addr); + if proxy_entries.capacity() - proxy_entries.len() > 16 { + proxy_entries.shrink_to_fit(); + } ); let ret = timeout( std::time::Duration::from_secs(10), diff --git a/easytier/src/gateway/socks5.rs b/easytier/src/gateway/socks5.rs index 8112e28e..13c1876e 100644 --- a/easytier/src/gateway/socks5.rs +++ b/easytier/src/gateway/socks5.rs @@ -958,6 +958,7 @@ impl Socks5Server { let udp_client_map = self.udp_client_map.clone(); let udp_forward_task = self.udp_forward_task.clone(); let entries = self.entries.clone(); + let cancel_tokens = self.cancel_tokens.clone(); self.tasks.lock().unwrap().spawn(async move { loop { tokio::time::sleep(Duration::from_secs(30)).await; @@ -972,6 +973,11 @@ impl Socks5Server { } _ => true, }); + + udp_client_map.shrink_to_fit(); + udp_forward_task.shrink_to_fit(); + entries.shrink_to_fit(); + cancel_tokens.shrink_to_fit(); } }); diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index c6099632..819d5bd9 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -513,6 +513,7 @@ impl TcpProxy { true } }); + syn_map.shrink_to_fit(); tokio::time::sleep(Duration::from_secs(10)).await; } }; @@ -706,6 +707,12 @@ impl TcpProxy { ) { conn_map.remove(&nat_entry.id); addr_conn_map.remove_if(&nat_entry.src, |_, entry| entry.id == nat_entry.id); + if conn_map.capacity() - conn_map.len() > 16 { + conn_map.shrink_to_fit(); + } + if addr_conn_map.capacity() - addr_conn_map.len() > 16 { + addr_conn_map.shrink_to_fit(); + } } async fn connect_to_nat_dst( diff --git a/easytier/src/gateway/udp_proxy.rs b/easytier/src/gateway/udp_proxy.rs index c7109050..aecaaae4 100644 --- a/easytier/src/gateway/udp_proxy.rs +++ b/easytier/src/gateway/udp_proxy.rs @@ -14,7 +14,7 @@ use pnet::packet::{ udp::{self, MutableUdpPacket}, Packet, }; -use tachyonix::{channel, Receiver, Sender, TrySendError}; +use tokio::sync::mpsc::{channel, error::TrySendError, Receiver, Sender}; use tokio::{ net::UdpSocket, sync::Mutex, @@ -144,7 +144,7 @@ impl UdpNatEntry { real_ipv4: Ipv4Addr, mapped_ipv4: Ipv4Addr, ) { - let (s, mut r) = tachyonix::channel(128); + let (s, mut r) = channel(128); let self_clone = self.clone(); let recv_task = ScopedTask::from(tokio::spawn(async move { @@ -190,7 +190,7 @@ impl UdpNatEntry { let self_clone = self.clone(); let send_task = ScopedTask::from(tokio::spawn(async move { let mut ip_id = 1; - while let Ok((mut packet, len, src_socket)) = r.recv().await { + while let Some((mut packet, len, src_socket)) = r.recv().await { let SocketAddr::V4(mut src_v4) = src_socket else { continue; }; @@ -422,6 +422,7 @@ impl UdpProxy { true } }); + nat_table.shrink_to_fit(); } }); @@ -438,7 +439,7 @@ impl UdpProxy { let peer_manager = self.peer_manager.clone(); let is_latency_first = self.global_ctx.get_flags().latency_first; self.tasks.lock().await.spawn(async move { - while let Ok(mut msg) = receiver.recv().await { + while let Some(mut msg) = receiver.recv().await { let hdr = msg.mut_peer_manager_header().unwrap(); hdr.set_latency_first(is_latency_first); let to_peer_id = hdr.to_peer_id.into(); diff --git a/easytier/src/instance/dns_server/server_instance.rs b/easytier/src/instance/dns_server/server_instance.rs index f7789df3..dc83b0fc 100644 --- a/easytier/src/instance/dns_server/server_instance.rs +++ b/easytier/src/instance/dns_server/server_instance.rs @@ -442,6 +442,7 @@ impl RpcServerHook for MagicDnsServerInstanceData { item.value_mut().remove(&remote_addr); } self.route_infos.retain(|_, v| !v.is_empty()); + self.route_infos.shrink_to_fit(); self.update().await; } } diff --git a/easytier/src/instance_manager.rs b/easytier/src/instance_manager.rs index d2dc7a70..dd454c63 100644 --- a/easytier/src/instance_manager.rs +++ b/easytier/src/instance_manager.rs @@ -84,6 +84,7 @@ impl NetworkInstanceManager { } stop_check_notifier.notify_one(); instance_stop_tasks.remove(&instance_id); + instance_stop_tasks.shrink_to_fit(); })), ); Ok(()) @@ -112,6 +113,7 @@ impl NetworkInstanceManager { instance_ids: Vec, ) -> Result, anyhow::Error> { self.instance_map.retain(|k, _| instance_ids.contains(k)); + self.instance_map.shrink_to_fit(); Ok(self.list_network_instance_ids()) } @@ -120,6 +122,7 @@ impl NetworkInstanceManager { instance_ids: Vec, ) -> Result, anyhow::Error> { self.instance_map.retain(|k, _| !instance_ids.contains(k)); + self.instance_map.shrink_to_fit(); Ok(self.list_network_instance_ids()) } diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index b3268fec..f2c84016 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -132,12 +132,6 @@ impl EasyTierLauncher { let mut instance = Instance::new(cfg); let mut tasks = JoinSet::new(); - api_service - .write() - .unwrap() - .replace(Arc::new(instance.get_api_rpc_service())); - drop(api_service); - // Subscribe to global context events let global_ctx = instance.get_global_ctx(); let data_c = data.clone(); @@ -163,6 +157,13 @@ impl EasyTierLauncher { Self::run_routine_for_android(&instance, &data, &mut tasks).await; instance.run().await?; + + api_service + .write() + .unwrap() + .replace(Arc::new(instance.get_api_rpc_service())); + drop(api_service); + stop_signal.notified().await; tasks.abort_all(); diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 150ec3ad..01edc4e6 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -24,7 +24,7 @@ use crate::{ config::{ConfigLoader, TomlConfigLoader}, error::Error, global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent, NetworkIdentity}, - join_joinset_background, + join_joinset_background, shrink_dashmap, stats_manager::{LabelSet, LabelType, MetricName, StatsManager}, token_bucket::TokenBucket, PeerId, @@ -448,6 +448,9 @@ impl ForeignNetworkManagerData { { self.network_peer_last_update.remove(network_name); } + shrink_dashmap(&self.peer_network_map, None); + shrink_dashmap(&self.network_peer_maps, None); + shrink_dashmap(&self.network_peer_last_update, None); } async fn clear_no_conn_peer(&self, network_name: &String) { diff --git a/easytier/src/peers/peer.rs b/easytier/src/peers/peer.rs index 09ccd8d9..e1cb1f1d 100644 --- a/easytier/src/peers/peer.rs +++ b/easytier/src/peers/peer.rs @@ -11,7 +11,6 @@ use super::{ peer_conn::{PeerConn, PeerConnId}, PacketRecvChan, }; -use crate::{common::scoped_task::ScopedTask, proto::api::instance::PeerConnInfo}; use crate::{ common::{ error::Error, @@ -20,6 +19,10 @@ use crate::{ }, tunnel::packet_def::ZCPacket, }; +use crate::{ + common::{scoped_task::ScopedTask, shrink_dashmap}, + proto::api::instance::PeerConnInfo, +}; type ArcPeerConn = Arc; type ConnMap = Arc>; @@ -72,6 +75,7 @@ impl Peer { global_ctx_copy.issue_event(GlobalCtxEvent::PeerConnRemoved( conn.get_conn_info(), )); + shrink_dashmap(&conns_copy, Some(4)); } } diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 8e8f5544..c67c7a7f 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -24,6 +24,7 @@ use crate::{ constants::EASYTIER_VERSION, error::Error, global_ctx::{ArcGlobalCtx, NetworkIdentity}, + shrink_dashmap, stats_manager::{CounterHandle, LabelSet, LabelType, MetricName}, stun::StunInfoCollectorTrait, PeerId, @@ -514,6 +515,7 @@ impl PeerManager { } self.reserved_my_peer_id_map.remove(&peer_network_name); + shrink_dashmap(&self.reserved_my_peer_id_map, None); tracing::info!("add tunnel as server done"); Ok(()) diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index 7f49d46c..3b2af1d6 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -11,7 +11,7 @@ use crate::{ common::{ error::Error, global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, - PeerId, + shrink_dashmap, PeerId, }, proto::{ api::instance::{self, PeerConnInfo}, @@ -84,6 +84,7 @@ impl PeerMap { if let Some(alive_conns) = alive_conns_weak.upgrade() { alive_conns.remove(&(conn_info.peer_id, conn_id)).unwrap(); alive_conn_count = alive_conns.len(); + shrink_dashmap(&alive_conns, None); } tracing::debug!( ?conn_id, @@ -295,6 +296,8 @@ impl PeerMap { pub async fn close_peer(&self, peer_id: PeerId) -> Result<(), TunnelError> { let remove_ret = self.peer_map.remove(&peer_id); + shrink_dashmap(&self.peer_map, None); + self.global_ctx .issue_event(GlobalCtxEvent::PeerRemoved(peer_id)); tracing::info!( diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 155dfa66..75da7b00 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -31,7 +31,7 @@ use tokio::{ use crate::{ common::{ config::NetworkIdentity, constants::EASYTIER_VERSION, global_ctx::ArcGlobalCtx, - stun::StunInfoCollectorTrait, PeerId, + shrink_dashmap, stun::StunInfoCollectorTrait, PeerId, }, peers::route_trait::{Route, RouteInterfaceBox}, proto::{ @@ -335,6 +335,14 @@ impl SyncedRouteInfo { self.foreign_network.retain(|k, _| k.peer_id != peer_id); self.group_trust_map.remove(&peer_id); self.group_trust_map_cache.remove(&peer_id); + + shrink_dashmap(&self.peer_infos, None); + shrink_dashmap(&self.raw_peer_infos, None); + shrink_dashmap(&self.conn_map, None); + shrink_dashmap(&self.foreign_network, None); + shrink_dashmap(&self.group_trust_map, None); + shrink_dashmap(&self.group_trust_map_cache, None); + self.version.inc(); } @@ -863,6 +871,12 @@ impl RouteTable { // remove cidr map for peers we cannot reach. self.next_hop_map.contains_key(&v.peer_id) }); + + shrink_dashmap(&self.peer_infos, None); + shrink_dashmap(&self.next_hop_map, None); + shrink_dashmap(&self.ipv4_peer_id_map, None); + shrink_dashmap(&self.ipv6_peer_id_map, None); + shrink_dashmap(&self.cidr_peer_id_map, None); } fn gen_next_hop_map_with_least_hop( @@ -1090,14 +1104,47 @@ impl Debug for SessionTask { } } +#[derive(Debug)] +struct VersionAndTouchTime { + version: AtomicVersion, + touch_time: AtomicCell, +} + +impl Default for VersionAndTouchTime { + fn default() -> Self { + VersionAndTouchTime { + version: AtomicVersion::new(), + touch_time: AtomicCell::new(Instant::now()), + } + } +} + +impl VersionAndTouchTime { + fn touch(&self) { + self.touch_time.store(Instant::now()); + } + + fn get(&self) -> Version { + self.version.get() + } + + fn set_if_larger(&self, version: Version) { + self.version.set_if_larger(version); + } + + fn is_expired(&self) -> bool { + self.touch_time.load().elapsed() > Duration::from_secs(60) + } +} + // if we need to sync route info with one peer, we create a SyncRouteSession with that peer. #[derive(Debug)] struct SyncRouteSession { my_peer_id: PeerId, dst_peer_id: PeerId, - dst_saved_peer_info_versions: DashMap, - dst_saved_conn_bitmap_version: DashMap, - dst_saved_foreign_network_versions: DashMap, + dst_saved_peer_info_versions: DashMap, + dst_saved_conn_bitmap_version: DashMap, + dst_saved_foreign_network_versions: DashMap, my_session_id: AtomicSessionId, dst_session_id: AtomicSessionId, @@ -1145,33 +1192,89 @@ impl SyncRouteSession { } self.dst_saved_peer_info_versions .get(&peer_id) - .map(|v| v.get() >= version) + .map(|v| { + v.touch(); + v.get() >= version + }) .unwrap_or(false) } - fn update_dst_saved_peer_info_version(&self, infos: &[RoutePeerInfo]) { + fn check_saved_conn_version_update_to_date(&self, peer_id: PeerId, version: Version) -> bool { + if version == 0 || peer_id == self.dst_peer_id { + // never send version 0 conn bitmap to dst peer. + return true; + } + self.dst_saved_conn_bitmap_version + .get(&peer_id) + .map(|v| { + v.touch(); + v.get() >= version + }) + .unwrap_or(false) + } + + fn check_saved_foreign_network_version_update_to_date( + &self, + foreign_network_key: &ForeignNetworkRouteInfoKey, + version: Version, + ) -> bool { + if version == 0 || foreign_network_key.peer_id == self.dst_peer_id { + // never send version 0 foreign network to dst peer. + return true; + } + + self.dst_saved_foreign_network_versions + .get(foreign_network_key) + .map(|x| { + x.touch(); + x.get() >= version + }) + .unwrap_or(false) + } + + fn update_dst_saved_peer_info_version(&self, infos: &[RoutePeerInfo], dst_peer_id: PeerId) { for info in infos.iter() { + if info.peer_id == dst_peer_id { + // we never send dst peer info to dst peer, so no need to store it. + continue; + } + self.dst_saved_peer_info_versions .entry(info.peer_id) - .or_insert_with(AtomicVersion::new) + .or_default() .set_if_larger(info.version); } } - fn update_dst_saved_conn_bitmap_version(&self, conn_bitmap: &RouteConnBitmap) { + fn update_dst_saved_conn_bitmap_version( + &self, + conn_bitmap: &RouteConnBitmap, + dst_peer_id: PeerId, + ) { for (peer_id, version) in conn_bitmap.peer_ids.iter() { + if *peer_id == dst_peer_id { + continue; + } + self.dst_saved_conn_bitmap_version .entry(*peer_id) - .or_insert_with(AtomicVersion::new) + .or_default() .set_if_larger(*version); } } - fn update_dst_saved_foreign_network_version(&self, foreign_network: &RouteForeignNetworkInfos) { + fn update_dst_saved_foreign_network_version( + &self, + foreign_network: &RouteForeignNetworkInfos, + dst_peer_id: PeerId, + ) { for item in foreign_network.infos.iter() { + if item.key.as_ref().unwrap().peer_id == dst_peer_id { + continue; + } self.dst_saved_foreign_network_versions .entry(item.key.clone().unwrap()) - .or_insert_with(AtomicVersion::new) + .or_default() .set_if_larger(item.value.as_ref().unwrap().version); } } @@ -1190,6 +1293,20 @@ impl SyncRouteSession { } } + fn clean_dst_saved_map(&self) { + self.dst_saved_peer_info_versions + .retain(|_, v| !v.is_expired()); + self.dst_saved_peer_info_versions.shrink_to_fit(); + + self.dst_saved_conn_bitmap_version + .retain(|_, v| !v.is_expired()); + self.dst_saved_conn_bitmap_version.shrink_to_fit(); + + self.dst_saved_foreign_network_versions + .retain(|_, v| !v.is_expired()); + self.dst_saved_foreign_network_versions.shrink_to_fit(); + } + fn short_debug_string(&self) -> String { format!( "session_dst_peer: {:?}, my_session_id: {:?}, dst_session_id: {:?}, we_are_initiator: {:?}, dst_is_initiator: {:?}, rpc_tx_count: {:?}, rpc_rx_count: {:?}, task: {:?}", @@ -1305,6 +1422,7 @@ impl PeerRouteServiceImpl { fn remove_session(&self, dst_peer_id: PeerId) { self.sessions.remove(&dst_peer_id); + shrink_dashmap(&self.sessions, None); } fn list_session_peers(&self) -> Vec { @@ -1509,14 +1627,14 @@ impl PeerRouteServiceImpl { fn build_route_info(&self, session: &SyncRouteSession) -> Option> { let mut route_infos = Vec::new(); for item in self.synced_route_info.peer_infos.iter() { - if session - .check_saved_peer_info_update_to_date(item.value().peer_id, item.value().version) - { + // do not send unreachable peer info to dst peer. + if !self.route_table.peer_reachable(*item.key()) { continue; } - // do not send unreachable peer info to dst peer. - if !self.route_table.peer_reachable(*item.key()) { + if session + .check_saved_peer_info_update_to_date(item.value().peer_id, item.value().version) + { continue; } @@ -1533,14 +1651,10 @@ impl PeerRouteServiceImpl { fn build_conn_bitmap(&self, session: &SyncRouteSession) -> Option { let mut need_update = false; for (peer_id, local_version) in self.cached_local_conn_map.lock().unwrap().peer_ids.iter() { - let peer_version = session - .dst_saved_conn_bitmap_version - .get(peer_id) - .map(|item| item.get()); - if peer_version.is_none() || peer_version.unwrap() < *local_version { - need_update = true; - break; + if session.check_saved_conn_version_update_to_date(*peer_id, *local_version) { + continue; } + need_update = true; } if !need_update { @@ -1556,15 +1670,10 @@ impl PeerRouteServiceImpl { ) -> Option { let mut foreign_networks = RouteForeignNetworkInfos::default(); for item in self.synced_route_info.foreign_network.iter() { - if item.key().peer_id == session.dst_peer_id { - continue; - } - if session - .dst_saved_foreign_network_versions - .get(item.key()) - .map(|x| x.get() >= item.value().version) - .unwrap_or(false) - { + if session.check_saved_foreign_network_version_update_to_date( + item.key(), + item.value().version, + ) { continue; } @@ -1780,15 +1889,15 @@ impl PeerRouteServiceImpl { session.update_dst_session_id(resp.session_id); if let Some(peer_infos) = &peer_infos { - session.update_dst_saved_peer_info_version(peer_infos); + session.update_dst_saved_peer_info_version(peer_infos, dst_peer_id); } if let Some(conn_bitmap) = &conn_bitmap { - session.update_dst_saved_conn_bitmap_version(conn_bitmap); + session.update_dst_saved_conn_bitmap_version(conn_bitmap, dst_peer_id); } if let Some(foreign_network) = &foreign_network { - session.update_dst_saved_foreign_network_version(foreign_network); + session.update_dst_saved_foreign_network_version(foreign_network, dst_peer_id); } } } @@ -1816,6 +1925,14 @@ impl PeerRouteServiceImpl { .map(|groups| groups.value().clone()) .unwrap_or_default() } + + fn clean_dst_saved_map(&self, dst_peer_id: PeerId) { + let Some(session) = self.get_session(dst_peer_id) else { + return; + }; + + session.clean_dst_saved_map(); + } } impl Drop for PeerRouteServiceImpl { @@ -1919,6 +2036,7 @@ impl RouteSessionManager { mut sync_now: tokio::sync::broadcast::Receiver<()>, ) { let mut last_sync = Instant::now(); + let mut last_clean_dst_saved_map = Instant::now(); loop { loop { let Some(service_impl) = service_impl.clone().upgrade() else { @@ -1941,6 +2059,10 @@ impl RouteSessionManager { .sync_route_with_peer(dst_peer_id, peer_rpc.clone(), sync_as_initiator) .await { + if last_clean_dst_saved_map.elapsed().as_secs() > 60 { + last_clean_dst_saved_map = Instant::now(); + service_impl.clean_dst_saved_map(dst_peer_id); + } break; } @@ -2159,13 +2281,13 @@ impl RouteSessionManager { peer_infos, &service_impl.global_ctx.get_acl_group_declarations(), ); - session.update_dst_saved_peer_info_version(peer_infos); + session.update_dst_saved_peer_info_version(peer_infos, from_peer_id); need_update_route_table = true; } if let Some(conn_bitmap) = &conn_bitmap { service_impl.synced_route_info.update_conn_map(conn_bitmap); - session.update_dst_saved_conn_bitmap_version(conn_bitmap); + session.update_dst_saved_conn_bitmap_version(conn_bitmap, from_peer_id); need_update_route_table = true; } @@ -2177,7 +2299,7 @@ impl RouteSessionManager { service_impl .synced_route_info .update_foreign_network(foreign_network); - session.update_dst_saved_foreign_network_version(foreign_network); + session.update_dst_saved_foreign_network_version(foreign_network, from_peer_id); } if need_update_route_table || foreign_network.is_some() { @@ -2643,8 +2765,7 @@ mod tests { .get(&p_a.my_peer_id()) .unwrap() .value() - .0 - .load(Ordering::Relaxed) + .get() ); assert_eq!((1, 1), get_rpc_counter(&r_a, p_b.my_peer_id())); diff --git a/easytier/src/peers/peer_task.rs b/easytier/src/peers/peer_task.rs index aa172b10..d5250ae3 100644 --- a/easytier/src/peers/peer_task.rs +++ b/easytier/src/peers/peer_task.rs @@ -96,6 +96,7 @@ where } } } + peer_task_map.shrink_to_fit(); } if !peers_to_connect.is_empty() { diff --git a/easytier/src/proto/rpc_impl/client.rs b/easytier/src/proto/rpc_impl/client.rs index d1763386..2a6af8b0 100644 --- a/easytier/src/proto/rpc_impl/client.rs +++ b/easytier/src/proto/rpc_impl/client.rs @@ -10,6 +10,7 @@ use tokio::task::JoinSet; use tokio::time::timeout; use tokio_stream::StreamExt; +use crate::common::shrink_dashmap; use crate::common::{ stats_manager::{LabelSet, LabelType, MetricName, StatsManager}, PeerId, @@ -131,6 +132,7 @@ impl Client { } true }); + peer_infos.shrink_to_fit(); } }); @@ -245,7 +247,7 @@ impl Client { .with_label_type(LabelType::ServiceName(desc.name().to_string())) .with_label_type(LabelType::MethodName(method.name().to_string())); - defer!(self.inflight_requests.remove(&key);); + defer!(self.inflight_requests.remove(&key); shrink_dashmap(&self.inflight_requests, Some(4));); self.inflight_requests.insert( key.clone(), InflightRequest { diff --git a/easytier/src/proto/rpc_impl/server.rs b/easytier/src/proto/rpc_impl/server.rs index c91ebfdb..611f64b9 100644 --- a/easytier/src/proto/rpc_impl/server.rs +++ b/easytier/src/proto/rpc_impl/server.rs @@ -82,16 +82,9 @@ impl Server { registry: Arc, stats_manager: Arc, ) -> Self { - let (ring_a, ring_b) = create_ring_tunnel_pair(); - - Self { - registry, - mpsc: Mutex::new(Some(MpscTunnel::new(ring_a, None))), - transport: Mutex::new(MpscTunnel::new(ring_b, None)), - tasks: Arc::new(Mutex::new(JoinSet::new())), - packet_mergers: Arc::new(DashMap::new()), - stats_manager: Some(stats_manager), - } + let mut ret = Self::new_with_registry(registry); + ret.stats_manager = Some(stats_manager); + ret } pub fn registry(&self) -> &ServiceRegistry { @@ -176,6 +169,7 @@ impl Server { loop { tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; packet_mergers.retain(|_, v| v.last_updated().elapsed().as_secs() < 10); + packet_mergers.shrink_to_fit(); } }); } diff --git a/easytier/src/tunnel/udp.rs b/easytier/src/tunnel/udp.rs index cb0cc0ea..52b2995a 100644 --- a/easytier/src/tunnel/udp.rs +++ b/easytier/src/tunnel/udp.rs @@ -23,7 +23,7 @@ use tracing::{instrument, Instrument}; use super::{packet_def::V6HolePunchPacket, TunnelInfo}; use crate::{ - common::{join_joinset_background, scoped_task::ScopedTask}, + common::{join_joinset_background, scoped_task::ScopedTask, shrink_dashmap}, tunnel::{ build_url_from_socket_addr, common::{reserve_buf, TunnelWrapper}, @@ -569,7 +569,10 @@ impl TunnelListener for UdpTunnelListener { if let Some(err) = err { tracing::error!(?err, "udp close event error"); } - sock_map.upgrade().map(|v| v.remove(&dst_addr)); + if let Some(sock_map) = sock_map.upgrade() { + sock_map.remove(&dst_addr); + shrink_dashmap(&sock_map, None); + } } }); diff --git a/easytier/src/tunnel/wireguard.rs b/easytier/src/tunnel/wireguard.rs index 96705f42..dfec589c 100644 --- a/easytier/src/tunnel/wireguard.rs +++ b/easytier/src/tunnel/wireguard.rs @@ -21,10 +21,13 @@ use rand::RngCore; use tokio::{net::UdpSocket, sync::Mutex, task::JoinSet}; use super::TunnelInfo; -use crate::tunnel::{ - build_url_from_socket_addr, - common::TunnelWrapper, - packet_def::{ZCPacket, WG_TUNNEL_HEADER_SIZE}, +use crate::{ + common::shrink_dashmap, + tunnel::{ + build_url_from_socket_addr, + common::TunnelWrapper, + packet_def::{ZCPacket, WG_TUNNEL_HEADER_SIZE}, + }, }; use super::{ @@ -491,12 +494,13 @@ impl WgTunnelListener { ) { let mut tasks = JoinSet::new(); - let peer_map_clone = peer_map.clone(); + let peer_map_clone: Arc>> = peer_map.clone(); tasks.spawn(async move { loop { peer_map_clone.retain(|_, peer| { peer.access_time.load().elapsed().as_secs() < 61 && !peer.stopped() }); + shrink_dashmap(&peer_map_clone, None); tokio::time::sleep(Duration::from_secs(1)).await; } }); diff --git a/easytier/src/vpn_portal/wireguard.rs b/easytier/src/vpn_portal/wireguard.rs index a8710891..95ca55be 100644 --- a/easytier/src/vpn_portal/wireguard.rs +++ b/easytier/src/vpn_portal/wireguard.rs @@ -16,7 +16,7 @@ use crate::{ common::{ config::NetworkIdentity, global_ctx::{ArcGlobalCtx, GlobalCtxEvent}, - join_joinset_background, + join_joinset_background, shrink_dashmap, }, peers::{peer_manager::PeerManager, PeerPacketFilter}, tunnel::{ @@ -144,6 +144,7 @@ impl WireGuardImpl { "The wg client changed its endpoint address, not removing from table" ), } + shrink_dashmap(&wg_peer_ip_table, None); } peer_mgr