From 98d321f8ac318c1dae5f5d719bd6897af4b44a7e Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Sat, 8 Mar 2025 17:01:15 +0800 Subject: [PATCH] fix kcp traffic not encrypted --- easytier/src/peers/peer_manager.rs | 34 +++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 3a96843a..e6b7f0a5 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -417,6 +417,7 @@ impl PeerManager { let foreign_client = self.foreign_network_client.clone(); let foreign_mgr = self.foreign_network_manager.clone(); let encryptor = self.encryptor.clone(); + let compress_algo = self.data_compress_algo; self.tasks.lock().await.spawn(async move { tracing::trace!("start_peer_recv"); while let Ok(ret) = recv_packet_from_chan(&mut recv).await { @@ -447,6 +448,16 @@ impl PeerManager { } hdr.forward_counter += 1; + + if from_peer_id == my_peer_id + && (hdr.packet_type == PacketType::Data as u8 + || hdr.packet_type == PacketType::KcpSrc as u8 + || hdr.packet_type == PacketType::KcpDst as u8) + { + let _ = Self::try_compress_and_encrypt(compress_algo, &encryptor, &mut ret) + .await; + } + tracing::trace!(?to_peer_id, ?my_peer_id, "need forward"); let ret = Self::send_msg_internal(&peers, &foreign_client, ret, to_peer_id).await; @@ -757,6 +768,20 @@ impl PeerManager { (dst_peers, is_exit_node) } + pub async fn try_compress_and_encrypt( + compress_algo: CompressorAlgo, + encryptor: &Box, + msg: &mut ZCPacket, + ) -> Result<(), Error> { + let compressor = DefaultCompressor {}; + compressor + .compress(msg, compress_algo) + .await + .with_context(|| "compress failed")?; + encryptor.encrypt(msg).with_context(|| "encrypt failed")?; + Ok(()) + } + pub async fn send_msg_ipv4(&self, mut msg: ZCPacket, ipv4_addr: Ipv4Addr) -> Result<(), Error> { tracing::trace!( "do send_msg in peer manager, msg: {:?}, ipv4_addr: {}", @@ -788,14 +813,7 @@ impl PeerManager { return Ok(()); } - let compressor = DefaultCompressor {}; - compressor - .compress(&mut msg, self.data_compress_algo) - .await - .with_context(|| "compress failed")?; - self.encryptor - .encrypt(&mut msg) - .with_context(|| "encrypt failed")?; + Self::try_compress_and_encrypt(self.data_compress_algo, &self.encryptor, &mut msg).await?; let is_latency_first = self.global_ctx.get_flags().latency_first; msg.mut_peer_manager_header()