Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c23b544c34 | ||
|
|
9d76b86f49 | ||
|
|
bb0ccca3e5 | ||
|
|
306817ae9a | ||
|
|
d2ec60e108 | ||
|
|
e016aeddeb | ||
|
|
a4419a31fd |
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -21,7 +21,7 @@ on:
|
||||
version:
|
||||
description: 'Version for this release'
|
||||
type: string
|
||||
default: 'v2.1.1'
|
||||
default: 'v2.1.2'
|
||||
required: true
|
||||
make_latest:
|
||||
description: 'Mark this release as latest'
|
||||
|
||||
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -1830,7 +1830,7 @@ checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125"
|
||||
|
||||
[[package]]
|
||||
name = "easytier"
|
||||
version = "2.1.1"
|
||||
version = "2.1.2"
|
||||
dependencies = [
|
||||
"aes-gcm",
|
||||
"anyhow",
|
||||
@@ -1926,7 +1926,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "easytier-gui"
|
||||
version = "2.1.1"
|
||||
version = "2.1.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
|
||||
15
README.md
15
README.md
@@ -31,6 +31,7 @@ EasyTier is a simple, safe and decentralized VPN networking solution implemented
|
||||
- **High Availability**: Supports multi-path and switches to healthy paths when high packet loss or network errors are detected.
|
||||
- **IPv6 Support**: Supports networking using IPv6.
|
||||
- **Multiple Protocol Types**: Supports communication between nodes using protocols such as WebSocket and QUIC.
|
||||
- **Web Management Interface**: Provides a [web-based management](https://easytier.cn/web) interface for easy configuration and monitoring.
|
||||
|
||||
## Installation
|
||||
|
||||
@@ -52,7 +53,7 @@ EasyTier is a simple, safe and decentralized VPN networking solution implemented
|
||||
|
||||
4. **Install by Docker Compose**
|
||||
|
||||
Please visit the [EasyTier Official Website](https://www.easytier.top/en/) to view the full documentation.
|
||||
Please visit the [EasyTier Official Website](https://www.easytier.cn/en/) to view the full documentation.
|
||||
|
||||
5. **Install by script (For Linux Only)**
|
||||
|
||||
@@ -200,20 +201,20 @@ Subnet proxy information will automatically sync to each node in the virtual net
|
||||
|
||||
### Networking without Public IP
|
||||
|
||||
EasyTier supports networking using shared public nodes. The currently deployed shared public node is ``tcp://public.easytier.top:11010``.
|
||||
EasyTier supports networking using shared public nodes. The currently deployed shared public node is ``tcp://public.easytier.cn:11010``.
|
||||
|
||||
When using shared nodes, each node entering the network needs to provide the same ``--network-name`` and ``--network-secret`` parameters as the unique identifier of the network.
|
||||
|
||||
Taking two nodes as an example, Node A executes:
|
||||
|
||||
```sh
|
||||
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
|
||||
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
|
||||
```
|
||||
|
||||
Node B executes
|
||||
|
||||
```sh
|
||||
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
|
||||
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
|
||||
```
|
||||
|
||||
After the command is successfully executed, Node A can access Node B through the virtual IP 10.144.144.2.
|
||||
@@ -286,7 +287,7 @@ Run you own public server cluster is exactly same as running an virtual network,
|
||||
You can also join the official public server cluster with following command:
|
||||
|
||||
```
|
||||
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.top:11010
|
||||
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.cn:11010
|
||||
```
|
||||
|
||||
|
||||
@@ -296,10 +297,8 @@ You can use ``easytier-core --help`` to view all configuration items
|
||||
|
||||
## Roadmap
|
||||
|
||||
- [ ] Improve documentation and user guides.
|
||||
- [ ] Support features such as encryption, TCP hole punching, etc.
|
||||
- [ ] Support features such TCP hole punching, KCP, FEC etc.
|
||||
- [ ] Support iOS.
|
||||
- [ ] Support Web configuration management.
|
||||
|
||||
## Community and Contribution
|
||||
|
||||
|
||||
16
README_CN.md
16
README_CN.md
@@ -8,7 +8,7 @@
|
||||
|
||||
[简体中文](/README_CN.md) | [English](/README.md)
|
||||
|
||||
**请访问 [EasyTier 官网](https://www.easytier.top/) 以查看完整的文档。**
|
||||
**请访问 [EasyTier 官网](https://www.easytier.cn/) 以查看完整的文档。**
|
||||
|
||||
一个简单、安全、去中心化的内网穿透 VPN 组网方案,使用 Rust 语言和 Tokio 框架实现。
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
- **高可用性**:支持多路径和在检测到高丢包率或网络错误时切换到健康路径。
|
||||
- **IPV6 支持**:支持利用 IPV6 组网。
|
||||
- **多协议类型**: 支持使用 WebSocket、QUIC 等协议进行节点间通信。
|
||||
- **Web 管理界面**:支持通过 [Web 界面](https://easytier.cn)管理节点。
|
||||
|
||||
## 安装
|
||||
|
||||
@@ -52,7 +53,7 @@
|
||||
|
||||
4. **通过Docker Compose安装**
|
||||
|
||||
请访问 [EasyTier 官网](https://www.easytier.top/) 以查看完整的文档。
|
||||
请访问 [EasyTier 官网](https://www.easytier.cn/) 以查看完整的文档。
|
||||
|
||||
5. **使用一键脚本安装 (仅适用于 Linux)**
|
||||
|
||||
@@ -199,20 +200,20 @@ sudo easytier-core --ipv4 10.144.144.2 -n 10.1.1.0/24
|
||||
|
||||
### 无公网IP组网
|
||||
|
||||
EasyTier 支持共享公网节点进行组网。目前已部署共享的公网节点 ``tcp://public.easytier.top:11010``。
|
||||
EasyTier 支持共享公网节点进行组网。目前已部署共享的公网节点 ``tcp://public.easytier.cn:11010``。
|
||||
|
||||
使用共享节点时,需要每个入网节点提供相同的 ``--network-name`` 和 ``--network-secret`` 参数,作为网络的唯一标识。
|
||||
|
||||
以双节点为例,节点 A 执行:
|
||||
|
||||
```sh
|
||||
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
|
||||
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
|
||||
```
|
||||
|
||||
节点 B 执行
|
||||
|
||||
```sh
|
||||
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
|
||||
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
|
||||
```
|
||||
|
||||
命令执行成功后,节点 A 即可通过虚拟 IP 10.144.144.2 访问节点 B。
|
||||
@@ -289,7 +290,7 @@ connected_clients:
|
||||
也可以使用以下命令加入官方公共服务器集群,后续将实现公共服务器集群的节点间负载均衡:
|
||||
|
||||
```
|
||||
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.top:11010
|
||||
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.cn:11010
|
||||
```
|
||||
|
||||
### 其他配置
|
||||
@@ -299,9 +300,8 @@ sudo easytier-core --network-name easytier --network-secret easytier -p tcp://pu
|
||||
## 路线图
|
||||
|
||||
- [ ] 完善文档和用户指南。
|
||||
- [ ] 支持 TCP 打洞等特性。
|
||||
- [ ] 支持 TCP 打洞、KCP、FEC 等特性。
|
||||
- [ ] 支持 iOS。
|
||||
- [ ] 支持 Web 配置管理。
|
||||
|
||||
## 社区和贡献
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "easytier-gui",
|
||||
"type": "module",
|
||||
"version": "2.1.1",
|
||||
"version": "2.1.2",
|
||||
"private": true,
|
||||
"packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4",
|
||||
"scripts": {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "easytier-gui"
|
||||
version = "2.1.1"
|
||||
version = "2.1.2"
|
||||
description = "EasyTier GUI"
|
||||
authors = ["you"]
|
||||
edition = "2021"
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
"createUpdaterArtifacts": false
|
||||
},
|
||||
"productName": "easytier-gui",
|
||||
"version": "2.1.1",
|
||||
"version": "2.1.2",
|
||||
"identifier": "com.kkrainbow.easytier",
|
||||
"plugins": {},
|
||||
"app": {
|
||||
|
||||
@@ -3,7 +3,7 @@ name = "easytier"
|
||||
description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
|
||||
homepage = "https://github.com/EasyTier/EasyTier"
|
||||
repository = "https://github.com/EasyTier/EasyTier"
|
||||
version = "2.1.1"
|
||||
version = "2.1.2"
|
||||
edition = "2021"
|
||||
authors = ["kkrainbow"]
|
||||
keywords = ["vpn", "p2p", "network", "easytier"]
|
||||
|
||||
@@ -134,6 +134,12 @@ core_clap:
|
||||
compression:
|
||||
en: "compression algorithm to use, support none, zstd. default is none"
|
||||
zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none"
|
||||
mapped_listeners:
|
||||
en: "manually specify the public address of the listener, other nodes can use this address to connect to this node. e.g.: tcp://123.123.123.123:11223, can specify multiple."
|
||||
zh-CN: "手动指定监听器的公网地址,其他节点可以使用该地址连接到本节点。例如:tcp://123.123.123.123:11223,可以指定多个。"
|
||||
bind_device:
|
||||
en: "bind the connector socket to physical devices to avoid routing issues. e.g.: subnet proxy segment conflicts with a node's segment, after binding the physical device, it can communicate with the node normally."
|
||||
zh-CN: "将连接器的套接字绑定到物理设备以避免路由问题。比如子网代理网段与某节点的网段冲突,绑定物理设备后可以与该节点正常通信。"
|
||||
|
||||
core_app:
|
||||
panic_backtrace_save:
|
||||
|
||||
@@ -27,8 +27,9 @@ pub fn gen_default_flags() -> Flags {
|
||||
relay_all_peer_rpc: false,
|
||||
disable_udp_hole_punching: false,
|
||||
ipv6_listener: "udp://[::]:0".to_string(),
|
||||
multi_thread: false,
|
||||
multi_thread: true,
|
||||
data_compress_algo: CompressionAlgoPb::None.into(),
|
||||
bind_device: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,6 +73,9 @@ pub trait ConfigLoader: Send + Sync {
|
||||
fn get_listeners(&self) -> Vec<url::Url>;
|
||||
fn set_listeners(&self, listeners: Vec<url::Url>);
|
||||
|
||||
fn get_mapped_listeners(&self) -> Vec<url::Url>;
|
||||
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>);
|
||||
|
||||
fn get_rpc_portal(&self) -> Option<SocketAddr>;
|
||||
fn set_rpc_portal(&self, addr: SocketAddr);
|
||||
|
||||
@@ -183,6 +187,7 @@ struct Config {
|
||||
dhcp: Option<bool>,
|
||||
network_identity: Option<NetworkIdentity>,
|
||||
listeners: Option<Vec<url::Url>>,
|
||||
mapped_listeners: Option<Vec<url::Url>>,
|
||||
exit_nodes: Option<Vec<Ipv4Addr>>,
|
||||
|
||||
peer: Option<Vec<PeerConfig>>,
|
||||
@@ -472,6 +477,19 @@ impl ConfigLoader for TomlConfigLoader {
|
||||
self.config.lock().unwrap().listeners = Some(listeners);
|
||||
}
|
||||
|
||||
fn get_mapped_listeners(&self) -> Vec<url::Url> {
|
||||
self.config
|
||||
.lock()
|
||||
.unwrap()
|
||||
.mapped_listeners
|
||||
.clone()
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>) {
|
||||
self.config.lock().unwrap().mapped_listeners = listeners;
|
||||
}
|
||||
|
||||
fn get_rpc_portal(&self) -> Option<SocketAddr> {
|
||||
self.config.lock().unwrap().rpc_portal
|
||||
}
|
||||
|
||||
@@ -230,7 +230,10 @@ impl GlobalCtx {
|
||||
}
|
||||
|
||||
pub fn add_running_listener(&self, url: url::Url) {
|
||||
self.running_listeners.lock().unwrap().push(url);
|
||||
let mut l = self.running_listeners.lock().unwrap();
|
||||
if !l.contains(&url) {
|
||||
l.push(url);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_vpn_portal_cidr(&self) -> Option<cidr::Ipv4Cidr> {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
future,
|
||||
io::Write as _,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use tokio::task::JoinSet;
|
||||
@@ -81,7 +82,17 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
|
||||
}
|
||||
|
||||
pub fn get_machine_id() -> uuid::Uuid {
|
||||
// TODO: load from local file
|
||||
// a path same as the binary
|
||||
let machine_id_file = std::env::current_exe()
|
||||
.map(|x| x.with_file_name("et_machine_id"))
|
||||
.unwrap_or_else(|_| std::path::PathBuf::from("et_machine_id"));
|
||||
|
||||
// try load from local file
|
||||
if let Ok(mid) = std::fs::read_to_string(&machine_id_file) {
|
||||
if let Ok(mid) = uuid::Uuid::parse_str(mid.trim()) {
|
||||
return mid;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(
|
||||
target_os = "linux",
|
||||
@@ -95,7 +106,7 @@ pub fn get_machine_id() -> uuid::Uuid {
|
||||
crate::tunnel::generate_digest_from_str("", x.as_str(), &mut b);
|
||||
uuid::Uuid::from_bytes(b)
|
||||
})
|
||||
.unwrap_or(uuid::Uuid::new_v4());
|
||||
.ok();
|
||||
|
||||
#[cfg(not(any(
|
||||
target_os = "linux",
|
||||
@@ -103,9 +114,18 @@ pub fn get_machine_id() -> uuid::Uuid {
|
||||
target_os = "windows",
|
||||
target_os = "freebsd"
|
||||
)))]
|
||||
let gen_mid = None;
|
||||
|
||||
if gen_mid.is_some() {
|
||||
return gen_mid.unwrap();
|
||||
}
|
||||
|
||||
let gen_mid = uuid::Uuid::new_v4();
|
||||
|
||||
// TODO: save to local file
|
||||
// try save to local file
|
||||
if let Ok(mut file) = std::fs::File::create(machine_id_file) {
|
||||
let _ = file.write_all(gen_mid.to_string().as_bytes());
|
||||
}
|
||||
|
||||
gen_mid
|
||||
}
|
||||
|
||||
@@ -1,6 +1,13 @@
|
||||
// try connect peers directly, with either its public ip or lan ip
|
||||
|
||||
use std::{net::SocketAddr, sync::Arc, time::Duration};
|
||||
use std::{
|
||||
net::SocketAddr,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
|
||||
@@ -29,6 +36,8 @@ use super::create_connector_by_url;
|
||||
pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1;
|
||||
pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300;
|
||||
|
||||
static TESTING: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait PeerManagerForDirectConnector {
|
||||
async fn list_peers(&self) -> Vec<PeerId>;
|
||||
@@ -182,7 +191,7 @@ impl DirectConnectorManager {
|
||||
|
||||
// let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?;
|
||||
|
||||
if peer_id != dst_peer_id {
|
||||
if peer_id != dst_peer_id && !TESTING.load(Ordering::Relaxed) {
|
||||
tracing::info!(
|
||||
"connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}",
|
||||
addr,
|
||||
@@ -279,87 +288,103 @@ impl DirectConnectorManager {
|
||||
|
||||
let listener_host = listener.socket_addrs(|| None).unwrap().pop();
|
||||
match listener_host {
|
||||
Some(SocketAddr::V4(_)) => {
|
||||
ip_list.interface_ipv4s.iter().for_each(|ip| {
|
||||
let mut addr = (*listener).clone();
|
||||
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
|
||||
tasks.spawn(Self::try_connect_to_ip(
|
||||
data.clone(),
|
||||
dst_peer_id.clone(),
|
||||
addr.to_string(),
|
||||
));
|
||||
} else {
|
||||
tracing::error!(
|
||||
?ip,
|
||||
?listener,
|
||||
?dst_peer_id,
|
||||
"failed to set host for interface ipv4"
|
||||
);
|
||||
}
|
||||
});
|
||||
Some(SocketAddr::V4(s_addr)) => {
|
||||
if s_addr.ip().is_unspecified() {
|
||||
ip_list.interface_ipv4s.iter().for_each(|ip| {
|
||||
let mut addr = (*listener).clone();
|
||||
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
|
||||
tasks.spawn(Self::try_connect_to_ip(
|
||||
data.clone(),
|
||||
dst_peer_id.clone(),
|
||||
addr.to_string(),
|
||||
));
|
||||
} else {
|
||||
tracing::error!(
|
||||
?ip,
|
||||
?listener,
|
||||
?dst_peer_id,
|
||||
"failed to set host for interface ipv4"
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(public_ipv4) = ip_list.public_ipv4 {
|
||||
let mut addr = (*listener).clone();
|
||||
if addr
|
||||
.set_host(Some(public_ipv4.to_string().as_str()))
|
||||
.is_ok()
|
||||
{
|
||||
tasks.spawn(Self::try_connect_to_ip(
|
||||
data.clone(),
|
||||
dst_peer_id.clone(),
|
||||
addr.to_string(),
|
||||
));
|
||||
} else {
|
||||
tracing::error!(
|
||||
?public_ipv4,
|
||||
?listener,
|
||||
?dst_peer_id,
|
||||
"failed to set host for public ipv4"
|
||||
);
|
||||
if let Some(public_ipv4) = ip_list.public_ipv4 {
|
||||
let mut addr = (*listener).clone();
|
||||
if addr
|
||||
.set_host(Some(public_ipv4.to_string().as_str()))
|
||||
.is_ok()
|
||||
{
|
||||
tasks.spawn(Self::try_connect_to_ip(
|
||||
data.clone(),
|
||||
dst_peer_id.clone(),
|
||||
addr.to_string(),
|
||||
));
|
||||
} else {
|
||||
tracing::error!(
|
||||
?public_ipv4,
|
||||
?listener,
|
||||
?dst_peer_id,
|
||||
"failed to set host for public ipv4"
|
||||
);
|
||||
}
|
||||
}
|
||||
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
|
||||
tasks.spawn(Self::try_connect_to_ip(
|
||||
data.clone(),
|
||||
dst_peer_id.clone(),
|
||||
listener.to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
Some(SocketAddr::V6(_)) => {
|
||||
ip_list.interface_ipv6s.iter().for_each(|ip| {
|
||||
let mut addr = (*listener).clone();
|
||||
if addr
|
||||
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
|
||||
.is_ok()
|
||||
{
|
||||
tasks.spawn(Self::try_connect_to_ip(
|
||||
data.clone(),
|
||||
dst_peer_id.clone(),
|
||||
addr.to_string(),
|
||||
));
|
||||
} else {
|
||||
tracing::error!(
|
||||
?ip,
|
||||
?listener,
|
||||
?dst_peer_id,
|
||||
"failed to set host for interface ipv6"
|
||||
);
|
||||
}
|
||||
});
|
||||
Some(SocketAddr::V6(s_addr)) => {
|
||||
if s_addr.ip().is_unspecified() {
|
||||
ip_list.interface_ipv6s.iter().for_each(|ip| {
|
||||
let mut addr = (*listener).clone();
|
||||
if addr
|
||||
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
|
||||
.is_ok()
|
||||
{
|
||||
tasks.spawn(Self::try_connect_to_ip(
|
||||
data.clone(),
|
||||
dst_peer_id.clone(),
|
||||
addr.to_string(),
|
||||
));
|
||||
} else {
|
||||
tracing::error!(
|
||||
?ip,
|
||||
?listener,
|
||||
?dst_peer_id,
|
||||
"failed to set host for interface ipv6"
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(public_ipv6) = ip_list.public_ipv6 {
|
||||
let mut addr = (*listener).clone();
|
||||
if addr
|
||||
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
|
||||
.is_ok()
|
||||
{
|
||||
tasks.spawn(Self::try_connect_to_ip(
|
||||
data.clone(),
|
||||
dst_peer_id.clone(),
|
||||
addr.to_string(),
|
||||
));
|
||||
} else {
|
||||
tracing::error!(
|
||||
?public_ipv6,
|
||||
?listener,
|
||||
?dst_peer_id,
|
||||
"failed to set host for public ipv6"
|
||||
);
|
||||
if let Some(public_ipv6) = ip_list.public_ipv6 {
|
||||
let mut addr = (*listener).clone();
|
||||
if addr
|
||||
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
|
||||
.is_ok()
|
||||
{
|
||||
tasks.spawn(Self::try_connect_to_ip(
|
||||
data.clone(),
|
||||
dst_peer_id.clone(),
|
||||
addr.to_string(),
|
||||
));
|
||||
} else {
|
||||
tracing::error!(
|
||||
?public_ipv6,
|
||||
?listener,
|
||||
?dst_peer_id,
|
||||
"failed to set host for public ipv6"
|
||||
);
|
||||
}
|
||||
}
|
||||
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
|
||||
tasks.spawn(Self::try_connect_to_ip(
|
||||
data.clone(),
|
||||
dst_peer_id.clone(),
|
||||
listener.to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
p => {
|
||||
@@ -452,6 +477,49 @@ mod tests {
|
||||
proto::peer_rpc::GetIpListResponse,
|
||||
};
|
||||
|
||||
use super::TESTING;
|
||||
|
||||
#[tokio::test]
|
||||
async fn direct_connector_mapped_listener() {
|
||||
TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
let p_a = create_mock_peer_manager().await;
|
||||
let p_b = create_mock_peer_manager().await;
|
||||
let p_c = create_mock_peer_manager().await;
|
||||
let p_x = create_mock_peer_manager().await;
|
||||
connect_peer_manager(p_a.clone(), p_b.clone()).await;
|
||||
connect_peer_manager(p_b.clone(), p_c.clone()).await;
|
||||
connect_peer_manager(p_c.clone(), p_x.clone()).await;
|
||||
|
||||
wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap();
|
||||
wait_route_appear(p_a.clone(), p_x.clone()).await.unwrap();
|
||||
|
||||
let mut f = p_a.get_global_ctx().get_flags();
|
||||
f.bind_device = false;
|
||||
p_a.get_global_ctx().config.set_flags(f);
|
||||
|
||||
p_c.get_global_ctx()
|
||||
.config
|
||||
.set_mapped_listeners(Some(vec!["tcp://127.0.0.1:11334".parse().unwrap()]));
|
||||
|
||||
p_x.get_global_ctx()
|
||||
.config
|
||||
.set_listeners(vec!["tcp://0.0.0.0:11334".parse().unwrap()]);
|
||||
let mut lis_x = ListenerManager::new(p_x.get_global_ctx(), p_x.clone());
|
||||
lis_x.prepare_listeners().await.unwrap();
|
||||
lis_x.run().await.unwrap();
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let mut dm_a = DirectConnectorManager::new(p_a.get_global_ctx(), p_a.clone());
|
||||
let mut dm_c = DirectConnectorManager::new(p_c.get_global_ctx(), p_c.clone());
|
||||
dm_a.run_as_client();
|
||||
dm_c.run_as_server();
|
||||
// p_c's mapped listener is p_x's listener, so p_a should connect to p_x directly
|
||||
|
||||
wait_route_appear_with_cost(p_a.clone(), p_x.my_peer_id(), Some(1))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[rstest::rstest]
|
||||
#[tokio::test]
|
||||
async fn direct_connector_basic_test(
|
||||
|
||||
@@ -297,12 +297,14 @@ impl ManualConnectorManager {
|
||||
|
||||
connector.lock().await.set_ip_version(ip_version);
|
||||
|
||||
set_bind_addr_for_peer_connector(
|
||||
connector.lock().await.as_mut(),
|
||||
ip_version == IpVersion::V4,
|
||||
&ip_collector,
|
||||
)
|
||||
.await;
|
||||
if data.global_ctx.config.get_flags().bind_device {
|
||||
set_bind_addr_for_peer_connector(
|
||||
connector.lock().await.as_mut(),
|
||||
ip_version == IpVersion::V4,
|
||||
&ip_collector,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
data.global_ctx.issue_event(GlobalCtxEvent::Connecting(
|
||||
connector.lock().await.remote_url().clone(),
|
||||
|
||||
@@ -56,23 +56,27 @@ pub async fn create_connector_by_url(
|
||||
"tcp" => {
|
||||
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "tcp")?;
|
||||
let mut connector = TcpTunnelConnector::new(url);
|
||||
set_bind_addr_for_peer_connector(
|
||||
&mut connector,
|
||||
dst_addr.is_ipv4(),
|
||||
&global_ctx.get_ip_collector(),
|
||||
)
|
||||
.await;
|
||||
if global_ctx.config.get_flags().bind_device {
|
||||
set_bind_addr_for_peer_connector(
|
||||
&mut connector,
|
||||
dst_addr.is_ipv4(),
|
||||
&global_ctx.get_ip_collector(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return Ok(Box::new(connector));
|
||||
}
|
||||
"udp" => {
|
||||
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "udp")?;
|
||||
let mut connector = UdpTunnelConnector::new(url);
|
||||
set_bind_addr_for_peer_connector(
|
||||
&mut connector,
|
||||
dst_addr.is_ipv4(),
|
||||
&global_ctx.get_ip_collector(),
|
||||
)
|
||||
.await;
|
||||
if global_ctx.config.get_flags().bind_device {
|
||||
set_bind_addr_for_peer_connector(
|
||||
&mut connector,
|
||||
dst_addr.is_ipv4(),
|
||||
&global_ctx.get_ip_collector(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return Ok(Box::new(connector));
|
||||
}
|
||||
"ring" => {
|
||||
@@ -84,12 +88,14 @@ pub async fn create_connector_by_url(
|
||||
"quic" => {
|
||||
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "quic")?;
|
||||
let mut connector = QUICTunnelConnector::new(url);
|
||||
set_bind_addr_for_peer_connector(
|
||||
&mut connector,
|
||||
dst_addr.is_ipv4(),
|
||||
&global_ctx.get_ip_collector(),
|
||||
)
|
||||
.await;
|
||||
if global_ctx.config.get_flags().bind_device {
|
||||
set_bind_addr_for_peer_connector(
|
||||
&mut connector,
|
||||
dst_addr.is_ipv4(),
|
||||
&global_ctx.get_ip_collector(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return Ok(Box::new(connector));
|
||||
}
|
||||
#[cfg(feature = "wireguard")]
|
||||
@@ -101,12 +107,14 @@ pub async fn create_connector_by_url(
|
||||
&nid.network_secret.unwrap_or_default(),
|
||||
);
|
||||
let mut connector = WgTunnelConnector::new(url, wg_config);
|
||||
set_bind_addr_for_peer_connector(
|
||||
&mut connector,
|
||||
dst_addr.is_ipv4(),
|
||||
&global_ctx.get_ip_collector(),
|
||||
)
|
||||
.await;
|
||||
if global_ctx.config.get_flags().bind_device {
|
||||
set_bind_addr_for_peer_connector(
|
||||
&mut connector,
|
||||
dst_addr.is_ipv4(),
|
||||
&global_ctx.get_ip_collector(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return Ok(Box::new(connector));
|
||||
}
|
||||
#[cfg(feature = "websocket")]
|
||||
@@ -114,12 +122,14 @@ pub async fn create_connector_by_url(
|
||||
use crate::tunnel::{FromUrl, IpVersion};
|
||||
let dst_addr = SocketAddr::from_url(url.clone(), IpVersion::Both)?;
|
||||
let mut connector = crate::tunnel::websocket::WSTunnelConnector::new(url);
|
||||
set_bind_addr_for_peer_connector(
|
||||
&mut connector,
|
||||
dst_addr.is_ipv4(),
|
||||
&global_ctx.get_ip_collector(),
|
||||
)
|
||||
.await;
|
||||
if global_ctx.config.get_flags().bind_device {
|
||||
set_bind_addr_for_peer_connector(
|
||||
&mut connector,
|
||||
dst_addr.is_ipv4(),
|
||||
&global_ctx.get_ip_collector(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return Ok(Box::new(connector));
|
||||
}
|
||||
_ => {
|
||||
|
||||
@@ -123,6 +123,13 @@ struct Cli {
|
||||
)]
|
||||
listeners: Vec<String>,
|
||||
|
||||
#[arg(
|
||||
long,
|
||||
help = t!("core_clap.mapped_listeners").to_string(),
|
||||
num_args = 0..
|
||||
)]
|
||||
mapped_listeners: Vec<String>,
|
||||
|
||||
#[arg(
|
||||
long,
|
||||
help = t!("core_clap.no_listener").to_string(),
|
||||
@@ -185,7 +192,7 @@ struct Cli {
|
||||
#[arg(
|
||||
long,
|
||||
help = t!("core_clap.multi_thread").to_string(),
|
||||
default_value = "false"
|
||||
default_value = "true"
|
||||
)]
|
||||
multi_thread: bool,
|
||||
|
||||
@@ -300,6 +307,12 @@ struct Cli {
|
||||
default_value = "none",
|
||||
)]
|
||||
compression: String,
|
||||
|
||||
#[arg(
|
||||
long,
|
||||
help = t!("core_clap.bind_device").to_string()
|
||||
)]
|
||||
bind_device: Option<bool>,
|
||||
}
|
||||
|
||||
rust_i18n::i18n!("locales", fallback = "en");
|
||||
@@ -422,6 +435,23 @@ impl TryFrom<&Cli> for TomlConfigLoader {
|
||||
.collect(),
|
||||
);
|
||||
|
||||
cfg.set_mapped_listeners(Some(
|
||||
cli.mapped_listeners
|
||||
.iter()
|
||||
.map(|s| {
|
||||
s.parse()
|
||||
.with_context(|| format!("mapped listener is not a valid url: {}", s))
|
||||
.unwrap()
|
||||
})
|
||||
.map(|s: url::Url| {
|
||||
if s.port().is_none() {
|
||||
panic!("mapped listener port is missing: {}", s);
|
||||
}
|
||||
s
|
||||
})
|
||||
.collect(),
|
||||
));
|
||||
|
||||
for n in cli.proxy_networks.iter() {
|
||||
cfg.add_proxy_cidr(
|
||||
n.parse()
|
||||
@@ -534,6 +564,9 @@ impl TryFrom<&Cli> for TomlConfigLoader {
|
||||
),
|
||||
}
|
||||
.into();
|
||||
if let Some(bind_device) = cli.bind_device {
|
||||
f.bind_device = bind_device;
|
||||
}
|
||||
cfg.set_flags(f);
|
||||
|
||||
cfg.set_exit_nodes(cli.exit_nodes.clone());
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use cidr::Ipv4Inet;
|
||||
use crossbeam::atomic::AtomicCell;
|
||||
use dashmap::DashMap;
|
||||
@@ -24,11 +25,11 @@ use tokio::{
|
||||
use tracing::Level;
|
||||
|
||||
use crate::{
|
||||
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
|
||||
common::{error::Error, global_ctx::ArcGlobalCtx, scoped_task::ScopedTask, PeerId},
|
||||
gateway::ip_reassembler::compose_ipv4_packet,
|
||||
peers::{peer_manager::PeerManager, PeerPacketFilter},
|
||||
tunnel::{
|
||||
common::setup_sokcet2,
|
||||
common::{reserve_buf, setup_sokcet2},
|
||||
packet_def::{PacketType, ZCPacket},
|
||||
},
|
||||
};
|
||||
@@ -139,59 +140,81 @@ impl UdpNatEntry {
|
||||
mut packet_sender: Sender<ZCPacket>,
|
||||
virtual_ipv4: Ipv4Addr,
|
||||
) {
|
||||
let mut buf = [0u8; 65536];
|
||||
let mut udp_body: &mut [u8] = unsafe { std::mem::transmute(&mut buf[20 + 8..]) };
|
||||
let mut ip_id = 1;
|
||||
let (s, mut r) = tachyonix::channel(128);
|
||||
|
||||
loop {
|
||||
let (len, src_socket) = match timeout(
|
||||
Duration::from_secs(120),
|
||||
self.socket.recv_from(&mut udp_body),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(x)) => x,
|
||||
Ok(Err(err)) => {
|
||||
tracing::error!(?err, "udp nat recv failed");
|
||||
let self_clone = self.clone();
|
||||
let recv_task = ScopedTask::from(tokio::spawn(async move {
|
||||
let mut cur_buf = BytesMut::new();
|
||||
loop {
|
||||
if self_clone
|
||||
.stopped
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
{
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(?err, "udp nat recv timeout");
|
||||
break;
|
||||
|
||||
reserve_buf(&mut cur_buf, 64 * 1024 + 28, 128 * 1024 + 28);
|
||||
assert_eq!(cur_buf.len(), 0);
|
||||
unsafe {
|
||||
cur_buf.advance_mut(28);
|
||||
}
|
||||
};
|
||||
|
||||
tracing::trace!(?len, ?src_socket, "udp nat packet response received");
|
||||
let (len, src_socket) = match timeout(
|
||||
Duration::from_secs(120),
|
||||
self_clone.socket.recv_buf_from(&mut cur_buf),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(x)) => x,
|
||||
Ok(Err(err)) => {
|
||||
tracing::error!(?err, "udp nat recv failed");
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(?err, "udp nat recv timeout");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if self.stopped.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
break;
|
||||
tracing::trace!(?len, ?src_socket, "udp nat packet response received");
|
||||
|
||||
let ret_buf = cur_buf.split();
|
||||
s.send((ret_buf, len, src_socket)).await.unwrap();
|
||||
}
|
||||
}));
|
||||
|
||||
let SocketAddr::V4(mut src_v4) = src_socket else {
|
||||
continue;
|
||||
};
|
||||
let self_clone = self.clone();
|
||||
let send_task = ScopedTask::from(tokio::spawn(async move {
|
||||
let mut ip_id = 1;
|
||||
while let Ok((mut packet, len, src_socket)) = r.recv().await {
|
||||
let SocketAddr::V4(mut src_v4) = src_socket else {
|
||||
continue;
|
||||
};
|
||||
|
||||
self.mark_active();
|
||||
self_clone.mark_active();
|
||||
|
||||
if src_v4.ip().is_loopback() {
|
||||
src_v4.set_ip(virtual_ipv4);
|
||||
if src_v4.ip().is_loopback() {
|
||||
src_v4.set_ip(virtual_ipv4);
|
||||
}
|
||||
|
||||
let Ok(_) = Self::compose_ipv4_packet(
|
||||
&self_clone,
|
||||
&mut packet_sender,
|
||||
&mut packet,
|
||||
&src_v4,
|
||||
len,
|
||||
1280,
|
||||
ip_id,
|
||||
)
|
||||
.await
|
||||
else {
|
||||
break;
|
||||
};
|
||||
ip_id = ip_id.wrapping_add(1);
|
||||
}
|
||||
}));
|
||||
|
||||
let Ok(_) = Self::compose_ipv4_packet(
|
||||
&self,
|
||||
&mut packet_sender,
|
||||
&mut buf,
|
||||
&src_v4,
|
||||
len,
|
||||
1200,
|
||||
ip_id,
|
||||
)
|
||||
.await
|
||||
else {
|
||||
break;
|
||||
};
|
||||
ip_id = ip_id.wrapping_add(1);
|
||||
}
|
||||
let _ = tokio::join!(recv_task, send_task);
|
||||
|
||||
self.stop();
|
||||
}
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use tokio::{sync::Mutex, task::JoinSet};
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
#[cfg(feature = "quic")]
|
||||
use crate::tunnel::quic::QUICTunnelListener;
|
||||
@@ -63,16 +62,20 @@ impl TunnelHandlerForListener for PeerManager {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Listener {
|
||||
inner: Arc<Mutex<dyn TunnelListener>>,
|
||||
pub trait ListenerCreatorTrait: Fn() -> Box<dyn TunnelListener> + Send + Sync {}
|
||||
impl<T: Send + Sync> ListenerCreatorTrait for T where T: Fn() -> Box<dyn TunnelListener> + Send {}
|
||||
pub type ListenerCreator = Box<dyn ListenerCreatorTrait>;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ListenerFactory {
|
||||
creator_fn: Arc<ListenerCreator>,
|
||||
must_succ: bool,
|
||||
}
|
||||
|
||||
pub struct ListenerManager<H> {
|
||||
global_ctx: ArcGlobalCtx,
|
||||
net_ns: NetNS,
|
||||
listeners: Vec<Listener>,
|
||||
listeners: Vec<ListenerFactory>,
|
||||
peer_manager: Arc<H>,
|
||||
|
||||
tasks: JoinSet<()>,
|
||||
@@ -90,31 +93,39 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
|
||||
}
|
||||
|
||||
pub async fn prepare_listeners(&mut self) -> Result<(), Error> {
|
||||
let self_id = self.global_ctx.get_id();
|
||||
self.add_listener(
|
||||
RingTunnelListener::new(
|
||||
format!("ring://{}", self.global_ctx.get_id())
|
||||
.parse()
|
||||
.unwrap(),
|
||||
),
|
||||
move || {
|
||||
Box::new(RingTunnelListener::new(
|
||||
format!("ring://{}", self_id).parse().unwrap(),
|
||||
))
|
||||
},
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
|
||||
for l in self.global_ctx.config.get_listener_uris().iter() {
|
||||
let Ok(lis) = get_listener_by_url(l, self.global_ctx.clone()) else {
|
||||
let l = l.clone();
|
||||
let Ok(_) = get_listener_by_url(&l, self.global_ctx.clone()) else {
|
||||
let msg = format!("failed to get listener by url: {}, maybe not supported", l);
|
||||
self.global_ctx
|
||||
.issue_event(GlobalCtxEvent::ListenerAddFailed(l.clone(), msg));
|
||||
continue;
|
||||
};
|
||||
self.add_listener(lis, true).await?;
|
||||
let ctx = self.global_ctx.clone();
|
||||
self.add_listener(move || get_listener_by_url(&l, ctx.clone()).unwrap(), true)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if self.global_ctx.config.get_flags().enable_ipv6 {
|
||||
let ipv6_listener = self.global_ctx.config.get_flags().ipv6_listener.clone();
|
||||
let _ = self
|
||||
.add_listener(
|
||||
UdpTunnelListener::new(ipv6_listener.parse().unwrap()),
|
||||
move || {
|
||||
Box::new(UdpTunnelListener::new(
|
||||
ipv6_listener.clone().parse().unwrap(),
|
||||
))
|
||||
},
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
@@ -123,85 +134,91 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add_listener<L>(&mut self, listener: L, must_succ: bool) -> Result<(), Error>
|
||||
where
|
||||
L: TunnelListener + 'static,
|
||||
{
|
||||
let listener = Arc::new(Mutex::new(listener));
|
||||
self.listeners.push(Listener {
|
||||
inner: listener,
|
||||
pub async fn add_listener<C: ListenerCreatorTrait + 'static>(
|
||||
&mut self,
|
||||
creator: C,
|
||||
must_succ: bool,
|
||||
) -> Result<(), Error> {
|
||||
self.listeners.push(ListenerFactory {
|
||||
creator_fn: Arc::new(Box::new(creator)),
|
||||
must_succ,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
#[tracing::instrument(skip(creator))]
|
||||
async fn run_listener(
|
||||
listener: Arc<Mutex<dyn TunnelListener>>,
|
||||
creator: Arc<ListenerCreator>,
|
||||
peer_manager: Arc<H>,
|
||||
global_ctx: ArcGlobalCtx,
|
||||
) {
|
||||
let mut l = listener.lock().await;
|
||||
global_ctx.add_running_listener(l.local_url());
|
||||
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
|
||||
loop {
|
||||
let ret = match l.accept().await {
|
||||
Ok(ret) => ret,
|
||||
let mut l = (creator)();
|
||||
let _g = global_ctx.net_ns.guard();
|
||||
match l.listen().await {
|
||||
Ok(_) => {
|
||||
global_ctx.add_running_listener(l.local_url());
|
||||
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
|
||||
}
|
||||
Err(e) => {
|
||||
global_ctx.issue_event(GlobalCtxEvent::ListenerAcceptFailed(
|
||||
global_ctx.issue_event(GlobalCtxEvent::ListenerAddFailed(
|
||||
l.local_url(),
|
||||
e.to_string(),
|
||||
format!("error: {:?}, retry listen later...", e),
|
||||
));
|
||||
tracing::error!(?e, ?l, "listener accept error");
|
||||
tracing::error!(?e, ?l, "listener listen error");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
loop {
|
||||
let ret = match l.accept().await {
|
||||
Ok(ret) => ret,
|
||||
Err(e) => {
|
||||
global_ctx.issue_event(GlobalCtxEvent::ListenerAcceptFailed(
|
||||
l.local_url(),
|
||||
format!("error: {:?}, retry listen later...", e),
|
||||
));
|
||||
tracing::error!(?e, ?l, "listener accept error");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let tunnel_info = ret.info().unwrap();
|
||||
global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted(
|
||||
tunnel_info
|
||||
.local_addr
|
||||
.clone()
|
||||
.unwrap_or_default()
|
||||
.to_string(),
|
||||
tunnel_info
|
||||
.remote_addr
|
||||
.clone()
|
||||
.unwrap_or_default()
|
||||
.to_string(),
|
||||
));
|
||||
tracing::info!(ret = ?ret, "conn accepted");
|
||||
let peer_manager = peer_manager.clone();
|
||||
let global_ctx = global_ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
let server_ret = peer_manager.handle_tunnel(ret).await;
|
||||
if let Err(e) = &server_ret {
|
||||
global_ctx.issue_event(GlobalCtxEvent::ConnectionError(
|
||||
tunnel_info.local_addr.unwrap_or_default().to_string(),
|
||||
tunnel_info.remote_addr.unwrap_or_default().to_string(),
|
||||
e.to_string(),
|
||||
));
|
||||
tracing::error!(error = ?e, "handle conn error");
|
||||
}
|
||||
});
|
||||
let tunnel_info = ret.info().unwrap();
|
||||
global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted(
|
||||
tunnel_info
|
||||
.local_addr
|
||||
.clone()
|
||||
.unwrap_or_default()
|
||||
.to_string(),
|
||||
tunnel_info
|
||||
.remote_addr
|
||||
.clone()
|
||||
.unwrap_or_default()
|
||||
.to_string(),
|
||||
));
|
||||
tracing::info!(ret = ?ret, "conn accepted");
|
||||
let peer_manager = peer_manager.clone();
|
||||
let global_ctx = global_ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
let server_ret = peer_manager.handle_tunnel(ret).await;
|
||||
if let Err(e) = &server_ret {
|
||||
global_ctx.issue_event(GlobalCtxEvent::ConnectionError(
|
||||
tunnel_info.local_addr.unwrap_or_default().to_string(),
|
||||
tunnel_info.remote_addr.unwrap_or_default().to_string(),
|
||||
e.to_string(),
|
||||
));
|
||||
tracing::error!(error = ?e, "handle conn error");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) -> Result<(), Error> {
|
||||
for listener in &self.listeners {
|
||||
let _guard = self.net_ns.guard();
|
||||
let addr = listener.inner.lock().await.local_url();
|
||||
tracing::warn!("run listener: {:?}", listener);
|
||||
listener
|
||||
.inner
|
||||
.lock()
|
||||
.await
|
||||
.listen()
|
||||
.await
|
||||
.with_context(|| format!("failed to add listener {}", addr))?;
|
||||
self.tasks.spawn(Self::run_listener(
|
||||
listener.inner.clone(),
|
||||
listener.creator_fn.clone(),
|
||||
self.peer_manager.clone(),
|
||||
self.global_ctx.clone(),
|
||||
));
|
||||
@@ -213,12 +230,14 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::atomic::{AtomicI32, Ordering};
|
||||
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use tokio::time::timeout;
|
||||
|
||||
use crate::{
|
||||
common::global_ctx::tests::get_mock_global_ctx,
|
||||
tunnel::{packet_def::ZCPacket, ring::RingTunnelConnector, TunnelConnector},
|
||||
tunnel::{packet_def::ZCPacket, ring::RingTunnelConnector, TunnelConnector, TunnelError},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
@@ -245,12 +264,18 @@ mod tests {
|
||||
|
||||
let ring_id = format!("ring://{}", uuid::Uuid::new_v4());
|
||||
|
||||
let ring_id_clone = ring_id.clone();
|
||||
listener_mgr
|
||||
.add_listener(RingTunnelListener::new(ring_id.parse().unwrap()), true)
|
||||
.add_listener(
|
||||
move || Box::new(RingTunnelListener::new(ring_id_clone.parse().unwrap())),
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
listener_mgr.run().await.unwrap();
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
let connect_once = |ring_id| async move {
|
||||
let tunnel = RingTunnelConnector::new(ring_id).connect().await.unwrap();
|
||||
let (mut recv, _send) = tunnel.split();
|
||||
@@ -269,4 +294,60 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn retry_listen() {
|
||||
let counter = Arc::new(AtomicI32::new(0));
|
||||
let drop_counter = Arc::new(AtomicI32::new(0));
|
||||
struct MockListener {
|
||||
counter: Arc<AtomicI32>,
|
||||
drop_counter: Arc<AtomicI32>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TunnelListener for MockListener {
|
||||
fn local_url(&self) -> url::Url {
|
||||
"mock://".parse().unwrap()
|
||||
}
|
||||
|
||||
async fn listen(&mut self) -> Result<(), TunnelError> {
|
||||
self.counter.fetch_add(1, Ordering::Relaxed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn accept(&mut self) -> Result<Box<dyn Tunnel>, TunnelError> {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
Err(TunnelError::BufferFull)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MockListener {
|
||||
fn drop(&mut self) {
|
||||
self.drop_counter.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
let handler = Arc::new(MockListenerHandler {});
|
||||
let mut listener_mgr = ListenerManager::new(get_mock_global_ctx(), handler.clone());
|
||||
let counter_clone = counter.clone();
|
||||
let drop_counter_clone = drop_counter.clone();
|
||||
listener_mgr
|
||||
.add_listener(
|
||||
move || {
|
||||
Box::new(MockListener {
|
||||
counter: counter_clone.clone(),
|
||||
drop_counter: drop_counter_clone.clone(),
|
||||
})
|
||||
},
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
listener_mgr.run().await.unwrap();
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
|
||||
|
||||
assert!(counter.load(Ordering::Relaxed) >= 2);
|
||||
assert!(drop_counter.load(Ordering::Relaxed) >= 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -484,13 +484,6 @@ mod tests {
|
||||
let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
|
||||
let c = TunnelWithFilter::new(c, c_recorder.clone());
|
||||
|
||||
let s = if drop_both {
|
||||
let s_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
|
||||
Box::new(TunnelWithFilter::new(s, s_recorder.clone()))
|
||||
} else {
|
||||
s
|
||||
};
|
||||
|
||||
let c_peer_id = new_peer_id();
|
||||
let s_peer_id = new_peer_id();
|
||||
|
||||
@@ -506,6 +499,7 @@ mod tests {
|
||||
s_peer
|
||||
.start_recv_loop(tokio::sync::mpsc::channel(200).0)
|
||||
.await;
|
||||
// do not start ping for s, s only reponde to ping from c
|
||||
|
||||
assert!(c_ret.is_ok());
|
||||
assert!(s_ret.is_ok());
|
||||
@@ -547,7 +541,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn peer_conn_pingpong_bothside_timeout() {
|
||||
peer_conn_pingpong_test_common(4, 12, true, true).await;
|
||||
peer_conn_pingpong_test_common(3, 14, true, true).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -289,29 +289,29 @@ impl PeerConnPinger {
|
||||
"pingpong task recv pingpong_once result"
|
||||
);
|
||||
|
||||
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 150 && loss_rate_1 > 0.20) {
|
||||
let current_rx_packets = throughput.rx_packets();
|
||||
let need_close = if last_rx_packets != current_rx_packets {
|
||||
// if we receive some packet from peers, we should relax the condition
|
||||
counter > 50 && loss_rate_1 > 0.5
|
||||
let current_rx_packets = throughput.rx_packets();
|
||||
if last_rx_packets != current_rx_packets {
|
||||
// if we receive some packet from peers, reset the counter to avoid conn close.
|
||||
// conn will close only if we have 5 continous round pingpong loss after no packet received.
|
||||
counter = 0;
|
||||
}
|
||||
|
||||
// TODO: wait more time to see if the loss rate is still high after no rx
|
||||
} else {
|
||||
true
|
||||
};
|
||||
tracing::debug!(
|
||||
"counter: {}, loss_rate_1: {}, loss_rate_20: {}, cur_rx_packets: {}, last_rx: {}, node_id: {}",
|
||||
counter, loss_rate_1, loss_rate_20, current_rx_packets, last_rx_packets, my_node_id
|
||||
);
|
||||
|
||||
if need_close {
|
||||
tracing::warn!(
|
||||
?ret,
|
||||
?self,
|
||||
?loss_rate_1,
|
||||
?loss_rate_20,
|
||||
?last_rx_packets,
|
||||
?current_rx_packets,
|
||||
"pingpong loss rate too high, closing"
|
||||
);
|
||||
break;
|
||||
}
|
||||
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 100 && loss_rate_1 > 0.35) {
|
||||
tracing::warn!(
|
||||
?ret,
|
||||
?self,
|
||||
?loss_rate_1,
|
||||
?loss_rate_20,
|
||||
?last_rx_packets,
|
||||
?current_rx_packets,
|
||||
"pingpong loss rate too high, closing"
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
last_rx_packets = throughput.rx_packets();
|
||||
|
||||
@@ -1739,7 +1739,6 @@ impl RouteSessionManager {
|
||||
continue;
|
||||
}
|
||||
let _ = self.stop_session(*peer_id);
|
||||
assert_ne!(Some(*peer_id), cur_dst_peer_id_to_initiate);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -24,8 +24,10 @@ impl DirectConnectorRpc for DirectConnectorManagerRpcServer {
|
||||
let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await;
|
||||
ret.listeners = self
|
||||
.global_ctx
|
||||
.get_running_listeners()
|
||||
.config
|
||||
.get_mapped_listeners()
|
||||
.into_iter()
|
||||
.chain(self.global_ctx.get_running_listeners().into_iter())
|
||||
.map(Into::into)
|
||||
.collect();
|
||||
Ok(ret)
|
||||
|
||||
@@ -21,6 +21,7 @@ message FlagsInConfig {
|
||||
string ipv6_listener = 14;
|
||||
bool multi_thread = 15;
|
||||
CompressionAlgoPb data_compress_algo = 16;
|
||||
bool bind_device = 17;
|
||||
}
|
||||
|
||||
message RpcDescriptor {
|
||||
|
||||
@@ -159,7 +159,10 @@ pub fn build_rpc_packet(
|
||||
let cur_packet = RpcPacket {
|
||||
from_peer,
|
||||
to_peer,
|
||||
descriptor: if cur_offset == 0 {
|
||||
descriptor: if cur_offset == 0
|
||||
|| compression_info.algo == CompressionAlgoPb::None as i32
|
||||
{
|
||||
// old version must have descriptor on every piece
|
||||
Some(rpc_desc.clone())
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -28,6 +28,30 @@ impl TcpTunnelListener {
|
||||
listener: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_accept(&mut self) -> Result<Box<dyn Tunnel>, std::io::Error> {
|
||||
let listener = self.listener.as_ref().unwrap();
|
||||
let (stream, _) = listener.accept().await?;
|
||||
|
||||
if let Err(e) = stream.set_nodelay(true) {
|
||||
tracing::warn!(?e, "set_nodelay fail in accept");
|
||||
}
|
||||
|
||||
let info = TunnelInfo {
|
||||
tunnel_type: "tcp".to_owned(),
|
||||
local_addr: Some(self.local_url().into()),
|
||||
remote_addr: Some(
|
||||
super::build_url_from_socket_addr(&stream.peer_addr()?.to_string(), "tcp").into(),
|
||||
),
|
||||
};
|
||||
|
||||
let (r, w) = stream.into_split();
|
||||
Ok(Box::new(TunnelWrapper::new(
|
||||
FramedReader::new(r, TCP_MTU_BYTES),
|
||||
FramedWriter::new(w),
|
||||
Some(info),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -57,27 +81,23 @@ impl TunnelListener for TcpTunnelListener {
|
||||
}
|
||||
|
||||
async fn accept(&mut self) -> Result<Box<dyn Tunnel>, super::TunnelError> {
|
||||
let listener = self.listener.as_ref().unwrap();
|
||||
let (stream, _) = listener.accept().await?;
|
||||
|
||||
if let Err(e) = stream.set_nodelay(true) {
|
||||
tracing::warn!(?e, "set_nodelay fail in accept");
|
||||
loop {
|
||||
match self.do_accept().await {
|
||||
Ok(ret) => return Ok(ret),
|
||||
Err(e) => {
|
||||
use std::io::ErrorKind::*;
|
||||
if matches!(
|
||||
e.kind(),
|
||||
NotConnected | ConnectionAborted | ConnectionRefused | ConnectionReset
|
||||
) {
|
||||
tracing::warn!(?e, "accept fail with retryable error: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
tracing::warn!(?e, "accept fail");
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let info = TunnelInfo {
|
||||
tunnel_type: "tcp".to_owned(),
|
||||
local_addr: Some(self.local_url().into()),
|
||||
remote_addr: Some(
|
||||
super::build_url_from_socket_addr(&stream.peer_addr()?.to_string(), "tcp").into(),
|
||||
),
|
||||
};
|
||||
|
||||
let (r, w) = stream.into_split();
|
||||
Ok(Box::new(TunnelWrapper::new(
|
||||
FramedReader::new(r, TCP_MTU_BYTES),
|
||||
FramedWriter::new(w),
|
||||
Some(info),
|
||||
)))
|
||||
}
|
||||
|
||||
fn local_url(&self) -> url::Url {
|
||||
|
||||
Reference in New Issue
Block a user