Compare commits

...

7 Commits

Author SHA1 Message Date
Sijie.Sun
c23b544c34 tcp accept should retry when encoutering some kinds of error (#565)
* tcp accept should retry when encoutering some kinds of error

bump version to v2.1.2

* persistent temporary machine id
2025-01-14 08:55:48 +08:00
Sijie.Sun
9d76b86f49 fix bugs (#561)
1. if peers disconnected before stop session, may crash at the assert.
2. bind_device flag should take effect on manual connector.
2025-01-12 00:16:38 +08:00
Sijie.Sun
bb0ccca3e5 allow manually specify public address of listeners (#556) 2025-01-10 09:25:14 +08:00
Sijie.Sun
306817ae9a allow listener retry listen (#554) 2025-01-09 00:01:41 +08:00
Sijie.Sun
d2ec60e108 batch recv for udp proxy (#552) 2025-01-07 23:52:18 +08:00
Sijie.Sun
e016aeddeb optimize pingpong conn close condition (#549)
if received some packets from peer, only close conn after enough
rounds of pingpong
2025-01-07 22:42:57 +08:00
Sijie.Sun
a4419a31fd fix peer rpc compatibility issue (#548)
every rpc packet should contains descriptor if sent to old version et.
2025-01-06 23:30:56 +08:00
25 changed files with 591 additions and 309 deletions

View File

@@ -21,7 +21,7 @@ on:
version:
description: 'Version for this release'
type: string
default: 'v2.1.1'
default: 'v2.1.2'
required: true
make_latest:
description: 'Mark this release as latest'

4
Cargo.lock generated
View File

@@ -1830,7 +1830,7 @@ checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125"
[[package]]
name = "easytier"
version = "2.1.1"
version = "2.1.2"
dependencies = [
"aes-gcm",
"anyhow",
@@ -1926,7 +1926,7 @@ dependencies = [
[[package]]
name = "easytier-gui"
version = "2.1.1"
version = "2.1.2"
dependencies = [
"anyhow",
"chrono",

View File

@@ -31,6 +31,7 @@ EasyTier is a simple, safe and decentralized VPN networking solution implemented
- **High Availability**: Supports multi-path and switches to healthy paths when high packet loss or network errors are detected.
- **IPv6 Support**: Supports networking using IPv6.
- **Multiple Protocol Types**: Supports communication between nodes using protocols such as WebSocket and QUIC.
- **Web Management Interface**: Provides a [web-based management](https://easytier.cn/web) interface for easy configuration and monitoring.
## Installation
@@ -52,7 +53,7 @@ EasyTier is a simple, safe and decentralized VPN networking solution implemented
4. **Install by Docker Compose**
Please visit the [EasyTier Official Website](https://www.easytier.top/en/) to view the full documentation.
Please visit the [EasyTier Official Website](https://www.easytier.cn/en/) to view the full documentation.
5. **Install by script (For Linux Only)**
@@ -200,20 +201,20 @@ Subnet proxy information will automatically sync to each node in the virtual net
### Networking without Public IP
EasyTier supports networking using shared public nodes. The currently deployed shared public node is ``tcp://public.easytier.top:11010``.
EasyTier supports networking using shared public nodes. The currently deployed shared public node is ``tcp://public.easytier.cn:11010``.
When using shared nodes, each node entering the network needs to provide the same ``--network-name`` and ``--network-secret`` parameters as the unique identifier of the network.
Taking two nodes as an example, Node A executes:
```sh
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
```
Node B executes
```sh
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
```
After the command is successfully executed, Node A can access Node B through the virtual IP 10.144.144.2.
@@ -286,7 +287,7 @@ Run you own public server cluster is exactly same as running an virtual network,
You can also join the official public server cluster with following command:
```
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.top:11010
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.cn:11010
```
@@ -296,10 +297,8 @@ You can use ``easytier-core --help`` to view all configuration items
## Roadmap
- [ ] Improve documentation and user guides.
- [ ] Support features such as encryption, TCP hole punching, etc.
- [ ] Support features such TCP hole punching, KCP, FEC etc.
- [ ] Support iOS.
- [ ] Support Web configuration management.
## Community and Contribution

View File

@@ -8,7 +8,7 @@
[简体中文](/README_CN.md) | [English](/README.md)
**请访问 [EasyTier 官网](https://www.easytier.top/) 以查看完整的文档。**
**请访问 [EasyTier 官网](https://www.easytier.cn/) 以查看完整的文档。**
一个简单、安全、去中心化的内网穿透 VPN 组网方案,使用 Rust 语言和 Tokio 框架实现。
@@ -31,6 +31,7 @@
- **高可用性**:支持多路径和在检测到高丢包率或网络错误时切换到健康路径。
- **IPV6 支持**:支持利用 IPV6 组网。
- **多协议类型**: 支持使用 WebSocket、QUIC 等协议进行节点间通信。
- **Web 管理界面**:支持通过 [Web 界面](https://easytier.cn)管理节点。
## 安装
@@ -52,7 +53,7 @@
4. **通过Docker Compose安装**
请访问 [EasyTier 官网](https://www.easytier.top/) 以查看完整的文档。
请访问 [EasyTier 官网](https://www.easytier.cn/) 以查看完整的文档。
5. **使用一键脚本安装 (仅适用于 Linux)**
@@ -199,20 +200,20 @@ sudo easytier-core --ipv4 10.144.144.2 -n 10.1.1.0/24
### 无公网IP组网
EasyTier 支持共享公网节点进行组网。目前已部署共享的公网节点 ``tcp://public.easytier.top:11010``。
EasyTier 支持共享公网节点进行组网。目前已部署共享的公网节点 ``tcp://public.easytier.cn:11010``。
使用共享节点时,需要每个入网节点提供相同的 ``--network-name`` 和 ``--network-secret`` 参数,作为网络的唯一标识。
以双节点为例,节点 A 执行:
```sh
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
```
节点 B 执行
```sh
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
```
命令执行成功后,节点 A 即可通过虚拟 IP 10.144.144.2 访问节点 B。
@@ -289,7 +290,7 @@ connected_clients:
也可以使用以下命令加入官方公共服务器集群,后续将实现公共服务器集群的节点间负载均衡:
```
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.top:11010
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.cn:11010
```
### 其他配置
@@ -299,9 +300,8 @@ sudo easytier-core --network-name easytier --network-secret easytier -p tcp://pu
## 路线图
- [ ] 完善文档和用户指南。
- [ ] 支持 TCP 打洞等特性。
- [ ] 支持 TCP 打洞、KCP、FEC 等特性。
- [ ] 支持 iOS。
- [ ] 支持 Web 配置管理。
## 社区和贡献

View File

@@ -1,7 +1,7 @@
{
"name": "easytier-gui",
"type": "module",
"version": "2.1.1",
"version": "2.1.2",
"private": true,
"packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4",
"scripts": {

View File

@@ -1,6 +1,6 @@
[package]
name = "easytier-gui"
version = "2.1.1"
version = "2.1.2"
description = "EasyTier GUI"
authors = ["you"]
edition = "2021"

View File

@@ -17,7 +17,7 @@
"createUpdaterArtifacts": false
},
"productName": "easytier-gui",
"version": "2.1.1",
"version": "2.1.2",
"identifier": "com.kkrainbow.easytier",
"plugins": {},
"app": {

View File

@@ -3,7 +3,7 @@ name = "easytier"
description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
homepage = "https://github.com/EasyTier/EasyTier"
repository = "https://github.com/EasyTier/EasyTier"
version = "2.1.1"
version = "2.1.2"
edition = "2021"
authors = ["kkrainbow"]
keywords = ["vpn", "p2p", "network", "easytier"]

View File

@@ -134,6 +134,12 @@ core_clap:
compression:
en: "compression algorithm to use, support none, zstd. default is none"
zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none"
mapped_listeners:
en: "manually specify the public address of the listener, other nodes can use this address to connect to this node. e.g.: tcp://123.123.123.123:11223, can specify multiple."
zh-CN: "手动指定监听器的公网地址其他节点可以使用该地址连接到本节点。例如tcp://123.123.123.123:11223可以指定多个。"
bind_device:
en: "bind the connector socket to physical devices to avoid routing issues. e.g.: subnet proxy segment conflicts with a node's segment, after binding the physical device, it can communicate with the node normally."
zh-CN: "将连接器的套接字绑定到物理设备以避免路由问题。比如子网代理网段与某节点的网段冲突,绑定物理设备后可以与该节点正常通信。"
core_app:
panic_backtrace_save:

View File

@@ -27,8 +27,9 @@ pub fn gen_default_flags() -> Flags {
relay_all_peer_rpc: false,
disable_udp_hole_punching: false,
ipv6_listener: "udp://[::]:0".to_string(),
multi_thread: false,
multi_thread: true,
data_compress_algo: CompressionAlgoPb::None.into(),
bind_device: true,
}
}
@@ -72,6 +73,9 @@ pub trait ConfigLoader: Send + Sync {
fn get_listeners(&self) -> Vec<url::Url>;
fn set_listeners(&self, listeners: Vec<url::Url>);
fn get_mapped_listeners(&self) -> Vec<url::Url>;
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>);
fn get_rpc_portal(&self) -> Option<SocketAddr>;
fn set_rpc_portal(&self, addr: SocketAddr);
@@ -183,6 +187,7 @@ struct Config {
dhcp: Option<bool>,
network_identity: Option<NetworkIdentity>,
listeners: Option<Vec<url::Url>>,
mapped_listeners: Option<Vec<url::Url>>,
exit_nodes: Option<Vec<Ipv4Addr>>,
peer: Option<Vec<PeerConfig>>,
@@ -472,6 +477,19 @@ impl ConfigLoader for TomlConfigLoader {
self.config.lock().unwrap().listeners = Some(listeners);
}
fn get_mapped_listeners(&self) -> Vec<url::Url> {
self.config
.lock()
.unwrap()
.mapped_listeners
.clone()
.unwrap_or_default()
}
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>) {
self.config.lock().unwrap().mapped_listeners = listeners;
}
fn get_rpc_portal(&self) -> Option<SocketAddr> {
self.config.lock().unwrap().rpc_portal
}

View File

@@ -230,7 +230,10 @@ impl GlobalCtx {
}
pub fn add_running_listener(&self, url: url::Url) {
self.running_listeners.lock().unwrap().push(url);
let mut l = self.running_listeners.lock().unwrap();
if !l.contains(&url) {
l.push(url);
}
}
pub fn get_vpn_portal_cidr(&self) -> Option<cidr::Ipv4Cidr> {

View File

@@ -1,6 +1,7 @@
use std::{
fmt::Debug,
future,
io::Write as _,
sync::{Arc, Mutex},
};
use tokio::task::JoinSet;
@@ -81,7 +82,17 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
}
pub fn get_machine_id() -> uuid::Uuid {
// TODO: load from local file
// a path same as the binary
let machine_id_file = std::env::current_exe()
.map(|x| x.with_file_name("et_machine_id"))
.unwrap_or_else(|_| std::path::PathBuf::from("et_machine_id"));
// try load from local file
if let Ok(mid) = std::fs::read_to_string(&machine_id_file) {
if let Ok(mid) = uuid::Uuid::parse_str(mid.trim()) {
return mid;
}
}
#[cfg(any(
target_os = "linux",
@@ -95,7 +106,7 @@ pub fn get_machine_id() -> uuid::Uuid {
crate::tunnel::generate_digest_from_str("", x.as_str(), &mut b);
uuid::Uuid::from_bytes(b)
})
.unwrap_or(uuid::Uuid::new_v4());
.ok();
#[cfg(not(any(
target_os = "linux",
@@ -103,9 +114,18 @@ pub fn get_machine_id() -> uuid::Uuid {
target_os = "windows",
target_os = "freebsd"
)))]
let gen_mid = None;
if gen_mid.is_some() {
return gen_mid.unwrap();
}
let gen_mid = uuid::Uuid::new_v4();
// TODO: save to local file
// try save to local file
if let Ok(mut file) = std::fs::File::create(machine_id_file) {
let _ = file.write_all(gen_mid.to_string().as_bytes());
}
gen_mid
}

View File

@@ -1,6 +1,13 @@
// try connect peers directly, with either its public ip or lan ip
use std::{net::SocketAddr, sync::Arc, time::Duration};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
@@ -29,6 +36,8 @@ use super::create_connector_by_url;
pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1;
pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300;
static TESTING: AtomicBool = AtomicBool::new(false);
#[async_trait::async_trait]
pub trait PeerManagerForDirectConnector {
async fn list_peers(&self) -> Vec<PeerId>;
@@ -182,7 +191,7 @@ impl DirectConnectorManager {
// let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?;
if peer_id != dst_peer_id {
if peer_id != dst_peer_id && !TESTING.load(Ordering::Relaxed) {
tracing::info!(
"connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}",
addr,
@@ -279,87 +288,103 @@ impl DirectConnectorManager {
let listener_host = listener.socket_addrs(|| None).unwrap().pop();
match listener_host {
Some(SocketAddr::V4(_)) => {
ip_list.interface_ipv4s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});
Some(SocketAddr::V4(s_addr)) => {
if s_addr.ip().is_unspecified() {
ip_list.interface_ipv4s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});
if let Some(public_ipv4) = ip_list.public_ipv4 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(public_ipv4.to_string().as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv4,
?listener,
?dst_peer_id,
"failed to set host for public ipv4"
);
if let Some(public_ipv4) = ip_list.public_ipv4 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(public_ipv4.to_string().as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv4,
?listener,
?dst_peer_id,
"failed to set host for public ipv4"
);
}
}
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
listener.to_string(),
));
}
}
Some(SocketAddr::V6(_)) => {
ip_list.interface_ipv6s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv6"
);
}
});
Some(SocketAddr::V6(s_addr)) => {
if s_addr.ip().is_unspecified() {
ip_list.interface_ipv6s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv6"
);
}
});
if let Some(public_ipv6) = ip_list.public_ipv6 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv6,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
if let Some(public_ipv6) = ip_list.public_ipv6 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv6,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
}
}
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
listener.to_string(),
));
}
}
p => {
@@ -452,6 +477,49 @@ mod tests {
proto::peer_rpc::GetIpListResponse,
};
use super::TESTING;
#[tokio::test]
async fn direct_connector_mapped_listener() {
TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
let p_c = create_mock_peer_manager().await;
let p_x = create_mock_peer_manager().await;
connect_peer_manager(p_a.clone(), p_b.clone()).await;
connect_peer_manager(p_b.clone(), p_c.clone()).await;
connect_peer_manager(p_c.clone(), p_x.clone()).await;
wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap();
wait_route_appear(p_a.clone(), p_x.clone()).await.unwrap();
let mut f = p_a.get_global_ctx().get_flags();
f.bind_device = false;
p_a.get_global_ctx().config.set_flags(f);
p_c.get_global_ctx()
.config
.set_mapped_listeners(Some(vec!["tcp://127.0.0.1:11334".parse().unwrap()]));
p_x.get_global_ctx()
.config
.set_listeners(vec!["tcp://0.0.0.0:11334".parse().unwrap()]);
let mut lis_x = ListenerManager::new(p_x.get_global_ctx(), p_x.clone());
lis_x.prepare_listeners().await.unwrap();
lis_x.run().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let mut dm_a = DirectConnectorManager::new(p_a.get_global_ctx(), p_a.clone());
let mut dm_c = DirectConnectorManager::new(p_c.get_global_ctx(), p_c.clone());
dm_a.run_as_client();
dm_c.run_as_server();
// p_c's mapped listener is p_x's listener, so p_a should connect to p_x directly
wait_route_appear_with_cost(p_a.clone(), p_x.my_peer_id(), Some(1))
.await
.unwrap();
}
#[rstest::rstest]
#[tokio::test]
async fn direct_connector_basic_test(

View File

@@ -297,12 +297,14 @@ impl ManualConnectorManager {
connector.lock().await.set_ip_version(ip_version);
set_bind_addr_for_peer_connector(
connector.lock().await.as_mut(),
ip_version == IpVersion::V4,
&ip_collector,
)
.await;
if data.global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
connector.lock().await.as_mut(),
ip_version == IpVersion::V4,
&ip_collector,
)
.await;
}
data.global_ctx.issue_event(GlobalCtxEvent::Connecting(
connector.lock().await.remote_url().clone(),

View File

@@ -56,23 +56,27 @@ pub async fn create_connector_by_url(
"tcp" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "tcp")?;
let mut connector = TcpTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
return Ok(Box::new(connector));
}
"udp" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "udp")?;
let mut connector = UdpTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
return Ok(Box::new(connector));
}
"ring" => {
@@ -84,12 +88,14 @@ pub async fn create_connector_by_url(
"quic" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "quic")?;
let mut connector = QUICTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
return Ok(Box::new(connector));
}
#[cfg(feature = "wireguard")]
@@ -101,12 +107,14 @@ pub async fn create_connector_by_url(
&nid.network_secret.unwrap_or_default(),
);
let mut connector = WgTunnelConnector::new(url, wg_config);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
return Ok(Box::new(connector));
}
#[cfg(feature = "websocket")]
@@ -114,12 +122,14 @@ pub async fn create_connector_by_url(
use crate::tunnel::{FromUrl, IpVersion};
let dst_addr = SocketAddr::from_url(url.clone(), IpVersion::Both)?;
let mut connector = crate::tunnel::websocket::WSTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
return Ok(Box::new(connector));
}
_ => {

View File

@@ -123,6 +123,13 @@ struct Cli {
)]
listeners: Vec<String>,
#[arg(
long,
help = t!("core_clap.mapped_listeners").to_string(),
num_args = 0..
)]
mapped_listeners: Vec<String>,
#[arg(
long,
help = t!("core_clap.no_listener").to_string(),
@@ -185,7 +192,7 @@ struct Cli {
#[arg(
long,
help = t!("core_clap.multi_thread").to_string(),
default_value = "false"
default_value = "true"
)]
multi_thread: bool,
@@ -300,6 +307,12 @@ struct Cli {
default_value = "none",
)]
compression: String,
#[arg(
long,
help = t!("core_clap.bind_device").to_string()
)]
bind_device: Option<bool>,
}
rust_i18n::i18n!("locales", fallback = "en");
@@ -422,6 +435,23 @@ impl TryFrom<&Cli> for TomlConfigLoader {
.collect(),
);
cfg.set_mapped_listeners(Some(
cli.mapped_listeners
.iter()
.map(|s| {
s.parse()
.with_context(|| format!("mapped listener is not a valid url: {}", s))
.unwrap()
})
.map(|s: url::Url| {
if s.port().is_none() {
panic!("mapped listener port is missing: {}", s);
}
s
})
.collect(),
));
for n in cli.proxy_networks.iter() {
cfg.add_proxy_cidr(
n.parse()
@@ -534,6 +564,9 @@ impl TryFrom<&Cli> for TomlConfigLoader {
),
}
.into();
if let Some(bind_device) = cli.bind_device {
f.bind_device = bind_device;
}
cfg.set_flags(f);
cfg.set_exit_nodes(cli.exit_nodes.clone());

View File

@@ -4,6 +4,7 @@ use std::{
time::Duration,
};
use bytes::{BufMut, BytesMut};
use cidr::Ipv4Inet;
use crossbeam::atomic::AtomicCell;
use dashmap::DashMap;
@@ -24,11 +25,11 @@ use tokio::{
use tracing::Level;
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
common::{error::Error, global_ctx::ArcGlobalCtx, scoped_task::ScopedTask, PeerId},
gateway::ip_reassembler::compose_ipv4_packet,
peers::{peer_manager::PeerManager, PeerPacketFilter},
tunnel::{
common::setup_sokcet2,
common::{reserve_buf, setup_sokcet2},
packet_def::{PacketType, ZCPacket},
},
};
@@ -139,59 +140,81 @@ impl UdpNatEntry {
mut packet_sender: Sender<ZCPacket>,
virtual_ipv4: Ipv4Addr,
) {
let mut buf = [0u8; 65536];
let mut udp_body: &mut [u8] = unsafe { std::mem::transmute(&mut buf[20 + 8..]) };
let mut ip_id = 1;
let (s, mut r) = tachyonix::channel(128);
loop {
let (len, src_socket) = match timeout(
Duration::from_secs(120),
self.socket.recv_from(&mut udp_body),
)
.await
{
Ok(Ok(x)) => x,
Ok(Err(err)) => {
tracing::error!(?err, "udp nat recv failed");
let self_clone = self.clone();
let recv_task = ScopedTask::from(tokio::spawn(async move {
let mut cur_buf = BytesMut::new();
loop {
if self_clone
.stopped
.load(std::sync::atomic::Ordering::Relaxed)
{
break;
}
Err(err) => {
tracing::error!(?err, "udp nat recv timeout");
break;
reserve_buf(&mut cur_buf, 64 * 1024 + 28, 128 * 1024 + 28);
assert_eq!(cur_buf.len(), 0);
unsafe {
cur_buf.advance_mut(28);
}
};
tracing::trace!(?len, ?src_socket, "udp nat packet response received");
let (len, src_socket) = match timeout(
Duration::from_secs(120),
self_clone.socket.recv_buf_from(&mut cur_buf),
)
.await
{
Ok(Ok(x)) => x,
Ok(Err(err)) => {
tracing::error!(?err, "udp nat recv failed");
break;
}
Err(err) => {
tracing::error!(?err, "udp nat recv timeout");
break;
}
};
if self.stopped.load(std::sync::atomic::Ordering::Relaxed) {
break;
tracing::trace!(?len, ?src_socket, "udp nat packet response received");
let ret_buf = cur_buf.split();
s.send((ret_buf, len, src_socket)).await.unwrap();
}
}));
let SocketAddr::V4(mut src_v4) = src_socket else {
continue;
};
let self_clone = self.clone();
let send_task = ScopedTask::from(tokio::spawn(async move {
let mut ip_id = 1;
while let Ok((mut packet, len, src_socket)) = r.recv().await {
let SocketAddr::V4(mut src_v4) = src_socket else {
continue;
};
self.mark_active();
self_clone.mark_active();
if src_v4.ip().is_loopback() {
src_v4.set_ip(virtual_ipv4);
if src_v4.ip().is_loopback() {
src_v4.set_ip(virtual_ipv4);
}
let Ok(_) = Self::compose_ipv4_packet(
&self_clone,
&mut packet_sender,
&mut packet,
&src_v4,
len,
1280,
ip_id,
)
.await
else {
break;
};
ip_id = ip_id.wrapping_add(1);
}
}));
let Ok(_) = Self::compose_ipv4_packet(
&self,
&mut packet_sender,
&mut buf,
&src_v4,
len,
1200,
ip_id,
)
.await
else {
break;
};
ip_id = ip_id.wrapping_add(1);
}
let _ = tokio::join!(recv_task, send_task);
self.stop();
}

View File

@@ -1,8 +1,7 @@
use std::{fmt::Debug, sync::Arc};
use anyhow::Context;
use async_trait::async_trait;
use tokio::{sync::Mutex, task::JoinSet};
use tokio::task::JoinSet;
#[cfg(feature = "quic")]
use crate::tunnel::quic::QUICTunnelListener;
@@ -63,16 +62,20 @@ impl TunnelHandlerForListener for PeerManager {
}
}
#[derive(Debug, Clone)]
struct Listener {
inner: Arc<Mutex<dyn TunnelListener>>,
pub trait ListenerCreatorTrait: Fn() -> Box<dyn TunnelListener> + Send + Sync {}
impl<T: Send + Sync> ListenerCreatorTrait for T where T: Fn() -> Box<dyn TunnelListener> + Send {}
pub type ListenerCreator = Box<dyn ListenerCreatorTrait>;
#[derive(Clone)]
struct ListenerFactory {
creator_fn: Arc<ListenerCreator>,
must_succ: bool,
}
pub struct ListenerManager<H> {
global_ctx: ArcGlobalCtx,
net_ns: NetNS,
listeners: Vec<Listener>,
listeners: Vec<ListenerFactory>,
peer_manager: Arc<H>,
tasks: JoinSet<()>,
@@ -90,31 +93,39 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
}
pub async fn prepare_listeners(&mut self) -> Result<(), Error> {
let self_id = self.global_ctx.get_id();
self.add_listener(
RingTunnelListener::new(
format!("ring://{}", self.global_ctx.get_id())
.parse()
.unwrap(),
),
move || {
Box::new(RingTunnelListener::new(
format!("ring://{}", self_id).parse().unwrap(),
))
},
true,
)
.await?;
for l in self.global_ctx.config.get_listener_uris().iter() {
let Ok(lis) = get_listener_by_url(l, self.global_ctx.clone()) else {
let l = l.clone();
let Ok(_) = get_listener_by_url(&l, self.global_ctx.clone()) else {
let msg = format!("failed to get listener by url: {}, maybe not supported", l);
self.global_ctx
.issue_event(GlobalCtxEvent::ListenerAddFailed(l.clone(), msg));
continue;
};
self.add_listener(lis, true).await?;
let ctx = self.global_ctx.clone();
self.add_listener(move || get_listener_by_url(&l, ctx.clone()).unwrap(), true)
.await?;
}
if self.global_ctx.config.get_flags().enable_ipv6 {
let ipv6_listener = self.global_ctx.config.get_flags().ipv6_listener.clone();
let _ = self
.add_listener(
UdpTunnelListener::new(ipv6_listener.parse().unwrap()),
move || {
Box::new(UdpTunnelListener::new(
ipv6_listener.clone().parse().unwrap(),
))
},
false,
)
.await?;
@@ -123,85 +134,91 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
Ok(())
}
pub async fn add_listener<L>(&mut self, listener: L, must_succ: bool) -> Result<(), Error>
where
L: TunnelListener + 'static,
{
let listener = Arc::new(Mutex::new(listener));
self.listeners.push(Listener {
inner: listener,
pub async fn add_listener<C: ListenerCreatorTrait + 'static>(
&mut self,
creator: C,
must_succ: bool,
) -> Result<(), Error> {
self.listeners.push(ListenerFactory {
creator_fn: Arc::new(Box::new(creator)),
must_succ,
});
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(creator))]
async fn run_listener(
listener: Arc<Mutex<dyn TunnelListener>>,
creator: Arc<ListenerCreator>,
peer_manager: Arc<H>,
global_ctx: ArcGlobalCtx,
) {
let mut l = listener.lock().await;
global_ctx.add_running_listener(l.local_url());
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
loop {
let ret = match l.accept().await {
Ok(ret) => ret,
let mut l = (creator)();
let _g = global_ctx.net_ns.guard();
match l.listen().await {
Ok(_) => {
global_ctx.add_running_listener(l.local_url());
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
}
Err(e) => {
global_ctx.issue_event(GlobalCtxEvent::ListenerAcceptFailed(
global_ctx.issue_event(GlobalCtxEvent::ListenerAddFailed(
l.local_url(),
e.to_string(),
format!("error: {:?}, retry listen later...", e),
));
tracing::error!(?e, ?l, "listener accept error");
tracing::error!(?e, ?l, "listener listen error");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
};
}
loop {
let ret = match l.accept().await {
Ok(ret) => ret,
Err(e) => {
global_ctx.issue_event(GlobalCtxEvent::ListenerAcceptFailed(
l.local_url(),
format!("error: {:?}, retry listen later...", e),
));
tracing::error!(?e, ?l, "listener accept error");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
};
let tunnel_info = ret.info().unwrap();
global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted(
tunnel_info
.local_addr
.clone()
.unwrap_or_default()
.to_string(),
tunnel_info
.remote_addr
.clone()
.unwrap_or_default()
.to_string(),
));
tracing::info!(ret = ?ret, "conn accepted");
let peer_manager = peer_manager.clone();
let global_ctx = global_ctx.clone();
tokio::spawn(async move {
let server_ret = peer_manager.handle_tunnel(ret).await;
if let Err(e) = &server_ret {
global_ctx.issue_event(GlobalCtxEvent::ConnectionError(
tunnel_info.local_addr.unwrap_or_default().to_string(),
tunnel_info.remote_addr.unwrap_or_default().to_string(),
e.to_string(),
));
tracing::error!(error = ?e, "handle conn error");
}
});
let tunnel_info = ret.info().unwrap();
global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted(
tunnel_info
.local_addr
.clone()
.unwrap_or_default()
.to_string(),
tunnel_info
.remote_addr
.clone()
.unwrap_or_default()
.to_string(),
));
tracing::info!(ret = ?ret, "conn accepted");
let peer_manager = peer_manager.clone();
let global_ctx = global_ctx.clone();
tokio::spawn(async move {
let server_ret = peer_manager.handle_tunnel(ret).await;
if let Err(e) = &server_ret {
global_ctx.issue_event(GlobalCtxEvent::ConnectionError(
tunnel_info.local_addr.unwrap_or_default().to_string(),
tunnel_info.remote_addr.unwrap_or_default().to_string(),
e.to_string(),
));
tracing::error!(error = ?e, "handle conn error");
}
});
}
}
}
pub async fn run(&mut self) -> Result<(), Error> {
for listener in &self.listeners {
let _guard = self.net_ns.guard();
let addr = listener.inner.lock().await.local_url();
tracing::warn!("run listener: {:?}", listener);
listener
.inner
.lock()
.await
.listen()
.await
.with_context(|| format!("failed to add listener {}", addr))?;
self.tasks.spawn(Self::run_listener(
listener.inner.clone(),
listener.creator_fn.clone(),
self.peer_manager.clone(),
self.global_ctx.clone(),
));
@@ -213,12 +230,14 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicI32, Ordering};
use futures::{SinkExt, StreamExt};
use tokio::time::timeout;
use crate::{
common::global_ctx::tests::get_mock_global_ctx,
tunnel::{packet_def::ZCPacket, ring::RingTunnelConnector, TunnelConnector},
tunnel::{packet_def::ZCPacket, ring::RingTunnelConnector, TunnelConnector, TunnelError},
};
use super::*;
@@ -245,12 +264,18 @@ mod tests {
let ring_id = format!("ring://{}", uuid::Uuid::new_v4());
let ring_id_clone = ring_id.clone();
listener_mgr
.add_listener(RingTunnelListener::new(ring_id.parse().unwrap()), true)
.add_listener(
move || Box::new(RingTunnelListener::new(ring_id_clone.parse().unwrap())),
true,
)
.await
.unwrap();
listener_mgr.run().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let connect_once = |ring_id| async move {
let tunnel = RingTunnelConnector::new(ring_id).connect().await.unwrap();
let (mut recv, _send) = tunnel.split();
@@ -269,4 +294,60 @@ mod tests {
.await
.unwrap();
}
#[tokio::test]
async fn retry_listen() {
let counter = Arc::new(AtomicI32::new(0));
let drop_counter = Arc::new(AtomicI32::new(0));
struct MockListener {
counter: Arc<AtomicI32>,
drop_counter: Arc<AtomicI32>,
}
#[async_trait::async_trait]
impl TunnelListener for MockListener {
fn local_url(&self) -> url::Url {
"mock://".parse().unwrap()
}
async fn listen(&mut self) -> Result<(), TunnelError> {
self.counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}
async fn accept(&mut self) -> Result<Box<dyn Tunnel>, TunnelError> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Err(TunnelError::BufferFull)
}
}
impl Drop for MockListener {
fn drop(&mut self) {
self.drop_counter.fetch_add(1, Ordering::Relaxed);
}
}
let handler = Arc::new(MockListenerHandler {});
let mut listener_mgr = ListenerManager::new(get_mock_global_ctx(), handler.clone());
let counter_clone = counter.clone();
let drop_counter_clone = drop_counter.clone();
listener_mgr
.add_listener(
move || {
Box::new(MockListener {
counter: counter_clone.clone(),
drop_counter: drop_counter_clone.clone(),
})
},
true,
)
.await
.unwrap();
listener_mgr.run().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
assert!(counter.load(Ordering::Relaxed) >= 2);
assert!(drop_counter.load(Ordering::Relaxed) >= 1);
}
}

View File

@@ -484,13 +484,6 @@ mod tests {
let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
let c = TunnelWithFilter::new(c, c_recorder.clone());
let s = if drop_both {
let s_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
Box::new(TunnelWithFilter::new(s, s_recorder.clone()))
} else {
s
};
let c_peer_id = new_peer_id();
let s_peer_id = new_peer_id();
@@ -506,6 +499,7 @@ mod tests {
s_peer
.start_recv_loop(tokio::sync::mpsc::channel(200).0)
.await;
// do not start ping for s, s only reponde to ping from c
assert!(c_ret.is_ok());
assert!(s_ret.is_ok());
@@ -547,7 +541,7 @@ mod tests {
#[tokio::test]
async fn peer_conn_pingpong_bothside_timeout() {
peer_conn_pingpong_test_common(4, 12, true, true).await;
peer_conn_pingpong_test_common(3, 14, true, true).await;
}
#[tokio::test]

View File

@@ -289,29 +289,29 @@ impl PeerConnPinger {
"pingpong task recv pingpong_once result"
);
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 150 && loss_rate_1 > 0.20) {
let current_rx_packets = throughput.rx_packets();
let need_close = if last_rx_packets != current_rx_packets {
// if we receive some packet from peers, we should relax the condition
counter > 50 && loss_rate_1 > 0.5
let current_rx_packets = throughput.rx_packets();
if last_rx_packets != current_rx_packets {
// if we receive some packet from peers, reset the counter to avoid conn close.
// conn will close only if we have 5 continous round pingpong loss after no packet received.
counter = 0;
}
// TODO: wait more time to see if the loss rate is still high after no rx
} else {
true
};
tracing::debug!(
"counter: {}, loss_rate_1: {}, loss_rate_20: {}, cur_rx_packets: {}, last_rx: {}, node_id: {}",
counter, loss_rate_1, loss_rate_20, current_rx_packets, last_rx_packets, my_node_id
);
if need_close {
tracing::warn!(
?ret,
?self,
?loss_rate_1,
?loss_rate_20,
?last_rx_packets,
?current_rx_packets,
"pingpong loss rate too high, closing"
);
break;
}
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 100 && loss_rate_1 > 0.35) {
tracing::warn!(
?ret,
?self,
?loss_rate_1,
?loss_rate_20,
?last_rx_packets,
?current_rx_packets,
"pingpong loss rate too high, closing"
);
break;
}
last_rx_packets = throughput.rx_packets();

View File

@@ -1739,7 +1739,6 @@ impl RouteSessionManager {
continue;
}
let _ = self.stop_session(*peer_id);
assert_ne!(Some(*peer_id), cur_dst_peer_id_to_initiate);
}
}

View File

@@ -24,8 +24,10 @@ impl DirectConnectorRpc for DirectConnectorManagerRpcServer {
let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await;
ret.listeners = self
.global_ctx
.get_running_listeners()
.config
.get_mapped_listeners()
.into_iter()
.chain(self.global_ctx.get_running_listeners().into_iter())
.map(Into::into)
.collect();
Ok(ret)

View File

@@ -21,6 +21,7 @@ message FlagsInConfig {
string ipv6_listener = 14;
bool multi_thread = 15;
CompressionAlgoPb data_compress_algo = 16;
bool bind_device = 17;
}
message RpcDescriptor {

View File

@@ -159,7 +159,10 @@ pub fn build_rpc_packet(
let cur_packet = RpcPacket {
from_peer,
to_peer,
descriptor: if cur_offset == 0 {
descriptor: if cur_offset == 0
|| compression_info.algo == CompressionAlgoPb::None as i32
{
// old version must have descriptor on every piece
Some(rpc_desc.clone())
} else {
None

View File

@@ -28,6 +28,30 @@ impl TcpTunnelListener {
listener: None,
}
}
async fn do_accept(&mut self) -> Result<Box<dyn Tunnel>, std::io::Error> {
let listener = self.listener.as_ref().unwrap();
let (stream, _) = listener.accept().await?;
if let Err(e) = stream.set_nodelay(true) {
tracing::warn!(?e, "set_nodelay fail in accept");
}
let info = TunnelInfo {
tunnel_type: "tcp".to_owned(),
local_addr: Some(self.local_url().into()),
remote_addr: Some(
super::build_url_from_socket_addr(&stream.peer_addr()?.to_string(), "tcp").into(),
),
};
let (r, w) = stream.into_split();
Ok(Box::new(TunnelWrapper::new(
FramedReader::new(r, TCP_MTU_BYTES),
FramedWriter::new(w),
Some(info),
)))
}
}
#[async_trait]
@@ -57,27 +81,23 @@ impl TunnelListener for TcpTunnelListener {
}
async fn accept(&mut self) -> Result<Box<dyn Tunnel>, super::TunnelError> {
let listener = self.listener.as_ref().unwrap();
let (stream, _) = listener.accept().await?;
if let Err(e) = stream.set_nodelay(true) {
tracing::warn!(?e, "set_nodelay fail in accept");
loop {
match self.do_accept().await {
Ok(ret) => return Ok(ret),
Err(e) => {
use std::io::ErrorKind::*;
if matches!(
e.kind(),
NotConnected | ConnectionAborted | ConnectionRefused | ConnectionReset
) {
tracing::warn!(?e, "accept fail with retryable error: {:?}", e);
continue;
}
tracing::warn!(?e, "accept fail");
return Err(e.into());
}
}
}
let info = TunnelInfo {
tunnel_type: "tcp".to_owned(),
local_addr: Some(self.local_url().into()),
remote_addr: Some(
super::build_url_from_socket_addr(&stream.peer_addr()?.to_string(), "tcp").into(),
),
};
let (r, w) = stream.into_split();
Ok(Box::new(TunnelWrapper::new(
FramedReader::new(r, TCP_MTU_BYTES),
FramedWriter::new(w),
Some(info),
)))
}
fn local_url(&self) -> url::Url {