diff --git a/easytier-core/src/tunnels/stats.rs b/easytier-core/src/tunnels/stats.rs index c0680bcc..9765570f 100644 --- a/easytier-core/src/tunnels/stats.rs +++ b/easytier-core/src/tunnels/stats.rs @@ -1,9 +1,12 @@ -use std::sync::atomic::{AtomicU32, AtomicU64}; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering::Relaxed}; pub struct WindowLatency { latency_us_window: Vec, latency_us_window_index: AtomicU32, - latency_us_window_size: AtomicU32, + latency_us_window_size: u32, + + sum: AtomicU32, + count: AtomicU32, } impl WindowLatency { @@ -11,39 +14,32 @@ impl WindowLatency { Self { latency_us_window: (0..window_size).map(|_| AtomicU32::new(0)).collect(), latency_us_window_index: AtomicU32::new(0), - latency_us_window_size: AtomicU32::new(window_size), + latency_us_window_size: window_size, + + sum: AtomicU32::new(0), + count: AtomicU32::new(0), } } pub fn record_latency(&self, latency_us: u32) { - let index = self - .latency_us_window_index - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let index = index - % self - .latency_us_window_size - .load(std::sync::atomic::Ordering::Relaxed); - self.latency_us_window[index as usize] - .store(latency_us, std::sync::atomic::Ordering::Relaxed); + let index = self.latency_us_window_index.fetch_add(1, Relaxed); + if index < self.latency_us_window_size { + self.count.fetch_add(1, Relaxed); + } + + let index = index % self.latency_us_window_size; + let old_lat = self.latency_us_window[index as usize].swap(latency_us, Relaxed); + + if old_lat < latency_us { + self.sum.fetch_add(latency_us - old_lat, Relaxed); + } else { + self.sum.fetch_sub(old_lat - latency_us, Relaxed); + } } pub fn get_latency_us + std::ops::Div>(&self) -> T { - let window_size = self - .latency_us_window_size - .load(std::sync::atomic::Ordering::Relaxed); - let mut sum = 0; - let mut count = 0; - for i in 0..window_size { - if i >= self - .latency_us_window_index - .load(std::sync::atomic::Ordering::Relaxed) - { - break; - } - sum += self.latency_us_window[i as usize].load(std::sync::atomic::Ordering::Relaxed); - count += 1; - } - + let count = self.count.load(Relaxed); + let sum = self.sum.load(Relaxed); if count == 0 { 0.into() } else { @@ -72,32 +68,28 @@ impl Throughput { } pub fn tx_bytes(&self) -> u64 { - self.tx_bytes.load(std::sync::atomic::Ordering::Relaxed) + self.tx_bytes.load(Relaxed) } pub fn rx_bytes(&self) -> u64 { - self.rx_bytes.load(std::sync::atomic::Ordering::Relaxed) + self.rx_bytes.load(Relaxed) } pub fn tx_packets(&self) -> u64 { - self.tx_packets.load(std::sync::atomic::Ordering::Relaxed) + self.tx_packets.load(Relaxed) } pub fn rx_packets(&self) -> u64 { - self.rx_packets.load(std::sync::atomic::Ordering::Relaxed) + self.rx_packets.load(Relaxed) } pub fn record_tx_bytes(&self, bytes: u64) { - self.tx_bytes - .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed); - self.tx_packets - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.tx_bytes.fetch_add(bytes, Relaxed); + self.tx_packets.fetch_add(1, Relaxed); } pub fn record_rx_bytes(&self, bytes: u64) { - self.rx_bytes - .fetch_add(bytes, std::sync::atomic::Ordering::Relaxed); - self.rx_packets - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.rx_bytes.fetch_add(bytes, Relaxed); + self.rx_packets.fetch_add(1, Relaxed); } }