Compare commits

...

14 Commits

Author SHA1 Message Date
sijie.sun
84bfac144c bump version to 2.4.1 2025-08-02 10:48:17 +08:00
Sijie.Sun
9eddb4b072 fix readme assets (#1182) 2025-08-01 23:58:01 +08:00
Sijie.Sun
4fca0f40fe update readme (#1181) 2025-08-01 23:52:27 +08:00
Sijie.Sun
43b9e6e6e9 fix macos elevate (#1177) 2025-08-01 09:36:10 +08:00
Sijie.Sun
583c768f40 fix exit code when error occcurs (#1173) 2025-07-30 23:05:22 +08:00
Tunglies
b1b2421561 fix: compiling with socket2::Type::RAW not found on macOS #1168 (#1169) 2025-07-30 00:33:38 +08:00
Sijie.Sun
3d610c0f0f Some Improvements (#1172)
1. do not exit when dns query failed on et startup.
2. do not send secret digest to client when secret mismatch.
2025-07-29 23:05:38 +08:00
Sijie.Sun
2ec88da823 cli for port forward and tcp whitelist (#1165) 2025-07-29 09:30:47 +08:00
Mg Pig
5514de1187 chore: update flake configuration (#1163) 2025-07-29 00:26:05 +08:00
Glavo
e70eed74e2 Add support for Linux RISC-V 64 (#1159) 2025-07-27 22:07:07 +08:00
Sijie.Sun
7dc5988620 avoid udp hole punch go through tun (#1155) 2025-07-26 14:39:03 +08:00
Sijie.Sun
354a4e1d7b fix acl not work with kcp&quic (#1152) 2025-07-26 14:38:10 +08:00
Sijie.Sun
5409c5bbe7 port range should not be converted to single port (#1154) 2025-07-26 14:13:13 +08:00
Sijie.Sun
33ff9554cd need encrypt rpc if dst is in peer map (#1151) 2025-07-25 22:28:47 +08:00
47 changed files with 1992 additions and 324 deletions

View File

@@ -19,6 +19,10 @@ SYSROOT = "/usr/local/ohos-sdk/linux/native/sysroot"
linker = "aarch64-unknown-linux-musl-gcc"
rustflags = ["-C", "target-feature=+crt-static"]
[target.riscv64gc-unknown-linux-musl]
linker = "riscv64-unknown-linux-musl-gcc"
rustflags = ["-C", "target-feature=+crt-static"]
[target.'cfg(all(windows, target_env = "msvc"))']
rustflags = ["-C", "target-feature=+crt-static"]

View File

@@ -83,6 +83,9 @@ jobs:
- TARGET: x86_64-unknown-linux-musl
OS: ubuntu-22.04
ARTIFACT_NAME: linux-x86_64
- TARGET: riscv64gc-unknown-linux-musl
OS: ubuntu-22.04
ARTIFACT_NAME: linux-riscv64
- TARGET: mips-unknown-linux-musl
OS: ubuntu-22.04
ARTIFACT_NAME: linux-mips
@@ -189,6 +192,8 @@ jobs:
if [[ $OS =~ ^windows.*$ ]]; then
SUFFIX=.exe
CORE_FEATURES="--features=mimalloc"
elif [[ $TARGET =~ ^riscv64.*$ ]]; then
CORE_FEATURES="--features=mimalloc"
else
CORE_FEATURES="--features=jemalloc"
fi
@@ -255,7 +260,7 @@ jobs:
TAG=$GITHUB_SHA
fi
if [[ $OS =~ ^ubuntu.*$ && ! $TARGET =~ ^.*freebsd$ && ! $TARGET =~ ^loongarch.*$ ]]; then
if [[ $OS =~ ^ubuntu.*$ && ! $TARGET =~ ^.*freebsd$ && ! $TARGET =~ ^loongarch.*$ && ! $TARGET =~ ^riscv64.*$ ]]; then
UPX_VERSION=4.2.4
curl -L https://github.com/upx/upx/releases/download/v${UPX_VERSION}/upx-${UPX_VERSION}-amd64_linux.tar.xz -s | tar xJvf -
cp upx-${UPX_VERSION}-amd64_linux/upx .

View File

@@ -11,7 +11,7 @@ on:
image_tag:
description: 'Tag for this image build'
type: string
default: 'v2.4.0'
default: 'v2.4.1'
required: true
mark_latest:
description: 'Mark this image as latest'

View File

@@ -15,6 +15,8 @@ if [[ $OS =~ ^ubuntu.*$ ]]; then
# if target is mips or mipsel, we should use soft-float version of musl
if [[ $TARGET =~ ^mips.*$ || $TARGET =~ ^mipsel.*$ ]]; then
MUSL_TARGET=${TARGET}sf
elif [[ $TARGET =~ ^riscv64gc-.*$ ]]; then
MUSL_TARGET=${TARGET/#riscv64gc-/riscv64-}
fi
if [[ $MUSL_TARGET =~ musl ]]; then
mkdir -p ./musl_gcc

View File

@@ -21,7 +21,7 @@ on:
version:
description: 'Version for this release'
type: string
default: 'v2.4.0'
default: 'v2.4.1'
required: true
make_latest:
description: 'Mark this release as latest'

1
.gitignore vendored
View File

@@ -12,6 +12,7 @@ target-*/
.vscode
/.idea
/.direnv/
# perf & flamegraph
perf.data

View File

@@ -37,7 +37,6 @@ Thank you for your interest in contributing to EasyTier! This document provides
# Core build dependencies
sudo apt-get update && sudo apt-get install -y \
musl-tools \
libappindicator3-dev \
llvm \
clang \
protobuf-compiler
@@ -53,6 +52,7 @@ sudo apt install -y \
librsvg2-dev \
libxdo-dev \
libssl-dev \
libappindicator3-dev \
patchelf
# Testing dependencies

View File

@@ -45,7 +45,6 @@
# 核心构建依赖
sudo apt-get update && sudo apt-get install -y \
musl-tools \
libappindicator3-dev \
llvm \
clang \
protobuf-compiler
@@ -61,6 +60,7 @@ sudo apt install -y \
librsvg2-dev \
libxdo-dev \
libssl-dev \
libappindicator3-dev \
patchelf
# 测试依赖

25
Cargo.lock generated
View File

@@ -1876,7 +1876,7 @@ dependencies = [
"libc",
"option-ext",
"redox_users 0.5.0",
"windows-sys 0.59.0",
"windows-sys 0.60.2",
]
[[package]]
@@ -2119,9 +2119,10 @@ dependencies = [
"dashmap",
"dunce",
"easytier",
"elevated-command",
"gethostname 1.0.2",
"libc",
"once_cell",
"security-framework-sys",
"serde",
"serde_json",
"tauri",
@@ -2137,6 +2138,8 @@ dependencies = [
"thunk-rs",
"tokio",
"uuid",
"winapi",
"windows 0.52.0",
]
[[package]]
@@ -2207,20 +2210,6 @@ dependencies = [
"serde",
]
[[package]]
name = "elevated-command"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54c410eccdcc5b759704fdb6a792afe6b01ab8a062e2c003ff2567e2697a94aa"
dependencies = [
"anyhow",
"base64 0.21.7",
"libc",
"log",
"winapi",
"windows 0.52.0",
]
[[package]]
name = "embed-resource"
version = "3.0.5"
@@ -7557,9 +7546,9 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.5.7"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.52.0",

View File

@@ -105,9 +105,9 @@ After successful execution, you can check the network status using `easytier-cli
```text
| ipv4 | hostname | cost | lat_ms | loss_rate | rx_bytes | tx_bytes | tunnel_proto | nat_type | id | version |
| ------------ | -------------- | ----- | ------ | --------- | -------- | -------- | ------------ | -------- | ---------- | --------------- |
| 10.126.126.1 | abc-1 | Local | * | * | * | * | udp | FullCone | 439804259 | 2.4.0-70e69a38~ |
| 10.126.126.2 | abc-2 | p2p | 3.452 | 0 | 17.33 kB | 20.42 kB | udp | FullCone | 390879727 | 2.4.0-70e69a38~ |
| | PublicServer_a | p2p | 27.796 | 0.000 | 50.01 kB | 67.46 kB | tcp | Unknown | 3771642457 | 2.4.0-70e69a38~ |
| 10.126.126.1 | abc-1 | Local | * | * | * | * | udp | FullCone | 439804259 | 2.4.1-70e69a38~ |
| 10.126.126.2 | abc-2 | p2p | 3.452 | 0 | 17.33 kB | 20.42 kB | udp | FullCone | 390879727 | 2.4.1-70e69a38~ |
| | PublicServer_a | p2p | 27.796 | 0.000 | 50.01 kB | 67.46 kB | tcp | Unknown | 3771642457 | 2.4.1-70e69a38~ |
```
You can test connectivity between nodes:
@@ -302,14 +302,18 @@ CDN acceleration and security protection for this project are sponsored by Tence
</a>
</p>
Special thanks to [Langlang Cloud](https://langlang.cloud/) for sponsoring our public servers.
Special thanks to [Langlang Cloud](https://langlangy.cn/?i26c5a5) and [RainCloud](https://www.rainyun.com/NjM0NzQ1_) for sponsoring our public servers.
<p align="center">
<a href="https://langlangy.cn/?i26c5a5" target="_blank">
<img src="assets/langlang.png" width="200">
</a>
<a href="https://langlangy.cn/?i26c5a5" target="_blank">
<img src="assets/raincloud.png" width="200">
</a>
</p>
If you find EasyTier helpful, please consider sponsoring us. Software development and maintenance require a lot of time and effort, and your sponsorship will help us better maintain and improve EasyTier.
<p align="center">

View File

@@ -106,9 +106,9 @@ sudo easytier-core -d --network-name abc --network-secret abc -p tcp://public.ea
```text
| ipv4 | hostname | cost | lat_ms | loss_rate | rx_bytes | tx_bytes | tunnel_proto | nat_type | id | version |
| ------------ | -------------- | ----- | ------ | --------- | -------- | -------- | ------------ | -------- | ---------- | --------------- |
| 10.126.126.1 | abc-1 | Local | * | * | * | * | udp | FullCone | 439804259 | 2.4.0-70e69a38~ |
| 10.126.126.2 | abc-2 | p2p | 3.452 | 0 | 17.33 kB | 20.42 kB | udp | FullCone | 390879727 | 2.4.0-70e69a38~ |
| | PublicServer_a | p2p | 27.796 | 0.000 | 50.01 kB | 67.46 kB | tcp | Unknown | 3771642457 | 2.4.0-70e69a38~ |
| 10.126.126.1 | abc-1 | Local | * | * | * | * | udp | FullCone | 439804259 | 2.4.1-70e69a38~ |
| 10.126.126.2 | abc-2 | p2p | 3.452 | 0 | 17.33 kB | 20.42 kB | udp | FullCone | 390879727 | 2.4.1-70e69a38~ |
| | PublicServer_a | p2p | 27.796 | 0.000 | 50.01 kB | 67.46 kB | tcp | Unknown | 3771642457 | 2.4.1-70e69a38~ |
```
您可以测试节点之间的连通性:
@@ -303,12 +303,15 @@ EasyTier 在 [LGPL-3.0](https://github.com/EasyTier/EasyTier/blob/main/LICENSE)
</a>
</p>
特别感谢 [浪浪云](https://langlang.cloud/) 赞助我们的公共服务器。
特别感谢 [浪浪云](https://langlangy.cn/?i26c5a5) 和 [雨云](https://www.rainyun.com/NjM0NzQ1_) 赞助我们的公共服务器。
<p align="center">
<a href="https://langlangy.cn/?i26c5a5" target="_blank">
<img src="assets/langlang.png" width="200">
</a>
<a href="https://langlangy.cn/?i26c5a5" target="_blank">
<img src="assets/raincloud.png" width="200">
</a>
</p>
如果您觉得 EasyTier 有帮助,请考虑赞助我们。软件开发和维护需要大量的时间和精力,您的赞助将帮助我们更好地维护和改进 EasyTier。

BIN
assets/raincloud.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

View File

@@ -1,6 +1,6 @@
id=easytier_magisk
name=EasyTier_Magisk
version=v2.4.0
version=v2.4.1
versionCode=1
author=EasyTier
description=easytier magisk module @EasyTier(https://github.com/EasyTier/EasyTier)

View File

@@ -1010,7 +1010,7 @@ dependencies = [
[[package]]
name = "easytier"
version = "2.4.0"
version = "2.4.1"
source = "git+https://github.com/EasyTier/EasyTier.git#a4bb555fac1046d0099c44676fa9d0d8cca55c99"
dependencies = [
"anyhow",

View File

@@ -1,7 +1,7 @@
{
"name": "easytier-gui",
"type": "module",
"version": "2.4.0",
"version": "2.4.1",
"private": true,
"packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4",
"scripts": {

View File

@@ -1,6 +1,6 @@
[package]
name = "easytier-gui"
version = "2.4.0"
version = "2.4.1"
description = "EasyTier GUI"
authors = ["you"]
edition = "2021"
@@ -40,7 +40,6 @@ chrono = { version = "0.4.37", features = ["serde"] }
once_cell = "1.18.0"
dashmap = "6.0"
elevated-command = "1.1.2"
gethostname = "1.0.2"
dunce = "1.0.4"
@@ -54,6 +53,15 @@ tauri-plugin-os = "2.3.0"
tauri-plugin-autostart = "2.5.0"
uuid = "1.17.0"
[target.'cfg(target_os = "windows")'.dependencies]
windows = { version = "0.52", features = ["Win32_Foundation", "Win32_UI_Shell", "Win32_UI_WindowsAndMessaging"] }
winapi = { version = "0.3.9", features = ["securitybaseapi", "processthreadsapi"] }
[target.'cfg(target_family = "unix")'.dependencies]
libc = "0.2"
[target.'cfg(target_os = "macos")'.dependencies]
security-framework-sys = "2.9.0"
[features]
# This feature is used for production builds or when a dev server is not specified, DO NOT REMOVE!!

View File

@@ -0,0 +1,97 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Luis Liu. All rights reserved.
* Licensed under the MIT License. See License in the project root for license information.
*--------------------------------------------------------------------------------------------*/
use super::Command;
use anyhow::{anyhow, Result};
use std::env;
use std::ffi::OsStr;
use std::path::PathBuf;
use std::process::{Command as StdCommand, Output};
use std::str::FromStr;
/// The implementation of state check and elevated executing varies on each platform
impl Command {
/// Check the state the current program running
///
/// Return `true` if the program is running as root, otherwise false
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
///
/// fn main() {
/// let is_elevated = Command::is_elevated();
///
/// }
/// ```
pub fn is_elevated() -> bool {
let uid = unsafe { libc::getuid() };
if uid == 0 {
true
} else {
false
}
}
/// Prompting the user with a graphical OS dialog for the root password,
/// excuting the command with escalated privileges, and return the output
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
/// use std::process::Command as StdCommand;
///
/// fn main() {
/// let mut cmd = StdCommand::new("path to the application");
/// let elevated_cmd = Command::new(cmd);
/// let output = elevated_cmd.output().unwrap();
/// }
/// ```
pub fn output(&self) -> Result<Output> {
let pkexec = PathBuf::from_str("/bin/pkexec")?;
let mut command = StdCommand::new(pkexec);
let display = env::var("DISPLAY");
let xauthority = env::var("XAUTHORITY");
let home = env::var("HOME");
command.arg("--disable-internal-agent");
if display.is_ok() || xauthority.is_ok() || home.is_ok() {
command.arg("env");
if let Ok(display) = display {
command.arg(format!("DISPLAY={}", display));
}
if let Ok(xauthority) = xauthority {
command.arg(format!("XAUTHORITY={}", xauthority));
}
if let Ok(home) = home {
command.arg(format!("HOME={}", home));
}
} else {
if self.cmd.get_envs().any(|(_, v)| v.is_some()) {
command.arg("env");
}
}
for (k, v) in self.cmd.get_envs() {
if let Some(value) = v {
command.arg(format!(
"{}={}",
k.to_str().ok_or(anyhow!("invalid key"))?,
value.to_str().ok_or(anyhow!("invalid value"))?
));
}
}
command.arg(self.cmd.get_program());
let args: Vec<&OsStr> = self.cmd.get_args().collect();
if !args.is_empty() {
command.args(args);
}
let output = command.output()?;
Ok(output)
}
}

View File

@@ -0,0 +1,182 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Luis Liu. All rights reserved.
* Licensed under the MIT License. See License in the project root for license information.
*--------------------------------------------------------------------------------------------*/
// Thanks to https://github.com/jorangreef/sudo-prompt/blob/master/index.js
// MIT License
//
// Copyright (c) 2015 Joran Dirk Greef
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// ...
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
use super::Command;
use anyhow::Result;
use std::env;
use std::path::PathBuf;
use std::process::{ExitStatus, Output};
use std::ffi::{CString, OsString};
use std::io;
use std::mem;
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
use std::ptr;
use libc::{fcntl, fileno, waitpid, EINTR, F_GETOWN};
use security_framework_sys::authorization::{
errAuthorizationSuccess, kAuthorizationFlagDefaults, kAuthorizationFlagDestroyRights,
AuthorizationCreate, AuthorizationExecuteWithPrivileges, AuthorizationFree, AuthorizationRef,
};
const ENV_PATH: &str = "PATH";
fn get_exe_path<P: AsRef<Path>>(exe_name: P) -> Option<PathBuf> {
let exe_name = exe_name.as_ref();
if exe_name.has_root() {
return Some(exe_name.into());
}
if let Ok(abs_path) = exe_name.canonicalize() {
if abs_path.is_file() {
return Some(abs_path);
}
}
env::var_os(ENV_PATH).and_then(|paths| {
env::split_paths(&paths)
.filter_map(|dir| {
let full_path = dir.join(exe_name);
if full_path.is_file() {
Some(full_path)
} else {
None
}
})
.next()
})
}
macro_rules! make_cstring {
($s:expr) => {
match CString::new($s.as_bytes()) {
Ok(s) => s,
Err(_) => {
return Err(io::Error::new(io::ErrorKind::Other, "null byte in string"));
}
}
};
}
unsafe fn gui_runas(prog: *const i8, argv: *const *const i8) -> i32 {
let mut authref: AuthorizationRef = ptr::null_mut();
let mut pipe: *mut libc::FILE = ptr::null_mut();
if AuthorizationCreate(
ptr::null(),
ptr::null(),
kAuthorizationFlagDefaults,
&mut authref,
) != errAuthorizationSuccess
{
return -1;
}
if AuthorizationExecuteWithPrivileges(
authref,
prog,
kAuthorizationFlagDefaults,
argv as *const *mut _,
&mut pipe,
) != errAuthorizationSuccess
{
AuthorizationFree(authref, kAuthorizationFlagDestroyRights);
return -1;
}
let pid = fcntl(fileno(pipe), F_GETOWN, 0);
let mut status = 0;
loop {
let r = waitpid(pid, &mut status, 0);
if r == -1 && io::Error::last_os_error().raw_os_error() == Some(EINTR) {
continue;
} else {
break;
}
}
AuthorizationFree(authref, kAuthorizationFlagDestroyRights);
status
}
fn runas_root_gui(cmd: &Command) -> io::Result<ExitStatus> {
let exe: OsString = match get_exe_path(&cmd.cmd.get_program()) {
Some(exe) => exe.into(),
None => unsafe {
return Ok(mem::transmute(!0));
},
};
let prog = make_cstring!(exe);
let mut args = vec![];
for arg in cmd.cmd.get_args() {
args.push(make_cstring!(arg))
}
let mut argv: Vec<_> = args.iter().map(|x| x.as_ptr()).collect();
argv.push(ptr::null());
unsafe { Ok(mem::transmute(gui_runas(prog.as_ptr(), argv.as_ptr()))) }
}
/// The implementation of state check and elevated executing varies on each platform
impl Command {
/// Check the state the current program running
///
/// Return `true` if the program is running as root, otherwise false
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
///
/// fn main() {
/// let is_elevated = Command::is_elevated();
///
/// }
/// ```
pub fn is_elevated() -> bool {
let uid = unsafe { libc::getuid() };
let euid = unsafe { libc::geteuid() };
match (uid, euid) {
(0, 0) => true,
(_, 0) => true,
(_, _) => false,
}
}
/// Prompting the user with a graphical OS dialog for the root password,
/// excuting the command with escalated privileges, and return the output
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
/// use std::process::Command as StdCommand;
///
/// fn main() {
/// let mut cmd = StdCommand::new("path to the application");
/// let elevated_cmd = Command::new(cmd);
/// let output = elevated_cmd.output().unwrap();
/// }
/// ```
pub fn output(&self) -> Result<Output> {
let status = runas_root_gui(self)?;
Ok(Output {
status,
stdout: Vec::new(),
stderr: Vec::new(),
})
}
}

View File

@@ -0,0 +1,182 @@
#![allow(dead_code)]
/*---------------------------------------------------------------------------------------------
* Copyright (c) Luis Liu. All rights reserved.
* Licensed under the MIT License. See License in the project root for license information.
*--------------------------------------------------------------------------------------------*/
use std::convert::From;
use std::process::Command as StdCommand;
/// Wrap of std::process::command and escalate privileges while executing
pub struct Command {
cmd: StdCommand,
#[allow(dead_code)]
icon: Option<Vec<u8>>,
#[allow(dead_code)]
name: Option<String>,
}
/// Command initialization shares the same logic across all the platforms
impl Command {
/// Constructs a new `Command` from a std::process::Command
/// instance, it would read the following configuration from
/// the instance while executing:
///
/// * The instance's path to the program
/// * The instance's arguments
/// * The instance's environment variables
///
/// So far, the new `Command` would only take the environment variables explicitly
/// set by std::process::Command::env and std::process::Command::env,
/// without the ones inherited from the parent process
///
/// And the environment variables would only be taken on Linux and MacOS,
/// they would be ignored on Windows
///
/// Current working directory would be the following while executing the command:
/// - %SystemRoot%\System32 on Windows
/// - /root on Linux
/// - $TMPDIR/sudo_prompt_applet/applet.app/Contents/MacOS on MacOS
///
/// To pass environment variables on Windows,
/// to inherit environment variables from the parent process and
/// to change the working directory will be supported in later versions
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
/// use std::process::Command as StdCommand;
///
/// fn main() {
/// let mut cmd = StdCommand::new("path to the application");
///
/// cmd.arg("some arg");
/// cmd.env("some key", "some value");
///
/// let elevated_cmd = Command::new(cmd);
/// }
/// ```
pub fn new(cmd: StdCommand) -> Self {
Self {
cmd,
icon: None,
name: None,
}
}
/// Consumes the `Take`, returning the wrapped std::process::Command
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
/// use std::process::Command as StdCommand;
///
/// fn main() {
/// let mut cmd = StdCommand::new("path to the application");
/// let elevated_cmd = Command::new(cmd);
/// let cmd = elevated_cmd.into_inner();
/// }
/// ```
pub fn into_inner(self) -> StdCommand {
self.cmd
}
/// Gets a mutable reference to the underlying std::process::Command
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
/// use std::process::Command as StdCommand;
///
/// fn main() {
/// let mut cmd = StdCommand::new("path to the application");
/// let elevated_cmd = Command::new(cmd);
/// let cmd = elevated_cmd.get_ref();
/// }
/// ```
pub fn get_ref(&self) -> &StdCommand {
&self.cmd
}
/// Gets a reference to the underlying std::process::Command
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
/// use std::process::Command as StdCommand;
///
/// fn main() {
/// let mut cmd = StdCommand::new("path to the application");
/// let elevated_cmd = Command::new(cmd);
/// let cmd = elevated_cmd.get_mut();
/// }
/// ```
pub fn get_mut(&mut self) -> &mut StdCommand {
&mut self.cmd
}
/// Set the `icon` for the pop-up graphical OS dialog
///
/// This method is only applicable on `MacOS`
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
/// use std::process::Command as StdCommand;
///
/// fn main() {
/// let mut cmd = StdCommand::new("path to the application");
/// let elevated_cmd = Command::new(cmd);
/// elevated_cmd.icon(include_bytes!("path to the icon").to_vec());
/// }
/// ```
pub fn icon(&mut self, icon: Vec<u8>) -> &mut Self {
self.icon = Some(icon);
self
}
/// Set the name for the pop-up graphical OS dialog
///
/// This method is only applicable on `MacOS`
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
/// use std::process::Command as StdCommand;
///
/// fn main() {
/// let mut cmd = StdCommand::new("path to the application");
/// let elevated_cmd = Command::new(cmd);
/// elevated_cmd.name("some name".to_string());
/// }
/// ```
pub fn name(&mut self, name: String) -> &mut Self {
self.name = Some(name);
self
}
}
impl From<StdCommand> for Command {
/// Converts from a std::process::Command
///
/// It is similiar with the construct method
fn from(cmd: StdCommand) -> Self {
Self {
cmd,
icon: None,
name: None,
}
}
}
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "macos")]
mod macos;
#[cfg(target_os = "windows")]
mod windows;

View File

@@ -0,0 +1,114 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Luis Liu. All rights reserved.
* Licensed under the MIT License. See License in the project root for license information.
*--------------------------------------------------------------------------------------------*/
use super::Command;
use anyhow::Result;
use std::mem;
use std::os::windows::process::ExitStatusExt;
use std::process::{ExitStatus, Output};
use winapi::shared::minwindef::{DWORD, LPVOID};
use winapi::um::processthreadsapi::{GetCurrentProcess, OpenProcessToken};
use winapi::um::securitybaseapi::GetTokenInformation;
use winapi::um::winnt::{TokenElevation, HANDLE, TOKEN_ELEVATION, TOKEN_QUERY};
use windows::core::{w, HSTRING, PCWSTR};
use windows::Win32::Foundation::HWND;
use windows::Win32::UI::Shell::ShellExecuteW;
use windows::Win32::UI::WindowsAndMessaging::SW_HIDE;
/// The implementation of state check and elevated executing varies on each platform
impl Command {
/// Check the state the current program running
///
/// Return `true` if the program is running as root, otherwise false
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
///
/// fn main() {
/// let is_elevated = Command::is_elevated();
///
/// }
/// ```
pub fn is_elevated() -> bool {
// Thanks to https://stackoverflow.com/a/8196291
unsafe {
let mut current_token_ptr: HANDLE = mem::zeroed();
let mut token_elevation: TOKEN_ELEVATION = mem::zeroed();
let token_elevation_type_ptr: *mut TOKEN_ELEVATION = &mut token_elevation;
let mut size: DWORD = 0;
let result = OpenProcessToken(GetCurrentProcess(), TOKEN_QUERY, &mut current_token_ptr);
if result != 0 {
let result = GetTokenInformation(
current_token_ptr,
TokenElevation,
token_elevation_type_ptr as LPVOID,
mem::size_of::<winapi::um::winnt::TOKEN_ELEVATION_TYPE>() as u32,
&mut size,
);
if result != 0 {
return token_elevation.TokenIsElevated != 0;
}
}
}
false
}
/// Prompting the user with a graphical OS dialog for the root password,
/// excuting the command with escalated privileges, and return the output
///
/// On Windows, according to https://learn.microsoft.com/en-us/windows/win32/api/shellapi/nf-shellapi-shellexecutew#return-value,
/// Output.status.code() shoudl be greater than 32 if the function succeeds,
/// otherwise the value indicates the cause of the failure
///
/// On Windows, Output.stdout and Output.stderr will always be empty as of now
///
/// # Examples
///
/// ```no_run
/// use elevated_command::Command;
/// use std::process::Command as StdCommand;
///
/// fn main() {
/// let mut cmd = StdCommand::new("path to the application");
/// let elevated_cmd = Command::new(cmd);
/// let output = elevated_cmd.output().unwrap();
/// }
/// ```
pub fn output(&self) -> Result<Output> {
let args = self
.cmd
.get_args()
.map(|c| c.to_str().unwrap().to_string())
.collect::<Vec<String>>();
let parameters = if args.is_empty() {
HSTRING::new()
} else {
let arg_str = args.join(" ");
HSTRING::from(arg_str)
};
// according to https://stackoverflow.com/a/38034535
// the cwd always point to %SystemRoot%\System32 and cannot be changed by settting lpdirectory param
let r = unsafe {
ShellExecuteW(
HWND(0),
w!("runas"),
&HSTRING::from(self.cmd.get_program()),
&HSTRING::from(parameters),
PCWSTR::null(),
SW_HIDE,
)
};
Ok(Output {
status: ExitStatus::from_raw(r.0 as u32),
stdout: Vec::<u8>::new(),
stderr: Vec::<u8>::new(),
})
}
}

View File

@@ -1,6 +1,8 @@
// Prevents additional console window on Windows in release, DO NOT REMOVE!!
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
mod elevate;
use std::collections::BTreeMap;
use easytier::{
@@ -128,7 +130,7 @@ fn toggle_window_visibility<R: tauri::Runtime>(app: &tauri::AppHandle<R>) {
#[cfg(not(target_os = "android"))]
fn check_sudo() -> bool {
let is_elevated = elevated_command::Command::is_elevated();
let is_elevated = elevate::Command::is_elevated();
if !is_elevated {
let exe_path = std::env::var("APPIMAGE")
.ok()
@@ -139,7 +141,7 @@ fn check_sudo() -> bool {
if args.contains(&AUTOSTART_ARG.to_owned()) {
stdcmd.arg(AUTOSTART_ARG);
}
elevated_command::Command::new(stdcmd)
elevate::Command::new(stdcmd)
.output()
.expect("Failed to run elevated command");
}

View File

@@ -17,7 +17,7 @@
"createUpdaterArtifacts": false
},
"productName": "easytier-gui",
"version": "2.4.0",
"version": "2.4.1",
"identifier": "com.kkrainbow.easytier",
"plugins": {},
"app": {

View File

@@ -1,6 +1,6 @@
[package]
name = "easytier-web"
version = "2.4.0"
version = "2.4.1"
edition = "2021"
description = "Config server for easytier. easytier-core gets config from this and web frontend use it as restful api server."

View File

@@ -3,7 +3,7 @@ name = "easytier"
description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
homepage = "https://github.com/EasyTier/EasyTier"
repository = "https://github.com/EasyTier/EasyTier"
version = "2.4.0"
version = "2.4.1"
edition = "2021"
authors = ["kkrainbow"]
keywords = ["vpn", "p2p", "network", "easytier"]
@@ -115,7 +115,7 @@ byteorder = "1.5.0"
# for proxy
cidr = { version = "0.2.2", features = ["serde"] }
socket2 = "0.5.5"
socket2 = { version = "0.5.10", features = ["all"] }
# for hole punching
stun_codec = "0.3.4"

View File

@@ -6,8 +6,9 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};
use crate::common::token_bucket::TokenBucket;
use crate::common::{config::ConfigLoader, global_ctx::ArcGlobalCtx, token_bucket::TokenBucket};
use crate::proto::acl::*;
use anyhow::Context as _;
use dashmap::DashMap;
use tokio::task::JoinSet;
@@ -470,6 +471,7 @@ impl AclProcessor {
let rules = match chain_type {
ChainType::Inbound => &self.inbound_rules,
ChainType::Outbound => &self.outbound_rules,
ChainType::Forward => &self.forward_rules,
_ => {
return AclResult {
action: Action::Drop,
@@ -992,6 +994,146 @@ impl AclStatKey {
}
}
pub struct AclRuleBuilder {
pub acl: Option<Acl>,
pub tcp_whitelist: Vec<String>,
pub udp_whitelist: Vec<String>,
pub whitelist_priority: Option<u32>,
}
impl AclRuleBuilder {
fn parse_port_list(port_list: &[String]) -> anyhow::Result<Vec<String>> {
let mut ports = Vec::new();
for port_spec in port_list {
if port_spec.contains('-') {
// Handle port range like "8000-9000"
let parts: Vec<&str> = port_spec.split('-').collect();
if parts.len() != 2 {
return Err(anyhow::anyhow!("Invalid port range format: {}", port_spec));
}
let start: u16 = parts[0]
.parse()
.with_context(|| format!("Invalid start port in range: {}", port_spec))?;
let end: u16 = parts[1]
.parse()
.with_context(|| format!("Invalid end port in range: {}", port_spec))?;
if start > end {
return Err(anyhow::anyhow!(
"Start port must be <= end port in range: {}",
port_spec
));
}
// acl can handle port range
ports.push(port_spec.clone());
} else {
// Handle single port
let port: u16 = port_spec
.parse()
.with_context(|| format!("Invalid port number: {}", port_spec))?;
ports.push(port.to_string());
}
}
Ok(ports)
}
fn generate_acl_from_whitelists(&mut self) -> anyhow::Result<()> {
if self.tcp_whitelist.is_empty() && self.udp_whitelist.is_empty() {
return Ok(());
}
// Create inbound chain for whitelist rules
let mut inbound_chain = Chain {
name: "inbound_whitelist".to_string(),
chain_type: ChainType::Inbound as i32,
description: "Auto-generated inbound whitelist from CLI".to_string(),
enabled: true,
rules: vec![],
default_action: Action::Drop as i32, // Default deny
};
let mut rule_priority = self.whitelist_priority.unwrap_or(1000u32);
// Add TCP whitelist rules
if !self.tcp_whitelist.is_empty() {
let tcp_ports = Self::parse_port_list(&self.tcp_whitelist)?;
let tcp_rule = Rule {
name: "tcp_whitelist".to_string(),
description: "Auto-generated TCP whitelist rule".to_string(),
priority: rule_priority,
enabled: true,
protocol: Protocol::Tcp as i32,
ports: tcp_ports,
source_ips: vec![],
destination_ips: vec![],
source_ports: vec![],
action: Action::Allow as i32,
rate_limit: 0,
burst_limit: 0,
stateful: true,
};
inbound_chain.rules.push(tcp_rule);
rule_priority -= 1;
}
// Add UDP whitelist rules
if !self.udp_whitelist.is_empty() {
let udp_ports = Self::parse_port_list(&self.udp_whitelist)?;
let udp_rule = Rule {
name: "udp_whitelist".to_string(),
description: "Auto-generated UDP whitelist rule".to_string(),
priority: rule_priority,
enabled: true,
protocol: Protocol::Udp as i32,
ports: udp_ports,
source_ips: vec![],
destination_ips: vec![],
source_ports: vec![],
action: Action::Allow as i32,
rate_limit: 0,
burst_limit: 0,
stateful: false,
};
inbound_chain.rules.push(udp_rule);
}
if self.acl.is_none() {
self.acl = Some(Acl::default());
}
let acl = self.acl.as_mut().unwrap();
if let Some(ref mut acl_v1) = acl.acl_v1 {
acl_v1.chains.push(inbound_chain);
} else {
acl.acl_v1 = Some(AclV1 {
chains: vec![inbound_chain],
});
}
Ok(())
}
fn do_build(mut self) -> anyhow::Result<Option<Acl>> {
self.generate_acl_from_whitelists()?;
Ok(self.acl.clone())
}
pub fn build(global_ctx: &ArcGlobalCtx) -> anyhow::Result<Option<Acl>> {
let builder = AclRuleBuilder {
acl: global_ctx.config.get_acl(),
tcp_whitelist: global_ctx.config.get_tcp_whitelist(),
udp_whitelist: global_ctx.config.get_udp_whitelist(),
whitelist_priority: None,
};
builder.do_build()
}
}
#[derive(Debug, Clone, Copy)]
pub enum AclStatType {
Total,

View File

@@ -122,6 +122,12 @@ pub trait ConfigLoader: Send + Sync {
fn get_acl(&self) -> Option<Acl>;
fn set_acl(&self, acl: Option<Acl>);
fn get_tcp_whitelist(&self) -> Vec<String>;
fn set_tcp_whitelist(&self, whitelist: Vec<String>);
fn get_udp_whitelist(&self) -> Vec<String>;
fn set_udp_whitelist(&self, whitelist: Vec<String>);
fn dump(&self) -> String;
}
@@ -230,7 +236,7 @@ pub struct VpnPortalConfig {
pub wireguard_listen: SocketAddr,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Hash)]
pub struct PortForwardConfig {
pub bind_addr: SocketAddr,
pub dst_addr: SocketAddr,
@@ -299,6 +305,9 @@ struct Config {
flags_struct: Option<Flags>,
acl: Option<Acl>,
tcp_whitelist: Option<Vec<String>>,
udp_whitelist: Option<Vec<String>>,
}
#[derive(Debug, Clone)]
@@ -665,6 +674,32 @@ impl ConfigLoader for TomlConfigLoader {
self.config.lock().unwrap().acl = acl;
}
fn get_tcp_whitelist(&self) -> Vec<String> {
self.config
.lock()
.unwrap()
.tcp_whitelist
.clone()
.unwrap_or_default()
}
fn set_tcp_whitelist(&self, whitelist: Vec<String>) {
self.config.lock().unwrap().tcp_whitelist = Some(whitelist);
}
fn get_udp_whitelist(&self) -> Vec<String> {
self.config
.lock()
.unwrap()
.udp_whitelist
.clone()
.unwrap_or_default()
}
fn set_udp_whitelist(&self, whitelist: Vec<String>) {
self.config.lock().unwrap().udp_whitelist = Some(whitelist);
}
fn dump(&self) -> String {
let default_flags_json = serde_json::to_string(&gen_default_flags()).unwrap();
let default_flags_hashmap =

View File

@@ -4,11 +4,11 @@ use std::{
};
use anyhow::Context;
use dashmap::{DashMap, DashSet};
use dashmap::DashSet;
use tokio::{
sync::{
broadcast::{error::RecvError, Receiver},
mpsc, Mutex,
mpsc,
},
task::JoinSet,
time::timeout,
@@ -32,7 +32,6 @@ use crate::{
global_ctx::{ArcGlobalCtx, GlobalCtxEvent},
netns::NetNS,
},
connector::set_bind_addr_for_peer_connector,
peers::peer_manager::PeerManager,
proto::cli::{
Connector, ConnectorManageRpc, ConnectorStatus, ListConnectorRequest,
@@ -43,8 +42,7 @@ use crate::{
use super::create_connector_by_url;
type MutexConnector = Arc<Mutex<Box<dyn TunnelConnector>>>;
type ConnectorMap = Arc<DashMap<String, MutexConnector>>;
type ConnectorMap = Arc<DashSet<String>>;
#[derive(Debug, Clone)]
struct ReconnResult {
@@ -72,7 +70,7 @@ pub struct ManualConnectorManager {
impl ManualConnectorManager {
pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc<PeerManager>) -> Self {
let connectors = Arc::new(DashMap::new());
let connectors = Arc::new(DashSet::new());
let tasks = JoinSet::new();
let event_subscriber = global_ctx.subscribe();
@@ -105,14 +103,11 @@ impl ManualConnectorManager {
T: TunnelConnector + 'static,
{
tracing::info!("add_connector: {}", connector.remote_url());
self.data.connectors.insert(
connector.remote_url().into(),
Arc::new(Mutex::new(Box::new(connector))),
);
self.data.connectors.insert(connector.remote_url().into());
}
pub async fn add_connector_by_url(&self, url: &str) -> Result<(), Error> {
self.add_connector(create_connector_by_url(url, &self.global_ctx, IpVersion::Both).await?);
self.data.connectors.insert(url.to_owned());
Ok(())
}
@@ -236,16 +231,16 @@ impl ManualConnectorManager {
for dead_url in dead_urls {
let data_clone = data.clone();
let sender = reconn_result_send.clone();
let (_, connector) = data.connectors.remove(&dead_url).unwrap();
data.connectors.remove(&dead_url).unwrap();
let insert_succ = data.reconnecting.insert(dead_url.clone());
assert!(insert_succ);
tasks.lock().unwrap().spawn(async move {
let reconn_ret = Self::conn_reconnect(data_clone.clone(), dead_url.clone(), connector.clone()).await;
let reconn_ret = Self::conn_reconnect(data_clone.clone(), dead_url.clone() ).await;
let _ = sender.send(reconn_ret).await;
data_clone.reconnecting.remove(&dead_url).unwrap();
data_clone.connectors.insert(dead_url.clone(), connector);
data_clone.connectors.insert(dead_url.clone());
});
}
tracing::info!("reconn_interval tick, done");
@@ -323,25 +318,13 @@ impl ManualConnectorManager {
async fn conn_reconnect_with_ip_version(
data: Arc<ConnectorManagerData>,
dead_url: String,
connector: MutexConnector,
ip_version: IpVersion,
) -> Result<ReconnResult, Error> {
let ip_collector = data.global_ctx.get_ip_collector();
let connector =
create_connector_by_url(&dead_url, &data.global_ctx.clone(), ip_version).await?;
connector.lock().await.set_ip_version(ip_version);
if data.global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
connector.lock().await.as_mut(),
ip_version == IpVersion::V4,
&ip_collector,
)
.await;
}
data.global_ctx.issue_event(GlobalCtxEvent::Connecting(
connector.lock().await.remote_url().clone(),
));
data.global_ctx
.issue_event(GlobalCtxEvent::Connecting(connector.remote_url().clone()));
tracing::info!("reconnect try connect... conn: {:?}", connector);
let Some(pm) = data.peer_manager.upgrade() else {
return Err(Error::AnyhowError(anyhow::anyhow!(
@@ -349,9 +332,7 @@ impl ManualConnectorManager {
)));
};
let (peer_id, conn_id) = pm
.try_direct_connect(connector.lock().await.as_mut())
.await?;
let (peer_id, conn_id) = pm.try_direct_connect(connector).await?;
tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url);
Ok(ReconnResult {
dead_url,
@@ -363,7 +344,6 @@ impl ManualConnectorManager {
async fn conn_reconnect(
data: Arc<ConnectorManagerData>,
dead_url: String,
connector: MutexConnector,
) -> Result<ReconnResult, Error> {
tracing::info!("reconnect: {}", dead_url);
@@ -415,12 +395,7 @@ impl ManualConnectorManager {
let ret = timeout(
// allow http connector to wait longer
std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }),
Self::conn_reconnect_with_ip_version(
data.clone(),
dead_url.clone(),
connector.clone(),
ip_version,
),
Self::conn_reconnect_with_ip_version(data.clone(), dead_url.clone(), ip_version),
)
.await;
tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret);

View File

@@ -314,8 +314,12 @@ impl PunchBothEasySymHoleClient {
);
for _ in 0..2 {
match try_connect_with_socket(socket.socket.clone(), remote_mapped_addr.into())
.await
match try_connect_with_socket(
global_ctx.clone(),
socket.socket.clone(),
remote_mapped_addr.into(),
)
.await
{
Ok(tunnel) => {
return Ok(Some(tunnel));

View File

@@ -1,5 +1,5 @@
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
time::Duration,
};
@@ -582,7 +582,33 @@ pub(crate) async fn send_symmetric_hole_punch_packet(
Ok(cur_port_idx % ports.len())
}
async fn check_udp_socket_local_addr(
global_ctx: ArcGlobalCtx,
remote_mapped_addr: SocketAddr,
) -> Result<(), Error> {
let socket = UdpSocket::bind("0.0.0.0:0").await?;
socket.connect(remote_mapped_addr).await?;
if let Ok(local_addr) = socket.local_addr() {
// local_addr should not be equal to virtual ipv4 or virtual ipv6
match local_addr.ip() {
IpAddr::V4(ip) => {
if global_ctx.get_ipv4().map(|ip| ip.address()) == Some(ip) {
return Err(anyhow::anyhow!("local address is virtual ipv4").into());
}
}
IpAddr::V6(ip) => {
if global_ctx.get_ipv6().map(|ip| ip.address()) == Some(ip) {
return Err(anyhow::anyhow!("local address is virtual ipv6").into());
}
}
}
}
Ok(())
}
pub(crate) async fn try_connect_with_socket(
global_ctx: ArcGlobalCtx,
socket: Arc<UdpSocket>,
remote_mapped_addr: SocketAddr,
) -> Result<Box<dyn Tunnel>, Error> {
@@ -596,6 +622,9 @@ pub(crate) async fn try_connect_with_socket(
.parse()
.unwrap(),
);
check_udp_socket_local_addr(global_ctx, remote_mapped_addr).await?;
connector
.try_connect_with_socket(socket, remote_mapped_addr)
.await

View File

@@ -223,8 +223,12 @@ impl PunchConeHoleClient {
tracing::debug!(?socket, ?tid, "punched socket found, try connect with it");
for _ in 0..2 {
match try_connect_with_socket(socket.socket.clone(), remote_mapped_addr.into())
.await
match try_connect_with_socket(
global_ctx.clone(),
socket.socket.clone(),
remote_mapped_addr.into(),
)
.await
{
Ok(tunnel) => {
tracing::info!(?tunnel, "hole punched");

View File

@@ -14,11 +14,15 @@ use tokio::{net::UdpSocket, sync::RwLock};
use tracing::Level;
use crate::{
common::{scoped_task::ScopedTask, stun::StunInfoCollectorTrait, PeerId},
connector::udp_hole_punch::common::{
send_symmetric_hole_punch_packet, try_connect_with_socket, HOLE_PUNCH_PACKET_BODY_LEN,
common::{
global_ctx::ArcGlobalCtx, scoped_task::ScopedTask, stun::StunInfoCollectorTrait, PeerId,
},
connector::udp_hole_punch::{
common::{
send_symmetric_hole_punch_packet, try_connect_with_socket, HOLE_PUNCH_PACKET_BODY_LEN,
},
handle_rpc_result,
},
connector::udp_hole_punch::handle_rpc_result,
defer,
peers::peer_manager::PeerManager,
proto::{
@@ -350,6 +354,7 @@ impl PunchSymToConeHoleClient {
}
async fn check_hole_punch_result<T>(
global_ctx: ArcGlobalCtx,
udp_array: &Arc<UdpSocketArray>,
packet: &[u8],
tid: u32,
@@ -376,7 +381,13 @@ impl PunchSymToConeHoleClient {
};
// if hole punched but tunnel creation failed, need to retry entire process.
match try_connect_with_socket(socket.socket.clone(), remote_mapped_addr.into()).await {
match try_connect_with_socket(
global_ctx.clone(),
socket.socket.clone(),
remote_mapped_addr.into(),
)
.await
{
Ok(tunnel) => {
ret_tunnel.replace(tunnel);
break;
@@ -435,6 +446,7 @@ impl PunchSymToConeHoleClient {
// try direct connect first
if self.try_direct_connect.load(Ordering::Relaxed) {
if let Ok(tunnel) = try_connect_with_socket(
global_ctx.clone(),
Arc::new(UdpSocket::bind("0.0.0.0:0").await?),
remote_mapped_addr.into(),
)
@@ -478,6 +490,7 @@ impl PunchSymToConeHoleClient {
))
.into();
let ret_tunnel = Self::check_hole_punch_result(
global_ctx.clone(),
&udp_array,
&packet,
tid,
@@ -505,6 +518,7 @@ impl PunchSymToConeHoleClient {
))
.into();
let ret_tunnel = Self::check_hole_punch_result(
global_ctx,
&udp_array,
&packet,
tid,

View File

@@ -22,23 +22,25 @@ use tokio::time::timeout;
use easytier::{
common::{
config::PortForwardConfig,
constants::EASYTIER_VERSION,
stun::{StunInfoCollector, StunInfoCollectorTrait},
},
proto::{
cli::{
list_peer_route_pair, AclManageRpc, AclManageRpcClientFactory, ConnectorManageRpc,
ConnectorManageRpcClientFactory, DumpRouteRequest, GetAclStatsRequest,
GetVpnPortalInfoRequest, ListConnectorRequest, ListForeignNetworkRequest,
ListGlobalForeignNetworkRequest, ListMappedListenerRequest, ListPeerRequest,
ListPeerResponse, ListRouteRequest, ListRouteResponse, ManageMappedListenerRequest,
MappedListenerManageAction, MappedListenerManageRpc,
MappedListenerManageRpcClientFactory, NodeInfo, PeerManageRpc,
PeerManageRpcClientFactory, ShowNodeInfoRequest, TcpProxyEntryState,
list_peer_route_pair, AclManageRpc, AclManageRpcClientFactory, AddPortForwardRequest,
ConnectorManageRpc, ConnectorManageRpcClientFactory, DumpRouteRequest,
GetAclStatsRequest, GetVpnPortalInfoRequest, GetWhitelistRequest, ListConnectorRequest,
ListForeignNetworkRequest, ListGlobalForeignNetworkRequest, ListMappedListenerRequest,
ListPeerRequest, ListPeerResponse, ListPortForwardRequest, ListRouteRequest,
ListRouteResponse, ManageMappedListenerRequest, MappedListenerManageAction,
MappedListenerManageRpc, MappedListenerManageRpcClientFactory, NodeInfo, PeerManageRpc,
PeerManageRpcClientFactory, PortForwardManageRpc, PortForwardManageRpcClientFactory,
RemovePortForwardRequest, SetWhitelistRequest, ShowNodeInfoRequest, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyRpcClientFactory, VpnPortalRpc,
VpnPortalRpcClientFactory,
},
common::NatType,
common::{NatType, SocketType},
peer_rpc::{GetGlobalPeerMapRequest, PeerCenterRpc, PeerCenterRpcClientFactory},
rpc_impl::standalone::StandAloneClient,
rpc_types::controller::BaseController,
@@ -96,6 +98,10 @@ enum SubCommand {
Proxy,
#[command(about = "show ACL rules statistics")]
Acl(AclArgs),
#[command(about = "manage port forwarding")]
PortForward(PortForwardArgs),
#[command(about = "manage TCP/UDP whitelist")]
Whitelist(WhitelistArgs),
#[command(about = t!("core_clap.generate_completions").to_string())]
GenAutocomplete { shell: Shell },
}
@@ -193,6 +199,62 @@ enum AclSubCommand {
Stats,
}
#[derive(Args, Debug)]
struct PortForwardArgs {
#[command(subcommand)]
sub_command: Option<PortForwardSubCommand>,
}
#[derive(Subcommand, Debug)]
enum PortForwardSubCommand {
/// Add port forward rule
Add {
#[arg(help = "Protocol (tcp/udp)")]
protocol: String,
#[arg(help = "Local bind address (e.g., 0.0.0.0:8080)")]
bind_addr: String,
#[arg(help = "Destination address (e.g., 10.1.1.1:80)")]
dst_addr: String,
},
/// Remove port forward rule
Remove {
#[arg(help = "Protocol (tcp/udp)")]
protocol: String,
#[arg(help = "Local bind address (e.g., 0.0.0.0:8080)")]
bind_addr: String,
#[arg(help = "Optional Destination address (e.g., 10.1.1.1:80)")]
dst_addr: Option<String>,
},
/// List port forward rules
List,
}
#[derive(Args, Debug)]
struct WhitelistArgs {
#[command(subcommand)]
sub_command: Option<WhitelistSubCommand>,
}
#[derive(Subcommand, Debug)]
enum WhitelistSubCommand {
/// Set TCP port whitelist
SetTcp {
#[arg(help = "TCP ports (e.g., 80,443,8000-9000)")]
ports: String,
},
/// Set UDP port whitelist
SetUdp {
#[arg(help = "UDP ports (e.g., 53,5000-6000)")]
ports: String,
},
/// Clear TCP whitelist
ClearTcp,
/// Clear UDP whitelist
ClearUdp,
/// Show current whitelist configuration
Show,
}
#[derive(Args, Debug)]
struct ServiceArgs {
#[arg(short, long, default_value = env!("CARGO_PKG_NAME"), help = "service name")]
@@ -340,6 +402,18 @@ impl CommandHandler<'_> {
.with_context(|| "failed to get vpn portal client")?)
}
async fn get_port_forward_manager_client(
&self,
) -> Result<Box<dyn PortForwardManageRpc<Controller = BaseController>>, Error> {
Ok(self
.client
.lock()
.unwrap()
.scoped_client::<PortForwardManageRpcClientFactory<BaseController>>("".to_string())
.await
.with_context(|| "failed to get port forward manager client")?)
}
async fn list_peers(&self) -> Result<ListPeerResponse, Error> {
let client = self.get_peer_manager_client().await?;
let request = ListPeerRequest::default();
@@ -788,6 +862,265 @@ impl CommandHandler<'_> {
}
Ok(url)
}
async fn handle_port_forward_add(
&self,
protocol: &str,
bind_addr: &str,
dst_addr: &str,
) -> Result<(), Error> {
let bind_addr: std::net::SocketAddr = bind_addr
.parse()
.with_context(|| format!("Invalid bind address: {}", bind_addr))?;
let dst_addr: std::net::SocketAddr = dst_addr
.parse()
.with_context(|| format!("Invalid destination address: {}", dst_addr))?;
if protocol != "tcp" && protocol != "udp" {
return Err(anyhow::anyhow!("Protocol must be 'tcp' or 'udp'"));
}
let client = self.get_port_forward_manager_client().await?;
let request = AddPortForwardRequest {
cfg: Some(
PortForwardConfig {
proto: protocol.to_string(),
bind_addr: bind_addr.into(),
dst_addr: dst_addr.into(),
}
.into(),
),
};
client
.add_port_forward(BaseController::default(), request)
.await?;
println!(
"Port forward rule added: {} {} -> {}",
protocol, bind_addr, dst_addr
);
Ok(())
}
async fn handle_port_forward_remove(
&self,
protocol: &str,
bind_addr: &str,
dst_addr: Option<&str>,
) -> Result<(), Error> {
let bind_addr: std::net::SocketAddr = bind_addr
.parse()
.with_context(|| format!("Invalid bind address: {}", bind_addr))?;
if protocol != "tcp" && protocol != "udp" {
return Err(anyhow::anyhow!("Protocol must be 'tcp' or 'udp'"));
}
let client = self.get_port_forward_manager_client().await?;
let request = RemovePortForwardRequest {
cfg: Some(
PortForwardConfig {
proto: protocol.to_string(),
bind_addr: bind_addr.into(),
dst_addr: dst_addr
.map(|s| s.parse::<SocketAddr>().unwrap())
.map(Into::into)
.unwrap_or("0.0.0.0:0".parse::<SocketAddr>().unwrap().into()),
}
.into(),
),
};
client
.remove_port_forward(BaseController::default(), request)
.await?;
println!("Port forward rule removed: {} {}", protocol, bind_addr);
Ok(())
}
async fn handle_port_forward_list(&self) -> Result<(), Error> {
let client = self.get_port_forward_manager_client().await?;
let request = ListPortForwardRequest::default();
let response = client
.list_port_forward(BaseController::default(), request)
.await?;
if self.verbose || *self.output_format == OutputFormat::Json {
println!("{}", serde_json::to_string_pretty(&response)?);
return Ok(());
}
#[derive(tabled::Tabled, serde::Serialize)]
struct PortForwardTableItem {
protocol: String,
bind_addr: String,
dst_addr: String,
}
let items: Vec<PortForwardTableItem> = response
.cfgs
.into_iter()
.map(|rule| PortForwardTableItem {
protocol: format!(
"{:?}",
SocketType::try_from(rule.socket_type).unwrap_or(SocketType::Tcp)
),
bind_addr: rule
.bind_addr
.map(|addr| addr.to_string())
.unwrap_or_default(),
dst_addr: rule
.dst_addr
.map(|addr| addr.to_string())
.unwrap_or_default(),
})
.collect();
print_output(&items, self.output_format)?;
Ok(())
}
async fn handle_whitelist_set_tcp(&self, ports: &str) -> Result<(), Error> {
let tcp_ports = Self::parse_port_list(ports)?;
let client = self.get_acl_manager_client().await?;
// Get current UDP ports to preserve them
let current = client
.get_whitelist(BaseController::default(), GetWhitelistRequest::default())
.await?;
let request = SetWhitelistRequest {
tcp_ports,
udp_ports: current.udp_ports,
};
client
.set_whitelist(BaseController::default(), request)
.await?;
println!("TCP whitelist updated: {}", ports);
Ok(())
}
async fn handle_whitelist_set_udp(&self, ports: &str) -> Result<(), Error> {
let udp_ports = Self::parse_port_list(ports)?;
let client = self.get_acl_manager_client().await?;
// Get current TCP ports to preserve them
let current = client
.get_whitelist(BaseController::default(), GetWhitelistRequest::default())
.await?;
let request = SetWhitelistRequest {
tcp_ports: current.tcp_ports,
udp_ports,
};
client
.set_whitelist(BaseController::default(), request)
.await?;
println!("UDP whitelist updated: {}", ports);
Ok(())
}
async fn handle_whitelist_clear_tcp(&self) -> Result<(), Error> {
let client = self.get_acl_manager_client().await?;
// Get current UDP ports to preserve them
let current = client
.get_whitelist(BaseController::default(), GetWhitelistRequest::default())
.await?;
let request = SetWhitelistRequest {
tcp_ports: vec![],
udp_ports: current.udp_ports,
};
client
.set_whitelist(BaseController::default(), request)
.await?;
println!("TCP whitelist cleared");
Ok(())
}
async fn handle_whitelist_clear_udp(&self) -> Result<(), Error> {
let client = self.get_acl_manager_client().await?;
// Get current TCP ports to preserve them
let current = client
.get_whitelist(BaseController::default(), GetWhitelistRequest::default())
.await?;
let request = SetWhitelistRequest {
tcp_ports: current.tcp_ports,
udp_ports: vec![],
};
client
.set_whitelist(BaseController::default(), request)
.await?;
println!("UDP whitelist cleared");
Ok(())
}
async fn handle_whitelist_show(&self) -> Result<(), Error> {
let client = self.get_acl_manager_client().await?;
let request = GetWhitelistRequest::default();
let response = client
.get_whitelist(BaseController::default(), request)
.await?;
if self.verbose || *self.output_format == OutputFormat::Json {
println!("{}", serde_json::to_string_pretty(&response)?);
return Ok(());
}
println!(
"TCP Whitelist: {}",
if response.tcp_ports.is_empty() {
"None".to_string()
} else {
response.tcp_ports.join(", ")
}
);
println!(
"UDP Whitelist: {}",
if response.udp_ports.is_empty() {
"None".to_string()
} else {
response.udp_ports.join(", ")
}
);
Ok(())
}
fn parse_port_list(ports_str: &str) -> Result<Vec<String>, Error> {
let mut ports = Vec::new();
for port_spec in ports_str.split(',') {
let port_spec = port_spec.trim();
if port_spec.contains('-') {
// Handle port range
let parts: Vec<&str> = port_spec.split('-').collect();
if parts.len() != 2 {
return Err(anyhow::anyhow!("Invalid port range: {}", port_spec));
}
let start: u16 = parts[0]
.parse()
.with_context(|| format!("Invalid start port: {}", parts[0]))?;
let end: u16 = parts[1]
.parse()
.with_context(|| format!("Invalid end port: {}", parts[1]))?;
if start > end {
return Err(anyhow::anyhow!("Invalid port range: start > end"));
}
ports.push(format!("{}-{}", start, end));
} else {
// Handle single port
let port: u16 = port_spec
.parse()
.with_context(|| format!("Invalid port number: {}", port_spec))?;
ports.push(port.to_string());
}
}
Ok(ports)
}
}
#[derive(Debug)]
@@ -1494,6 +1827,46 @@ async fn main() -> Result<(), Error> {
handler.handle_acl_stats().await?;
}
},
SubCommand::PortForward(port_forward_args) => match &port_forward_args.sub_command {
Some(PortForwardSubCommand::Add {
protocol,
bind_addr,
dst_addr,
}) => {
handler
.handle_port_forward_add(protocol, bind_addr, dst_addr)
.await?;
}
Some(PortForwardSubCommand::Remove {
protocol,
bind_addr,
dst_addr,
}) => {
handler
.handle_port_forward_remove(protocol, bind_addr, dst_addr.as_deref())
.await?;
}
Some(PortForwardSubCommand::List) | None => {
handler.handle_port_forward_list().await?;
}
},
SubCommand::Whitelist(whitelist_args) => match &whitelist_args.sub_command {
Some(WhitelistSubCommand::SetTcp { ports }) => {
handler.handle_whitelist_set_tcp(ports).await?;
}
Some(WhitelistSubCommand::SetUdp { ports }) => {
handler.handle_whitelist_set_udp(ports).await?;
}
Some(WhitelistSubCommand::ClearTcp) => {
handler.handle_whitelist_clear_tcp().await?;
}
Some(WhitelistSubCommand::ClearUdp) => {
handler.handle_whitelist_clear_udp().await?;
}
Some(WhitelistSubCommand::Show) | None => {
handler.handle_whitelist_show().await?;
}
},
SubCommand::GenAutocomplete { shell } => {
let mut cmd = Cli::command();
easytier::print_completions(shell, &mut cmd, "easytier-cli");

View File

@@ -29,10 +29,7 @@ use easytier::{
connector::create_connector_by_url,
instance_manager::NetworkInstanceManager,
launcher::{add_proxy_network_to_config, ConfigSource},
proto::{
acl::{Acl, AclV1, Action, Chain, ChainType, Protocol, Rule},
common::{CompressionAlgoPb, NatType},
},
proto::common::{CompressionAlgoPb, NatType},
tunnel::{IpVersion, PROTO_PORT_OFFSET},
utils::{init_logger, setup_panic_handler},
web_client,
@@ -622,117 +619,6 @@ impl NetworkOptions {
false
}
fn parse_port_list(port_list: &[String]) -> anyhow::Result<Vec<String>> {
let mut ports = Vec::new();
for port_spec in port_list {
if port_spec.contains('-') {
// Handle port range like "8000-9000"
let parts: Vec<&str> = port_spec.split('-').collect();
if parts.len() != 2 {
return Err(anyhow::anyhow!("Invalid port range format: {}", port_spec));
}
let start: u16 = parts[0]
.parse()
.with_context(|| format!("Invalid start port in range: {}", port_spec))?;
let end: u16 = parts[1]
.parse()
.with_context(|| format!("Invalid end port in range: {}", port_spec))?;
if start > end {
return Err(anyhow::anyhow!(
"Start port must be <= end port in range: {}",
port_spec
));
}
// Add individual ports in the range
for port in start..=end {
ports.push(port.to_string());
}
} else {
// Handle single port
let port: u16 = port_spec
.parse()
.with_context(|| format!("Invalid port number: {}", port_spec))?;
ports.push(port.to_string());
}
}
Ok(ports)
}
fn generate_acl_from_whitelists(&self) -> anyhow::Result<Option<Acl>> {
if self.tcp_whitelist.is_empty() && self.udp_whitelist.is_empty() {
return Ok(None);
}
let mut acl = Acl {
acl_v1: Some(AclV1 { chains: vec![] }),
};
let acl_v1 = acl.acl_v1.as_mut().unwrap();
// Create inbound chain for whitelist rules
let mut inbound_chain = Chain {
name: "inbound_whitelist".to_string(),
chain_type: ChainType::Inbound as i32,
description: "Auto-generated inbound whitelist from CLI".to_string(),
enabled: true,
rules: vec![],
default_action: Action::Drop as i32, // Default deny
};
let mut rule_priority = 1000u32;
// Add TCP whitelist rules
if !self.tcp_whitelist.is_empty() {
let tcp_ports = Self::parse_port_list(&self.tcp_whitelist)?;
let tcp_rule = Rule {
name: "tcp_whitelist".to_string(),
description: "Auto-generated TCP whitelist rule".to_string(),
priority: rule_priority,
enabled: true,
protocol: Protocol::Tcp as i32,
ports: tcp_ports,
source_ips: vec![],
destination_ips: vec![],
source_ports: vec![],
action: Action::Allow as i32,
rate_limit: 0,
burst_limit: 0,
stateful: true,
};
inbound_chain.rules.push(tcp_rule);
rule_priority -= 1;
}
// Add UDP whitelist rules
if !self.udp_whitelist.is_empty() {
let udp_ports = Self::parse_port_list(&self.udp_whitelist)?;
let udp_rule = Rule {
name: "udp_whitelist".to_string(),
description: "Auto-generated UDP whitelist rule".to_string(),
priority: rule_priority,
enabled: true,
protocol: Protocol::Udp as i32,
ports: udp_ports,
source_ips: vec![],
destination_ips: vec![],
source_ports: vec![],
action: Action::Allow as i32,
rate_limit: 0,
burst_limit: 0,
stateful: false,
};
inbound_chain.rules.push(udp_rule);
}
acl_v1.chains.push(inbound_chain);
Ok(Some(acl))
}
fn merge_into(&self, cfg: &mut TomlConfigLoader) -> anyhow::Result<()> {
if self.hostname.is_some() {
cfg.set_hostname(self.hostname.clone());
@@ -990,10 +876,13 @@ impl NetworkOptions {
cfg.set_exit_nodes(self.exit_nodes.clone());
}
// Handle port whitelists by generating ACL configuration
if let Some(acl) = self.generate_acl_from_whitelists()? {
cfg.set_acl(Some(acl));
}
let mut old_tcp_whitelist = cfg.get_tcp_whitelist();
old_tcp_whitelist.extend(self.tcp_whitelist.clone());
cfg.set_tcp_whitelist(old_tcp_whitelist);
let mut old_udp_whitelist = cfg.get_udp_whitelist();
old_udp_whitelist.extend(self.udp_whitelist.clone());
cfg.set_udp_whitelist(old_udp_whitelist);
Ok(())
}
@@ -1222,6 +1111,14 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> {
tokio::select! {
_ = manager.wait() => {
let infos = manager.collect_network_infos()?;
let errs = infos
.into_values()
.filter_map(|info| info.error_msg)
.collect::<Vec<_>>();
if errs.len() > 0 {
return Err(anyhow::anyhow!("some instances stopped with errors"));
}
}
_ = tokio::signal::ctrl_c() => {
println!("ctrl-c received, exiting...");

View File

@@ -20,7 +20,12 @@ use pnet::packet::{
Packet as _,
};
use prost::Message;
use tokio::{io::copy_bidirectional, select, task::JoinSet};
use tokio::{
io::{copy_bidirectional, AsyncRead, AsyncWrite},
select,
task::JoinSet,
};
use tokio_util::io::InspectReader;
use super::{
tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy},
@@ -28,11 +33,13 @@ use super::{
};
use crate::{
common::{
acl_processor::PacketInfo,
error::Result,
global_ctx::{ArcGlobalCtx, GlobalCtx},
},
peers::{peer_manager::PeerManager, NicPacketFilter, PeerPacketFilter},
peers::{acl_filter::AclFilter, peer_manager::PeerManager, NicPacketFilter, PeerPacketFilter},
proto::{
acl::{Action, ChainType, Protocol},
cli::{
ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc,
@@ -372,6 +379,50 @@ pub struct KcpProxyDst {
tasks: JoinSet<()>,
}
#[derive(Clone)]
pub struct ProxyAclHandler {
pub acl_filter: Arc<AclFilter>,
pub packet_info: PacketInfo,
pub chain_type: ChainType,
}
impl ProxyAclHandler {
pub fn handle_packet(&self, buf: &[u8]) -> Result<()> {
let mut packet_info = self.packet_info.clone();
packet_info.packet_size = buf.len();
let ret = self
.acl_filter
.get_processor()
.process_packet(&packet_info, self.chain_type);
self.acl_filter.handle_acl_result(
&ret,
&packet_info,
self.chain_type,
&self.acl_filter.get_processor(),
);
if !matches!(ret.action, Action::Allow) {
return Err(anyhow::anyhow!("acl denied").into());
}
Ok(())
}
pub async fn copy_bidirection_with_acl(
&self,
src: impl AsyncRead + AsyncWrite + Unpin,
mut dst: impl AsyncRead + AsyncWrite + Unpin,
) -> Result<()> {
let (src_reader, src_writer) = tokio::io::split(src);
let src_reader = InspectReader::new(src_reader, |buf| {
let _ = self.handle_packet(buf);
});
let mut src = tokio::io::join(src_reader, src_writer);
copy_bidirectional(&mut src, &mut dst).await?;
Ok(())
}
}
impl KcpProxyDst {
pub async fn new(peer_manager: Arc<PeerManager>) -> Self {
let mut kcp_endpoint = create_kcp_endpoint();
@@ -396,7 +447,7 @@ impl KcpProxyDst {
#[tracing::instrument(ret)]
async fn handle_one_in_stream(
mut kcp_stream: KcpStream,
kcp_stream: KcpStream,
global_ctx: ArcGlobalCtx,
proxy_entries: Arc<DashMap<ConnId, TcpProxyEntry>>,
cidr_set: Arc<CidrSet>,
@@ -411,6 +462,7 @@ impl KcpProxyDst {
parsed_conn_data
))?
.into();
let src_socket: SocketAddr = parsed_conn_data.src.unwrap_or_default().into();
match dst_socket.ip() {
IpAddr::V4(dst_v4_ip) => {
@@ -437,17 +489,36 @@ impl KcpProxyDst {
proxy_entries.remove(&conn_id);
}
if Some(dst_socket.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address()))
&& global_ctx.no_tun()
{
let send_to_self =
Some(dst_socket.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address()));
if send_to_self && global_ctx.no_tun() {
dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap();
}
let acl_handler = ProxyAclHandler {
acl_filter: global_ctx.get_acl_filter().clone(),
packet_info: PacketInfo {
src_ip: src_socket.ip(),
dst_ip: dst_socket.ip(),
src_port: Some(src_socket.port()),
dst_port: Some(dst_socket.port()),
protocol: Protocol::Tcp,
packet_size: conn_data.len(),
},
chain_type: if send_to_self {
ChainType::Inbound
} else {
ChainType::Forward
},
};
acl_handler.handle_packet(&conn_data)?;
tracing::debug!("kcp connect to dst socket: {:?}", dst_socket);
let _g = global_ctx.net_ns.guard();
let connector = NatDstTcpConnector {};
let mut ret = connector
let ret = connector
.connect("0.0.0.0:0".parse().unwrap(), dst_socket)
.await?;
@@ -455,7 +526,10 @@ impl KcpProxyDst {
e.state = TcpProxyEntryState::Connected.into();
}
copy_bidirectional(&mut ret, &mut kcp_stream).await?;
acl_handler
.copy_bidirection_with_acl(kcp_stream, ret)
.await?;
Ok(())
}

View File

@@ -7,19 +7,21 @@ use dashmap::DashMap;
use pnet::packet::ipv4::Ipv4Packet;
use prost::Message as _;
use quinn::{Endpoint, Incoming};
use tokio::io::{copy_bidirectional, AsyncRead, AsyncReadExt, AsyncWrite};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use tokio::net::TcpStream;
use tokio::task::JoinSet;
use tokio::time::timeout;
use crate::common::acl_processor::PacketInfo;
use crate::common::error::Result;
use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx};
use crate::common::join_joinset_background;
use crate::defer;
use crate::gateway::kcp_proxy::TcpProxyForKcpSrcTrait;
use crate::gateway::kcp_proxy::{ProxyAclHandler, TcpProxyForKcpSrcTrait};
use crate::gateway::tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy};
use crate::gateway::CidrSet;
use crate::peers::peer_manager::PeerManager;
use crate::proto::acl::{ChainType, Protocol};
use crate::proto::cli::{
ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc,
@@ -322,12 +324,13 @@ impl QUICProxyDst {
.await;
match ret {
Ok(Ok((mut quic_stream, mut tcp_stream))) => {
let ret = copy_bidirectional(&mut quic_stream, &mut tcp_stream).await;
Ok(Ok((quic_stream, tcp_stream, acl))) => {
let remote_addr = quic_stream.connection.as_ref().map(|c| c.remote_address());
let ret = acl.copy_bidirection_with_acl(quic_stream, tcp_stream).await;
tracing::info!(
"QUIC connection handled, result: {:?}, remote addr: {:?}",
ret,
quic_stream.connection.as_ref().map(|c| c.remote_address())
remote_addr,
);
}
Ok(Err(e)) => {
@@ -345,7 +348,7 @@ impl QUICProxyDst {
cidr_set: Arc<CidrSet>,
proxy_entry_key: SocketAddr,
proxy_entries: Arc<DashMap<SocketAddr, TcpProxyEntry>>,
) -> Result<(QUICStream, TcpStream)> {
) -> Result<(QUICStream, TcpStream, ProxyAclHandler)> {
let conn = incoming.await.with_context(|| "accept failed")?;
let addr = conn.remote_address();
tracing::info!("Accepted QUIC connection from {}", addr);
@@ -376,7 +379,8 @@ impl QUICProxyDst {
dst_socket.set_ip(real_ip);
}
if Some(*dst_socket.ip()) == ctx.get_ipv4().map(|ip| ip.address()) && ctx.no_tun() {
let send_to_self = Some(*dst_socket.ip()) == ctx.get_ipv4().map(|ip| ip.address());
if send_to_self && ctx.no_tun() {
dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap();
}
@@ -391,6 +395,24 @@ impl QUICProxyDst {
},
);
let acl_handler = ProxyAclHandler {
acl_filter: ctx.get_acl_filter().clone(),
packet_info: PacketInfo {
src_ip: addr.ip(),
dst_ip: (*dst_socket.ip()).into(),
src_port: Some(addr.port()),
dst_port: Some(dst_socket.port()),
protocol: Protocol::Tcp,
packet_size: len as usize,
},
chain_type: if send_to_self {
ChainType::Inbound
} else {
ChainType::Forward
},
};
acl_handler.handle_packet(&buf)?;
let connector = NatDstTcpConnector {};
let dst_stream = {
@@ -411,7 +433,7 @@ impl QUICProxyDst {
receiver: r,
};
Ok((quic_stream, dst_stream))
Ok((quic_stream, dst_stream, acl_handler))
}
}

View File

@@ -6,6 +6,7 @@ use std::{
use crossbeam::atomic::AtomicCell;
use kcp_sys::{endpoint::KcpEndpoint, stream::KcpStream};
use tokio_util::sync::{CancellationToken, DropGuard};
use crate::{
common::{
@@ -432,6 +433,8 @@ pub struct Socks5Server {
udp_forward_task: Arc<DashMap<UdpClientKey, ScopedTask<()>>>,
kcp_endpoint: Mutex<Option<Weak<KcpEndpoint>>>,
cancel_tokens: DashMap<PortForwardConfig, DropGuard>,
}
#[async_trait::async_trait]
@@ -531,6 +534,8 @@ impl Socks5Server {
udp_forward_task: Arc::new(DashMap::new()),
kcp_endpoint: Mutex::new(None),
cancel_tokens: DashMap::new(),
})
}
@@ -614,10 +619,9 @@ impl Socks5Server {
need_start = true;
};
for port_forward in self.global_ctx.config.get_port_forwards() {
self.add_port_forward(port_forward).await?;
need_start = true;
}
let cfgs = self.global_ctx.config.get_port_forwards();
self.reload_port_forwards(&cfgs).await?;
need_start = need_start || cfgs.len() > 0;
if need_start {
self.peer_manager
@@ -630,6 +634,26 @@ impl Socks5Server {
Ok(())
}
pub async fn reload_port_forwards(&self, cfgs: &Vec<PortForwardConfig>) -> Result<(), Error> {
// remove entries not in new cfg
self.cancel_tokens.retain(|k, _| {
cfgs.iter().any(|cfg| {
if cfg.dst_addr.ip().is_unspecified() {
k.bind_addr == cfg.bind_addr && k.proto == cfg.proto
} else {
k == cfg
}
})
});
// add new ones
for cfg in cfgs {
if !self.cancel_tokens.contains_key(cfg) {
self.add_port_forward(cfg.clone()).await?;
}
}
Ok(())
}
async fn handle_port_forward_connection(
mut incoming_socket: tokio::net::TcpStream,
connector: Box<dyn AsyncTcpConnector<S = SocksTcpStream> + Send>,
@@ -660,12 +684,10 @@ impl Socks5Server {
pub async fn add_port_forward(&self, cfg: PortForwardConfig) -> Result<(), Error> {
match cfg.proto.to_lowercase().as_str() {
"tcp" => {
self.add_tcp_port_forward(cfg.bind_addr, cfg.dst_addr)
.await?;
self.add_tcp_port_forward(&cfg).await?;
}
"udp" => {
self.add_udp_port_forward(cfg.bind_addr, cfg.dst_addr)
.await?;
self.add_udp_port_forward(&cfg).await?;
}
_ => {
return Err(anyhow::anyhow!(
@@ -680,11 +702,12 @@ impl Socks5Server {
Ok(())
}
pub async fn add_tcp_port_forward(
&self,
bind_addr: SocketAddr,
dst_addr: SocketAddr,
) -> Result<(), Error> {
pub fn remove_port_forward(&self, cfg: PortForwardConfig) {
let _ = self.cancel_tokens.remove(&cfg);
}
pub async fn add_tcp_port_forward(&self, cfg: &PortForwardConfig) -> Result<(), Error> {
let (bind_addr, dst_addr) = (cfg.bind_addr, cfg.dst_addr);
let listener = bind_tcp_socket(bind_addr, self.global_ctx.net_ns.clone())?;
let net = self.net.clone();
@@ -693,14 +716,26 @@ impl Socks5Server {
let forward_tasks = tasks.clone();
let kcp_endpoint = self.kcp_endpoint.lock().await.clone();
let peer_mgr = Arc::downgrade(&self.peer_manager.clone());
let cancel_token = CancellationToken::new();
self.cancel_tokens
.insert(cfg.clone(), cancel_token.clone().drop_guard());
self.tasks.lock().unwrap().spawn(async move {
loop {
let (incoming_socket, addr) = match listener.accept().await {
Ok(result) => result,
Err(err) => {
tracing::error!("port forward accept error = {:?}", err);
continue;
let (incoming_socket, addr) = select! {
biased;
_ = cancel_token.cancelled() => {
tracing::info!("port forward for {:?} cancelled", bind_addr);
break;
}
res = listener.accept() => {
match res {
Ok(result) => result,
Err(err) => {
tracing::error!("port forward accept error = {:?}", err);
continue;
}
}
}
};
@@ -747,11 +782,8 @@ impl Socks5Server {
}
#[tracing::instrument(name = "add_udp_port_forward", skip(self))]
pub async fn add_udp_port_forward(
&self,
bind_addr: SocketAddr,
dst_addr: SocketAddr,
) -> Result<(), Error> {
pub async fn add_udp_port_forward(&self, cfg: &PortForwardConfig) -> Result<(), Error> {
let (bind_addr, dst_addr) = (cfg.bind_addr, cfg.dst_addr);
let socket = Arc::new(bind_udp_socket(bind_addr, self.global_ctx.net_ns.clone())?);
let entries = self.entries.clone();
@@ -759,16 +791,28 @@ impl Socks5Server {
let net = self.net.clone();
let udp_client_map = self.udp_client_map.clone();
let udp_forward_task = self.udp_forward_task.clone();
let cancel_token = CancellationToken::new();
self.cancel_tokens
.insert(cfg.clone(), cancel_token.clone().drop_guard());
self.tasks.lock().unwrap().spawn(async move {
loop {
// we set the max buffer size of smoltcp to 8192, so we need to use a buffer size that is less than 8192 here.
let mut buf = vec![0u8; 8192];
let (len, addr) = match socket.recv_from(&mut buf).await {
Ok(result) => result,
Err(err) => {
tracing::error!("udp port forward recv error = {:?}", err);
continue;
let (len, addr) = select! {
biased;
_ = cancel_token.cancelled() => {
tracing::info!("udp port forward for {:?} cancelled", bind_addr);
break;
}
res = socket.recv_from(&mut buf) => {
match res {
Ok(result) => result,
Err(err) => {
tracing::error!("udp port forward recv error = {:?}", err);
continue;
}
}
}
};

View File

@@ -63,7 +63,13 @@ pub struct NatDstTcpConnector;
impl NatDstConnector for NatDstTcpConnector {
type DstStream = TcpStream;
async fn connect(&self, _src: SocketAddr, nat_dst: SocketAddr) -> Result<Self::DstStream> {
let socket = TcpSocket::new_v4().unwrap();
let socket = match TcpSocket::new_v4() {
Ok(s) => s,
Err(e) => {
eprintln!("create v4 socket failed: {:?}", e);
return Err(e.into());
}
};
if let Err(e) = socket.set_nodelay(true) {
tracing::warn!("set_nodelay failed, ignore it: {:?}", e);
}

View File

@@ -10,6 +10,7 @@ use cidr::{IpCidr, Ipv4Inet};
use tokio::{sync::Mutex, task::JoinSet};
use tokio_util::sync::CancellationToken;
use crate::common::acl_processor::AclRuleBuilder;
use crate::common::config::ConfigLoader;
use crate::common::error::Error;
use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent};
@@ -29,13 +30,15 @@ use crate::peers::peer_manager::{PeerManager, RouteAlgoType};
use crate::peers::rpc_service::PeerManagerRpcService;
use crate::peers::{create_packet_recv_chan, recv_packet_from_chan, PacketRecvChanReceiver};
use crate::proto::cli::VpnPortalRpc;
use crate::proto::cli::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo};
use crate::proto::cli::{
ListMappedListenerRequest, ListMappedListenerResponse, ManageMappedListenerRequest,
ManageMappedListenerResponse, MappedListener, MappedListenerManageAction,
MappedListenerManageRpc,
AddPortForwardRequest, AddPortForwardResponse, ListMappedListenerRequest,
ListMappedListenerResponse, ListPortForwardRequest, ListPortForwardResponse,
ManageMappedListenerRequest, ManageMappedListenerResponse, MappedListener,
MappedListenerManageAction, MappedListenerManageRpc, PortForwardManageRpc,
RemovePortForwardRequest, RemovePortForwardResponse,
};
use crate::proto::common::TunnelInfo;
use crate::proto::cli::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo};
use crate::proto::common::{PortForwardConfigPb, TunnelInfo};
use crate::proto::peer_rpc::PeerCenterRpcServer;
use crate::proto::rpc_impl::standalone::{RpcServerHook, StandAloneServer};
use crate::proto::rpc_types;
@@ -609,9 +612,9 @@ impl Instance {
}
}
if let Some(acl) = self.global_ctx.config.get_acl() {
self.global_ctx.get_acl_filter().reload_rules(Some(&acl));
}
self.global_ctx
.get_acl_filter()
.reload_rules(AclRuleBuilder::build(&self.global_ctx)?.as_ref());
// run after tun device created, so listener can bind to tun device, which may be required by win 10
self.ip_proxy = Some(IpProxy::new(
@@ -790,6 +793,85 @@ impl Instance {
MappedListenerManagerRpcService(self.global_ctx.clone())
}
fn get_port_forward_manager_rpc_service(
&self,
) -> impl PortForwardManageRpc<Controller = BaseController> + Clone {
#[derive(Clone)]
pub struct PortForwardManagerRpcService {
global_ctx: ArcGlobalCtx,
socks5_server: Weak<Socks5Server>,
}
#[async_trait::async_trait]
impl PortForwardManageRpc for PortForwardManagerRpcService {
type Controller = BaseController;
async fn add_port_forward(
&self,
_: BaseController,
request: AddPortForwardRequest,
) -> Result<AddPortForwardResponse, rpc_types::error::Error> {
let Some(socks5_server) = self.socks5_server.upgrade() else {
return Err(anyhow::anyhow!("socks5 server not available").into());
};
if let Some(cfg) = request.cfg {
tracing::info!("Port forward rule added: {:?}", cfg);
let mut current_forwards = self.global_ctx.config.get_port_forwards();
current_forwards.push(cfg.into());
self.global_ctx
.config
.set_port_forwards(current_forwards.clone());
socks5_server
.reload_port_forwards(&current_forwards)
.await
.with_context(|| "Failed to reload port forwards")?;
}
Ok(AddPortForwardResponse {})
}
async fn remove_port_forward(
&self,
_: BaseController,
request: RemovePortForwardRequest,
) -> Result<RemovePortForwardResponse, rpc_types::error::Error> {
let Some(socks5_server) = self.socks5_server.upgrade() else {
return Err(anyhow::anyhow!("socks5 server not available").into());
};
let Some(cfg) = request.cfg else {
return Err(anyhow::anyhow!("port forward config is empty").into());
};
let cfg = cfg.into();
let mut current_forwards = self.global_ctx.config.get_port_forwards();
current_forwards.retain(|e| *e != cfg);
self.global_ctx
.config
.set_port_forwards(current_forwards.clone());
socks5_server
.reload_port_forwards(&current_forwards)
.await
.with_context(|| "Failed to reload port forwards")?;
tracing::info!("Port forward rule removed: {:?}", cfg);
Ok(RemovePortForwardResponse {})
}
async fn list_port_forward(
&self,
_: BaseController,
_request: ListPortForwardRequest,
) -> Result<ListPortForwardResponse, rpc_types::error::Error> {
let forwards = self.global_ctx.config.get_port_forwards();
let cfgs: Vec<PortForwardConfigPb> = forwards.into_iter().map(Into::into).collect();
Ok(ListPortForwardResponse { cfgs })
}
}
PortForwardManagerRpcService {
global_ctx: self.global_ctx.clone(),
socks5_server: Arc::downgrade(&self.socks5_server),
}
}
async fn run_rpc_server(&mut self) -> Result<(), Error> {
let Some(_) = self.global_ctx.config.get_rpc_portal() else {
tracing::info!("rpc server not enabled, because rpc_portal is not set.");
@@ -803,6 +885,7 @@ impl Instance {
let peer_center = self.peer_center.clone();
let vpn_portal_rpc = self.get_vpn_portal_rpc_service();
let mapped_listener_manager_rpc = self.get_mapped_listener_manager_rpc_service();
let port_forward_manager_rpc = self.get_port_forward_manager_rpc_service();
let s = self.rpc_server.as_mut().unwrap();
let peer_mgr_rpc_service = PeerManagerRpcService::new(peer_mgr.clone());
@@ -823,6 +906,10 @@ impl Instance {
MappedListenerManageRpcServer::new(mapped_listener_manager_rpc),
"",
);
s.registry().register(
PortForwardManageRpcServer::new(port_forward_manager_rpc),
"",
);
if let Some(ip_proxy) = self.ip_proxy.as_ref() {
s.registry().register(

View File

@@ -48,7 +48,6 @@ impl NetworkInstanceManager {
}
let instance_stop_notifier = instance.get_stop_notifier();
let instance_config_source = instance.get_config_source();
let instance_event_receiver = match instance.get_config_source() {
ConfigSource::Cli | ConfigSource::File | ConfigSource::Web => {
Some(instance.subscribe_event())
@@ -78,14 +77,8 @@ impl NetworkInstanceManager {
eprintln!("instance {} stopped with error: {}", instance_id, e);
}
}
match instance_config_source {
ConfigSource::Cli | ConfigSource::File => {
instance_map.remove(&instance_id);
}
ConfigSource::Web | ConfigSource::GUI | ConfigSource::FFI => {}
}
stop_check_notifier.notify_one();
instance_stop_tasks.remove(&instance_id);
stop_check_notifier.notify_waiters();
})),
);
Ok(())
@@ -160,7 +153,11 @@ impl NetworkInstanceManager {
}
pub async fn wait(&self) {
while self.instance_map.len() > 0 {
while self
.instance_map
.iter()
.any(|item| item.value().is_easytier_running())
{
self.stop_check_notifier.notified().await;
}
}
@@ -338,6 +335,7 @@ mod tests {
use crate::common::config::*;
#[tokio::test]
#[serial_test::serial]
async fn it_works() {
let manager = NetworkInstanceManager::new();
let cfg_str = r#"
@@ -404,6 +402,7 @@ mod tests {
}
#[test]
#[serial_test::serial]
fn test_no_tokio_runtime() {
let manager = NetworkInstanceManager::new();
let cfg_str = r#"
@@ -466,6 +465,7 @@ mod tests {
}
#[tokio::test]
#[serial_test::serial]
async fn test_single_instance_failed() {
let free_tcp_port =
crate::utils::find_free_tcp_port(10012..65534).expect("no free tcp port found");
@@ -491,7 +491,7 @@ mod tests {
tokio::select! {
_ = manager.wait() => {
assert_eq!(manager.list_network_instance_ids().len(), 0);
assert_eq!(manager.list_network_instance_ids().len(), 1);
}
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
panic!("instance manager with single failed instance({:?}) should not running", config_source);
@@ -522,6 +522,7 @@ mod tests {
}
#[tokio::test]
#[serial_test::serial]
async fn test_multiple_instances_one_failed() {
let free_tcp_port =
crate::utils::find_free_tcp_port(10012..65534).expect("no free tcp port found");
@@ -557,7 +558,7 @@ mod tests {
panic!("instance manager with multiple instances one failed should still running");
}
_ = tokio::time::sleep(std::time::Duration::from_secs(2)) => {
assert_eq!(manager.list_network_instance_ids().len(), 1);
assert_eq!(manager.list_network_instance_ids().len(), 2);
}
}
}

View File

@@ -64,7 +64,7 @@ impl AclFilter {
}
/// Get current processor for processing packets
fn get_processor(&self) -> Arc<AclProcessor> {
pub fn get_processor(&self) -> Arc<AclProcessor> {
self.acl_processor.load_full()
}
@@ -160,7 +160,7 @@ impl AclFilter {
}
/// Process ACL result and log if needed
fn handle_acl_result(
pub fn handle_acl_result(
&self,
result: &AclResult,
packet_info: &PacketInfo,

View File

@@ -247,7 +247,7 @@ impl PeerConn {
.await?
}
async fn send_handshake(&mut self) -> Result<(), Error> {
async fn send_handshake(&mut self, send_secret_digest: bool) -> Result<(), Error> {
let network = self.global_ctx.get_network_identity();
let mut req = HandshakeRequest {
magic: MAGIC,
@@ -257,8 +257,16 @@ impl PeerConn {
network_name: network.network_name.clone(),
..Default::default()
};
req.network_secret_digrest
.extend_from_slice(&network.network_secret_digest.unwrap_or_default());
// only send network secret digest if the network is the same
if send_secret_digest {
req.network_secret_digrest
.extend_from_slice(&network.network_secret_digest.unwrap_or_default());
} else {
// fill zero
req.network_secret_digrest
.extend_from_slice(&[0u8; std::mem::size_of::<NetworkSecretDigest>()]);
}
let hs_req = req.encode_to_vec();
let mut zc_packet = ZCPacket::new_with_payload(hs_req.as_bytes());
@@ -295,7 +303,8 @@ impl PeerConn {
self.info = Some(rsp);
self.is_client = Some(false);
self.send_handshake().await?;
let send_digest = self.get_network_identity() == self.global_ctx.get_network_identity();
self.send_handshake(send_digest).await?;
if self.get_peer_id() == self.my_peer_id {
Err(Error::WaitRespError("peer id conflict".to_owned()))
@@ -310,10 +319,14 @@ impl PeerConn {
tracing::info!("handshake request: {:?}", rsp);
self.info = Some(rsp);
self.is_client = Some(false);
self.send_handshake().await?;
let send_digest = self.get_network_identity() == self.global_ctx.get_network_identity();
self.send_handshake(send_digest).await?;
if self.get_peer_id() == self.my_peer_id {
Err(Error::WaitRespError("peer id conflict".to_owned()))
Err(Error::WaitRespError(
"peer id conflict, are you connecting to yourself?".to_owned(),
))
} else {
Ok(())
}
@@ -321,7 +334,7 @@ impl PeerConn {
#[tracing::instrument]
pub async fn do_handshake_as_client(&mut self) -> Result<(), Error> {
self.send_handshake().await?;
self.send_handshake(true).await?;
tracing::info!("waiting for handshake request from server");
let rsp = self.wait_handshake_loop().await?;
tracing::info!("handshake response: {:?}", rsp);
@@ -329,7 +342,9 @@ impl PeerConn {
self.is_client = Some(true);
if self.get_peer_id() == self.my_peer_id {
Err(Error::WaitRespError("peer id conflict".to_owned()))
Err(Error::WaitRespError(
"peer id conflict, are you connecting to yourself?".to_owned(),
))
} else {
Ok(())
}

View File

@@ -86,7 +86,8 @@ impl PeerRpcManagerTransport for RpcTransport {
.get_route_peer_info(dst_peer_id)
.await
.and_then(|x| x.feature_flag.map(|x| x.is_public_server))
.unwrap_or(true);
// if dst is directly connected, it's must not public server
.unwrap_or(!peers.has_peer(dst_peer_id));
if !is_dst_peer_public_server {
self.encryptor
.encrypt(&mut msg)

View File

@@ -1,13 +1,18 @@
use std::sync::Arc;
use crate::proto::{
cli::{
AclManageRpc, DumpRouteRequest, DumpRouteResponse, GetAclStatsRequest, GetAclStatsResponse,
ListForeignNetworkRequest, ListForeignNetworkResponse, ListGlobalForeignNetworkRequest,
ListGlobalForeignNetworkResponse, ListPeerRequest, ListPeerResponse, ListRouteRequest,
ListRouteResponse, PeerInfo, PeerManageRpc, ShowNodeInfoRequest, ShowNodeInfoResponse,
use crate::{
common::acl_processor::AclRuleBuilder,
proto::{
cli::{
AclManageRpc, DumpRouteRequest, DumpRouteResponse, GetAclStatsRequest,
GetAclStatsResponse, GetWhitelistRequest, GetWhitelistResponse,
ListForeignNetworkRequest, ListForeignNetworkResponse, ListGlobalForeignNetworkRequest,
ListGlobalForeignNetworkResponse, ListPeerRequest, ListPeerResponse, ListRouteRequest,
ListRouteResponse, PeerInfo, PeerManageRpc, SetWhitelistRequest, SetWhitelistResponse,
ShowNodeInfoRequest, ShowNodeInfoResponse,
},
rpc_types::{self, controller::BaseController},
},
rpc_types::{self, controller::BaseController},
};
use super::peer_manager::PeerManager;
@@ -153,4 +158,45 @@ impl AclManageRpc for PeerManagerRpcService {
acl_stats: Some(acl_stats),
})
}
async fn set_whitelist(
&self,
_: BaseController,
request: SetWhitelistRequest,
) -> Result<SetWhitelistResponse, rpc_types::error::Error> {
tracing::info!(
"Setting whitelist - TCP: {:?}, UDP: {:?}",
request.tcp_ports,
request.udp_ports
);
let global_ctx = self.peer_manager.get_global_ctx();
global_ctx.config.set_tcp_whitelist(request.tcp_ports);
global_ctx.config.set_udp_whitelist(request.udp_ports);
global_ctx
.get_acl_filter()
.reload_rules(AclRuleBuilder::build(&global_ctx)?.as_ref());
Ok(SetWhitelistResponse {})
}
async fn get_whitelist(
&self,
_: BaseController,
_request: GetWhitelistRequest,
) -> Result<GetWhitelistResponse, rpc_types::error::Error> {
let global_ctx = self.peer_manager.get_global_ctx();
let tcp_ports = global_ctx.config.get_tcp_whitelist();
let udp_ports = global_ctx.config.get_udp_whitelist();
tracing::info!(
"Getting whitelist - TCP: {:?}, UDP: {:?}",
tcp_ports,
udp_ports
);
Ok(GetWhitelistResponse {
tcp_ports,
udp_ports,
})
}
}

View File

@@ -261,4 +261,44 @@ message GetAclStatsResponse {
service AclManageRpc {
rpc GetAclStats(GetAclStatsRequest) returns (GetAclStatsResponse);
rpc SetWhitelist(SetWhitelistRequest) returns (SetWhitelistResponse);
rpc GetWhitelist(GetWhitelistRequest) returns (GetWhitelistResponse);
}
message SetWhitelistRequest {
repeated string tcp_ports = 1;
repeated string udp_ports = 2;
}
message SetWhitelistResponse {}
message GetWhitelistRequest {}
message GetWhitelistResponse {
repeated string tcp_ports = 1;
repeated string udp_ports = 2;
}
message AddPortForwardRequest {
common.PortForwardConfigPb cfg = 1;
}
message AddPortForwardResponse {}
message RemovePortForwardRequest {
common.PortForwardConfigPb cfg = 1;
}
message RemovePortForwardResponse {}
message ListPortForwardRequest {}
message ListPortForwardResponse {
repeated common.PortForwardConfigPb cfgs = 1;
}
service PortForwardManageRpc {
rpc AddPortForward(AddPortForwardRequest) returns (AddPortForwardResponse);
rpc RemovePortForward(RemovePortForwardRequest) returns (RemovePortForwardResponse);
rpc ListPortForward(ListPortForwardRequest) returns (ListPortForwardResponse);
}

View File

@@ -1329,16 +1329,33 @@ async fn avoid_tunnel_loop_back_to_virtual_network() {
drop_insts(insts).await;
}
#[rstest::rstest]
#[tokio::test]
#[serial_test::serial]
pub async fn acl_rule_test_inbound() {
pub async fn acl_rule_test_inbound(
#[values(true, false)] enable_kcp_proxy: bool,
#[values(true, false)] enable_quic_proxy: bool,
) {
use crate::tunnel::{
common::tests::_tunnel_pingpong_netns,
tcp::{TcpTunnelConnector, TcpTunnelListener},
udp::{UdpTunnelConnector, UdpTunnelListener},
};
use rand::Rng;
let insts = init_three_node("udp").await;
let insts = init_three_node_ex(
"udp",
|cfg| {
if cfg.get_inst_name() == "inst1" {
let mut flags = cfg.get_flags();
flags.enable_kcp_proxy = enable_kcp_proxy;
flags.enable_quic_proxy = enable_quic_proxy;
cfg.set_flags(flags);
}
cfg
},
false,
)
.await;
// 构造 ACL 配置
use crate::proto::acl::*;
@@ -1422,7 +1439,7 @@ pub async fn acl_rule_test_inbound() {
.await;
// 6. 8080 应该连接失败(被 ACL 拦截)
let result = tokio::time::timeout(
let result = tokio::spawn(tokio::time::timeout(
std::time::Duration::from_millis(200),
_tunnel_pingpong_netns(
listener_8080,
@@ -1431,10 +1448,13 @@ pub async fn acl_rule_test_inbound() {
NetNS::new(Some("net_a".into())),
buf.clone(),
),
)
))
.await;
assert!(result.is_err(), "TCP 连接 8080 应被 ACL 拦截,不能成功");
assert!(
result.is_err() || result.unwrap().is_err(),
"TCP 连接 8080 应被 ACL 拦截,不能成功"
);
// 7. 从 10.144.144.2 连接 8082 应该连接失败(被 ACL 拦截)
let result = tokio::time::timeout(
@@ -1508,3 +1528,236 @@ pub async fn acl_rule_test_inbound() {
drop_insts(insts).await;
}
#[rstest::rstest]
#[tokio::test]
#[serial_test::serial]
pub async fn acl_rule_test_subnet_proxy(
#[values(true, false)] enable_kcp_proxy: bool,
#[values(true, false)] enable_quic_proxy: bool,
) {
use crate::tunnel::{
common::tests::_tunnel_pingpong_netns,
tcp::{TcpTunnelConnector, TcpTunnelListener},
udp::{UdpTunnelConnector, UdpTunnelListener},
};
use rand::Rng;
let insts = init_three_node_ex(
"udp",
|cfg| {
if cfg.get_inst_name() == "inst1" {
let mut flags = cfg.get_flags();
flags.enable_kcp_proxy = enable_kcp_proxy;
flags.enable_quic_proxy = enable_quic_proxy;
cfg.set_flags(flags);
} else if cfg.get_inst_name() == "inst3" {
// 添加子网代理配置
cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None)
.unwrap();
}
cfg
},
false,
)
.await;
// 等待代理路由出现
wait_proxy_route_appear(
&insts[0].get_peer_manager(),
"10.144.144.3/24",
insts[2].peer_id(),
"10.1.2.0/24",
)
.await;
// Test IPv4 connectivity
wait_for_condition(
|| async { ping_test("net_a", "10.1.2.4", None).await },
Duration::from_secs(5),
)
.await;
// 构造 ACL 配置 - 针对子网代理流量
use crate::proto::acl::*;
let mut acl = Acl::default();
let mut acl_v1 = AclV1::default();
let mut chain = Chain::default();
chain.name = "test_subnet_proxy_inbound".to_string();
chain.chain_type = ChainType::Forward as i32;
chain.enabled = true;
// 禁止访问子网代理中的 8080 端口
let mut deny_rule = Rule::default();
deny_rule.name = "deny_subnet_8080".to_string();
deny_rule.priority = 200;
deny_rule.enabled = true;
deny_rule.action = Action::Drop as i32;
deny_rule.protocol = Protocol::Any as i32;
deny_rule.ports = vec!["8080".to_string()];
deny_rule.destination_ips = vec!["10.1.2.0/24".to_string()];
chain.rules.push(deny_rule);
// 禁止来自 inst1 (10.144.144.1) 访问子网代理中的 8081 端口
let mut deny_src_rule = Rule::default();
deny_src_rule.name = "deny_inst1_to_subnet_8081".to_string();
deny_src_rule.priority = 200;
deny_src_rule.enabled = true;
deny_src_rule.action = Action::Drop as i32;
deny_src_rule.protocol = Protocol::Any as i32;
deny_src_rule.ports = vec!["8081".to_string()];
deny_src_rule.source_ips = vec!["10.144.144.1/32".to_string()];
deny_src_rule.destination_ips = vec!["10.1.2.0/24".to_string()];
chain.rules.push(deny_src_rule);
// 允许其他流量
let mut allow_rule = Rule::default();
allow_rule.name = "allow_all".to_string();
allow_rule.priority = 100;
allow_rule.enabled = true;
allow_rule.action = Action::Allow as i32;
allow_rule.protocol = Protocol::Any as i32;
allow_rule.stateful = true;
chain.rules.push(allow_rule);
acl_v1.chains.push(chain);
acl.acl_v1 = Some(acl_v1);
// 在 inst3 上应用 ACL 规则
insts[2]
.get_global_ctx()
.get_acl_filter()
.reload_rules(Some(&acl));
// TCP 测试部分 - 测试子网代理的 ACL 规则
{
// 在 net_d (10.1.2.4) 上监听多个端口
let listener_8080 = TcpTunnelListener::new("tcp://0.0.0.0:8080".parse().unwrap());
let listener_8081 = TcpTunnelListener::new("tcp://0.0.0.0:8081".parse().unwrap());
let listener_8082 = TcpTunnelListener::new("tcp://0.0.0.0:8082".parse().unwrap());
// 从 inst1 (net_a) 连接到子网代理
let connector_8080 = TcpTunnelConnector::new("tcp://10.1.2.4:8080".parse().unwrap());
let connector_8081 = TcpTunnelConnector::new("tcp://10.1.2.4:8081".parse().unwrap());
let connector_8082 = TcpTunnelConnector::new("tcp://10.1.2.4:8082".parse().unwrap());
let mut buf = vec![0; 32];
rand::thread_rng().fill(&mut buf[..]);
// 8082 应该可以连接成功(不被 ACL 拦截)
_tunnel_pingpong_netns(
listener_8082,
connector_8082,
NetNS::new(Some("net_d".into())),
NetNS::new(Some("net_a".into())),
buf.clone(),
)
.await;
// 8080 应该连接失败(被 ACL 拦截 - 禁止访问子网代理的 8080
let result = tokio::spawn(tokio::time::timeout(
std::time::Duration::from_millis(200),
_tunnel_pingpong_netns(
listener_8080,
connector_8080,
NetNS::new(Some("net_d".into())),
NetNS::new(Some("net_a".into())),
buf.clone(),
),
))
.await;
assert!(
result.is_err() || result.unwrap().is_err(),
"TCP 连接子网代理 8080 应被 ACL 拦截,不能成功"
);
// 8081 应该连接失败(被 ACL 拦截 - 禁止 inst1 访问子网代理的 8081
let result = tokio::spawn(tokio::time::timeout(
std::time::Duration::from_millis(200),
_tunnel_pingpong_netns(
listener_8081,
connector_8081,
NetNS::new(Some("net_d".into())),
NetNS::new(Some("net_a".into())),
buf.clone(),
),
))
.await;
assert!(
result.is_err() || result.unwrap().is_err(),
"TCP 连接子网代理 8081 应被 ACL 拦截,不能成功"
);
let stats = insts[2].get_global_ctx().get_acl_filter().get_stats();
println!("ACL stats after TCP tests: {:?}", stats);
}
// UDP 测试部分 - 测试子网代理的 ACL 规则
{
let listener_8080 = UdpTunnelListener::new("udp://0.0.0.0:8080".parse().unwrap());
let listener_8082 = UdpTunnelListener::new("udp://0.0.0.0:8082".parse().unwrap());
let connector_8080 = UdpTunnelConnector::new("udp://10.1.2.4:8080".parse().unwrap());
let connector_8082 = UdpTunnelConnector::new("udp://10.1.2.4:8082".parse().unwrap());
let mut buf = vec![0; 32];
rand::thread_rng().fill(&mut buf[..]);
// 8082 应该可以连接成功
_tunnel_pingpong_netns(
listener_8082,
connector_8082,
NetNS::new(Some("net_d".into())),
NetNS::new(Some("net_a".into())),
buf.clone(),
)
.await;
// 8080 应该连接失败(被 ACL 拦截)
let result = tokio::time::timeout(
std::time::Duration::from_millis(200),
_tunnel_pingpong_netns(
listener_8080,
connector_8080,
NetNS::new(Some("net_d".into())),
NetNS::new(Some("net_a".into())),
buf.clone(),
),
)
.await;
let stats = insts[2].get_global_ctx().get_acl_filter().get_stats();
println!("ACL stats after UDP tests: {}", stats);
assert!(
result.is_err(),
"UDP 连接子网代理 8080 应被 ACL 拦截,不能成功"
);
}
// 测试 ICMP 到子网代理(应该被拒绝,因为 Any 协议被拒绝)
tokio::spawn(wait_for_condition(
|| async { ping_test("net_a", "10.1.2.4", None).await },
Duration::from_secs(1),
))
.await
.unwrap_err();
// 移除 ACL 规则
insts[2]
.get_global_ctx()
.get_acl_filter()
.reload_rules(None);
// 验证移除 ACL 后ICMP 可以正常工作
wait_for_condition(
|| async { ping_test("net_a", "10.1.2.4", None).await },
Duration::from_secs(5),
)
.await;
drop_insts(insts).await;
}

12
flake.lock generated
View File

@@ -20,11 +20,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1750741721,
"narHash": "sha256-Z0djmTa1YmnGMfE9jEe05oO4zggjDmxOGKwt844bUhE=",
"lastModified": 1753429684,
"narHash": "sha256-9h7+4/53cSfQ/uA3pSvCaBepmZaz/dLlLVJnbQ+SJjk=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "4b1164c3215f018c4442463a27689d973cffd750",
"rev": "7fd36ee82c0275fb545775cc5e4d30542899511d",
"type": "github"
},
"original": {
@@ -48,11 +48,11 @@
]
},
"locked": {
"lastModified": 1750905536,
"narHash": "sha256-Mo7yXM5IvMGNvJPiNkFsVT2UERmnvjsKgnY6UyDdySQ=",
"lastModified": 1753671061,
"narHash": "sha256-IU4eBWfe9h2QejJYST+EAlhg8a1H6mh9gbcmWgZ2/mQ=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "2fa7c0aabd15fa0ccc1dc7e675a4fcf0272ad9a1",
"rev": "40065d17ee4dbec3ded8ca61236132aede843fab",
"type": "github"
},
"original": {

View File

@@ -10,35 +10,48 @@
};
};
outputs = { self, nixpkgs, flake-utils, rust-overlay }:
outputs = { self, nixpkgs, flake-utils, rust-overlay, ... }:
flake-utils.lib.eachDefaultSystem (system:
let
overlays = [ (import rust-overlay) ];
pkgs = import nixpkgs {
inherit system overlays;
};
lib = nixpkgs.lib;
rust = pkgs.rust-bin.stable.latest.default.override {
rustVersion = "1.87.0";
rust = pkgs.rust-bin.stable.${rustVersion}.default.override{
extensions = [ "rust-src" "rust-analyzer" ];
};
in
{
devShells.default = pkgs.mkShell {
devShells.default = pkgs.mkShell rec {
nativeBuildInputs = with pkgs; [
rust
pkg-config
protobuf
];
clang
pkg-config
# web
nodejs_22
pnpm
];
buildInputs = with pkgs; [
zstd
openssl
libclang
llvmPackages.libclang
# gui
webkitgtk_4_1
libsoup_3
];
RUST_SRC_PATH = "${rust}/lib/rustlib/src/rust/library";
LIBCLANG_PATH = "${pkgs.libclang.lib}/lib";
BINDGEN_EXTRA_CLANG_ARGS = "-I${pkgs.clang}/resource-root/include";
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath (buildInputs ++ nativeBuildInputs);
ZSTD_SYS_USE_PKG_CONFIG = true;
KCP_SYS_EXTRA_HEADER_PATH = "${pkgs.libclang.lib}/lib/clang/19/include:${pkgs.glibc.dev}/include";
};
});
}
);
}