Compare commits

...

64 Commits

Author SHA1 Message Date
Sijie.Sun
d880dfbbca bump version to v2.2.4 (#697) 2025-03-19 17:23:15 +08:00
Sijie.Sun
b46a200f8d connector should set bind addrs correctly (#696) 2025-03-19 10:47:43 +08:00
kevin
81490d0662 enable sni for tls client (#691)
* enable sni for tls client
* update test case
* fix public_ip parse bug
2025-03-19 01:15:34 +08:00
treasury1203
3d1e841cc5 Merge pull request #687 from treasury1203/patch-1
docs: (contributing)
2025-03-17 22:27:34 +08:00
sijie.sun
f52936a103 bump version to v2.2.3 2025-03-17 22:24:19 +08:00
Sijie.Sun
23f69ce6a4 improve direct connector (#685)
* support ipv6 stun
* show interface and public ip in cli node info
* direct conn should keep trying unless already direct connected
* peer should use conn with smallest latency
* deprecate ipv6_listener, use -l instead
2025-03-17 10:46:14 +08:00
sijie.sun
f84ae228fc fix some tailwind style not work 2025-03-16 11:45:18 +08:00
kevin
74c716ccaa fix web bugs 2025-03-15 14:52:09 +08:00
sijie.sun
445b02b2ca do not upload to oss 2025-03-15 00:16:12 +08:00
sijie.sun
bb17ffa9fc fix wireguard not respond after idle for 120s 2025-03-15 00:16:12 +08:00
sijie.sun
389ea709ce fix smoltcp not wakeup closed socket 2025-03-15 00:16:12 +08:00
kevin
c2f535ead4 import/export network config for web (#676)
* import/export network config for web
* add socks5 config for web
2025-03-12 23:19:56 +08:00
Sijie.Sun
0318f55322 add serde default to NetworkConfig (#675)
* add serde default to NetworkConfig

* set base z-index of event-dialog
2025-03-12 10:36:54 +08:00
kevin
1f4340e82f add configurable items for web/gui
enable_exit_node
relay_all_peer_rpc
multi_thread
proxy_forward_by_system
relay_network_whitelist
manual_routes
exit_nodes
2025-03-11 22:30:39 +08:00
loecom
ed08707c98 easytier-web add tcp support
easytier-web add tcp support
2025-03-11 12:48:48 +08:00
sijie.sun
7397abcb94 txt connector should not rely on A record 2025-03-09 21:31:43 +08:00
sijie.sun
98d321f8ac fix kcp traffic not encrypted 2025-03-08 22:09:43 +08:00
sijie.sun
e78b0ef869 test serializedly 2025-03-08 15:59:54 +08:00
sijie.sun
8d654330ac fix http_connector
1. use ipv4 first when connect to http server.
2. allow redirect to url like: http://tcp://p.com:11010
3. dns should also use long timeout
2025-03-08 15:59:54 +08:00
L-Trump
00d61333d3 allow proxy packets to be forwarded by system kernel 2025-03-08 12:56:49 +08:00
sijie.sun
03b55b61e7 support txt/srv record 2025-03-08 12:56:23 +08:00
sijie.sun
745e44cc87 allow using http connector for config server 2025-03-07 22:17:23 +08:00
sijie.sun
24213a874a make http connector timeout longer
http response may be slow, make its timeout longer.
2025-03-07 22:17:23 +08:00
sijie.sun
155f8a2ba2 make prost build smaller 2025-03-06 11:07:05 +08:00
sijie.sun
568dca6f9c fix memory leak 2025-03-06 11:07:05 +08:00
sijie.sun
673c34cf5a http redirector 2025-02-21 11:51:13 +08:00
sijie.sun
2050ed78d0 remove some dep 2025-02-21 11:51:13 +08:00
zhj9709
2632c44195 fix docker stop issue by using tini for graceful shutdown 2025-02-10 22:54:07 +08:00
Sijie.Sun
5449eabf2a Update docker.yml 2025-02-10 12:47:12 +08:00
sijie.sun
dd5b00faf4 bump version to v2.2.2 2025-02-10 08:47:18 +08:00
sijie.sun
0caec3e4da fix label translate 2025-02-09 22:01:26 +08:00
sijie.sun
e48e62cac0 fix tcp proxy not close properly 2025-02-09 22:01:09 +08:00
sijie.sun
06ebda2e2f update kcp-sys to fix unexpected disconnect 2025-02-09 00:30:27 +08:00
sijie.sun
53c449b9fb fix net2net kcp proxy 2025-02-08 23:11:10 +08:00
sijie.sun
51e0fac72c improve user experience
1. add config generator to easytier-web
2. add command to show tcp/kcp proxy entries
2025-02-07 23:59:36 +08:00
sijie.sun
32b1fe0893 netlink shoud remove route only when ifidx is same 2025-02-06 19:23:00 +08:00
sijie.sun
2af3b82e32 bump version to 2.2.1 2025-02-06 16:54:49 +08:00
sijie.sun
eca1231831 fix help msg of kcp 2025-02-06 16:54:49 +08:00
sijie.sun
e833c2a28b improve experience of subnet/kcp proxy
1. add self to windows firewall on windows
2. android always use smoltcp
2025-02-06 16:54:49 +08:00
Sijie.Sun
8b89a037e8 fix tcp incoming failure when kcp proxy is enabled (#601) 2025-02-06 09:08:34 +08:00
Sijie.Sun
1e821a03fe netlink route add should be exclusive (#596) 2025-02-04 23:01:13 +08:00
Sijie.Sun
66051967fe fix self peer route info not exist when starting (#595) 2025-02-04 21:35:14 +08:00
Sijie.Sun
a63778854f use netlink instead of shell cmd to config ip (#593) 2025-02-03 15:13:50 +08:00
Sijie.Sun
4aea0821dd forward original peer info in ospf route (#589)
prost doesn't support unknown field, and these info may be lost when
they go through a old version node.
2025-01-27 20:38:22 +08:00
Sijie.Sun
08546925cc fix tests (#588)
fix proxy_three_node_disconnect_test and hole_punching_symmetric_only_random
2025-01-27 15:17:47 +08:00
Sijie.Sun
d0f26d9303 bump version to 2.2.0 (#586) 2025-01-26 23:45:50 +08:00
Sijie.Sun
2a5d5ea4df make kcp proxy compitible with old version (#585)
* fix kcp not work with smoltcp
* check if dst kcp input is enabled
2025-01-26 16:22:10 +08:00
Sijie.Sun
b69b122c8d add options to gui to enable kcp (#583)
* add test to kcp
* add options to gui to enable kcp
2025-01-26 13:31:20 +08:00
Sijie.Sun
55a39491cb feat/kcp (#580)
* support proxy tcp stream with kcp to improve experience of tcp over udp
* update rust version
* make subnet proxy route metrics lower in windows.
2025-01-26 00:41:15 +08:00
Sijie.Sun
1194ee1c2d fix peer manager stuck when sending large peer rpc (#572) 2025-01-17 06:50:21 +08:00
Sijie.Sun
c23b544c34 tcp accept should retry when encoutering some kinds of error (#565)
* tcp accept should retry when encoutering some kinds of error

bump version to v2.1.2

* persistent temporary machine id
2025-01-14 08:55:48 +08:00
Sijie.Sun
9d76b86f49 fix bugs (#561)
1. if peers disconnected before stop session, may crash at the assert.
2. bind_device flag should take effect on manual connector.
2025-01-12 00:16:38 +08:00
Sijie.Sun
bb0ccca3e5 allow manually specify public address of listeners (#556) 2025-01-10 09:25:14 +08:00
Sijie.Sun
306817ae9a allow listener retry listen (#554) 2025-01-09 00:01:41 +08:00
Sijie.Sun
d2ec60e108 batch recv for udp proxy (#552) 2025-01-07 23:52:18 +08:00
Sijie.Sun
e016aeddeb optimize pingpong conn close condition (#549)
if received some packets from peer, only close conn after enough
rounds of pingpong
2025-01-07 22:42:57 +08:00
Sijie.Sun
a4419a31fd fix peer rpc compatibility issue (#548)
every rpc packet should contains descriptor if sent to old version et.
2025-01-06 23:30:56 +08:00
Sijie.Sun
34e4e907a9 bump version to v2.1.1 (#533) 2024-12-24 10:40:57 -05:00
Sijie.Sun
2f4a097787 fix android (#531) 2024-12-23 19:38:32 -05:00
Sijie.Sun
f3de00be37 support pause a network (#528) 2024-12-23 09:29:59 +08:00
Sijie.Sun
4cf61f0d4a fix web show dup entry for same machine (#526) 2024-12-21 11:51:01 -05:00
Sijie.Sun
4e5915f98e save api host in local storage (#523) 2024-12-21 01:29:54 +08:00
Sijie.Sun
870eca9e9f optimize easytier-web (#522)
1. use default compress level for tower_http. the best level consume
lots of memory
2. add more help message and command line arg.
2024-12-21 01:27:39 +08:00
Sijie.Sun
25ed41caf5 use correct config server url (#519) 2024-12-20 00:21:22 +08:00
147 changed files with 6976 additions and 2075 deletions

View File

@@ -18,7 +18,7 @@ RUN mkdir -p /tmp/output; \
FROM alpine:latest
RUN apk add --no-cache tzdata
RUN apk add --no-cache tzdata tini
WORKDIR /app
COPY --from=builder --chmod=755 /tmp/output/* /usr/local/bin
@@ -36,4 +36,4 @@ EXPOSE 11011/tcp
# wss
EXPOSE 11012/tcp
ENTRYPOINT ["easytier-core"]
ENTRYPOINT ["/sbin/tini", "--", "easytier-core"]

View File

@@ -97,6 +97,7 @@ jobs:
echo "GIT_DESC=$(git log -1 --format=%cd.%h --date=format:%Y-%m-%d_%H:%M:%S)" >> $GITHUB_ENV
- name: Cargo cache
if: ${{ ! endsWith(matrix.TARGET, 'freebsd') }}
uses: actions/cache@v4
with:
path: |
@@ -114,6 +115,9 @@ jobs:
if: ${{ ! endsWith(matrix.TARGET, 'freebsd') }}
run: |
bash ./.github/workflows/install_rust.sh
# this dir is a soft link generated by install_rust.sh
# kcp-sys need this to gen ffi bindings. without this clang may fail to find some libc headers such as bits/libc-header-start.h
export KCP_SYS_EXTRA_HEADER_PATH=/usr/include/musl-cross
if [[ $OS =~ ^ubuntu.*$ && $TARGET =~ ^mips.*$ ]]; then
cargo +nightly build -r --verbose --target $TARGET -Z build-std=std,panic_abort --no-default-features --features mips --package=easytier
else
@@ -142,14 +146,14 @@ jobs:
whoami
env | sort
sudo pkg install -y git protobuf
sudo pkg install -y git protobuf llvm-devel
curl --proto 'https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
source $HOME/.cargo/env
rustup set auto-self-update disable
rustup install 1.77
rustup default 1.77
rustup install 1.84
rustup default 1.84
export CC=clang
export CXX=clang++
@@ -204,18 +208,6 @@ jobs:
path: |
./artifacts/*
- name: Upload OSS
if: ${{ env.OSS_BUCKET != '' }}
uses: Menci/upload-to-oss@main
with:
access-key-id: ${{ secrets.ALIYUN_OSS_ACCESS_ID }}
access-key-secret: ${{ secrets.ALIYUN_OSS_ACCESS_KEY }}
endpoint: ${{ secrets.ALIYUN_OSS_ENDPOINT }}
bucket: ${{ secrets.ALIYUN_OSS_BUCKET }}
local-path: ./artifacts/
remote-path: /easytier-releases/${{env.GIT_DESC}}/easytier-${{ matrix.ARTIFACT_NAME }}
no-delete-remote-files: true
retry: 5
core-result:
if: needs.pre_job.outputs.should_skip != 'true' && always()
runs-on: ubuntu-latest

View File

@@ -11,7 +11,7 @@ on:
image_tag:
description: 'Tag for this image build'
type: string
default: 'v1.2.0'
default: 'v2.2.4'
required: true
mark_latest:
description: 'Mark this image as latest'
@@ -39,6 +39,12 @@ jobs:
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: login github container registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Download artifact
id: download-artifact
uses: dawidd6/action-download-artifact@v6
@@ -58,4 +64,6 @@ jobs:
platforms: linux/amd64,linux/arm64
push: true
file: .github/workflows/Dockerfile
tags: easytier/easytier:${{ inputs.image_tag }}${{ inputs.mark_latest && ',easytier/easytier:latest' || '' }},
tags: |
easytier/easytier:${{ inputs.image_tag }}${{ inputs.mark_latest && ',easytier/easytier:latest' || '' }},
ghcr.io/easytier/easytier:${{ inputs.image_tag }}${{ inputs.mark_latest && ',easytier/easytier:latest' || '' }},

View File

@@ -124,6 +124,20 @@ jobs:
# GitHub repo token to use to avoid rate limiter
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install GUI dependencies (x86 only)
if: ${{ matrix.TARGET == 'x86_64-unknown-linux-musl' }}
run: |
sudo apt install -qq libwebkit2gtk-4.1-dev \
build-essential \
curl \
wget \
file \
libgtk-3-dev \
librsvg2-dev \
libxdo-dev \
libssl-dev \
patchelf
- name: Install GUI cross compile (aarch64 only)
if: ${{ matrix.TARGET == 'aarch64-unknown-linux-musl' }}
run: |
@@ -207,18 +221,6 @@ jobs:
path: |
./artifacts/*
- name: Upload OSS
if: ${{ env.OSS_BUCKET != '' }}
uses: Menci/upload-to-oss@main
with:
access-key-id: ${{ secrets.ALIYUN_OSS_ACCESS_ID }}
access-key-secret: ${{ secrets.ALIYUN_OSS_ACCESS_KEY }}
endpoint: ${{ secrets.ALIYUN_OSS_ENDPOINT }}
bucket: ${{ secrets.ALIYUN_OSS_BUCKET }}
local-path: ./artifacts/
remote-path: /easytier-releases/${{env.GIT_DESC}}/easytier-gui-${{ matrix.ARTIFACT_NAME }}
no-delete-remote-files: true
retry: 5
gui-result:
if: needs.pre_job.outputs.should_skip != 'true' && always()
runs-on: ubuntu-latest

View File

@@ -8,20 +8,7 @@
# dependencies are only needed on ubuntu as that's the only place where
# we make cross-compilation
if [[ $OS =~ ^ubuntu.*$ ]]; then
sudo apt-get update && sudo apt-get install -qq crossbuild-essential-arm64 crossbuild-essential-armhf musl-tools libappindicator3-dev
# for easytier-gui
if [[ $GUI_TARGET != '' && $GUI_TARGET =~ ^x86_64.*$ ]]; then
sudo apt install -qq libwebkit2gtk-4.1-dev \
build-essential \
curl \
wget \
file \
libgtk-3-dev \
librsvg2-dev \
libxdo-dev \
libssl-dev \
patchelf
fi
sudo apt-get update && sudo apt-get install -qq crossbuild-essential-arm64 crossbuild-essential-armhf musl-tools libappindicator3-dev llvm clang
# curl -s musl.cc | grep mipsel
case $TARGET in
mipsel-unknown-linux-musl)
@@ -49,16 +36,17 @@ if [[ $OS =~ ^ubuntu.*$ ]]; then
if [ -n "$MUSL_URI" ]; then
mkdir -p ./musl_gcc
wget -c https://musl.cc/${MUSL_URI}-cross.tgz -P ./musl_gcc/
wget --inet4-only -c https://musl.cc/${MUSL_URI}-cross.tgz -P ./musl_gcc/
tar zxf ./musl_gcc/${MUSL_URI}-cross.tgz -C ./musl_gcc/
sudo ln -s $(pwd)/musl_gcc/${MUSL_URI}-cross/bin/*gcc /usr/bin/
sudo ln -s $(pwd)/musl_gcc/${MUSL_URI}-cross/${MUSL_URI}/include/ /usr/include/musl-cross
fi
fi
# see https://github.com/rust-lang/rustup/issues/3709
rustup set auto-self-update disable
rustup install 1.77
rustup default 1.77
rustup install 1.84
rustup default 1.84
# mips/mipsel cannot add target from rustup, need compile by ourselves
if [[ $OS =~ ^ubuntu.*$ && $TARGET =~ ^mips.*$ ]]; then

View File

@@ -146,18 +146,6 @@ jobs:
path: |
./artifacts/*
- name: Upload OSS
if: ${{ env.OSS_BUCKET != '' }}
uses: Menci/upload-to-oss@main
with:
access-key-id: ${{ secrets.ALIYUN_OSS_ACCESS_ID }}
access-key-secret: ${{ secrets.ALIYUN_OSS_ACCESS_KEY }}
endpoint: ${{ secrets.ALIYUN_OSS_ENDPOINT }}
bucket: ${{ secrets.ALIYUN_OSS_BUCKET }}
local-path: ./artifacts/
remote-path: /easytier-releases/${{env.GIT_DESC}}/easytier-gui-${{ matrix.ARTIFACT_NAME }}
no-delete-remote-files: true
retry: 5
mobile-result:
if: needs.pre_job.outputs.should_skip != 'true' && always()
runs-on: ubuntu-latest

View File

@@ -21,7 +21,7 @@ on:
version:
description: 'Version for this release'
type: string
default: 'v2.1.0'
default: 'v2.2.4'
required: true
make_latest:
description: 'Mark this release as latest'

View File

@@ -62,6 +62,6 @@ jobs:
- name: Run tests
run: |
sudo -E env "PATH=$PATH" cargo test --no-default-features --features=full --verbose
sudo -E env "PATH=$PATH" cargo test --no-default-features --features=full --verbose -- --test-threads=1 --nocapture
sudo chown -R $USER:$USER ./target
sudo chown -R $USER:$USER ~/.cargo

1178
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,10 +5,11 @@
[![GitHub issues](https://img.shields.io/github/issues/EasyTier/EasyTier)](https://github.com/EasyTier/EasyTier/issues)
[![GitHub Core Actions](https://github.com/EasyTier/EasyTier/actions/workflows/core.yml/badge.svg)](https://github.com/EasyTier/EasyTier/actions/workflows/core.yml)
[![GitHub GUI Actions](https://github.com/EasyTier/EasyTier/actions/workflows/gui.yml/badge.svg)](https://github.com/EasyTier/EasyTier/actions/workflows/gui.yml)
[![GitHub Test Actions](https://github.com/EasyTier/EasyTier/actions/workflows/test.yml/badge.svg)](https://github.com/EasyTier/EasyTier/actions/workflows/test.yml)
[简体中文](/README_CN.md) | [English](/README.md)
**Please visit the [EasyTier Official Website](https://www.easytier.top/en/) to view the full documentation.**
**Please visit the [EasyTier Official Website](https://easytier.cn/en/) to view the full documentation.**
EasyTier is a simple, safe and decentralized VPN networking solution implemented with the Rust language and Tokio framework.
@@ -31,6 +32,7 @@ EasyTier is a simple, safe and decentralized VPN networking solution implemented
- **High Availability**: Supports multi-path and switches to healthy paths when high packet loss or network errors are detected.
- **IPv6 Support**: Supports networking using IPv6.
- **Multiple Protocol Types**: Supports communication between nodes using protocols such as WebSocket and QUIC.
- **Web Management Interface**: Provides a [web-based management](https://easytier.cn/web) interface for easy configuration and monitoring.
## Installation
@@ -52,7 +54,7 @@ EasyTier is a simple, safe and decentralized VPN networking solution implemented
4. **Install by Docker Compose**
Please visit the [EasyTier Official Website](https://www.easytier.top/en/) to view the full documentation.
Please visit the [EasyTier Official Website](https://easytier.cn/en/) to view the full documentation.
5. **Install by script (For Linux Only)**
@@ -200,20 +202,20 @@ Subnet proxy information will automatically sync to each node in the virtual net
### Networking without Public IP
EasyTier supports networking using shared public nodes. The currently deployed shared public node is ``tcp://public.easytier.top:11010``.
EasyTier supports networking using shared public nodes. The currently deployed shared public node is ``tcp://public.easytier.cn:11010``.
When using shared nodes, each node entering the network needs to provide the same ``--network-name`` and ``--network-secret`` parameters as the unique identifier of the network.
Taking two nodes as an example, Node A executes:
```sh
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
```
Node B executes
```sh
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
```
After the command is successfully executed, Node A can access Node B through the virtual IP 10.144.144.2.
@@ -286,7 +288,7 @@ Run you own public server cluster is exactly same as running an virtual network,
You can also join the official public server cluster with following command:
```
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.top:11010
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.cn:11010
```
@@ -296,10 +298,8 @@ You can use ``easytier-core --help`` to view all configuration items
## Roadmap
- [ ] Improve documentation and user guides.
- [ ] Support features such as encryption, TCP hole punching, etc.
- [ ] Support features such TCP hole punching, KCP, FEC etc.
- [ ] Support iOS.
- [ ] Support Web configuration management.
## Community and Contribution

View File

@@ -8,7 +8,7 @@
[简体中文](/README_CN.md) | [English](/README.md)
**请访问 [EasyTier 官网](https://www.easytier.top/) 以查看完整的文档。**
**请访问 [EasyTier 官网](https://easytier.cn/) 以查看完整的文档。**
一个简单、安全、去中心化的内网穿透 VPN 组网方案,使用 Rust 语言和 Tokio 框架实现。
@@ -31,6 +31,7 @@
- **高可用性**:支持多路径和在检测到高丢包率或网络错误时切换到健康路径。
- **IPV6 支持**:支持利用 IPV6 组网。
- **多协议类型**: 支持使用 WebSocket、QUIC 等协议进行节点间通信。
- **Web 管理界面**:支持通过 [Web 界面](https://easytier.cn)管理节点。
## 安装
@@ -52,7 +53,7 @@
4. **通过Docker Compose安装**
请访问 [EasyTier 官网](https://www.easytier.top/) 以查看完整的文档。
请访问 [EasyTier 官网](https://easytier.cn/) 以查看完整的文档。
5. **使用一键脚本安装 (仅适用于 Linux)**
@@ -199,20 +200,20 @@ sudo easytier-core --ipv4 10.144.144.2 -n 10.1.1.0/24
### 无公网IP组网
EasyTier 支持共享公网节点进行组网。目前已部署共享的公网节点 ``tcp://public.easytier.top:11010``。
EasyTier 支持共享公网节点进行组网。目前已部署共享的公网节点 ``tcp://public.easytier.cn:11010``。
使用共享节点时,需要每个入网节点提供相同的 ``--network-name`` 和 ``--network-secret`` 参数,作为网络的唯一标识。
以双节点为例,节点 A 执行:
```sh
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
sudo easytier-core -i 10.144.144.1 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
```
节点 B 执行
```sh
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -e tcp://public.easytier.top:11010
sudo easytier-core --ipv4 10.144.144.2 --network-name abc --network-secret abc -p tcp://public.easytier.cn:11010
```
命令执行成功后,节点 A 即可通过虚拟 IP 10.144.144.2 访问节点 B。
@@ -289,7 +290,7 @@ connected_clients:
也可以使用以下命令加入官方公共服务器集群,后续将实现公共服务器集群的节点间负载均衡:
```
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.top:11010
sudo easytier-core --network-name easytier --network-secret easytier -p tcp://public.easytier.cn:11010
```
### 其他配置
@@ -299,9 +300,8 @@ sudo easytier-core --network-name easytier --network-secret easytier -p tcp://pu
## 路线图
- [ ] 完善文档和用户指南。
- [ ] 支持 TCP 打洞等特性。
- [ ] 支持 TCP 打洞、KCP、FEC 等特性。
- [ ] 支持 iOS。
- [ ] 支持 Web 配置管理。
## 社区和贡献

View File

@@ -1,7 +1,7 @@
{
"name": "easytier-gui",
"type": "module",
"version": "2.1.0",
"version": "2.2.4",
"private": true,
"packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4",
"scripts": {
@@ -35,16 +35,18 @@
"@primevue/auto-import-resolver": "^4.1.0",
"@tauri-apps/api": "2.1.0",
"@tauri-apps/cli": "2.1.0",
"@types/default-gateway": "^7.2.2",
"@types/node": "^22.7.4",
"@types/uuid": "^10.0.0",
"@vitejs/plugin-vue": "^5.1.4",
"@vue-macros/volar": "0.30.5",
"autoprefixer": "^10.4.20",
"cidr-tools": "^11.0.2",
"default-gateway": "^7.2.2",
"eslint": "^9.12.0",
"eslint-plugin-format": "^0.1.2",
"internal-ip": "^8.0.0",
"postcss": "^8.4.47",
"tailwindcss": "^3.4.13",
"tailwindcss": "=3.4.17",
"typescript": "^5.6.2",
"unplugin-auto-import": "^0.18.3",
"unplugin-vue-components": "^0.27.4",
@@ -58,4 +60,4 @@
"vue-i18n": "^10.0.0",
"vue-tsc": "^2.1.10"
}
}
}

View File

@@ -1,6 +1,6 @@
[package]
name = "easytier-gui"
version = "2.1.0"
version = "2.2.4"
description = "EasyTier GUI"
authors = ["you"]
edition = "2021"
@@ -15,7 +15,8 @@ crate-type = ["staticlib", "cdylib", "rlib"]
tauri-build = { version = "2.0.0-rc", features = [] }
[dependencies]
tauri = { version = "2.1", features = [
# wry 0.47 may crash on android, see https://github.com/EasyTier/EasyTier/issues/527
tauri = { version = "=2.0.6", features = [
"tray-icon",
"image-png",
"image-ico",

View File

@@ -39,7 +39,7 @@
"vpnservice:allow-prepare-vpn",
"vpnservice:allow-start-vpn",
"vpnservice:allow-stop-vpn",
"vpnservice:allow-register-listener",
"vpnservice:allow-registerListener",
"os:default",
"os:allow-os-type",
"os:allow-arch",

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.4 KiB

After

Width:  |  Height:  |  Size: 5.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 14 KiB

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.4 KiB

After

Width:  |  Height:  |  Size: 5.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.3 KiB

After

Width:  |  Height:  |  Size: 5.1 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.9 KiB

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.3 KiB

After

Width:  |  Height:  |  Size: 5.1 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.8 KiB

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 18 KiB

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.8 KiB

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 12 KiB

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

After

Width:  |  Height:  |  Size: 62 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 12 KiB

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 16 KiB

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 40 KiB

After

Width:  |  Height:  |  Size: 85 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 16 KiB

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.6 KiB

After

Width:  |  Height:  |  Size: 2.6 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 59 KiB

After

Width:  |  Height:  |  Size: 59 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 68 KiB

After

Width:  |  Height:  |  Size: 68 KiB

View File

@@ -89,6 +89,7 @@ fn get_os_hostname() -> Result<String, String> {
#[tauri::command]
fn set_logging_level(level: String) -> Result<(), String> {
#[allow(static_mut_refs)]
let sender = unsafe { LOGGER_LEVEL_SENDER.as_ref().unwrap() };
sender.send(level).map_err(|e| e.to_string())?;
Ok(())
@@ -141,7 +142,6 @@ pub fn run() {
process::exit(0);
}
#[cfg(not(target_os = "android"))]
utils::setup_panic_handler();
let mut builder = tauri::Builder::default();
@@ -189,7 +189,10 @@ pub fn run() {
let Ok(Some(logger_reinit)) = utils::init_logger(config, true) else {
return Ok(());
};
unsafe { LOGGER_LEVEL_SENDER.replace(logger_reinit) };
#[allow(static_mut_refs)]
unsafe {
LOGGER_LEVEL_SENDER.replace(logger_reinit)
};
// for tray icon, menu need to be built in js
#[cfg(not(target_os = "android"))]

View File

@@ -17,7 +17,7 @@
"createUpdaterArtifacts": false
},
"productName": "easytier-gui",
"version": "2.1.0",
"version": "2.2.4",
"identifier": "com.kkrainbow.easytier",
"plugins": {},
"app": {

View File

@@ -49,9 +49,9 @@ async function doStartVpn(ipv4Addr: string, cidr: number, routes: string[]) {
return
}
console.log('start vpn')
console.log('start vpn service', ipv4Addr, cidr, routes)
const start_ret = await start_vpn({
ipv4Addr: `${ipv4Addr}`,
ipv4Addr: `${ipv4Addr}/${cidr}`,
routes,
disallowedApplications: ['com.kkrainbow.easytier'],
mtu: 1300,
@@ -113,6 +113,7 @@ function getRoutesForVpn(routes: Route[]): string[] {
}
async function onNetworkInstanceChange() {
console.error('vpn service watch network instance change ids', JSON.stringify(networkStore.networkInstanceIds))
const insts = networkStore.networkInstanceIds
if (!insts) {
await doStopVpn()
@@ -142,7 +143,7 @@ async function onNetworkInstanceChange() {
const routesChanged = JSON.stringify(routes) !== JSON.stringify(curVpnStatus.routes)
if (ipChanged || routesChanged) {
console.log('virtual ip changed', JSON.stringify(curVpnStatus), virtual_ip)
console.info('vpn service virtual ip changed', JSON.stringify(curVpnStatus), virtual_ip)
try {
await doStopVpn()
}
@@ -154,7 +155,7 @@ async function onNetworkInstanceChange() {
await doStartVpn(virtual_ip, 24, routes)
}
catch (e) {
console.error('start vpn failed, clear all network insts.', e)
console.error('start vpn service failed, clear all network insts.', e)
networkStore.clearNetworkInstances()
await retainNetworkInstance(networkStore.networkInstanceIds)
}
@@ -175,6 +176,7 @@ async function watchNetworkInstance() {
}
subscribe_running = false
})
console.error('vpn service watch network instance')
}
export async function initMobileVpnService() {

View File

@@ -50,8 +50,8 @@ async function main() {
darkModeSelector: 'system',
cssLayer: {
name: 'primevue',
order: 'tailwind-base, primevue, tailwind-utilities'
}
order: 'tailwind-base, primevue, tailwind-utilities',
},
},
},
})

View File

@@ -250,7 +250,12 @@ onBeforeMount(async () => {
onMounted(async () => {
if (type() === 'android') {
await initMobileVpnService()
try {
await initMobileVpnService()
console.error("easytier init vpn service done")
} catch (e: any) {
console.error("easytier init vpn service failed", e)
}
}
})

View File

@@ -1,9 +1,11 @@
import { networkInterfaces } from 'node:os'
import path from 'node:path'
import process from 'node:process'
import VueI18n from '@intlify/unplugin-vue-i18n/vite'
import { PrimeVueResolver } from '@primevue/auto-import-resolver'
import Vue from '@vitejs/plugin-vue'
import { internalIpV4Sync } from 'internal-ip'
import { containsCidr, parseCidr } from 'cidr-tools'
import { gateway4sync } from 'default-gateway'
import AutoImport from 'unplugin-auto-import/vite'
import Components from 'unplugin-vue-components/vite'
import VueMacros from 'unplugin-vue-macros/vite'
@@ -13,6 +15,20 @@ import { defineConfig } from 'vite'
import VueDevTools from 'vite-plugin-vue-devtools'
import Layouts from 'vite-plugin-vue-layouts'
function findIp(gateway: string) {
// Look for the matching interface in all local interfaces
console.log('gateway', gateway)
for (const addresses of Object.values(networkInterfaces())) {
if (!addresses)
continue
for (const { cidr } of addresses) {
if (cidr && containsCidr(cidr, gateway)) {
return parseCidr(cidr).ip
}
}
}
}
const host = process.env.TAURI_DEV_HOST
// https://vitejs.dev/config/
@@ -99,10 +115,10 @@ export default defineConfig(async () => ({
},
hmr: host
? {
protocol: 'ws',
host: internalIpV4Sync(),
port: 1430,
}
protocol: 'ws',
host: findIp(gateway4sync().gateway),
port: 1430,
}
: undefined,
},
}))

View File

@@ -8,7 +8,7 @@ repository = "https://github.com/EasyTier/EasyTier"
authors = ["kkrainbow"]
keywords = ["vpn", "p2p", "network", "easytier"]
categories = ["network-programming", "command-line-utilities"]
rust-version = "1.77.0"
rust-version = "1.84.0"
license-file = "LICENSE"
readme = "README.md"

View File

@@ -1,7 +1,8 @@
[package]
name = "easytier-web"
version = "0.1.0"
version = "2.2.4"
edition = "2021"
description = "Config server for easytier. easytier-core gets config from this and web frontend use it as restful api server."
[dependencies]
easytier = { path = "../easytier" }
@@ -23,7 +24,11 @@ tower-sessions = { version = "0.13.0", default-features = false, features = [
] }
tower-http = { version = "0.6", features = ["cors", "compression-full"] }
sqlx = { version = "0.8", features = ["sqlite"] }
sea-orm = { version = "1.1", features = [ "sqlx-sqlite", "runtime-tokio-rustls", "macros" ] }
sea-orm = { version = "1.1", features = [
"sqlx-sqlite",
"runtime-tokio-rustls",
"macros",
] }
sea-orm-migration = { version = "1.1" }
@@ -31,11 +36,13 @@ sea-orm-migration = { version = "1.1" }
rust-embed = { version = "8.5.0", features = ["debug-embed"] }
base64 = "0.22"
rand = "0.8"
image = {version="0.24", default-features = false, features = ["png"]}
image = { version = "0.24", default-features = false, features = ["png"] }
rusttype = "0.9.3"
imageproc = "0.23.0"
rust-i18n = "3"
sys-locale = "0.3"
clap = { version = "4.4.8", features = [
"string",
"unicode",
@@ -50,3 +57,5 @@ uuid = { version = "1.5.0", features = [
"macro-diagnostics",
"serde",
] }
chrono = { version = "0.4.37", features = ["serde"] }

View File

@@ -40,10 +40,10 @@
"postcss": "^8.4.47",
"postcss-import": "^16.1.0",
"postcss-nested": "^7.0.2",
"tailwindcss": "^3.4.14",
"tailwindcss": "=3.4.17",
"typescript": "~5.6.3",
"vite": "^5.4.10",
"vite-plugin-dts": "^4.3.0",
"vue-tsc": "^2.1.10"
}
}
}

View File

@@ -120,13 +120,50 @@ function searchListenerSuggestions(e: { query: string }) {
listenerSuggestions.value = ret
}
const exitNodesSuggestions = ref([''])
function searchExitNodesSuggestions(e: { query: string }) {
const ret = []
ret.push(e.query)
exitNodesSuggestions.value = ret
}
const whitelistSuggestions = ref([''])
function searchWhitelistSuggestions(e: { query: string }) {
const ret = []
ret.push(e.query)
whitelistSuggestions.value = ret
}
interface BoolFlag {
field: keyof NetworkConfig
help: string
}
const bool_flags: BoolFlag[] = [
{ field: 'latency_first', help: 'latency_first_help' },
{ field: 'use_smoltcp', help: 'use_smoltcp_help' },
{ field: 'enable_kcp_proxy', help: 'enable_kcp_proxy_help' },
{ field: 'disable_kcp_input', help: 'disable_kcp_input_help' },
{ field: 'disable_p2p', help: 'disable_p2p_help' },
{ field: 'bind_device', help: 'bind_device_help' },
{ field: 'no_tun', help: 'no_tun_help' },
{ field: 'enable_exit_node', help: 'enable_exit_node_help' },
{ field: 'relay_all_peer_rpc', help: 'relay_all_peer_rpc_help' },
{ field: 'multi_thread', help: 'multi_thread_help' },
{ field: 'proxy_forward_by_system', help: 'proxy_forward_by_system_help' },
{ field: 'disable_encryption', help: 'disable_encryption_help' },
]
</script>
<template>
<div class="frontend-lib">
<div class="flex flex-col h-full">
<div class="flex flex-col">
<div class="w-10/12 self-center ">
<div class="w-11/12 self-center ">
<Panel :header="t('basic_settings')">
<div class="flex flex-col gap-y-2">
<div class="flex flex-row gap-x-9 flex-wrap">
@@ -188,11 +225,18 @@ function searchListenerSuggestions(e: { query: string }) {
<Panel :header="t('advanced_settings')" toggleable collapsed>
<div class="flex flex-col gap-y-2">
<div class="flex flex-row gap-x-9 flex-wrap">
<div class="flex flex-col gap-2 basis-5/12 grow">
<div class="flex items-center">
<Checkbox v-model="curNetwork.latency_first" input-id="use_latency_first" :binary="true" />
<label for="use_latency_first" class="ml-2"> {{ t('use_latency_first') }} </label>
<label> {{ t('flags_switch') }} </label>
<div class="flex flex-row flex-wrap">
<div class="basis-[20rem] flex items-center" v-for="flag in bool_flags">
<Checkbox v-model="curNetwork[flag.field]" :input-id="flag.field" :binary="true" />
<label :for="flag.field" class="ml-2"> {{ t(flag.field) }} </label>
<span class="pi pi-question-circle ml-2 self-center" v-tooltip="t(flag.help)"></span>
</div>
</div>
</div>
</div>
@@ -220,17 +264,20 @@ function searchListenerSuggestions(e: { query: string }) {
<ToggleButton v-model="curNetwork.enable_vpn_portal" on-icon="pi pi-check" off-icon="pi pi-times"
:on-label="t('off_text')" :off-label="t('on_text')" class="w-48" />
<div v-if="curNetwork.enable_vpn_portal" class="items-center flex flex-row gap-x-4">
<div class="min-w-64">
<InputGroup>
<InputText v-model="curNetwork.vpn_portal_client_network_addr"
:placeholder="t('vpn_portal_client_network')" />
<InputGroupAddon>
<span>/{{ curNetwork.vpn_portal_client_network_len }}</span>
</InputGroupAddon>
</InputGroup>
<InputNumber v-model="curNetwork.vpn_portal_listen_port" :allow-empty="false" :format="false"
:min="0" :max="65535" class="w-8/12" fluid />
<div class="flex flex-row gap-x-9 flex-wrap w-full">
<div class="flex flex-col gap-2 basis-8/12 grow">
<InputGroup>
<InputText v-model="curNetwork.vpn_portal_client_network_addr"
:placeholder="t('vpn_portal_client_network')" />
<InputGroupAddon>
<span>/{{ curNetwork.vpn_portal_client_network_len }}</span>
</InputGroupAddon>
</InputGroup>
</div>
<div class="flex flex-col gap-2 basis-3/12 grow">
<InputNumber v-model="curNetwork.vpn_portal_listen_port" :allow-empty="false" :format="false"
:min="0" :max="65535" fluid />
</div>
</div>
</div>
</div>
@@ -261,6 +308,73 @@ function searchListenerSuggestions(e: { query: string }) {
:placeholder="t('dev_name_placeholder')" />
</div>
</div>
<div class="flex flex-row gap-x-9 flex-wrap">
<div class="flex flex-col gap-2 basis-5/12 grow">
<div class="flex">
<label for="relay_network_whitelist">{{ t('relay_network_whitelist') }}</label>
<span class="pi pi-question-circle ml-2 self-center"
v-tooltip="t('relay_network_whitelist_help')"></span>
</div>
<ToggleButton v-model="curNetwork.enable_relay_network_whitelist" on-icon="pi pi-check" off-icon="pi pi-times"
:on-label="t('off_text')" :off-label="t('on_text')" class="w-48" />
<div v-if="curNetwork.enable_relay_network_whitelist" class="items-center flex flex-row gap-x-4">
<div class="min-w-64 w-full">
<AutoComplete id="relay_network_whitelist" v-model="curNetwork.relay_network_whitelist"
:placeholder="t('relay_network_whitelist')" class="w-full" multiple fluid
:suggestions="whitelistSuggestions" @complete="searchWhitelistSuggestions" />
</div>
</div>
</div>
</div>
<div class="flex flex-row gap-x-9 flex-wrap ">
<div class="flex flex-col gap-2 grow">
<div class="flex">
<label for="routes">{{ t('manual_routes') }}</label>
<span class="pi pi-question-circle ml-2 self-center" v-tooltip="t('manual_routes_help')"></span>
</div>
<ToggleButton v-model="curNetwork.enable_manual_routes" on-icon="pi pi-check" off-icon="pi pi-times"
:on-label="t('off_text')" :off-label="t('on_text')" class="w-48" />
<div v-if="curNetwork.enable_manual_routes" class="items-center flex flex-row gap-x-4">
<div class="min-w-64 w-full">
<AutoComplete id="routes" v-model="curNetwork.routes"
:placeholder="t('chips_placeholder', ['192.168.0.0/16'])" class="w-full" multiple fluid
:suggestions="inetSuggestions" @complete="searchInetSuggestions" />
</div>
</div>
</div>
</div>
<div class="flex flex-row gap-x-9 flex-wrap ">
<div class="flex flex-col gap-2 grow">
<div class="flex">
<label for="socks5_port">{{ t('socks5') }}</label>
<span class="pi pi-question-circle ml-2 self-center" v-tooltip="t('socks5_help')"></span>
</div>
<ToggleButton v-model="curNetwork.enable_socks5" on-icon="pi pi-check" off-icon="pi pi-times"
:on-label="t('off_text')" :off-label="t('on_text')" class="w-48" />
<div v-if="curNetwork.enable_socks5" class="items-center flex flex-row gap-x-4">
<div class="min-w-64 w-full">
<InputNumber id="socks5_port" v-model="curNetwork.socks5_port" aria-describedby="rpc_port-help"
:format="false" :allow-empty="false" :min="0" :max="65535" class="w-full"/>
</div>
</div>
</div>
</div>
<div class="flex flex-row gap-x-9 flex-wrap w-full">
<div class="flex flex-col gap-2 grow p-fluid">
<div class="flex">
<label for="exit_nodes">{{ t('exit_nodes') }}</label>
<span class="pi pi-question-circle ml-2 self-center" v-tooltip="t('exit_nodes_help')"></span>
</div>
<AutoComplete id="exit_nodes" v-model="curNetwork.exit_nodes"
:placeholder="t('chips_placeholder', ['192.168.8.8'])" class="w-full" multiple fluid
:suggestions="exitNodesSuggestions" @complete="searchExitNodesSuggestions" />
</div>
</div>
</div>
</Panel>

View File

@@ -5,7 +5,7 @@ import { NetworkInstance, type NodeInfo, type PeerRoutePair } from '../types/net
import { useI18n } from 'vue-i18n';
import { computed, onMounted, onUnmounted, ref } from 'vue';
import { ipv4InetToString, ipv4ToString, ipv6ToString } from '../modules/utils';
import { DataTable, Column, Tag, Chip, Button, Dialog, ScrollPanel, Timeline, Divider, Card, } from 'primevue';
import { DataTable, Column, Tag, Chip, Button, Dialog, ScrollPanel, Timeline, Divider, Panel, } from 'primevue';
const props = defineProps<{
curNetworkInst: NetworkInstance | null,
@@ -303,14 +303,15 @@ function showEventLogs() {
<template>
<div class="frontend-lib">
<Dialog v-model:visible="dialogVisible" modal :header="t(dialogHeader)" class="w-2/3 h-auto">
<Dialog v-model:visible="dialogVisible" modal :header="t(dialogHeader)" class="w-full h-auto max-h-full"
:baseZIndex="2000">
<ScrollPanel v-if="dialogHeader === 'vpn_portal_config'">
<pre>{{ dialogContent }}</pre>
</ScrollPanel>
<Timeline v-else :value="dialogContent">
<template #opposite="slotProps">
<small class="text-surface-500 dark:text-surface-400">{{ useTimeAgo(Date.parse(slotProps.item.time))
}}</small>
}}</small>
</template>
<template #content="slotProps">
<HumanEvent :event="slotProps.item.event" />
@@ -318,107 +319,101 @@ function showEventLogs() {
</Timeline>
</Dialog>
<Card v-if="curNetworkInst?.error_msg">
<template #title>
<Panel v-if="curNetworkInst?.error_msg">
<template #header>
Run Network Error
</template>
<template #content>
<div class="flex flex-col gap-y-5">
<div class="text-red-500">
{{ curNetworkInst.error_msg }}
</div>
<div class="flex flex-col gap-y-5">
<div class="text-red-500">
{{ curNetworkInst.error_msg }}
</div>
</template>
</Card>
</div>
</Panel>
<template v-else>
<Card>
<template #title>
<Panel>
<template #header>
{{ t('my_node_info') }}
</template>
<template #content>
<div class="flex w-full flex-col gap-y-5">
<div class="m-0 flex flex-row justify-center gap-x-5">
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid green">
<div class="font-bold">
{{ t('peer_count') }}
</div>
<div class="text-5xl mt-1">
{{ peerCount }}
</div>
<div class="flex w-full flex-col gap-y-5">
<div class="m-0 flex flex-row justify-center gap-x-5">
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid green">
<div class="font-bold">
{{ t('peer_count') }}
</div>
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid purple">
<div class="font-bold">
{{ t('upload') }}
</div>
<div class="text-xl mt-2">
{{ txRate }}/s
</div>
</div>
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid fuchsia">
<div class="font-bold">
{{ t('download') }}
</div>
<div class="text-xl mt-2">
{{ rxRate }}/s
</div>
<div class="text-5xl mt-1">
{{ peerCount }}
</div>
</div>
<div class="flex flex-row items-center flex-wrap w-full max-h-40 overflow-scroll">
<Chip v-for="(chip, i) in myNodeInfoChips" :key="i" :label="chip.label" :icon="chip.icon"
class="mr-2 mt-2 text-sm" />
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid purple">
<div class="font-bold">
{{ t('upload') }}
</div>
<div class="text-xl mt-2">
{{ txRate }}/s
</div>
</div>
<div v-if="myNodeInfo" class="m-0 flex flex-row justify-center gap-x-5 text-sm">
<Button severity="info" :label="t('show_vpn_portal_config')" @click="showVpnPortalConfig" />
<Button severity="info" :label="t('show_event_log')" @click="showEventLogs" />
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid fuchsia">
<div class="font-bold">
{{ t('download') }}
</div>
<div class="text-xl mt-2">
{{ rxRate }}/s
</div>
</div>
</div>
</template>
</Card>
<div class="flex flex-row items-center flex-wrap w-full max-h-40 overflow-scroll">
<Chip v-for="(chip, i) in myNodeInfoChips" :key="i" :label="chip.label" :icon="chip.icon"
class="mr-2 mt-2 text-sm" />
</div>
<div v-if="myNodeInfo" class="m-0 flex flex-row justify-center gap-x-5 text-sm">
<Button severity="info" :label="t('show_vpn_portal_config')" @click="showVpnPortalConfig" />
<Button severity="info" :label="t('show_event_log')" @click="showEventLogs" />
</div>
</div>
</Panel>
<Divider />
<Card>
<template #title>
<Panel>
<template #header>
{{ t('peer_info') }}
</template>
<template #content>
<DataTable :value="peerRouteInfos" column-resize-mode="fit" table-class="w-full">
<Column :field="ipFormat" :header="t('virtual_ipv4')" />
<Column :header="t('hostname')">
<template #body="slotProps">
<div v-if="!slotProps.data.route.cost || !slotProps.data.route.feature_flag.is_public_server"
v-tooltip="slotProps.data.route.hostname">
{{
slotProps.data.route.hostname }}
</div>
<div v-else v-tooltip="slotProps.data.route.hostname" class="space-x-1">
<Tag v-if="slotProps.data.route.feature_flag.is_public_server" severity="info" value="Info">
{{ t('status.server') }}
</Tag>
<Tag v-if="slotProps.data.route.feature_flag.avoid_relay_data" severity="warn" value="Warn">
{{ t('status.relay') }}
</Tag>
</div>
</template>
</Column>
<Column :field="routeCost" :header="t('route_cost')" />
<Column :field="latencyMs" :header="t('latency')" />
<Column :field="txBytes" :header="t('upload_bytes')" />
<Column :field="rxBytes" :header="t('download_bytes')" />
<Column :field="lossRate" :header="t('loss_rate')" />
<Column :header="t('status.version')">
<template #body="slotProps">
<span>{{ version(slotProps.data) }}</span>
</template>
</Column>
</DataTable>
</template>
</Card>
<DataTable :value="peerRouteInfos" column-resize-mode="fit" table-class="w-full">
<Column :field="ipFormat" :header="t('virtual_ipv4')" />
<Column :header="t('hostname')">
<template #body="slotProps">
<div v-if="!slotProps.data.route.cost || !slotProps.data.route.feature_flag.is_public_server"
v-tooltip="slotProps.data.route.hostname">
{{
slotProps.data.route.hostname }}
</div>
<div v-else v-tooltip="slotProps.data.route.hostname" class="space-x-1">
<Tag v-if="slotProps.data.route.feature_flag.is_public_server" severity="info" value="Info">
{{ t('status.server') }}
</Tag>
<Tag v-if="slotProps.data.route.feature_flag.avoid_relay_data" severity="warn" value="Warn">
{{ t('status.relay') }}
</Tag>
</div>
</template>
</Column>
<Column :field="routeCost" :header="t('route_cost')" />
<Column :field="latencyMs" :header="t('latency')" />
<Column :field="txBytes" :header="t('upload_bytes')" />
<Column :field="rxBytes" :header="t('download_bytes')" />
<Column :field="lossRate" :header="t('loss_rate')" />
<Column :header="t('status.version')">
<template #body="slotProps">
<span>{{ version(slotProps.data) }}</span>
</template>
</Column>
</DataTable>
</Panel>
</template>
</div>
</template>

View File

@@ -69,6 +69,62 @@ upload_bytes: 上传
download_bytes: 下载
loss_rate: 丢包率
flags_switch: 功能开关
latency_first: 开启延迟优先模式
latency_first_help: 忽略中转跳数,选择总延迟最低的路径
use_smoltcp: 使用用户态协议栈
use_smoltcp_help: 使用用户态 TCP/IP 协议栈,避免操作系统防火墙问题导致无法子网代理 / KCP代理。
enable_kcp_proxy: 启用 KCP 代理
enable_kcp_proxy_help: 将 TCP 流量转为 KCP 流量,降低传输延迟,提升传输速度。
disable_kcp_input: 禁用 KCP 输入
disable_kcp_input_help: 禁用 KCP 入站流量,其他开启 KCP 代理的节点仍然使用 TCP 连接到本节点。
disable_p2p: 禁用 P2P
disable_p2p_help: 禁用 P2P 模式,所有流量通过手动指定的服务器中转。
bind_device: 仅使用物理网卡
bind_device_help: 仅使用物理网卡,避免 EasyTier 通过其他虚拟网建立连接。
no_tun: 无 TUN 模式
no_tun_help: 不使用 TUN 网卡,适合无管理员权限时使用。本节点仅允许被访问。访问其他节点需要使用 SOCK5
enable_exit_node: 启用出口节点
enable_exit_node_help: 允许此节点成为出口节点
relay_all_peer_rpc: 转发RPC包
relay_all_peer_rpc_help: |
允许转发所有对等节点的RPC数据包即使对等节点不在转发网络白名单中。
这可以帮助白名单外网络中的对等节点建立P2P连接。
multi_thread: 启用多线程
multi_thread_help: 使用多线程运行时
proxy_forward_by_system: 系统转发
proxy_forward_by_system_help: 通过系统内核转发子网代理数据包禁用内置NAT
disable_encryption: 禁用加密
disable_encryption_help: 禁用对等节点通信的加密默认为false必须与对等节点相同
relay_network_whitelist: 网络白名单
relay_network_whitelist_help: |
仅转发白名单网络的流量,支持通配符字符串。多个网络名称间可以使用英文空格间隔。
如果该参数为空,则禁用转发。默认允许所有网络。
例如:'*'(所有网络),'def*'以def为前缀的网络'net1 net2'只允许net1和net2
manual_routes: 自定义路由
manual_routes_help: 手动分配路由CIDR将禁用子网代理和从对等节点传播的wireguard路由。例如192.168.0.0/16
socks5: socks5服务器
socks5_help: |
启用 socks5 服务器,允许 socks5 客户端访问虚拟网络. 格式: <端口>例如1080
exit_nodes: 出口节点列表
exit_nodes_help: 转发所有流量的出口节点虚拟IPv4地址优先级由列表顺序决定
status:
version: 内核版本
local: 本机
@@ -113,3 +169,4 @@ event:
VpnPortalClientDisconnected: VPN门户客户端已断开连接
DhcpIpv4Changed: DHCP IPv4地址更改
DhcpIpv4Conflicted: DHCP IPv4地址冲突

View File

@@ -68,6 +68,63 @@ upload_bytes: Upload
download_bytes: Download
loss_rate: Loss Rate
flags_switch: Feature Switch
latency_first: Enable Latency-First Mode
latency_first_help: Ignore hop count and select the path with the lowest total latency
use_smoltcp: Use User-Space Protocol Stack
use_smoltcp_help: Use a user-space TCP/IP stack to avoid issues with operating system firewalls blocking subnet or KCP proxy functionality.
enable_kcp_proxy: Enable KCP Proxy
enable_kcp_proxy_help: Convert TCP traffic to KCP traffic to reduce latency and boost transmission speed.
disable_kcp_input: Disable KCP Input
disable_kcp_input_help: Disable inbound KCP traffic, while nodes with KCP proxy enabled continue to connect using TCP.
disable_p2p: Disable P2P
disable_p2p_help: Disable P2P mode; route all traffic through a manually specified relay server.
bind_device: Bind to Physical Device Only
bind_device_help: Use only the physical network interface to prevent EasyTier from connecting via virtual networks.
no_tun: No TUN Mode
no_tun_help: Do not use a TUN interface, suitable for environments without administrator privileges. This node is only accessible; accessing other nodes requires SOCKS5.
enable_exit_node: Enable Exit Node
enable_exit_node_help: Allow this node to be an exit node
relay_all_peer_rpc: Relay RPC Packets
relay_all_peer_rpc_help: |
Relay all peer rpc packets, even if the peer is not in the relay network whitelist.
This can help peers not in relay network whitelist to establish p2p connection.
multi_thread: Multi Thread
multi_thread_help: Use multi-thread runtime
proxy_forward_by_system: System Forward
proxy_forward_by_system_help: Forward packet to proxy networks via system kernel, disable internal nat for network proxy
disable_encryption: Disable Encryption
disable_encryption_help: Disable encryption for peers communication, default is false, must be same with peers
relay_network_whitelist: Network Whitelist
relay_network_whitelist_help: |
Only forward traffic from the whitelist networks, supporting wildcard strings, multiple network names can be separated by spaces.
If this parameter is empty, forwarding is disabled. By default, all networks are allowed.
e.g.: '*' (all networks), 'def*' (networks with the prefix 'def'), 'net1 net2' (only allow net1 and net2)
manual_routes: Manual Route
manual_routes_help: |
Assign routes cidr manually, will disable subnet proxy and wireguard routes propagated from peers. e.g.:192.168.0.0/16
socks5: Socks5 Server
socks5_help: |
Enable socks5 server, allow socks5 client to access virtual network. format: <port>, e.g.: 1080
exit_nodes: Exit Nodes
exit_nodes_help: Exit nodes to forward all traffic to, a virtual ipv4 address, priority is determined by the order of the list
status:
version: Version
local: Local

View File

@@ -1,5 +1,7 @@
import axios, { AxiosError, AxiosInstance, AxiosResponse, InternalAxiosRequestConfig } from 'axios';
import { Md5 } from 'ts-md5'
import { UUID } from './utils';
import { NetworkConfig } from '../types/network';
export interface ValidateConfigResponse {
toml_config: string;
@@ -31,6 +33,20 @@ export interface Summary {
device_count: number;
}
export interface ListNetworkInstanceIdResponse {
running_inst_ids: Array<UUID>,
disabled_inst_ids: Array<UUID>,
}
export interface GenerateConfigRequest {
config: NetworkConfig;
}
export interface GenerateConfigResponse {
toml_config?: string;
error?: string;
}
export class ApiClient {
private client: AxiosInstance;
private authFailedCb: Function | undefined;
@@ -141,6 +157,17 @@ export class ApiClient {
return response.machines;
}
public async list_deivce_instance_ids(machine_id: string): Promise<ListNetworkInstanceIdResponse> {
const response = await this.client.get<any, ListNetworkInstanceIdResponse>('/machines/' + machine_id + '/networks');
return response;
}
public async update_device_instance_state(machine_id: string, inst_id: string, disabled: boolean): Promise<undefined> {
await this.client.put<string>('/machines/' + machine_id + '/networks/' + inst_id, {
disabled: disabled,
});
}
public async get_network_info(machine_id: string, inst_id: string): Promise<any> {
const response = await this.client.get<any, Record<string, any>>('/machines/' + machine_id + '/networks/info/' + inst_id);
return response.info.map;
@@ -176,6 +203,18 @@ export class ApiClient {
public captcha_url() {
return this.client.defaults.baseURL + '/auth/captcha';
}
public async generate_config(config: GenerateConfigRequest): Promise<GenerateConfigResponse> {
try {
const response = await this.client.post<any, GenerateConfigResponse>('/generate-config', config);
return response;
} catch (error) {
if (error instanceof AxiosError) {
return { error: error.response?.data };
}
return { error: 'Unknown error: ' + error };
}
}
}
export default ApiClient;

View File

@@ -1,8 +1,6 @@
@import 'primeicons/primeicons.css';
@import 'floating-vue/dist/style.css';
.frontend-lib {
@layer tailwind-base, primevue, tailwind-utilities;
@layer tailwind-base {
@@ -51,4 +49,6 @@
background-color: #0000005d;
}
.v-popper__inner {
white-space: pre-wrap;
}

View File

@@ -35,6 +35,29 @@ export interface NetworkConfig {
latency_first: boolean
dev_name: string
use_smoltcp?: boolean
enable_kcp_proxy?: boolean
disable_kcp_input?: boolean
disable_p2p?: boolean
bind_device?: boolean
no_tun?: boolean
enable_exit_node?: boolean
relay_all_peer_rpc?: boolean
multi_thread?: boolean
proxy_forward_by_system?: boolean
disable_encryption?: boolean
enable_relay_network_whitelist?: boolean
relay_network_whitelist: string[]
enable_manual_routes: boolean
routes: string[]
exit_nodes: string[]
enable_socks5?: boolean
socks5_port: number
}
export function DEFAULT_NETWORK_CONFIG(): NetworkConfig {
@@ -67,8 +90,27 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig {
'wg://0.0.0.0:11011',
],
rpc_port: 0,
latency_first: true,
latency_first: false,
dev_name: '',
use_smoltcp: false,
enable_kcp_proxy: false,
disable_kcp_input: false,
disable_p2p: false,
bind_device: true,
no_tun: false,
enable_exit_node: false,
relay_all_peer_rpc: false,
multi_thread: true,
proxy_forward_by_system: false,
disable_encryption: false,
enable_relay_network_whitelist: false,
relay_network_whitelist: [],
enable_manual_routes: false,
routes: [],
exit_nodes: [],
enable_socks5: false,
socks5_port: 1080,
}
}

View File

@@ -23,10 +23,10 @@
"@vitejs/plugin-vue": "^5.1.4",
"autoprefixer": "^10.4.20",
"postcss": "^8.4.47",
"tailwindcss": "^3.4.14",
"tailwindcss": "=3.4.17",
"typescript": "~5.6.2",
"vite": "^5.4.10",
"vite-plugin-singlefile": "^2.0.3",
"vue-tsc": "^2.1.10"
}
}
}

View File

@@ -0,0 +1,39 @@
<script setup lang="ts">
import { NetworkTypes } from 'easytier-frontend-lib';
import { ref } from 'vue';
import { Api } from 'easytier-frontend-lib'
const defaultApiHost = 'https://config-server.easytier.cn'
const api = new Api.ApiClient(defaultApiHost);
const newNetworkConfig = ref<NetworkTypes.NetworkConfig>(NetworkTypes.DEFAULT_NETWORK_CONFIG());
const toml_config = ref<string>("Press 'Run Network' to generate TOML configuration");
const generateConfig = (config: NetworkTypes.NetworkConfig) => {
api.generate_config({
config: config
}).then((res) => {
if (res.error) {
toml_config.value = res.error;
} else if (res.toml_config) {
toml_config.value = res.toml_config;
} else {
toml_config.value = "Api server returned an unexpected response";
}
});
};
</script>
<template>
<div class="flex items-center justify-center m-5">
<div class="sm:block md:flex w-full">
<div class="sm:w-full md:w-1/2 p-4">
<Config :cur-network="newNetworkConfig" @run-network="generateConfig" />
</div>
<div class="sm:w-full md:w-1/2 p-4 bg-gray-100">
<pre class="whitespace-pre-wrap">{{ toml_config }}</pre>
</div>
</div>
</div>
</template>

View File

@@ -27,7 +27,7 @@ const loadDevices = async () => {
public_ip: device.client_url,
running_network_instances: device.info?.running_network_instances.map((instance: any) => Utils.UuidToStr(instance)),
running_network_count: device.info?.running_network_instances.length,
report_time: device.info?.report_time,
report_time: new Date(device.info?.report_time).toLocaleString(),
easytier_version: device.info?.easytier_version,
machine_id: Utils.UuidToStr(device.info?.machine_id),
});
@@ -102,7 +102,7 @@ const selectedDeviceHostname = computed<string | undefined>(() => {
</DataTable>
<Drawer v-model:visible="deviceManageVisible" :header="`Manage ${selectedDeviceHostname}`" position="right"
class="w-1/2 min-w-96">
:baseZIndex=1000 class="w-3/5 min-w-96">
<RouterView v-slot="{ Component }">
<component :is="Component" :api="api" :deviceList="deviceList" @update="loadDevices" />
</RouterView>

View File

@@ -1,7 +1,7 @@
<script setup lang="ts">
import { Toolbar, IftaLabel, Select, Button, ConfirmPopup, Dialog, useConfirm, useToast } from 'primevue';
import {Toolbar, IftaLabel, Select, Button, ConfirmPopup, Dialog, useConfirm, useToast, Divider} from 'primevue';
import { NetworkTypes, Status, Utils, Api, } from 'easytier-frontend-lib';
import { computed, onMounted, onUnmounted, ref } from 'vue';
import { watch, computed, onMounted, onUnmounted, ref } from 'vue';
import { useRoute, useRouter } from 'vue-router';
const props = defineProps<{
@@ -27,15 +27,24 @@ const deviceInfo = computed<Utils.DeviceInfo | undefined | null>(() => {
return deviceId.value ? props.deviceList?.find((device) => device.machine_id === deviceId.value) : null;
});
const configFile = ref();
const curNetworkInfo = ref<NetworkTypes.NetworkInstance | null>(null);
const isEditing = ref(false);
const showCreateNetworkDialog = ref(false);
const newNetworkConfig = ref<NetworkTypes.NetworkConfig>(NetworkTypes.DEFAULT_NETWORK_CONFIG());
const listInstanceIdResponse = ref<Api.ListNetworkInstanceIdResponse | undefined>(undefined);
const instanceIdList = computed(() => {
let insts = deviceInfo.value?.running_network_instances || [];
let options = insts.map((instance: string) => {
let insts = new Set(deviceInfo.value?.running_network_instances || []);
let t = listInstanceIdResponse.value;
if (t) {
t.running_inst_ids.forEach((u) => insts.add(Utils.UuidToStr(u)));
t.disabled_inst_ids.forEach((u) => insts.add(Utils.UuidToStr(u)));
}
let options = Array.from(insts).map((instance: string) => {
return { uuid: instance };
});
return options;
@@ -51,6 +60,53 @@ const selectedInstanceId = computed({
}
});
const needShowNetworkStatus = computed(() => {
if (!selectedInstanceId.value) {
// nothing selected
return false;
}
if (networkIsDisabled.value) {
// network is disabled
return false;
}
return true;
})
const networkIsDisabled = computed(() => {
if (!selectedInstanceId.value) {
return false;
}
return listInstanceIdResponse.value?.disabled_inst_ids.map(Utils.UuidToStr).includes(selectedInstanceId.value?.uuid);
});
watch(selectedInstanceId, async (newVal, oldVal) => {
if (newVal?.uuid !== oldVal?.uuid && networkIsDisabled.value) {
await loadDisabledNetworkConfig();
}
});
const disabledNetworkConfig = ref<NetworkTypes.NetworkConfig | undefined>(undefined);
const loadDisabledNetworkConfig = async () => {
disabledNetworkConfig.value = undefined;
if (!deviceId.value || !selectedInstanceId.value) {
return;
}
let ret = await props.api?.get_network_config(deviceId.value, selectedInstanceId.value.uuid);
disabledNetworkConfig.value = ret;
}
const updateNetworkState = async (disabled: boolean) => {
if (!deviceId.value || !selectedInstanceId.value) {
return;
}
await props.api?.update_device_instance_state(deviceId.value, selectedInstanceId.value.uuid, disabled);
await loadNetworkInstanceIds();
}
const confirm = useConfirm();
const confirmDeleteNetwork = (event: any) => {
confirm.require({
@@ -128,6 +184,15 @@ const editNetwork = async () => {
}
}
const loadNetworkInstanceIds = async () => {
if (!deviceId.value) {
return;
}
listInstanceIdResponse.value = await props.api?.list_deivce_instance_ids(deviceId.value);
console.debug("loadNetworkInstanceIds", listInstanceIdResponse.value);
}
const loadDeviceInfo = async () => {
if (!deviceId.value || !instanceId.value) {
return;
@@ -144,9 +209,68 @@ const loadDeviceInfo = async () => {
} as NetworkTypes.NetworkInstance;
}
const exportConfig = async () => {
if (!deviceId.value || !instanceId.value) {
toast.add({ severity: 'error', summary: 'Error', detail: 'No network instance selected', life: 2000 });
return;
}
try {
let ret = await props.api?.get_network_config(deviceId.value, instanceId.value);
delete ret.instance_id;
exportJsonFile(JSON.stringify(ret, null, 2),instanceId.value +'.json');
} catch (e: any) {
console.error(e);
toast.add({ severity: 'error', summary: 'Error', detail: 'Failed to export network config, error: ' + JSON.stringify(e.response.data), life: 2000 });
return;
}
}
const importConfig = () => {
configFile.value.click();
}
const handleFileUpload = (event: Event) => {
const files = (event.target as HTMLInputElement).files;
const file = files ? files[0] : null;
if (file) {
const reader = new FileReader();
reader.onload = (e) => {
try {
let str = e.target?.result?.toString();
if(str){
const config = JSON.parse(str);
if(config === null || typeof config !== "object"){
throw new Error();
}
Object.assign(newNetworkConfig.value, config);
toast.add({ severity: 'success', summary: 'Import Success', detail: "Config file import success", life: 2000 });
}
} catch (error) {
toast.add({ severity: 'error', summary: 'Error', detail: 'Config file parse error.', life: 2000 });
}
configFile.value.value = null;
}
reader.readAsText(file);
}
}
const exportJsonFile = (context: string, name: string) => {
let url = window.URL.createObjectURL(new Blob([context], { type: 'application/json' }));
let link = document.createElement('a');
link.style.display = 'none';
link.href = url;
link.setAttribute('download', name);
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
window.URL.revokeObjectURL(url);
}
let periodFunc = new Utils.PeriodicTask(async () => {
try {
await loadDeviceInfo();
await Promise.all([loadNetworkInstanceIds(), loadDeviceInfo()]);
} catch (e) {
console.debug(e);
}
@@ -163,9 +287,16 @@ onUnmounted(() => {
</script>
<template>
<input type="file" @change="handleFileUpload" class="hidden" accept="application/json" ref="configFile"/>
<ConfirmPopup></ConfirmPopup>
<Dialog v-model:visible="showCreateNetworkDialog" modal :header="!isEditing ? 'Create New Network' : 'Edit Network'"
:style="{ width: '55rem' }">
<div class="flex flex-col">
<div class="w-11/12 self-center ">
<Button @click="importConfig" icon="pi pi-file-import" label="Import" iconPos="right" />
<Divider />
</div>
</div>
<Config :cur-network="newNetworkConfig" @run-network="createNewNetwork"></Config>
</Dialog>
@@ -182,14 +313,33 @@ onUnmounted(() => {
<div class="gap-x-3 flex">
<Button @click="confirmDeleteNetwork($event)" icon="pi pi-minus" severity="danger" label="Delete"
iconPos="right" />
<Button @click="exportConfig" icon="pi pi-file-export" severity="help" label="Export" iconPos="right" />
<Button @click="editNetwork" icon="pi pi-pen-to-square" label="Edit" iconPos="right" severity="info" />
<Button @click="newNetwork" icon="pi pi-plus" label="Create" iconPos="right" />
</div>
</template>
</Toolbar>
<Status v-bind:cur-network-inst="curNetworkInfo" v-if="!!selectedInstanceId">
</Status>
<Divider />
<!-- For running network, show the status -->
<div v-if="needShowNetworkStatus">
<Status v-bind:cur-network-inst="curNetworkInfo" v-if="needShowNetworkStatus">
</Status>
<Divider />
<div class="text-center">
<Button @click="updateNetworkState(true)" label="Disable Network" severity="warn" />
</div>
</div>
<!-- For disabled network, show the config -->
<div v-if="networkIsDisabled">
<Config :cur-network="disabledNetworkConfig" @run-network="updateNetworkState(false)"
v-if="disabledNetworkConfig" />
<div v-else>
<div class="text-center text-xl"> Network is disabled, Loading config... </div>
</div>
</div>
<div class="grid grid-cols-1 gap-4 place-content-center h-full" v-if="!selectedInstanceId">
<div class="text-center text-xl"> Select or create a network instance to manage </div>

View File

@@ -1,5 +1,5 @@
<script setup lang="ts">
import { computed, ref } from 'vue';
import { computed, onMounted, ref } from 'vue';
import { Card, InputText, Password, Button, AutoComplete } from 'primevue';
import { useRouter } from 'vue-router';
import { useToast } from 'primevue/usetoast';
@@ -20,8 +20,60 @@ const registerPassword = ref('');
const captcha = ref('');
const captchaSrc = computed(() => api.value.captcha_url());
interface ApiHost {
value: string;
usedAt: number;
}
const isValidHttpUrl = (s: string): boolean => {
let url;
try {
url = new URL(s);
} catch (_) {
return false;
}
return url.protocol === "http:" || url.protocol === "https:";
}
const cleanAndLoadApiHosts = (): Array<ApiHost> => {
const maxHosts = 10;
const apiHosts = localStorage.getItem('apiHosts');
if (apiHosts) {
const hosts: Array<ApiHost> = JSON.parse(apiHosts);
// sort by usedAt
hosts.sort((a, b) => b.usedAt - a.usedAt);
// only keep the first 10
if (hosts.length > maxHosts) {
hosts.splice(maxHosts);
}
localStorage.setItem('apiHosts', JSON.stringify(hosts));
return hosts;
} else {
return [];
}
};
const saveApiHost = (host: string) => {
console.log('Save API Host:', host);
if (!isValidHttpUrl(host)) {
console.error('Invalid API Host:', host);
return;
}
let hosts = cleanAndLoadApiHosts();
const newHost: ApiHost = { value: host, usedAt: Date.now() };
hosts = hosts.filter((h) => h.value !== host);
hosts.push(newHost);
localStorage.setItem('apiHosts', JSON.stringify(hosts));
};
const onSubmit = async () => {
// Add your login logic here
saveApiHost(apiHost.value);
const credential: Api.Credential = { username: username.value, password: password.value, };
let ret = await api.value?.login(credential);
if (ret.success) {
@@ -36,6 +88,7 @@ const onSubmit = async () => {
};
const onRegister = async () => {
saveApiHost(apiHost.value);
const credential: Api.Credential = { username: registerUsername.value, password: registerPassword.value };
const registerReq: Api.RegisterData = { credentials: credential, captcha: captcha.value };
let ret = await api.value?.register(registerReq);
@@ -47,17 +100,36 @@ const onRegister = async () => {
}
};
const defaultApiHost = 'http://easytier.cn:11211'
const apiHost = ref<string>(defaultApiHost)
const getInitialApiHost = (): string => {
const hosts = cleanAndLoadApiHosts();
if (hosts.length > 0) {
return hosts[0].value;
} else {
return defaultApiHost;
}
};
const defaultApiHost = 'https://config-server.easytier.cn'
const apiHost = ref<string>(getInitialApiHost())
const apiHostSuggestions = ref<Array<string>>([])
const apiHostSearch = async (event: { query: string }) => {
apiHostSuggestions.value = [];
let hosts = cleanAndLoadApiHosts();
if (event.query) {
apiHostSuggestions.value.push(event.query);
}
apiHostSuggestions.value.push(defaultApiHost);
hosts.forEach((host) => {
apiHostSuggestions.value.push(host.value);
});
}
onMounted(() => {
let hosts = cleanAndLoadApiHosts();
if (hosts.length === 0) {
saveApiHost(defaultApiHost);
}
});
</script>
<template>
@@ -87,7 +159,7 @@ const apiHostSearch = async (event: { query: string }) => {
</div>
<div class="flex items-center justify-between">
<Button label="Register" type="button" class="w-full"
@click="$router.replace({ name: 'register' })" severity="secondary" />
@click="saveApiHost(apiHost); $router.replace({ name: 'register' })" severity="secondary" />
</div>
</form>
@@ -111,7 +183,7 @@ const apiHostSearch = async (event: { query: string }) => {
</div>
<div class="flex items-center justify-between">
<Button label="Back to Login" type="button" class="w-full"
@click="$router.replace({ name: 'login' })" severity="secondary" />
@click="saveApiHost(apiHost); $router.replace({ name: 'login' })" severity="secondary" />
</div>
</form>
</template>

View File

@@ -1,6 +1,6 @@
import { createApp } from 'vue'
import './style.css'
import 'easytier-frontend-lib/style.css'
import './style.css'
import App from './App.vue'
import EasytierFrontendLib from 'easytier-frontend-lib'
import PrimeVue from 'primevue/config'
@@ -15,6 +15,7 @@ import DeviceManagement from './components/DeviceManagement.vue'
import Dashboard from './components/Dashboard.vue'
import DialogService from 'primevue/dialogservice';
import ToastService from 'primevue/toastservice';
import ConfigGenerator from './components/ConfigGenerator.vue'
const routes = [
{
@@ -66,6 +67,10 @@ const routes = [
}
}
},
{
path: '/config_generator',
component: ConfigGenerator,
}
]
const router = createRouter({

View File

@@ -0,0 +1,24 @@
_version: 2
cli:
db:
en: "path to the sqlite3 database file, used to save all the data"
zh-CN: "sqlite3 数据库文件路径, 用于保存所有数据"
console_log_level:
en: "The log level for the console logger. Possible values: trace, debug, info, warn, error"
zh-CN: "控制台日志级别。可能的值trace, debug, info, warn, error"
file_log_level:
en: "The log level for the file logger. Possible values: trace, debug, info, warn, error"
zh-CN: "文件日志级别。可能的值trace, debug, info, warn, error"
file_log_dir:
en: "The directory to save the log files, default is the current directory"
zh-CN: "保存日志文件的目录,默认为当前目录"
config_server_port:
en: "The port to listen for the config server, used by the easytier-core to connect to"
zh-CN: "配置服务器的监听端口,用于被 easytier-core 连接"
config_server_protocol:
en: "The protocol to listen for the config server, used by the easytier-core to connect to"
zh-CN: "配置服务器的监听协议,用于被 easytier-core 连接, 可能的值udp, tcp"
api_server_port:
en: "The port to listen for the restful server, acting as ApiHost and used by the web frontend"
zh-CN: "restful 服务器的监听端口,作为 ApiHost 并被 web 前端使用"

View File

@@ -93,7 +93,7 @@ impl ClientManager {
.map(|item| item.value().clone())
}
pub fn list_machine_by_token(&self, token: String) -> Vec<url::Url> {
pub async fn list_machine_by_token(&self, token: String) -> Vec<url::Url> {
self.storage.list_token_clients(&token)
}

View File

@@ -1,4 +1,4 @@
use std::{fmt::Debug, sync::Arc};
use std::{fmt::Debug, str::FromStr as _, sync::Arc};
use easytier::{
common::scoped_task::ScopedTask,
@@ -15,6 +15,8 @@ use easytier::{
};
use tokio::sync::{broadcast, RwLock};
use crate::db::ListNetworkProps;
use super::storage::{Storage, StorageToken, WeakRefStorage};
#[derive(Debug)]
@@ -87,10 +89,20 @@ impl WebServerService for SessionRpcService {
.map(Into::into)
.unwrap_or(uuid::Uuid::new_v4()),
});
if let Ok(storage) = Storage::try_from(data.storage.clone()) {
storage.add_client(data.storage_token.as_ref().unwrap().clone());
}
}
if let Ok(storage) = Storage::try_from(data.storage.clone()) {
let Ok(report_time) = chrono::DateTime::<chrono::Local>::from_str(&req.report_time)
else {
tracing::error!("Failed to parse report time: {:?}", req.report_time);
return Ok(HeartbeatResponse {});
};
storage.update_client(
data.storage_token.as_ref().unwrap().clone(),
report_time.timestamp(),
);
}
let _ = data.notifier.send(req);
Ok(HeartbeatResponse {})
}
@@ -196,7 +208,11 @@ impl Session {
let local_configs = match storage
.db
.list_network_configs(user_id, Some(req.machine_id.unwrap().into()), true)
.list_network_configs(
user_id,
Some(req.machine_id.unwrap().into()),
ListNetworkProps::EnabledOnly,
)
.await
{
Ok(configs) => configs,

View File

@@ -1,6 +1,6 @@
use std::sync::{Arc, Weak};
use dashmap::{DashMap, DashSet};
use dashmap::DashMap;
use crate::db::Db;
@@ -12,11 +12,19 @@ pub struct StorageToken {
pub machine_id: uuid::Uuid,
}
#[derive(Debug, Clone)]
struct ClientInfo {
client_url: url::Url,
machine_id: uuid::Uuid,
token: String,
report_time: i64,
}
#[derive(Debug)]
pub struct StorageInner {
// some map for indexing
pub token_clients_map: DashMap<String, DashSet<url::Url>>,
pub machine_client_url_map: DashMap<uuid::Uuid, DashSet<url::Url>>,
token_clients_map: DashMap<String, DashMap<uuid::Uuid, ClientInfo>>,
machine_client_url_map: DashMap<uuid::Uuid, ClientInfo>,
pub db: Db,
}
@@ -41,33 +49,57 @@ impl Storage {
}))
}
pub fn add_client(&self, stoken: StorageToken) {
fn remove_mid_to_client_info_map(
map: &DashMap<uuid::Uuid, ClientInfo>,
machine_id: &uuid::Uuid,
client_url: &url::Url,
) {
map.remove_if(&machine_id, |_, v| v.client_url == *client_url);
}
fn update_mid_to_client_info_map(
map: &DashMap<uuid::Uuid, ClientInfo>,
client_info: &ClientInfo,
) {
map.entry(client_info.machine_id)
.and_modify(|e| {
if e.report_time < client_info.report_time {
assert_eq!(e.machine_id, client_info.machine_id);
*e = client_info.clone();
}
})
.or_insert(client_info.clone());
}
pub fn update_client(&self, stoken: StorageToken, report_time: i64) {
let inner = self
.0
.token_clients_map
.entry(stoken.token)
.or_insert_with(DashSet::new);
inner.insert(stoken.client_url.clone());
.entry(stoken.token.clone())
.or_insert_with(DashMap::new);
self.0
.machine_client_url_map
.entry(stoken.machine_id)
.or_insert_with(DashSet::new)
.insert(stoken.client_url.clone());
let client_info = ClientInfo {
client_url: stoken.client_url.clone(),
machine_id: stoken.machine_id,
token: stoken.token.clone(),
report_time,
};
Self::update_mid_to_client_info_map(&inner, &client_info);
Self::update_mid_to_client_info_map(&self.0.machine_client_url_map, &client_info);
}
pub fn remove_client(&self, stoken: &StorageToken) {
self.0.token_clients_map.remove_if(&stoken.token, |_, set| {
set.remove(&stoken.client_url);
Self::remove_mid_to_client_info_map(set, &stoken.machine_id, &stoken.client_url);
set.is_empty()
});
self.0
.machine_client_url_map
.remove_if(&stoken.machine_id, |_, set| {
set.remove(&stoken.client_url);
set.is_empty()
});
Self::remove_mid_to_client_info_map(
&self.0.machine_client_url_map,
&stoken.machine_id,
&stoken.client_url,
);
}
pub fn weak_ref(&self) -> WeakRefStorage {
@@ -78,15 +110,19 @@ impl Storage {
self.0
.machine_client_url_map
.get(&machine_id)
.map(|url| url.iter().next().map(|url| url.clone()))
.flatten()
.map(|info| info.client_url.clone())
}
pub fn list_token_clients(&self, token: &str) -> Vec<url::Url> {
self.0
.token_clients_map
.get(token)
.map(|set| set.iter().map(|url| url.clone()).collect())
.map(|info_map| {
info_map
.iter()
.map(|info| info.value().client_url.clone())
.collect()
})
.unwrap_or_default()
}

View File

@@ -4,7 +4,7 @@ pub mod entity;
use entity::user_running_network_configs;
use sea_orm::{
sea_query::OnConflict, ColumnTrait as _, DatabaseConnection, DbErr, EntityTrait as _,
prelude::Expr, sea_query::OnConflict, ColumnTrait as _, DatabaseConnection, DbErr, EntityTrait,
QueryFilter as _, SqlxSqliteConnector, TransactionTrait as _,
};
use sea_orm_migration::MigratorTrait as _;
@@ -14,6 +14,12 @@ use crate::migrator;
type UserIdInDb = i32;
pub enum ListNetworkProps {
All,
EnabledOnly,
DisabledOnly,
}
#[derive(Debug, Clone)]
pub struct Db {
db_path: String,
@@ -115,17 +121,51 @@ impl Db {
Ok(())
}
pub async fn update_network_config_state(
&self,
user_id: UserIdInDb,
network_inst_id: uuid::Uuid,
disabled: bool,
) -> Result<entity::user_running_network_configs::Model, DbErr> {
use entity::user_running_network_configs as urnc;
urnc::Entity::update_many()
.filter(urnc::Column::UserId.eq(user_id))
.filter(urnc::Column::NetworkInstanceId.eq(network_inst_id.to_string()))
.col_expr(urnc::Column::Disabled, Expr::value(disabled))
.col_expr(
urnc::Column::UpdateTime,
Expr::value(chrono::Local::now().fixed_offset()),
)
.exec(self.orm_db())
.await?;
urnc::Entity::find()
.filter(urnc::Column::UserId.eq(user_id))
.filter(urnc::Column::NetworkInstanceId.eq(network_inst_id.to_string()))
.one(self.orm_db())
.await?
.ok_or(DbErr::RecordNotFound(format!(
"Network config not found for user {} and network instance {}",
user_id, network_inst_id
)))
}
pub async fn list_network_configs(
&self,
user_id: UserIdInDb,
device_id: Option<uuid::Uuid>,
only_enabled: bool,
props: ListNetworkProps,
) -> Result<Vec<user_running_network_configs::Model>, DbErr> {
use entity::user_running_network_configs as urnc;
let configs = urnc::Entity::find().filter(urnc::Column::UserId.eq(user_id));
let configs = if only_enabled {
configs.filter(urnc::Column::Disabled.eq(false))
let configs = if matches!(
props,
ListNetworkProps::EnabledOnly | ListNetworkProps::DisabledOnly
) {
configs
.filter(urnc::Column::Disabled.eq(matches!(props, ListNetworkProps::DisabledOnly)))
} else {
configs
};
@@ -140,6 +180,24 @@ impl Db {
Ok(configs)
}
pub async fn get_network_config(
&self,
user_id: UserIdInDb,
device_id: &uuid::Uuid,
network_inst_id: &String,
) -> Result<Option<user_running_network_configs::Model>, DbErr> {
use entity::user_running_network_configs as urnc;
let config = urnc::Entity::find()
.filter(urnc::Column::UserId.eq(user_id))
.filter(urnc::Column::DeviceId.eq(device_id.to_string()))
.filter(urnc::Column::NetworkInstanceId.eq(network_inst_id))
.one(self.orm_db())
.await?;
Ok(config)
}
pub async fn get_user_id<T: ToString>(
&self,
user_name: T,
@@ -167,7 +225,7 @@ impl Db {
mod tests {
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter as _};
use crate::db::{entity::user_running_network_configs, Db};
use crate::db::{entity::user_running_network_configs, Db, ListNetworkProps};
#[tokio::test]
async fn test_user_network_config_management() {
@@ -209,7 +267,7 @@ mod tests {
assert_ne!(result.update_time, result2.update_time);
assert_eq!(
db.list_network_configs(user_id, Some(device_id), true)
db.list_network_configs(user_id, Some(device_id), ListNetworkProps::All)
.await
.unwrap()
.len(),

View File

@@ -1,11 +1,19 @@
#![allow(dead_code)]
#[macro_use]
extern crate rust_i18n;
use std::sync::Arc;
use clap::{command, Parser};
use easytier::{
common::config::{ConfigLoader, ConsoleLoggerConfig, TomlConfigLoader},
tunnel::udp::UdpTunnelListener,
utils::init_logger,
common::{
config::{ConfigLoader, ConsoleLoggerConfig, FileLoggerConfig, TomlConfigLoader},
constants::EASYTIER_VERSION,
error::Error,
},
tunnel::{tcp::TcpTunnelListener, udp::UdpTunnelListener, TunnelListener},
utils::{init_logger, setup_panic_handler},
};
mod client_manager;
@@ -13,26 +21,105 @@ mod db;
mod migrator;
mod restful;
rust_i18n::i18n!("locales", fallback = "en");
#[derive(Parser, Debug)]
#[command(name = "easytier-core", author, version = EASYTIER_VERSION , about, long_about = None)]
struct Cli {
#[arg(short, long, default_value = "et.db", help = t!("cli.db").to_string())]
db: String,
#[arg(
long,
help = t!("cli.console_log_level").to_string(),
)]
console_log_level: Option<String>,
#[arg(
long,
help = t!("cli.file_log_level").to_string(),
)]
file_log_level: Option<String>,
#[arg(
long,
help = t!("cli.file_log_dir").to_string(),
)]
file_log_dir: Option<String>,
#[arg(
long,
short='c',
default_value = "22020",
help = t!("cli.config_server_port").to_string(),
)]
config_server_port: u16,
#[arg(
long,
short='p',
default_value = "udp",
help = t!("cli.config_server_protocol").to_string(),
)]
config_server_protocol: String,
#[arg(
long,
short='a',
default_value = "11211",
help = t!("cli.api_server_port").to_string(),
)]
api_server_port: u16,
}
pub fn get_listener_by_url(
l: &url::Url,
) -> Result<Box<dyn TunnelListener>, Error> {
Ok(match l.scheme() {
"tcp" => Box::new(TcpTunnelListener::new(l.clone())),
"udp" => Box::new(UdpTunnelListener::new(l.clone())),
_ => {
return Err(Error::InvalidUrl(l.to_string()));
}
})
}
#[tokio::main]
async fn main() {
let locale = sys_locale::get_locale().unwrap_or_else(|| String::from("en-US"));
rust_i18n::set_locale(&locale);
setup_panic_handler();
let cli = Cli::parse();
let config = TomlConfigLoader::default();
config.set_console_logger_config(ConsoleLoggerConfig {
level: Some("trace".to_string()),
level: cli.console_log_level,
});
config.set_file_logger_config(FileLoggerConfig {
dir: cli.file_log_dir,
level: cli.file_log_level,
file: None,
});
init_logger(config, false).unwrap();
// let db = db::Db::new(":memory:").await.unwrap();
let db = db::Db::new("et.db").await.unwrap();
let db = db::Db::new(cli.db).await.unwrap();
let listener = UdpTunnelListener::new("udp://0.0.0.0:22020".parse().unwrap());
let listener = get_listener_by_url(
&format!("{}://0.0.0.0:{}", cli.config_server_protocol, cli.config_server_port).parse().unwrap(),
)
.unwrap();
let mut mgr = client_manager::ClientManager::new(db.clone());
mgr.serve(listener).await.unwrap();
let mgr = Arc::new(mgr);
let mut restful_server =
restful::RestfulServer::new("0.0.0.0:11211".parse().unwrap(), mgr.clone(), db)
.await
.unwrap();
let mut restful_server = restful::RestfulServer::new(
format!("0.0.0.0:{}", cli.api_server_port).parse().unwrap(),
mgr.clone(),
db,
)
.await
.unwrap();
restful_server.start().await.unwrap();
tokio::signal::ctrl_c().await.unwrap();
}

View File

@@ -6,11 +6,14 @@ mod users;
use std::{net::SocketAddr, sync::Arc};
use axum::http::StatusCode;
use axum::routing::post;
use axum::{extract::State, routing::get, Json, Router};
use axum_login::tower_sessions::{ExpiredDeletion, SessionManagerLayer};
use axum_login::{login_required, AuthManagerLayerBuilder, AuthzBackend};
use axum_messages::MessagesManagerLayer;
use easytier::common::config::ConfigLoader;
use easytier::common::scoped_task::ScopedTask;
use easytier::launcher::NetworkConfig;
use easytier::proto::rpc_types;
use network::NetworkApi;
use sea_orm::DbErr;
@@ -48,6 +51,17 @@ struct GetSummaryJsonResp {
device_count: u32,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct GenerateConfigRequest {
config: NetworkConfig,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct GenerateConfigResponse {
error: Option<String>,
toml_config: Option<String>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct Error {
message: String,
@@ -121,7 +135,9 @@ impl RestfulServer {
return Err((StatusCode::UNAUTHORIZED, other_error("No such user").into()));
};
let machines = client_mgr.list_machine_by_token(user.tokens[0].clone());
let machines = client_mgr
.list_machine_by_token(user.tokens[0].clone())
.await;
Ok(GetSummaryJsonResp {
device_count: machines.len() as u32,
@@ -129,6 +145,24 @@ impl RestfulServer {
.into())
}
async fn handle_generate_config(
Json(req): Json<GenerateConfigRequest>,
) -> Result<Json<GenerateConfigResponse>, HttpHandleError> {
let config = req.config.gen_config();
match config {
Ok(c) => Ok(GenerateConfigResponse {
error: None,
toml_config: Some(c.dump()),
}
.into()),
Err(e) => Ok(GenerateConfigResponse {
error: Some(format!("{:?}", e)),
toml_config: None,
}
.into()),
}
}
pub async fn start(&mut self) -> Result<(), anyhow::Error> {
let listener = TcpListener::bind(self.bind_addr).await?;
@@ -167,7 +201,7 @@ impl RestfulServer {
.deflate(true)
.gzip(true)
.zstd(true)
.quality(tower_http::compression::CompressionLevel::Best);
.quality(tower_http::compression::CompressionLevel::Default);
let app = Router::new()
.route("/api/v1/summary", get(Self::handle_get_summary))
@@ -176,6 +210,10 @@ impl RestfulServer {
.route_layer(login_required!(Backend))
.merge(auth::router())
.with_state(self.client_mgr.clone())
.route(
"/api/v1/generate-config",
post(Self::handle_generate_config),
)
.layer(MessagesManagerLayer)
.layer(auth_layer)
.layer(tower_http::cors::CorsLayer::very_permissive())

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use axum::extract::{Path, Query};
use axum::extract::Path;
use axum::http::StatusCode;
use axum::routing::{delete, post};
use axum::{extract::State, routing::get, Json, Router};
@@ -13,6 +13,7 @@ use easytier::proto::web::*;
use crate::client_manager::session::Session;
use crate::client_manager::ClientManager;
use crate::db::ListNetworkProps;
use super::users::AuthSession;
use super::{
@@ -46,13 +47,21 @@ struct ColletNetworkInfoJsonReq {
inst_ids: Option<Vec<uuid::Uuid>>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct UpdateNetworkStateJsonReq {
disabled: bool,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct RemoveNetworkJsonReq {
inst_ids: Vec<uuid::Uuid>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct ListNetworkInstanceIdsJsonResp(Vec<uuid::Uuid>);
struct ListNetworkInstanceIdsJsonResp {
running_inst_ids: Vec<easytier::proto::common::Uuid>,
disabled_inst_ids: Vec<easytier::proto::common::Uuid>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct ListMachineItem {
@@ -190,7 +199,7 @@ impl NetworkApi {
auth_session: AuthSession,
State(client_mgr): AppState,
Path(machine_id): Path<uuid::Uuid>,
Query(payload): Query<ColletNetworkInfoJsonReq>,
Json(payload): Json<ColletNetworkInfoJsonReq>,
) -> Result<Json<CollectNetworkInfoResponse>, HttpHandleError> {
let result =
Self::get_session_by_machine_id(&auth_session, &client_mgr, &machine_id).await?;
@@ -226,10 +235,28 @@ impl NetworkApi {
.list_network_instance(BaseController::default(), ListNetworkInstanceRequest {})
.await
.map_err(convert_rpc_error)?;
Ok(
ListNetworkInstanceIdsJsonResp(ret.inst_ids.into_iter().map(Into::into).collect())
.into(),
)
let running_inst_ids = ret.inst_ids.clone().into_iter().map(Into::into).collect();
// collect networks that are disabled
let disabled_inst_ids = client_mgr
.db()
.list_network_configs(
auth_session.user.unwrap().id(),
Some(machine_id),
ListNetworkProps::DisabledOnly,
)
.await
.map_err(convert_db_error)?
.iter()
.filter_map(|x| x.network_instance_id.clone().try_into().ok())
.collect::<Vec<_>>();
Ok(ListNetworkInstanceIdsJsonResp {
running_inst_ids,
disabled_inst_ids,
}
.into())
}
async fn handle_remove_network_instance(
@@ -270,7 +297,7 @@ impl NetworkApi {
let client_urls = DashSet::new();
for token in tokens {
let urls = client_mgr.list_machine_by_token(token);
let urls = client_mgr.list_machine_by_token(token).await;
for url in urls {
client_urls.insert(url);
}
@@ -289,6 +316,54 @@ impl NetworkApi {
Ok(Json(ListMachineJsonResp { machines }))
}
async fn handle_update_network_state(
auth_session: AuthSession,
State(client_mgr): AppState,
Path((machine_id, inst_id)): Path<(uuid::Uuid, Option<uuid::Uuid>)>,
Json(payload): Json<UpdateNetworkStateJsonReq>,
) -> Result<(), HttpHandleError> {
let Some(inst_id) = inst_id else {
// not implement disable all
return Err((
StatusCode::NOT_IMPLEMENTED,
other_error(format!("Not implemented")).into(),
))
.into();
};
let sess = Self::get_session_by_machine_id(&auth_session, &client_mgr, &machine_id).await?;
let cfg = client_mgr
.db()
.update_network_config_state(auth_session.user.unwrap().id(), inst_id, payload.disabled)
.await
.map_err(convert_db_error)?;
let c = sess.scoped_rpc_client();
if payload.disabled {
c.delete_network_instance(
BaseController::default(),
DeleteNetworkInstanceRequest {
inst_ids: vec![inst_id.into()],
},
)
.await
.map_err(convert_rpc_error)?;
} else {
c.run_network_instance(
BaseController::default(),
RunNetworkInstanceRequest {
inst_id: Some(inst_id.into()),
config: Some(serde_json::from_str(&cfg.network_config).unwrap()),
},
)
.await
.map_err(convert_rpc_error)?;
}
Ok(())
}
async fn handle_get_network_config(
auth_session: AuthSession,
State(client_mgr): AppState,
@@ -298,25 +373,24 @@ impl NetworkApi {
let db_row = client_mgr
.db()
.list_network_configs(auth_session.user.unwrap().id(), Some(machine_id), false)
.get_network_config(auth_session.user.unwrap().id(), &machine_id, &inst_id)
.await
.map_err(convert_db_error)?
.iter()
.find(|x| x.network_instance_id == inst_id)
.map(|x| x.network_config.clone())
.ok_or((
StatusCode::NOT_FOUND,
other_error(format!("No such network instance: {}", inst_id)).into(),
))?;
Ok(serde_json::from_str::<NetworkConfig>(&db_row)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
other_error(format!("Failed to parse network config: {:?}", e)).into(),
)
})?
.into())
Ok(
serde_json::from_str::<NetworkConfig>(&db_row.network_config)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
other_error(format!("Failed to parse network config: {:?}", e)).into(),
)
})?
.into(),
)
}
pub fn build_route(&mut self) -> Router<AppStateInner> {
@@ -332,7 +406,7 @@ impl NetworkApi {
)
.route(
"/api/v1/machines/:machine-id/networks/:inst-id",
delete(Self::handle_remove_network_instance),
delete(Self::handle_remove_network_instance).put(Self::handle_update_network_state),
)
.route(
"/api/v1/machines/:machine-id/networks/info",

View File

@@ -3,12 +3,12 @@ name = "easytier"
description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
homepage = "https://github.com/EasyTier/EasyTier"
repository = "https://github.com/EasyTier/EasyTier"
version = "2.1.0"
version = "2.2.4"
edition = "2021"
authors = ["kkrainbow"]
keywords = ["vpn", "p2p", "network", "easytier"]
categories = ["network-programming", "command-line-utilities"]
rust-version = "1.77.0"
rust-version = "1.84.0"
license-file = "LICENSE"
readme = "README.md"
@@ -62,7 +62,6 @@ timedmap = "=1.0.1"
zerocopy = { version = "0.7.32", features = ["derive", "simd"] }
bytes = "1.5.0"
pin-project-lite = "0.2.13"
atomicbox = "0.4.0"
tachyonix = "0.3.0"
quinn = { version = "0.11.0", optional = true, features = ["ring"] }
@@ -89,7 +88,7 @@ tun = { package = "tun-easytier", version = "1.1.1", features = [
"async",
], optional = true }
# for net ns
nix = { version = "0.27", features = ["sched", "socket", "ioctl"] }
nix = { version = "0.29.0", features = ["sched", "socket", "ioctl", "net"] }
uuid = { version = "1.5.0", features = [
"v4",
@@ -99,7 +98,6 @@ uuid = { version = "1.5.0", features = [
] }
# for ring tunnel
crossbeam-queue = "0.3"
once_cell = "1.18.0"
# for rpc
@@ -126,7 +124,7 @@ serde = { version = "1.0", features = ["derive"] }
pnet = { version = "0.35.0", features = ["serde"] }
serde_json = "1"
clap = { version = "4.4.8", features = [
clap = { version = "4.5.30", features = [
"string",
"unicode",
"derive",
@@ -138,7 +136,7 @@ async-recursion = "1.0.5"
network-interface = "2.0"
# for ospf route
petgraph = "0.6.5"
petgraph = "0.7.1"
# for wireguard
boringtun = { package = "boringtun-easytier", version = "0.6.1", optional = true }
@@ -154,21 +152,18 @@ humansize = "2.1.3"
base64 = "0.22"
derivative = "2.2.0"
mimalloc-rust = { version = "0.2.1", optional = true }
# for mips
indexmap = { version = "~1.9.3", optional = false, features = ["std"] }
# mips
atomic-shim = "0.2.0"
smoltcp = { version = "0.11.0", optional = true, default-features = false, features = [
smoltcp = { version = "0.12.0", optional = true, default-features = false, features = [
"std",
"medium-ip",
"proto-ipv4",
"proto-ipv6",
"socket-tcp",
"socket-tcp-cubic",
"async",
] }
parking_lot = { version = "0.12.0", optional = true }
@@ -185,16 +180,39 @@ service-manager = {git = "https://github.com/chipsenkbeil/service-manager-rs.git
async-compression = { version = "0.4.17", default-features = false, features = ["zstd", "tokio"] }
kcp-sys = { git = "https://github.com/EasyTier/kcp-sys" }
prost-reflect = { version = "0.14.5", default-features = false, features = [
"derive",
] }
# for http connector
http_req = { git = "https://github.com/EasyTier/http_req.git", default-features = false, features = ["rust-tls"] }
# for dns connector
hickory-resolver = "0.24.4"
bounded_join_set = "0.3.0"
[target.'cfg(any(target_os = "linux", target_os = "macos", target_os = "windows", target_os = "freebsd"))'.dependencies]
machine-uid = "0.5.3"
[target.'cfg(any(target_os = "linux"))'.dependencies]
netlink-sys = "0.8.7"
netlink-packet-route = "0.21.0"
netlink-packet-core = { version = "0.7.0" }
netlink-packet-utils = "0.5.2"
[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.52", features = [
"Win32_Networking_WinSock",
"Win32_NetworkManagement_IpHelper",
windows = { version = "0.52.0", features = [
"Win32_Foundation",
"Win32_NetworkManagement_WindowsFirewall",
"Win32_System_Com",
"Win32_Networking",
"Win32_System_Ole",
"Win32_Networking_WinSock",
"Win32_System_IO",
] }
]}
encoding = "0.2"
winreg = "0.52"
windows-service = "0.7.0"
@@ -205,9 +223,10 @@ globwalk = "0.8.1"
regex = "1"
prost-build = "0.13.2"
rpc_build = { package = "easytier-rpc-build", version = "0.1.0", features = ["internal-namespace"] }
prost-reflect-build = { version = "0.14.0" }
[target.'cfg(windows)'.build-dependencies]
reqwest = { version = "0.11", features = ["blocking"] }
reqwest = { version = "0.12.12", features = ["blocking"] }
zip = "0.6.6"

View File

@@ -128,20 +128,21 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(target_os = "windows")]
WindowsBuild::check_for_win();
let proto_files_reflect = ["src/proto/peer_rpc.proto", "src/proto/common.proto"];
let proto_files = [
"src/proto/peer_rpc.proto",
"src/proto/common.proto",
"src/proto/error.proto",
"src/proto/tests.proto",
"src/proto/cli.proto",
"src/proto/web.proto",
];
for proto_file in &proto_files {
for proto_file in proto_files.iter().chain(proto_files_reflect.iter()) {
println!("cargo:rerun-if-changed={}", proto_file);
}
prost_build::Config::new()
let mut config = prost_build::Config::new();
config
.protoc_arg("--experimental_allow_proto3_optional")
.type_attribute(".common", "#[derive(serde::Serialize, serde::Deserialize)]")
.type_attribute(".error", "#[derive(serde::Serialize, serde::Deserialize)]")
@@ -155,10 +156,15 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute("peer_rpc.PeerInfoForGlobalMap", "#[derive(Hash)]")
.type_attribute("peer_rpc.ForeignNetworkRouteInfoKey", "#[derive(Hash, Eq)]")
.type_attribute("common.RpcDescriptor", "#[derive(Hash, Eq)]")
.field_attribute(".web.NetworkConfig", "#[serde(default)]")
.service_generator(Box::new(rpc_build::ServiceGenerator::new()))
.btree_map(["."])
.compile_protos(&proto_files, &["src/proto/"])
.unwrap();
.btree_map(["."]);
config.compile_protos(&proto_files, &["src/proto/"])?;
prost_reflect_build::Builder::new()
.file_descriptor_set_bytes("crate::proto::DESCRIPTOR_POOL_BYTES")
.compile_protos_with_config(config, &proto_files_reflect, &["src/proto/"])?;
check_locale();
Ok(())

View File

@@ -5,11 +5,11 @@ core_clap:
en: |+
config server address, allow format:
full url: --config-server udp://127.0.0.1:22020/admin
only user name: --config-server admin
only user name: --config-server admin, will use official server
zh-CN: |+
配置服务器地址。允许格式:
完整URL--config-server udp://127.0.0.1:22020/admin
仅用户名:--config-server admin
仅用户名:--config-server admin,将使用官方的服务器
config_file:
en: "path to the config file, NOTE: if this is set, all other options will be ignored"
zh-CN: "配置文件路径,注意:如果设置了这个选项,其他所有选项都将被忽略"
@@ -96,12 +96,15 @@ core_clap:
enable_exit_node:
en: "allow this node to be an exit node"
zh-CN: "允许此节点成为出口节点"
proxy_forward_by_system:
en: "forward packet to proxy networks via system kernel, disable internal nat for network proxy"
zh-CN: "通过系统内核转发子网代理数据包禁用内置NAT"
no_tun:
en: "do not create TUN device, can use subnet proxy to access node"
zh-CN: "不创建TUN设备可以使用子网代理访问节点"
use_smoltcp:
en: "enable smoltcp stack for subnet proxy"
zh-CN: "为子网代理启用smoltcp堆栈"
en: "enable smoltcp stack for subnet proxy and kcp proxy"
zh-CN: "为子网代理和 KCP 代理启用smoltcp堆栈"
manual_routes:
en: "assign routes cidr manually, will disable subnet proxy and wireguard routes propagated from peers. e.g.: 192.168.0.0/16"
zh-CN: "手动分配路由CIDR将禁用子网代理和从对等节点传播的wireguard路由。例如192.168.0.0/16"
@@ -134,6 +137,18 @@ core_clap:
compression:
en: "compression algorithm to use, support none, zstd. default is none"
zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none"
mapped_listeners:
en: "manually specify the public address of the listener, other nodes can use this address to connect to this node. e.g.: tcp://123.123.123.123:11223, can specify multiple."
zh-CN: "手动指定监听器的公网地址其他节点可以使用该地址连接到本节点。例如tcp://123.123.123.123:11223可以指定多个。"
bind_device:
en: "bind the connector socket to physical devices to avoid routing issues. e.g.: subnet proxy segment conflicts with a node's segment, after binding the physical device, it can communicate with the node normally."
zh-CN: "将连接器的套接字绑定到物理设备以避免路由问题。比如子网代理网段与某节点的网段冲突,绑定物理设备后可以与该节点正常通信。"
enable_kcp_proxy:
en: "proxy tcp streams with kcp, improving the latency and throughput on the network with udp packet loss."
zh-CN: "使用 KCP 代理 TCP 流,提高在 UDP 丢包网络上的延迟和吞吐量。"
disable_kcp_input:
en: "do not allow other nodes to use kcp to proxy tcp streams to this node. when a node with kcp proxy enabled accesses this node, the original tcp connection is preserved."
zh-CN: "不允许其他节点使用 KCP 代理 TCP 流到此节点。开启 KCP 代理的节点访问此节点时,依然使用原始 TCP 连接。"
core_app:
panic_backtrace_save:

View File

@@ -1,26 +1,27 @@
use std::{
ffi::c_void,
io::{self, ErrorKind},
mem,
net::SocketAddr,
os::windows::io::AsRawSocket,
ptr,
};
use std::{io, net::SocketAddr, os::windows::io::AsRawSocket};
use anyhow::Context;
use network_interface::NetworkInterfaceConfig;
use windows_sys::{
core::PCSTR,
use windows::{
core::BSTR,
Win32::{
Foundation::{BOOL, FALSE},
NetworkManagement::WindowsFirewall::{
INetFwPolicy2, INetFwRule, NET_FW_ACTION_ALLOW, NET_FW_PROFILE2_PRIVATE,
NET_FW_PROFILE2_PUBLIC, NET_FW_RULE_DIR_IN, NET_FW_RULE_DIR_OUT,
},
Networking::WinSock::{
htonl, setsockopt, WSAGetLastError, WSAIoctl, IPPROTO_IP, IPPROTO_IPV6,
IPV6_UNICAST_IF, IP_UNICAST_IF, SIO_UDP_CONNRESET, SOCKET, SOCKET_ERROR,
},
System::Com::{
CoCreateInstance, CoInitializeEx, CoUninitialize, CLSCTX_ALL, COINIT_MULTITHREADED,
},
},
};
pub fn disable_connection_reset<S: AsRawSocket>(socket: &S) -> io::Result<()> {
let handle = socket.as_raw_socket() as SOCKET;
let handle = SOCKET(socket.as_raw_socket() as usize);
unsafe {
// Ignoring UdpSocket's WSAECONNRESET error
@@ -39,21 +40,18 @@ pub fn disable_connection_reset<S: AsRawSocket>(socket: &S) -> io::Result<()> {
let ret = WSAIoctl(
handle,
SIO_UDP_CONNRESET,
&enable as *const _ as *const c_void,
mem::size_of_val(&enable) as u32,
ptr::null_mut(),
Some(&enable as *const _ as *const std::ffi::c_void),
std::mem::size_of_val(&enable) as u32,
None,
0,
&mut bytes_returned as *mut _,
ptr::null_mut(),
None,
None,
);
if ret == SOCKET_ERROR {
use std::io::Error;
// Error occurs
let err_code = WSAGetLastError();
return Err(Error::from_raw_os_error(err_code));
return Err(std::io::Error::from_raw_os_error(err_code.0));
}
}
@@ -63,7 +61,7 @@ pub fn disable_connection_reset<S: AsRawSocket>(socket: &S) -> io::Result<()> {
pub fn interface_count() -> io::Result<usize> {
let ifaces = network_interface::NetworkInterface::show().map_err(|e| {
io::Error::new(
ErrorKind::NotFound,
io::ErrorKind::NotFound,
format!("Failed to get interfaces. error: {}", e),
)
})?;
@@ -73,7 +71,7 @@ pub fn interface_count() -> io::Result<usize> {
pub fn find_interface_index(iface_name: &str) -> io::Result<u32> {
let ifaces = network_interface::NetworkInterface::show().map_err(|e| {
io::Error::new(
ErrorKind::NotFound,
io::ErrorKind::NotFound,
format!("Failed to get interfaces. {}, error: {}", iface_name, e),
)
})?;
@@ -82,7 +80,7 @@ pub fn find_interface_index(iface_name: &str) -> io::Result<u32> {
}
tracing::error!("Failed to find interface index for {}", iface_name);
Err(io::Error::new(
ErrorKind::NotFound,
io::ErrorKind::NotFound,
format!("{}", iface_name),
))
}
@@ -92,7 +90,7 @@ pub fn set_ip_unicast_if<S: AsRawSocket>(
addr: &SocketAddr,
iface: &str,
) -> io::Result<()> {
let handle = socket.as_raw_socket() as SOCKET;
let handle = SOCKET(socket.as_raw_socket() as usize);
let if_index = find_interface_index(iface)?;
@@ -100,30 +98,23 @@ pub fn set_ip_unicast_if<S: AsRawSocket>(
// https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options
let ret = match addr {
SocketAddr::V4(..) => {
// Interface index is in network byte order for IPPROTO_IP.
let if_index = htonl(if_index);
setsockopt(
handle,
IPPROTO_IP as i32,
IP_UNICAST_IF as i32,
&if_index as *const _ as PCSTR,
mem::size_of_val(&if_index) as i32,
)
let if_index_bytes = if_index.to_ne_bytes();
setsockopt(handle, IPPROTO_IP.0, IP_UNICAST_IF, Some(&if_index_bytes))
}
SocketAddr::V6(..) => {
// Interface index is in host byte order for IPPROTO_IPV6.
let if_index_bytes = if_index.to_ne_bytes();
setsockopt(
handle,
IPPROTO_IPV6 as i32,
IPV6_UNICAST_IF as i32,
&if_index as *const _ as PCSTR,
mem::size_of_val(&if_index) as i32,
IPPROTO_IPV6.0,
IPV6_UNICAST_IF,
Some(&if_index_bytes),
)
}
};
if ret == SOCKET_ERROR {
let err = io::Error::from_raw_os_error(WSAGetLastError());
let err = std::io::Error::from_raw_os_error(WSAGetLastError().0);
tracing::error!(
"set IP_UNICAST_IF / IPV6_UNICAST_IF interface: {}, index: {}, error: {}",
iface,
@@ -152,4 +143,95 @@ pub fn setup_socket_for_win<S: AsRawSocket>(
}
Ok(())
}
}
struct ComInitializer;
impl ComInitializer {
fn new() -> windows::core::Result<Self> {
unsafe { CoInitializeEx(None, COINIT_MULTITHREADED)? };
Ok(Self)
}
}
impl Drop for ComInitializer {
fn drop(&mut self) {
unsafe {
CoUninitialize();
}
}
}
pub fn do_add_self_to_firewall_allowlist(inbound: bool) -> anyhow::Result<()> {
let _com = ComInitializer::new()?;
// 创建防火墙策略实例
let policy: INetFwPolicy2 = unsafe {
CoCreateInstance(
&windows::Win32::NetworkManagement::WindowsFirewall::NetFwPolicy2,
None,
CLSCTX_ALL,
)
}?;
// 创建防火墙规则实例
let rule: INetFwRule = unsafe {
CoCreateInstance(
&windows::Win32::NetworkManagement::WindowsFirewall::NetFwRule,
None,
CLSCTX_ALL,
)
}?;
// 设置规则属性
let exe_path = std::env::current_exe()
.with_context(|| "Failed to get current executable path when adding firewall rule")?
.to_string_lossy()
.replace(r"\\?\", "");
let name = BSTR::from(format!(
"EasyTier {} ({})",
exe_path,
if inbound { "Inbound" } else { "Outbound" }
));
let desc = BSTR::from("Allow EasyTier to do subnet proxy and kcp proxy");
let app_path = BSTR::from(&exe_path);
unsafe {
rule.SetName(&name)?;
rule.SetDescription(&desc)?;
rule.SetApplicationName(&app_path)?;
rule.SetAction(NET_FW_ACTION_ALLOW)?;
if inbound {
rule.SetDirection(NET_FW_RULE_DIR_IN)?; // 允许入站连接
} else {
rule.SetDirection(NET_FW_RULE_DIR_OUT)?; // 允许出站连接
}
rule.SetEnabled(windows::Win32::Foundation::VARIANT_TRUE)?;
rule.SetProfiles(NET_FW_PROFILE2_PRIVATE.0 | NET_FW_PROFILE2_PUBLIC.0)?;
rule.SetGrouping(&BSTR::from("EasyTier"))?;
// 获取规则集合并添加新规则
let rules = policy.Rules()?;
rules.Remove(&name)?; // 先删除同名规则
rules.Add(&rule)?;
}
Ok(())
}
pub fn add_self_to_firewall_allowlist() -> anyhow::Result<()> {
do_add_self_to_firewall_allowlist(true)?;
do_add_self_to_firewall_allowlist(false)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_add_self_to_firewall_allowlist() {
let res = add_self_to_firewall_allowlist();
assert!(res.is_ok());
}
}

View File

@@ -20,15 +20,19 @@ pub fn gen_default_flags() -> Flags {
mtu: 1380,
latency_first: false,
enable_exit_node: false,
proxy_forward_by_system: false,
no_tun: false,
use_smoltcp: false,
relay_network_whitelist: "*".to_string(),
disable_p2p: false,
relay_all_peer_rpc: false,
disable_udp_hole_punching: false,
ipv6_listener: "udp://[::]:0".to_string(),
multi_thread: false,
multi_thread: true,
data_compress_algo: CompressionAlgoPb::None.into(),
bind_device: true,
enable_kcp_proxy: false,
disable_kcp_input: false,
disable_relay_kcp: true,
}
}
@@ -72,6 +76,9 @@ pub trait ConfigLoader: Send + Sync {
fn get_listeners(&self) -> Vec<url::Url>;
fn set_listeners(&self, listeners: Vec<url::Url>);
fn get_mapped_listeners(&self) -> Vec<url::Url>;
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>);
fn get_rpc_portal(&self) -> Option<SocketAddr>;
fn set_rpc_portal(&self, addr: SocketAddr);
@@ -183,6 +190,7 @@ struct Config {
dhcp: Option<bool>,
network_identity: Option<NetworkIdentity>,
listeners: Option<Vec<url::Url>>,
mapped_listeners: Option<Vec<url::Url>>,
exit_nodes: Option<Vec<Ipv4Addr>>,
peer: Option<Vec<PeerConfig>>,
@@ -472,6 +480,19 @@ impl ConfigLoader for TomlConfigLoader {
self.config.lock().unwrap().listeners = Some(listeners);
}
fn get_mapped_listeners(&self) -> Vec<url::Url> {
self.config
.lock()
.unwrap()
.mapped_listeners
.clone()
.unwrap_or_default()
}
fn set_mapped_listeners(&self, listeners: Option<Vec<url::Url>>) {
self.config.lock().unwrap().mapped_listeners = listeners;
}
fn get_rpc_portal(&self) -> Option<SocketAddr> {
self.config.lock().unwrap().rpc_portal
}

View File

@@ -68,6 +68,7 @@ pub struct GlobalCtx {
running_listeners: Mutex<Vec<url::Url>>,
enable_exit_node: bool,
proxy_forward_by_system: bool,
no_tun: bool,
feature_flags: AtomicCell<PeerFeatureFlag>,
@@ -99,8 +100,13 @@ impl GlobalCtx {
let stun_info_collection = Arc::new(StunInfoCollector::new_with_default_servers());
let enable_exit_node = config_fs.get_flags().enable_exit_node;
let proxy_forward_by_system = config_fs.get_flags().proxy_forward_by_system;
let no_tun = config_fs.get_flags().no_tun;
let mut feature_flags = PeerFeatureFlag::default();
feature_flags.kcp_input = !config_fs.get_flags().disable_kcp_input;
feature_flags.no_relay_kcp = config_fs.get_flags().disable_relay_kcp;
GlobalCtx {
inst_name: config_fs.get_inst_name(),
id,
@@ -121,9 +127,10 @@ impl GlobalCtx {
running_listeners: Mutex::new(Vec::new()),
enable_exit_node,
proxy_forward_by_system,
no_tun,
feature_flags: AtomicCell::new(PeerFeatureFlag::default()),
feature_flags: AtomicCell::new(feature_flags),
}
}
@@ -230,7 +237,10 @@ impl GlobalCtx {
}
pub fn add_running_listener(&self, url: url::Url) {
self.running_listeners.lock().unwrap().push(url);
let mut l = self.running_listeners.lock().unwrap();
if !l.contains(&url) {
l.push(url);
}
}
pub fn get_vpn_portal_cidr(&self) -> Option<cidr::Ipv4Cidr> {
@@ -266,6 +276,10 @@ impl GlobalCtx {
self.enable_exit_node
}
pub fn proxy_forward_by_system(&self) -> bool {
self.proxy_forward_by_system
}
pub fn no_tun(&self) -> bool {
self.no_tun
}

View File

@@ -1,417 +0,0 @@
use std::net::Ipv4Addr;
use async_trait::async_trait;
use tokio::process::Command;
use super::error::Error;
#[async_trait]
pub trait IfConfiguerTrait: Send + Sync {
async fn add_ipv4_route(
&self,
_name: &str,
_address: Ipv4Addr,
_cidr_prefix: u8,
) -> Result<(), Error> {
Ok(())
}
async fn remove_ipv4_route(
&self,
_name: &str,
_address: Ipv4Addr,
_cidr_prefix: u8,
) -> Result<(), Error> {
Ok(())
}
async fn add_ipv4_ip(
&self,
_name: &str,
_address: Ipv4Addr,
_cidr_prefix: u8,
) -> Result<(), Error> {
Ok(())
}
async fn set_link_status(&self, _name: &str, _up: bool) -> Result<(), Error> {
Ok(())
}
async fn remove_ip(&self, _name: &str, _ip: Option<Ipv4Addr>) -> Result<(), Error> {
Ok(())
}
async fn wait_interface_show(&self, _name: &str) -> Result<(), Error> {
return Ok(());
}
async fn set_mtu(&self, _name: &str, _mtu: u32) -> Result<(), Error> {
Ok(())
}
}
fn cidr_to_subnet_mask(prefix_length: u8) -> Ipv4Addr {
if prefix_length > 32 {
panic!("Invalid CIDR prefix length");
}
let subnet_mask: u32 = (!0u32)
.checked_shl(32 - u32::from(prefix_length))
.unwrap_or(0);
Ipv4Addr::new(
((subnet_mask >> 24) & 0xFF) as u8,
((subnet_mask >> 16) & 0xFF) as u8,
((subnet_mask >> 8) & 0xFF) as u8,
(subnet_mask & 0xFF) as u8,
)
}
async fn run_shell_cmd(cmd: &str) -> Result<(), Error> {
let cmd_out: std::process::Output;
let stdout: String;
let stderr: String;
#[cfg(target_os = "windows")]
{
const CREATE_NO_WINDOW: u32 = 0x08000000;
cmd_out = Command::new("cmd")
.stdin(std::process::Stdio::null())
.arg("/C")
.arg(cmd)
.creation_flags(CREATE_NO_WINDOW)
.output()
.await?;
stdout = crate::utils::utf8_or_gbk_to_string(cmd_out.stdout.as_slice());
stderr = crate::utils::utf8_or_gbk_to_string(cmd_out.stderr.as_slice());
};
#[cfg(not(target_os = "windows"))]
{
cmd_out = Command::new("sh").arg("-c").arg(cmd).output().await?;
stdout = String::from_utf8_lossy(cmd_out.stdout.as_slice()).to_string();
stderr = String::from_utf8_lossy(cmd_out.stderr.as_slice()).to_string();
};
let ec = cmd_out.status.code();
let succ = cmd_out.status.success();
tracing::info!(?cmd, ?ec, ?succ, ?stdout, ?stderr, "run shell cmd");
if !cmd_out.status.success() {
return Err(Error::ShellCommandError(stdout + &stderr));
}
Ok(())
}
pub struct MacIfConfiger {}
#[async_trait]
impl IfConfiguerTrait for MacIfConfiger {
async fn add_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"route -n add {} -netmask {} -interface {} -hopcount 7",
address,
cidr_to_subnet_mask(cidr_prefix),
name
)
.as_str(),
)
.await
}
async fn remove_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"route -n delete {} -netmask {} -interface {}",
address,
cidr_to_subnet_mask(cidr_prefix),
name
)
.as_str(),
)
.await
}
async fn add_ipv4_ip(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"ifconfig {} {:?}/{:?} 10.8.8.8 up",
name, address, cidr_prefix,
)
.as_str(),
)
.await
}
async fn set_link_status(&self, name: &str, up: bool) -> Result<(), Error> {
run_shell_cmd(format!("ifconfig {} {}", name, if up { "up" } else { "down" }).as_str())
.await
}
async fn remove_ip(&self, name: &str, ip: Option<Ipv4Addr>) -> Result<(), Error> {
if ip.is_none() {
run_shell_cmd(format!("ifconfig {} inet delete", name).as_str()).await
} else {
run_shell_cmd(
format!("ifconfig {} inet {} delete", name, ip.unwrap().to_string()).as_str(),
)
.await
}
}
async fn set_mtu(&self, name: &str, mtu: u32) -> Result<(), Error> {
run_shell_cmd(format!("ifconfig {} mtu {}", name, mtu).as_str()).await
}
}
pub struct LinuxIfConfiger {}
#[async_trait]
impl IfConfiguerTrait for LinuxIfConfiger {
async fn add_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"ip route add {}/{} dev {} metric 65535",
address, cidr_prefix, name
)
.as_str(),
)
.await
}
async fn remove_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(format!("ip route del {}/{} dev {}", address, cidr_prefix, name).as_str())
.await
}
async fn add_ipv4_ip(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(format!("ip addr add {:?}/{:?} dev {}", address, cidr_prefix, name).as_str())
.await
}
async fn set_link_status(&self, name: &str, up: bool) -> Result<(), Error> {
run_shell_cmd(format!("ip link set {} {}", name, if up { "up" } else { "down" }).as_str())
.await
}
async fn remove_ip(&self, name: &str, ip: Option<Ipv4Addr>) -> Result<(), Error> {
if ip.is_none() {
run_shell_cmd(format!("ip addr flush dev {}", name).as_str()).await
} else {
run_shell_cmd(
format!("ip addr del {:?} dev {}", ip.unwrap().to_string(), name).as_str(),
)
.await
}
}
async fn set_mtu(&self, name: &str, mtu: u32) -> Result<(), Error> {
run_shell_cmd(format!("ip link set dev {} mtu {}", name, mtu).as_str()).await
}
}
#[cfg(target_os = "windows")]
pub struct WindowsIfConfiger {}
#[cfg(target_os = "windows")]
impl WindowsIfConfiger {
pub fn get_interface_index(name: &str) -> Option<u32> {
crate::arch::windows::find_interface_index(name).ok()
}
async fn list_ipv4(name: &str) -> Result<Vec<Ipv4Addr>, Error> {
use anyhow::Context;
use network_interface::NetworkInterfaceConfig;
use std::net::IpAddr;
let ret = network_interface::NetworkInterface::show().with_context(|| "show interface")?;
let addrs = ret
.iter()
.filter_map(|x| {
if x.name != name {
return None;
}
Some(x.addr.clone())
})
.flat_map(|x| x)
.map(|x| x.ip())
.filter_map(|x| {
if let IpAddr::V4(ipv4) = x {
Some(ipv4)
} else {
None
}
})
.collect::<Vec<_>>();
Ok(addrs)
}
async fn remove_one_ipv4(name: &str, ip: Ipv4Addr) -> Result<(), Error> {
run_shell_cmd(
format!(
"netsh interface ipv4 delete address {} address={}",
name,
ip.to_string()
)
.as_str(),
)
.await
}
}
#[cfg(target_os = "windows")]
#[async_trait]
impl IfConfiguerTrait for WindowsIfConfiger {
async fn add_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
let Some(idx) = Self::get_interface_index(name) else {
return Err(Error::NotFound);
};
run_shell_cmd(
format!(
"route ADD {} MASK {} 10.1.1.1 IF {} METRIC 255",
address,
cidr_to_subnet_mask(cidr_prefix),
idx
)
.as_str(),
)
.await
}
async fn remove_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
let Some(idx) = Self::get_interface_index(name) else {
return Err(Error::NotFound);
};
run_shell_cmd(
format!(
"route DELETE {} MASK {} IF {}",
address,
cidr_to_subnet_mask(cidr_prefix),
idx
)
.as_str(),
)
.await
}
async fn add_ipv4_ip(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"netsh interface ipv4 add address {} address={} mask={}",
name,
address,
cidr_to_subnet_mask(cidr_prefix)
)
.as_str(),
)
.await
}
async fn set_link_status(&self, name: &str, up: bool) -> Result<(), Error> {
run_shell_cmd(
format!(
"netsh interface set interface {} {}",
name,
if up { "enable" } else { "disable" }
)
.as_str(),
)
.await
}
async fn remove_ip(&self, name: &str, ip: Option<Ipv4Addr>) -> Result<(), Error> {
if ip.is_none() {
for ip in Self::list_ipv4(name).await?.iter() {
Self::remove_one_ipv4(name, *ip).await?;
}
Ok(())
} else {
Self::remove_one_ipv4(name, ip.unwrap()).await
}
}
async fn wait_interface_show(&self, name: &str) -> Result<(), Error> {
Ok(
tokio::time::timeout(std::time::Duration::from_secs(10), async move {
loop {
if let Some(idx) = Self::get_interface_index(name) {
tracing::info!(?name, ?idx, "Interface found");
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Ok::<(), Error>(())
})
.await??,
)
}
async fn set_mtu(&self, name: &str, mtu: u32) -> Result<(), Error> {
let _ = run_shell_cmd(
format!("netsh interface ipv6 set subinterface {} mtu={}", name, mtu).as_str(),
)
.await;
run_shell_cmd(
format!("netsh interface ipv4 set subinterface {} mtu={}", name, mtu).as_str(),
)
.await
}
}
pub struct DummyIfConfiger {}
#[async_trait]
impl IfConfiguerTrait for DummyIfConfiger {}
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
pub type IfConfiger = MacIfConfiger;
#[cfg(target_os = "linux")]
pub type IfConfiger = LinuxIfConfiger;
#[cfg(target_os = "windows")]
pub type IfConfiger = WindowsIfConfiger;
#[cfg(not(any(
target_os = "macos",
target_os = "linux",
target_os = "windows",
target_os = "freebsd"
)))]
pub type IfConfiger = DummyIfConfiger;

View File

@@ -0,0 +1,81 @@
use std::net::Ipv4Addr;
use async_trait::async_trait;
use super::{cidr_to_subnet_mask, run_shell_cmd, Error, IfConfiguerTrait};
pub struct MacIfConfiger {}
#[async_trait]
impl IfConfiguerTrait for MacIfConfiger {
async fn add_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"route -n add {} -netmask {} -interface {} -hopcount 7",
address,
cidr_to_subnet_mask(cidr_prefix),
name
)
.as_str(),
)
.await
}
async fn remove_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"route -n delete {} -netmask {} -interface {}",
address,
cidr_to_subnet_mask(cidr_prefix),
name
)
.as_str(),
)
.await
}
async fn add_ipv4_ip(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"ifconfig {} {:?}/{:?} 10.8.8.8 up",
name, address, cidr_prefix,
)
.as_str(),
)
.await
}
async fn set_link_status(&self, name: &str, up: bool) -> Result<(), Error> {
run_shell_cmd(format!("ifconfig {} {}", name, if up { "up" } else { "down" }).as_str())
.await
}
async fn remove_ip(&self, name: &str, ip: Option<Ipv4Addr>) -> Result<(), Error> {
if ip.is_none() {
run_shell_cmd(format!("ifconfig {} inet delete", name).as_str()).await
} else {
run_shell_cmd(
format!("ifconfig {} inet {} delete", name, ip.unwrap().to_string()).as_str(),
)
.await
}
}
async fn set_mtu(&self, name: &str, mtu: u32) -> Result<(), Error> {
run_shell_cmd(format!("ifconfig {} mtu {}", name, mtu).as_str()).await
}
}

View File

@@ -0,0 +1,127 @@
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
mod darwin;
#[cfg(any(target_os = "linux"))]
mod netlink;
#[cfg(target_os = "windows")]
mod windows;
mod route;
use std::net::Ipv4Addr;
use async_trait::async_trait;
use tokio::process::Command;
use super::error::Error;
#[async_trait]
pub trait IfConfiguerTrait: Send + Sync {
async fn add_ipv4_route(
&self,
_name: &str,
_address: Ipv4Addr,
_cidr_prefix: u8,
) -> Result<(), Error> {
Ok(())
}
async fn remove_ipv4_route(
&self,
_name: &str,
_address: Ipv4Addr,
_cidr_prefix: u8,
) -> Result<(), Error> {
Ok(())
}
async fn add_ipv4_ip(
&self,
_name: &str,
_address: Ipv4Addr,
_cidr_prefix: u8,
) -> Result<(), Error> {
Ok(())
}
async fn set_link_status(&self, _name: &str, _up: bool) -> Result<(), Error> {
Ok(())
}
async fn remove_ip(&self, _name: &str, _ip: Option<Ipv4Addr>) -> Result<(), Error> {
Ok(())
}
async fn wait_interface_show(&self, _name: &str) -> Result<(), Error> {
return Ok(());
}
async fn set_mtu(&self, _name: &str, _mtu: u32) -> Result<(), Error> {
Ok(())
}
}
fn cidr_to_subnet_mask(prefix_length: u8) -> Ipv4Addr {
if prefix_length > 32 {
panic!("Invalid CIDR prefix length");
}
let subnet_mask: u32 = (!0u32)
.checked_shl(32 - u32::from(prefix_length))
.unwrap_or(0);
Ipv4Addr::new(
((subnet_mask >> 24) & 0xFF) as u8,
((subnet_mask >> 16) & 0xFF) as u8,
((subnet_mask >> 8) & 0xFF) as u8,
(subnet_mask & 0xFF) as u8,
)
}
async fn run_shell_cmd(cmd: &str) -> Result<(), Error> {
let cmd_out: std::process::Output;
let stdout: String;
let stderr: String;
#[cfg(target_os = "windows")]
{
const CREATE_NO_WINDOW: u32 = 0x08000000;
cmd_out = Command::new("cmd")
.stdin(std::process::Stdio::null())
.arg("/C")
.arg(cmd)
.creation_flags(CREATE_NO_WINDOW)
.output()
.await?;
stdout = crate::utils::utf8_or_gbk_to_string(cmd_out.stdout.as_slice());
stderr = crate::utils::utf8_or_gbk_to_string(cmd_out.stderr.as_slice());
};
#[cfg(not(target_os = "windows"))]
{
cmd_out = Command::new("sh").arg("-c").arg(cmd).output().await?;
stdout = String::from_utf8_lossy(cmd_out.stdout.as_slice()).to_string();
stderr = String::from_utf8_lossy(cmd_out.stderr.as_slice()).to_string();
};
let ec = cmd_out.status.code();
let succ = cmd_out.status.success();
tracing::info!(?cmd, ?ec, ?succ, ?stdout, ?stderr, "run shell cmd");
if !cmd_out.status.success() {
return Err(Error::ShellCommandError(stdout + &stderr));
}
Ok(())
}
pub struct DummyIfConfiger {}
#[async_trait]
impl IfConfiguerTrait for DummyIfConfiger {}
#[cfg(any(target_os = "linux"))]
pub type IfConfiger = netlink::NetlinkIfConfiger;
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
pub type IfConfiger = darwin::MacIfConfiger;
#[cfg(target_os = "windows")]
pub type IfConfiger = windows::WindowsIfConfiger;
#[cfg(not(any(
target_os = "macos",
target_os = "linux",
target_os = "windows",
target_os = "freebsd",
)))]
pub type IfConfiger = DummyIfConfiger;

View File

@@ -0,0 +1,577 @@
use std::{
ffi::CString,
fmt::Debug,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
num::NonZero,
os::fd::AsRawFd,
};
use anyhow::Context;
use async_trait::async_trait;
use cidr::IpInet;
use netlink_packet_core::{
NetlinkDeserializable, NetlinkHeader, NetlinkMessage, NetlinkPayload, NetlinkSerializable,
NLM_F_ACK, NLM_F_CREATE, NLM_F_DUMP, NLM_F_EXCL, NLM_F_REQUEST,
};
use netlink_packet_route::{
address::{AddressAttribute, AddressMessage},
route::{
RouteAddress, RouteAttribute, RouteHeader, RouteMessage, RouteProtocol, RouteScope,
RouteType,
},
AddressFamily, RouteNetlinkMessage,
};
use netlink_sys::{protocols::NETLINK_ROUTE, Socket, SocketAddr};
use nix::{
ifaddrs::getifaddrs,
libc::{self, ifreq, ioctl, Ioctl, SIOCGIFFLAGS, SIOCGIFMTU, SIOCSIFFLAGS, SIOCSIFMTU},
net::if_::InterfaceFlags,
sys::socket::SockaddrLike as _,
};
use pnet::ipnetwork::ip_mask_to_prefix;
use super::{route::Route, Error, IfConfiguerTrait};
pub(crate) fn dummy_socket() -> Result<std::net::UdpSocket, Error> {
Ok(std::net::UdpSocket::bind("0:0")?)
}
fn build_ifreq(name: &str) -> ifreq {
let c_str = CString::new(name).unwrap();
let mut ifr: ifreq = unsafe { std::mem::zeroed() };
let name_bytes = c_str.as_bytes_with_nul();
for (i, &b) in name_bytes.iter().enumerate() {
ifr.ifr_name[i] = b as libc::c_char;
}
ifr
}
fn send_netlink_req<T: NetlinkDeserializable + NetlinkSerializable + Debug>(
req: T,
flags: u16,
) -> Result<Socket, Error> {
let mut socket = Socket::new(NETLINK_ROUTE)?;
socket.bind_auto()?;
socket.connect(&SocketAddr::new(0, 0))?;
let mut req: NetlinkMessage<T> =
NetlinkMessage::new(NetlinkHeader::default(), NetlinkPayload::InnerMessage(req));
req.header.flags = flags;
req.finalize();
let mut buf = vec![0; req.header.length as _];
req.serialize(&mut buf);
tracing::debug!("net link request >>> {:?}", req);
socket.send(&buf, 0)?;
Ok(socket)
}
fn send_netlink_req_and_wait_one_resp<T: NetlinkDeserializable + NetlinkSerializable + Debug>(
req: T,
is_remove: bool,
) -> Result<(), Error> {
let socket = send_netlink_req(
req,
NLM_F_ACK | NLM_F_CREATE | NLM_F_REQUEST | if !is_remove { NLM_F_EXCL } else { 0 },
)?;
let resp = socket.recv_from_full()?;
let ret = NetlinkMessage::<T>::deserialize(&resp.0)
.with_context(|| "Failed to deserialize netlink message")?;
tracing::debug!("net link response <<< {:?}", ret);
match ret.payload {
NetlinkPayload::Error(e) => {
if e.code == NonZero::new(0) {
return Ok(());
} else {
return Err(e.to_io().into());
}
}
p => {
tracing::error!("Unexpected netlink response: {:?}", p);
return Err(anyhow::anyhow!("Unexpected netlink response").into());
}
}
}
fn addr_to_ip(addr: RouteAddress) -> Option<IpAddr> {
match addr {
RouteAddress::Inet(addr) => Some(addr.into()),
RouteAddress::Inet6(addr) => Some(addr.into()),
_ => None,
}
}
impl From<RouteMessage> for Route {
fn from(msg: RouteMessage) -> Self {
let mut gateway = None;
let mut source = None;
let mut source_hint = None;
let mut destination = None;
let mut ifindex = None;
let mut metric = None;
for attr in msg.attributes {
match attr {
RouteAttribute::Source(addr) => {
source = addr_to_ip(addr);
}
RouteAttribute::PrefSource(addr) => {
source_hint = addr_to_ip(addr);
}
RouteAttribute::Destination(addr) => {
destination = addr_to_ip(addr);
}
RouteAttribute::Gateway(addr) => {
gateway = addr_to_ip(addr);
}
RouteAttribute::Oif(i) => {
ifindex = Some(i);
}
RouteAttribute::Priority(priority) => {
metric = Some(priority);
}
_ => {}
}
}
// rtnetlink gives None instead of 0.0.0.0 for the default route, but we'll convert to 0 here to make it match the other platforms
let destination = destination.unwrap_or_else(|| match msg.header.address_family {
AddressFamily::Inet => Ipv4Addr::UNSPECIFIED.into(),
AddressFamily::Inet6 => Ipv6Addr::UNSPECIFIED.into(),
_ => panic!("invalid destination family"),
});
Self {
destination,
prefix: msg.header.destination_prefix_length,
source,
source_prefix: msg.header.source_prefix_length,
source_hint,
gateway,
ifindex,
table: msg.header.table,
metric,
}
}
}
pub struct NetlinkIfConfiger {}
impl NetlinkIfConfiger {
fn get_interface_index(name: &str) -> Result<u32, Error> {
let name = CString::new(name).with_context(|| "failed to convert interface name")?;
match unsafe { libc::if_nametoindex(name.as_ptr()) } {
0 => Err(std::io::Error::last_os_error().into()),
n => Ok(n),
}
}
fn get_prefix_len(name: &str, ip: Ipv4Addr) -> Result<u8, Error> {
let addrs = Self::list_addresses(name)?;
for addr in addrs {
if addr.address() == IpAddr::V4(ip) {
return Ok(addr.network_length());
}
}
Err(Error::NotFound)
}
fn remove_one_ip(name: &str, ip: Ipv4Addr, prefix_len: u8) -> Result<(), Error> {
let mut message = AddressMessage::default();
message.header.prefix_len = prefix_len;
message.header.index = NetlinkIfConfiger::get_interface_index(name)?;
message.header.family = AddressFamily::Inet;
message
.attributes
.push(AddressAttribute::Address(std::net::IpAddr::V4(ip)));
send_netlink_req_and_wait_one_resp::<RouteNetlinkMessage>(
RouteNetlinkMessage::DelAddress(message),
true,
)
}
pub(crate) fn mtu_op<T: TryInto<Ioctl>>(
name: &str,
op: T,
value: libc::c_int,
) -> Result<u32, Error>
where
<T as TryInto<Ioctl>>::Error: Debug,
{
let dummy_socket = dummy_socket()?;
let mut ifr: ifreq = build_ifreq(name);
unsafe {
ifr.ifr_ifru.ifru_mtu = value;
// 使用ioctl获取MTU
if ioctl(dummy_socket.as_raw_fd(), op.try_into().unwrap(), &ifr) != 0 {
return Err(std::io::Error::last_os_error().into());
}
}
Ok(unsafe { ifr.ifr_ifru.ifru_mtu as u32 })
}
fn mtu(name: &str) -> Result<u32, Error> {
Self::mtu_op(name, SIOCGIFMTU, 0)
}
pub fn list_addresses(name: &str) -> Result<Vec<IpInet>, Error> {
let mut result = vec![];
for interface in getifaddrs()
.with_context(|| "failed to call getifaddrs")?
.filter(|x| x.interface_name == name)
{
let (Some(address), Some(netmask)) = (interface.address, interface.netmask) else {
continue;
};
use nix::sys::socket::AddressFamily::{Inet, Inet6};
let (address, netmask) = match (address.family(), netmask.family()) {
(Some(Inet), Some(Inet)) => (
IpAddr::V4(address.as_sockaddr_in().unwrap().ip().into()),
IpAddr::V4(netmask.as_sockaddr_in().unwrap().ip().into()),
),
(Some(Inet6), Some(Inet6)) => (
IpAddr::V6(address.as_sockaddr_in6().unwrap().ip()),
IpAddr::V6(netmask.as_sockaddr_in6().unwrap().ip()),
),
(_, _) => continue,
};
let prefix = ip_mask_to_prefix(netmask).unwrap();
result.push(IpInet::new(address, prefix).unwrap());
}
Ok(result)
}
pub(crate) fn set_flags_op<T: TryInto<Ioctl>>(
name: &str,
op: T,
flags: InterfaceFlags,
) -> Result<InterfaceFlags, Error>
where
<T as TryInto<Ioctl>>::Error: Debug,
{
let mut req = build_ifreq(name);
req.ifr_ifru.ifru_flags = flags.bits() as _;
let socket = dummy_socket()?;
unsafe {
if ioctl(socket.as_raw_fd(), op.try_into().unwrap(), &req) != 0 {
return Err(std::io::Error::last_os_error().into());
}
Ok(InterfaceFlags::from_bits_truncate(
req.ifr_ifru.ifru_flags as _,
))
}
}
pub(crate) fn set_flags(name: &str, flags: InterfaceFlags) -> Result<InterfaceFlags, Error> {
Self::set_flags_op(name, SIOCSIFFLAGS, flags)
}
pub(crate) fn get_flags(name: &str) -> Result<InterfaceFlags, Error> {
Self::set_flags_op(name, SIOCGIFFLAGS, InterfaceFlags::empty())
}
fn list_routes() -> Result<Vec<RouteMessage>, Error> {
let mut message = RouteMessage::default();
message.header.table = RouteHeader::RT_TABLE_UNSPEC;
message.header.protocol = RouteProtocol::Unspec;
message.header.scope = RouteScope::Universe;
message.header.kind = RouteType::Unicast;
message.header.address_family = AddressFamily::Inet;
message.header.destination_prefix_length = 0;
message.header.source_prefix_length = 0;
let s = send_netlink_req(
RouteNetlinkMessage::GetRoute(message),
NLM_F_REQUEST | NLM_F_DUMP,
)?;
let mut ret_vec = vec![];
let mut resp = Vec::<u8>::new();
loop {
if resp.len() == 0 {
let (new_resp, _) = s.recv_from_full()?;
resp = new_resp;
}
let ret = NetlinkMessage::<RouteNetlinkMessage>::deserialize(&resp)
.with_context(|| "Failed to deserialize netlink message")?;
resp = resp.split_off(ret.buffer_len());
tracing::debug!("net link response <<< {:?}", ret);
match ret.payload {
NetlinkPayload::Error(e) => {
if e.code == NonZero::new(0) {
continue;
} else {
return Err(e.to_io().into());
}
}
NetlinkPayload::InnerMessage(RouteNetlinkMessage::NewRoute(m)) => {
tracing::debug!("net link response <<< {:?}", m);
ret_vec.push(m);
}
NetlinkPayload::Done(_) => {
break;
}
p => {
tracing::error!("Unexpected netlink response: {:?}", p);
return Err(anyhow::anyhow!("Unexpected netlink response").into());
}
}
}
Ok(ret_vec)
}
}
#[async_trait]
impl IfConfiguerTrait for NetlinkIfConfiger {
async fn add_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
let mut message = RouteMessage::default();
message.header.table = RouteHeader::RT_TABLE_MAIN;
message.header.protocol = RouteProtocol::Static;
message.header.scope = RouteScope::Universe;
message.header.kind = RouteType::Unicast;
message.header.address_family = AddressFamily::Inet;
// metric
message.attributes.push(RouteAttribute::Priority(65535));
// output interface
message
.attributes
.push(RouteAttribute::Oif(NetlinkIfConfiger::get_interface_index(
name,
)?));
// source address
message.header.destination_prefix_length = cidr_prefix;
message
.attributes
.push(RouteAttribute::Destination(RouteAddress::Inet(address)));
send_netlink_req_and_wait_one_resp(RouteNetlinkMessage::NewRoute(message), false)
}
async fn remove_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
let routes = Self::list_routes()?;
let ifidx = NetlinkIfConfiger::get_interface_index(name)?;
for msg in routes {
let other_route: Route = msg.clone().into();
if other_route.destination == std::net::IpAddr::V4(address)
&& other_route.prefix == cidr_prefix
&& other_route.ifindex == Some(ifidx)
{
send_netlink_req_and_wait_one_resp(RouteNetlinkMessage::DelRoute(msg), true)?;
return Ok(());
}
}
Ok(())
}
async fn add_ipv4_ip(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
let mut message = AddressMessage::default();
message.header.prefix_len = cidr_prefix;
message.header.index = NetlinkIfConfiger::get_interface_index(name)?;
message.header.family = AddressFamily::Inet;
message
.attributes
.push(AddressAttribute::Address(std::net::IpAddr::V4(address)));
// for IPv4 the IFA_LOCAL address can be set to the same value as
// IFA_ADDRESS
message
.attributes
.push(AddressAttribute::Local(std::net::IpAddr::V4(address)));
// set the IFA_BROADCAST address as well
if cidr_prefix == 32 {
message
.attributes
.push(AddressAttribute::Broadcast(address));
} else {
let ip_addr = u32::from(address);
let brd = Ipv4Addr::from((0xffff_ffff_u32) >> u32::from(cidr_prefix) | ip_addr);
message.attributes.push(AddressAttribute::Broadcast(brd));
};
send_netlink_req_and_wait_one_resp::<RouteNetlinkMessage>(
RouteNetlinkMessage::NewAddress(message),
false,
)
}
async fn set_link_status(&self, name: &str, up: bool) -> Result<(), Error> {
let mut flags = Self::get_flags(name)?;
flags.set(InterfaceFlags::IFF_UP, up);
Self::set_flags(name, flags)?;
Ok(())
}
async fn remove_ip(&self, name: &str, ip: Option<Ipv4Addr>) -> Result<(), Error> {
if ip.is_none() {
let addrs = Self::list_addresses(name)?;
for addr in addrs {
if let IpAddr::V4(ipv4) = addr.address() {
Self::remove_one_ip(name, ipv4, addr.network_length())?;
}
}
} else {
let ip = ip.unwrap();
let prefix_len = Self::get_prefix_len(name, ip)?;
Self::remove_one_ip(name, ip, prefix_len)?;
}
Ok(())
}
async fn set_mtu(&self, name: &str, mtu: u32) -> Result<(), Error> {
Self::mtu_op(name, SIOCSIFMTU, mtu as libc::c_int)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
const DUMMY_IFACE_NAME: &str = "dummy";
fn run_cmd(cmd: &str) -> String {
let output = std::process::Command::new("sh")
.arg("-c")
.arg(cmd)
.output()
.expect("failed to execute process");
String::from_utf8(output.stdout).unwrap()
}
struct PrepareEnv {}
impl PrepareEnv {
fn new() -> Self {
let _ = run_cmd(&format!("sudo ip link add {} type dummy", DUMMY_IFACE_NAME));
PrepareEnv {}
}
}
impl Drop for PrepareEnv {
fn drop(&mut self) {
let _ = run_cmd(&format!("sudo ip link del {}", DUMMY_IFACE_NAME));
}
}
#[serial_test::serial]
#[tokio::test]
async fn addr_test() {
let _prepare_env = PrepareEnv::new();
let ifcfg = NetlinkIfConfiger {};
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
ifcfg
.add_ipv4_ip(DUMMY_IFACE_NAME, "10.44.44.4".parse().unwrap(), 24)
.await
.unwrap();
let addrs = NetlinkIfConfiger::list_addresses(DUMMY_IFACE_NAME).unwrap();
assert_eq!(addrs.len(), 1);
assert_eq!(
addrs[0].address(),
IpAddr::V4("10.44.44.4".parse().unwrap())
);
assert_eq!(addrs[0].network_length(), 24);
NetlinkIfConfiger::remove_one_ip(DUMMY_IFACE_NAME, "10.44.44.4".parse().unwrap(), 24)
.unwrap();
let addrs = NetlinkIfConfiger::list_addresses(DUMMY_IFACE_NAME).unwrap();
assert_eq!(addrs.len(), 0);
let old_mtu = NetlinkIfConfiger::mtu(DUMMY_IFACE_NAME).unwrap();
assert_ne!(old_mtu, 0);
let new_mtu = old_mtu + 1;
ifcfg.set_mtu(DUMMY_IFACE_NAME, new_mtu).await.unwrap();
let mtu = NetlinkIfConfiger::mtu(DUMMY_IFACE_NAME).unwrap();
assert_eq!(mtu, new_mtu);
ifcfg
.set_link_status(DUMMY_IFACE_NAME, false)
.await
.unwrap();
ifcfg.set_link_status(DUMMY_IFACE_NAME, true).await.unwrap();
}
#[serial_test::serial]
#[tokio::test]
async fn route_test() {
let _prepare_env = PrepareEnv::new();
let ret = NetlinkIfConfiger::list_routes().unwrap();
let ifcfg = NetlinkIfConfiger {};
println!("{:?}", ret);
ifcfg.set_link_status(DUMMY_IFACE_NAME, true).await.unwrap();
ifcfg
.add_ipv4_route(DUMMY_IFACE_NAME, "10.5.5.0".parse().unwrap(), 24)
.await
.unwrap();
let routes = NetlinkIfConfiger::list_routes()
.unwrap()
.into_iter()
.map(Route::from)
.map(|x| x.destination)
.collect::<Vec<_>>();
assert!(routes.contains(&IpAddr::V4("10.5.5.0".parse().unwrap())));
ifcfg
.remove_ipv4_route(DUMMY_IFACE_NAME, "10.5.5.0".parse().unwrap(), 24)
.await
.unwrap();
let routes = NetlinkIfConfiger::list_routes()
.unwrap()
.into_iter()
.map(Route::from)
.map(|x| x.destination)
.collect::<Vec<_>>();
assert!(!routes.contains(&IpAddr::V4("10.5.5.0".parse().unwrap())));
}
}

View File

@@ -0,0 +1,133 @@
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Route {
/// Network address of the destination. `0.0.0.0` with a prefix of `0` is considered a default route.
pub destination: IpAddr,
/// Length of network prefix in the destination address.
pub prefix: u8,
/// The address of the next hop of this route.
///
/// On macOS, this must be `Some` if ifindex is `None`
pub gateway: Option<IpAddr>,
/// The index of the local interface through which the next hop of this route may be reached.
///
/// On macOS, this must be `Some` if gateway is `None`
pub ifindex: Option<u32>,
#[cfg(target_os = "linux")]
/// The routing table this route belongs to.
pub table: u8,
/// Network address of the source.
#[cfg(target_os = "linux")]
pub source: Option<IpAddr>,
/// Prefix length of the source address.
#[cfg(target_os = "linux")]
pub source_prefix: u8,
/// Source address hint. Does not influence routing.
#[cfg(target_os = "linux")]
pub source_hint: Option<IpAddr>,
#[cfg(any(target_os = "windows", target_os = "linux"))]
/// The route metric offset value for this route.
pub metric: Option<u32>,
#[cfg(target_os = "windows")]
/// Luid of the local interface through which the next hop of this route may be reached.
///
/// If luid is specified, ifindex is optional.
pub luid: Option<u64>,
}
impl Route {
/// Create a route that matches a given destination network.
///
/// Either the gateway or interface should be set before attempting to add to a routing table.
pub fn new(destination: IpAddr, prefix: u8) -> Self {
Self {
destination,
prefix,
gateway: None,
ifindex: None,
#[cfg(target_os = "linux")]
// default to main table
table: 254,
#[cfg(target_os = "linux")]
source: None,
#[cfg(target_os = "linux")]
source_prefix: 0,
#[cfg(target_os = "linux")]
source_hint: None,
#[cfg(any(target_os = "windows", target_os = "linux"))]
metric: None,
#[cfg(target_os = "windows")]
luid: None,
}
}
/// Set the next next hop gateway for this route.
pub fn with_gateway(mut self, gateway: IpAddr) -> Self {
self.gateway = Some(gateway);
self
}
/// Set the index of the local interface through which the next hop of this route should be reached.
pub fn with_ifindex(mut self, ifindex: u32) -> Self {
self.ifindex = Some(ifindex);
self
}
/// Set table the route will be installed in.
#[cfg(target_os = "linux")]
pub fn with_table(mut self, table: u8) -> Self {
self.table = table;
self
}
/// Set source.
#[cfg(target_os = "linux")]
pub fn with_source(mut self, source: IpAddr, prefix: u8) -> Self {
self.source = Some(source);
self.source_prefix = prefix;
self
}
/// Set source hint.
#[cfg(target_os = "linux")]
pub fn with_source_hint(mut self, hint: IpAddr) -> Self {
self.source_hint = Some(hint);
self
}
/// Set route metric.
#[cfg(any(target_os = "windows", target_os = "linux"))]
pub fn with_metric(mut self, metric: u32) -> Self {
self.metric = Some(metric);
self
}
/// Set luid of the local interface through which the next hop of this route should be reached.
#[cfg(target_os = "windows")]
pub fn with_luid(mut self, luid: u64) -> Self {
self.luid = Some(luid);
self
}
/// Get the netmask covering the network portion of the destination address.
pub fn mask(&self) -> IpAddr {
match self.destination {
IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::from(
u32::MAX.checked_shl(32 - self.prefix as u32).unwrap_or(0),
)),
IpAddr::V6(_) => IpAddr::V6(Ipv6Addr::from(
u128::MAX.checked_shl(128 - self.prefix as u32).unwrap_or(0),
)),
}
}
}

View File

@@ -0,0 +1,166 @@
use std::net::Ipv4Addr;
use async_trait::async_trait;
use super::{cidr_to_subnet_mask, run_shell_cmd, Error, IfConfiguerTrait};
pub struct WindowsIfConfiger {}
impl WindowsIfConfiger {
pub fn get_interface_index(name: &str) -> Option<u32> {
crate::arch::windows::find_interface_index(name).ok()
}
async fn list_ipv4(name: &str) -> Result<Vec<Ipv4Addr>, Error> {
use anyhow::Context;
use network_interface::NetworkInterfaceConfig;
use std::net::IpAddr;
let ret = network_interface::NetworkInterface::show().with_context(|| "show interface")?;
let addrs = ret
.iter()
.filter_map(|x| {
if x.name != name {
return None;
}
Some(x.addr.clone())
})
.flat_map(|x| x)
.map(|x| x.ip())
.filter_map(|x| {
if let IpAddr::V4(ipv4) = x {
Some(ipv4)
} else {
None
}
})
.collect::<Vec<_>>();
Ok(addrs)
}
async fn remove_one_ipv4(name: &str, ip: Ipv4Addr) -> Result<(), Error> {
run_shell_cmd(
format!(
"netsh interface ipv4 delete address {} address={}",
name,
ip.to_string()
)
.as_str(),
)
.await
}
}
#[cfg(target_os = "windows")]
#[async_trait]
impl IfConfiguerTrait for WindowsIfConfiger {
async fn add_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
let Some(idx) = Self::get_interface_index(name) else {
return Err(Error::NotFound);
};
run_shell_cmd(
format!(
"route ADD {} MASK {} 10.1.1.1 IF {} METRIC 9000",
address,
cidr_to_subnet_mask(cidr_prefix),
idx
)
.as_str(),
)
.await
}
async fn remove_ipv4_route(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
let Some(idx) = Self::get_interface_index(name) else {
return Err(Error::NotFound);
};
run_shell_cmd(
format!(
"route DELETE {} MASK {} IF {}",
address,
cidr_to_subnet_mask(cidr_prefix),
idx
)
.as_str(),
)
.await
}
async fn add_ipv4_ip(
&self,
name: &str,
address: Ipv4Addr,
cidr_prefix: u8,
) -> Result<(), Error> {
run_shell_cmd(
format!(
"netsh interface ipv4 add address {} address={} mask={}",
name,
address,
cidr_to_subnet_mask(cidr_prefix)
)
.as_str(),
)
.await
}
async fn set_link_status(&self, name: &str, up: bool) -> Result<(), Error> {
run_shell_cmd(
format!(
"netsh interface set interface {} {}",
name,
if up { "enable" } else { "disable" }
)
.as_str(),
)
.await
}
async fn remove_ip(&self, name: &str, ip: Option<Ipv4Addr>) -> Result<(), Error> {
if ip.is_none() {
for ip in Self::list_ipv4(name).await?.iter() {
Self::remove_one_ipv4(name, *ip).await?;
}
Ok(())
} else {
Self::remove_one_ipv4(name, ip.unwrap()).await
}
}
async fn wait_interface_show(&self, name: &str) -> Result<(), Error> {
Ok(
tokio::time::timeout(std::time::Duration::from_secs(10), async move {
loop {
if let Some(idx) = Self::get_interface_index(name) {
tracing::info!(?name, ?idx, "Interface found");
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Ok::<(), Error>(())
})
.await??,
)
}
async fn set_mtu(&self, name: &str, mtu: u32) -> Result<(), Error> {
let _ = run_shell_cmd(
format!("netsh interface ipv6 set subinterface {} mtu={}", name, mtu).as_str(),
)
.await;
run_shell_cmd(
format!("netsh interface ipv4 set subinterface {} mtu={}", name, mtu).as_str(),
)
.await
}
}

View File

@@ -1,9 +1,10 @@
use std::{
fmt::Debug,
future,
io::Write as _,
sync::{Arc, Mutex},
};
use tokio::task::JoinSet;
use tokio::{task::JoinSet, time::timeout};
use tracing::Instrument;
pub mod compressor;
@@ -46,16 +47,13 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
origin: String,
) {
let js = Arc::downgrade(&js);
let o = origin.clone();
tokio::spawn(
async move {
loop {
while js.strong_count() > 0 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if js.weak_count() == 0 {
tracing::info!("joinset task exit");
break;
}
future::poll_fn(|cx| {
let fut = future::poll_fn(|cx| {
let Some(js) = js.upgrade() else {
return std::task::Poll::Ready(());
};
@@ -63,15 +61,24 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
let mut js = js.lock().unwrap();
while !js.is_empty() {
let ret = js.poll_join_next(cx);
if ret.is_pending() {
return std::task::Poll::Pending;
match ret {
std::task::Poll::Ready(Some(_)) => {
continue;
}
std::task::Poll::Ready(None) => {
break;
}
std::task::Poll::Pending => {
return std::task::Poll::Pending;
}
}
}
std::task::Poll::Ready(())
})
.await;
});
let _ = timeout(std::time::Duration::from_secs(5), fut).await;
}
tracing::debug!(?o, "joinset task exit");
}
.instrument(tracing::info_span!(
"join_joinset_background",
@@ -81,7 +88,17 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
}
pub fn get_machine_id() -> uuid::Uuid {
// TODO: load from local file
// a path same as the binary
let machine_id_file = std::env::current_exe()
.map(|x| x.with_file_name("et_machine_id"))
.unwrap_or_else(|_| std::path::PathBuf::from("et_machine_id"));
// try load from local file
if let Ok(mid) = std::fs::read_to_string(&machine_id_file) {
if let Ok(mid) = uuid::Uuid::parse_str(mid.trim()) {
return mid;
}
}
#[cfg(any(
target_os = "linux",
@@ -95,7 +112,7 @@ pub fn get_machine_id() -> uuid::Uuid {
crate::tunnel::generate_digest_from_str("", x.as_str(), &mut b);
uuid::Uuid::from_bytes(b)
})
.unwrap_or(uuid::Uuid::new_v4());
.ok();
#[cfg(not(any(
target_os = "linux",
@@ -103,9 +120,18 @@ pub fn get_machine_id() -> uuid::Uuid {
target_os = "windows",
target_os = "freebsd"
)))]
let gen_mid = None;
if gen_mid.is_some() {
return gen_mid.unwrap();
}
let gen_mid = uuid::Uuid::new_v4();
// TODO: save to local file
// try save to local file
if let Ok(mut file) = std::fs::File::create(machine_id_file) {
let _ = file.write_all(gen_mid.to_string().as_bytes());
}
gen_mid
}
@@ -147,5 +173,6 @@ mod tests {
drop(js);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
assert_eq!(weak_js.weak_count(), 0);
assert_eq!(weak_js.strong_count(), 0);
}
}

View File

@@ -1,5 +1,5 @@
use std::collections::BTreeSet;
use std::net::{IpAddr, SocketAddr};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -8,6 +8,8 @@ use crate::proto::common::{NatType, StunInfo};
use anyhow::Context;
use chrono::Local;
use crossbeam::atomic::AtomicCell;
use hickory_resolver::config::{NameServerConfig, Protocol, ResolverConfig, ResolverOpts};
use hickory_resolver::TokioAsyncResolver;
use rand::seq::IteratorRandom;
use tokio::net::{lookup_host, UdpSocket};
use tokio::sync::{broadcast, Mutex};
@@ -22,21 +24,68 @@ use crate::common::error::Error;
use super::stun_codec_ext::*;
pub fn get_default_resolver_config() -> ResolverConfig {
let mut default_resolve_config = ResolverConfig::new();
default_resolve_config.add_name_server(NameServerConfig::new(
"223.5.5.5:53".parse().unwrap(),
Protocol::Udp,
));
default_resolve_config.add_name_server(NameServerConfig::new(
"180.184.1.1:53".parse().unwrap(),
Protocol::Udp,
));
default_resolve_config
}
pub async fn resolve_txt_record(
domain_name: &str,
resolver: &TokioAsyncResolver,
) -> Result<String, Error> {
let response = resolver.txt_lookup(domain_name).await.with_context(|| {
format!(
"txt_lookup failed, domain_name: {}",
domain_name.to_string()
)
})?;
let txt_record = response.iter().next().with_context(|| {
format!(
"no txt record found, domain_name: {}",
domain_name.to_string()
)
})?;
let txt_data = String::from_utf8_lossy(&txt_record.txt_data()[0]);
tracing::info!(?txt_data, ?domain_name, "get txt record");
Ok(txt_data.to_string())
}
struct HostResolverIter {
hostnames: Vec<String>,
ips: Vec<SocketAddr>,
max_ip_per_domain: u32,
use_ipv6: bool,
}
impl HostResolverIter {
fn new(hostnames: Vec<String>, max_ip_per_domain: u32) -> Self {
fn new(hostnames: Vec<String>, max_ip_per_domain: u32, use_ipv6: bool) -> Self {
Self {
hostnames,
ips: vec![],
max_ip_per_domain,
use_ipv6,
}
}
async fn get_txt_record(domain_name: &str) -> Result<Vec<String>, Error> {
let resolver = TokioAsyncResolver::tokio_from_system_conf().unwrap_or(
TokioAsyncResolver::tokio(get_default_resolver_config(), ResolverOpts::default()),
);
let txt_data = resolve_txt_record(domain_name, &resolver).await?;
Ok(txt_data.split(" ").map(|x| x.to_string()).collect())
}
#[async_recursion::async_recursion]
async fn next(&mut self) -> Option<SocketAddr> {
if self.ips.is_empty() {
@@ -51,10 +100,35 @@ impl HostResolverIter {
format!("{}:3478", host)
};
if host.starts_with("txt:") {
let domain_name = host.trim_start_matches("txt:");
match Self::get_txt_record(domain_name).await {
Ok(hosts) => {
tracing::info!(
?domain_name,
?hosts,
"get txt record success when resolve stun server"
);
// insert hosts to the head of hostnames
self.hostnames.splice(0..0, hosts.into_iter());
}
Err(e) => {
tracing::warn!(
?domain_name,
?e,
"get txt record failed when resolve stun server"
);
}
}
return self.next().await;
}
let use_ipv6 = self.use_ipv6;
match lookup_host(&host).await {
Ok(ips) => {
self.ips = ips
.filter(|x| x.is_ipv4())
.filter(|x| if use_ipv6 { x.is_ipv6() } else { x.is_ipv4() })
.choose_multiple(&mut rand::thread_rng(), self.max_ip_per_domain as usize);
if self.ips.is_empty() {
@@ -400,7 +474,7 @@ impl UdpNatTypeDetectResult {
// find resp with distinct stun server
self.stun_resps
.iter()
.map(|x| x.stun_server_addr)
.map(|x| x.recv_from_addr)
.collect::<BTreeSet<_>>()
.len()
}
@@ -555,8 +629,11 @@ impl UdpNatTypeDetector {
udp: Arc<UdpSocket>,
) -> Result<UdpNatTypeDetectResult, Error> {
let mut stun_servers = vec![];
let mut host_resolver =
HostResolverIter::new(self.stun_server_hosts.clone(), self.max_ip_per_domain);
let mut host_resolver = HostResolverIter::new(
self.stun_server_hosts.clone(),
self.max_ip_per_domain,
false,
);
while let Some(addr) = host_resolver.next().await {
stun_servers.push(addr);
}
@@ -602,7 +679,9 @@ pub trait StunInfoCollectorTrait: Send + Sync {
pub struct StunInfoCollector {
stun_servers: Arc<RwLock<Vec<String>>>,
stun_servers_v6: Arc<RwLock<Vec<String>>>,
udp_nat_test_result: Arc<RwLock<Option<UdpNatTypeDetectResult>>>,
public_ipv6: Arc<AtomicCell<Option<Ipv6Addr>>>,
nat_test_result_time: Arc<AtomicCell<chrono::DateTime<Local>>>,
redetect_notify: Arc<tokio::sync::Notify>,
tasks: std::sync::Mutex<JoinSet<()>>,
@@ -621,7 +700,12 @@ impl StunInfoCollectorTrait for StunInfoCollector {
udp_nat_type: result.nat_type() as i32,
tcp_nat_type: 0,
last_update_time: self.nat_test_result_time.load().timestamp(),
public_ip: result.public_ips().iter().map(|x| x.to_string()).collect(),
public_ip: result
.public_ips()
.iter()
.map(|x| x.to_string())
.chain(self.public_ipv6.load().map(|x| x.to_string()))
.collect(),
min_port: result.min_port() as u32,
max_port: result.max_port() as u32,
}
@@ -640,7 +724,7 @@ impl StunInfoCollectorTrait for StunInfoCollector {
if stun_servers.is_empty() {
let mut host_resolver =
HostResolverIter::new(self.stun_servers.read().unwrap().clone(), 2);
HostResolverIter::new(self.stun_servers.read().unwrap().clone(), 2, false);
while let Some(addr) = host_resolver.next().await {
stun_servers.push(addr);
if stun_servers.len() >= 2 {
@@ -680,7 +764,9 @@ impl StunInfoCollector {
pub fn new(stun_servers: Vec<String>) -> Self {
Self {
stun_servers: Arc::new(RwLock::new(stun_servers)),
stun_servers_v6: Arc::new(RwLock::new(Self::get_default_servers_v6())),
udp_nat_test_result: Arc::new(RwLock::new(None)),
public_ipv6: Arc::new(AtomicCell::new(None)),
nat_test_result_time: Arc::new(AtomicCell::new(Local::now())),
redetect_notify: Arc::new(tokio::sync::Notify::new()),
tasks: std::sync::Mutex::new(JoinSet::new()),
@@ -696,28 +782,42 @@ impl StunInfoCollector {
// NOTICE: we may need to choose stun stun server based on geo location
// stun server cross nation may return a external ip address with high latency and loss rate
vec![
"txt:stun.easytier.cn",
"stun.miwifi.com",
"stun.chat.bilibili.com",
"stun.hitv.com",
"stun.cdnbye.com",
"stun.douyucdn.cn:18000",
"fwa.lifesizecloud.com",
"global.turn.twilio.com",
"turn.cloudflare.com",
"stun.isp.net.au",
"stun.nextcloud.com",
"stun.freeswitch.org",
"stun.voip.blackberry.com",
"stunserver.stunprotocol.org",
"stun.sipnet.com",
"stun.radiojar.com",
"stun.sonetel.com",
]
.iter()
.map(|x| x.to_string())
.collect()
}
pub fn get_default_servers_v6() -> Vec<String> {
vec!["txt:stun-v6.easytier.cn"]
.iter()
.map(|x| x.to_string())
.collect()
}
async fn get_public_ipv6(servers: &Vec<String>) -> Option<Ipv6Addr> {
let mut ips = HostResolverIter::new(servers.to_vec(), 10, true);
while let Some(ip) = ips.next().await {
let udp = Arc::new(UdpSocket::bind(format!("[::]:0")).await.unwrap());
let ret = StunClientBuilder::new(udp.clone())
.new_stun_client(ip)
.bind_request(false, false)
.await;
tracing::debug!(?ret, "finish ipv6 udp nat type detect");
match ret.map(|x| x.mapped_socket_addr.map(|x| x.ip())) {
Ok(Some(IpAddr::V6(v6))) => {
return Some(v6);
}
_ => {}
}
}
None
}
fn start_stun_routine(&self) {
if self.started.load(std::sync::atomic::Ordering::Relaxed) {
return;
@@ -784,6 +884,30 @@ impl StunInfoCollector {
}
}
});
// for ipv6
let stun_servers = self.stun_servers_v6.clone();
let stored_ipv6 = self.public_ipv6.clone();
let redetect_notify = self.redetect_notify.clone();
self.tasks.lock().unwrap().spawn(async move {
loop {
let servers = stun_servers.read().unwrap().clone();
Self::get_public_ipv6(&servers)
.await
.map(|x| stored_ipv6.store(Some(x)));
let sleep_sec = if stored_ipv6.load().is_none() {
60
} else {
360
};
tokio::select! {
_ = redetect_notify.notified() => {}
_ = tokio::time::sleep(Duration::from_secs(sleep_sec)) => {}
}
}
});
}
pub fn update_stun_info(&self) {
@@ -862,6 +986,48 @@ mod tests {
let detector = UdpNatTypeDetector::new(stun_servers, 1);
let ret = detector.detect_nat_type(0).await;
println!("{:#?}, {:?}", ret, ret.as_ref().unwrap().nat_type());
assert_eq!(ret.unwrap().nat_type(), NatType::PortRestricted);
assert_eq!(ret.unwrap().nat_type(), NatType::Restricted);
}
#[tokio::test]
async fn test_txt_public_stun_server() {
let stun_servers = vec!["txt:stun.easytier.cn".to_string()];
let detector = UdpNatTypeDetector::new(stun_servers, 1);
let ret = detector.detect_nat_type(0).await;
println!("{:#?}, {:?}", ret, ret.as_ref().unwrap().nat_type());
assert!(!ret.unwrap().stun_resps.is_empty());
}
#[tokio::test]
async fn test_v4_stun() {
let mut udp_server = UdpTunnelListener::new("udp://0.0.0.0:55355".parse().unwrap());
let mut tasks = JoinSet::new();
tasks.spawn(async move {
udp_server.listen().await.unwrap();
loop {
udp_server.accept().await.unwrap();
}
});
let stun_servers = vec!["127.0.0.1:55355".to_string()];
let detector = UdpNatTypeDetector::new(stun_servers, 1);
let ret = detector.detect_nat_type(0).await;
println!("{:#?}, {:?}", ret, ret.as_ref().unwrap().nat_type());
assert_eq!(ret.unwrap().nat_type(), NatType::Restricted);
}
#[tokio::test]
async fn test_v6_stun() {
let mut udp_server = UdpTunnelListener::new("udp://[::]:55355".parse().unwrap());
let mut tasks = JoinSet::new();
tasks.spawn(async move {
udp_server.listen().await.unwrap();
loop {
udp_server.accept().await.unwrap();
}
});
let stun_servers = vec!["::1:55355".to_string()];
let ret = StunInfoCollector::get_public_ipv6(&stun_servers).await;
println!("{:#?}", ret);
}
}

View File

@@ -1,6 +1,15 @@
// try connect peers directly, with either its public ip or lan ip
use std::{net::SocketAddr, sync::Arc, time::Duration};
use std::{
collections::HashSet,
net::{Ipv6Addr, SocketAddr},
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
@@ -15,6 +24,7 @@ use crate::{
},
rpc_types::controller::BaseController,
},
tunnel::IpVersion,
};
use crate::proto::cli::PeerConnInfo;
@@ -29,6 +39,8 @@ use super::create_connector_by_url;
pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1;
pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300;
static TESTING: AtomicBool = AtomicBool::new(false);
#[async_trait::async_trait]
pub trait PeerManagerForDirectConnector {
async fn list_peers(&self) -> Vec<PeerId>;
@@ -70,7 +82,6 @@ struct DstListenerUrlBlackListItem(PeerId, url::Url);
struct DirectConnectorManagerData {
global_ctx: ArcGlobalCtx,
peer_manager: Arc<PeerManager>,
dst_blacklist: timedmap::TimedMap<DstBlackListItem, ()>,
dst_listener_blacklist: timedmap::TimedMap<DstListenerUrlBlackListItem, ()>,
}
@@ -79,7 +90,6 @@ impl DirectConnectorManagerData {
Self {
global_ctx,
peer_manager,
dst_blacklist: timedmap::TimedMap::new(),
dst_listener_blacklist: timedmap::TimedMap::new(),
}
}
@@ -141,7 +151,9 @@ impl DirectConnectorManager {
let peers = data.peer_manager.list_peers().await;
let mut tasks = JoinSet::new();
for peer_id in peers {
if peer_id == my_peer_id {
if peer_id == my_peer_id
|| data.peer_manager.has_directly_connected_conn(peer_id)
{
continue;
}
tasks.spawn(Self::do_try_direct_connect(data.clone(), peer_id));
@@ -164,25 +176,14 @@ impl DirectConnectorManager {
dst_peer_id: PeerId,
addr: String,
) -> Result<(), Error> {
data.dst_blacklist.cleanup();
if data
.dst_blacklist
.contains(&DstBlackListItem(dst_peer_id.clone(), addr.clone()))
{
tracing::debug!("try_connect_to_ip failed, addr in blacklist: {}", addr);
return Err(Error::UrlInBlacklist);
}
let connector = create_connector_by_url(&addr, &data.global_ctx).await?;
let connector = create_connector_by_url(&addr, &data.global_ctx, IpVersion::Both).await?;
let (peer_id, conn_id) = timeout(
std::time::Duration::from_secs(5),
data.peer_manager.try_connect(connector),
std::time::Duration::from_secs(3),
data.peer_manager.try_direct_connect(connector),
)
.await??;
// let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?;
if peer_id != dst_peer_id {
if peer_id != dst_peer_id && !TESTING.load(Ordering::Relaxed) {
tracing::info!(
"connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}",
addr,
@@ -195,6 +196,7 @@ impl DirectConnectorManager {
.await?;
return Err(Error::InvalidUrl(addr));
}
Ok(())
}
@@ -205,7 +207,7 @@ impl DirectConnectorManager {
addr: String,
) -> Result<(), Error> {
let mut rand_gen = rand::rngs::OsRng::default();
let backoff_ms = vec![1000, 2000, 4000];
let backoff_ms = vec![1000, 2000];
let mut backoff_idx = 0;
loop {
@@ -228,12 +230,6 @@ impl DirectConnectorManager {
backoff_idx += 1;
continue;
} else {
data.dst_blacklist.insert(
DstBlackListItem(dst_peer_id.clone(), addr.clone()),
(),
std::time::Duration::from_secs(DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC),
);
return ret;
}
}
@@ -264,102 +260,95 @@ impl DirectConnectorManager {
tracing::debug!(?available_listeners, "got available listeners");
let mut listener = available_listeners.get(0).ok_or(anyhow::anyhow!(
"peer {} have no valid listener",
dst_peer_id
))?;
if available_listeners.is_empty() {
return Err(anyhow::anyhow!("peer {} have no valid listener", dst_peer_id).into());
}
// if have default listener, use it first
listener = available_listeners
let listener = available_listeners
.iter()
.find(|l| l.scheme() == data.global_ctx.get_flags().default_protocol)
.unwrap_or(listener);
.unwrap_or(available_listeners.get(0).unwrap());
let mut tasks = JoinSet::new();
let mut tasks = bounded_join_set::JoinSet::new(2);
let listener_host = listener.socket_addrs(|| None).unwrap().pop();
let listener_host = listener.socket_addrs(|| None)?.pop();
match listener_host {
Some(SocketAddr::V4(_)) => {
ip_list.interface_ipv4s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});
if let Some(public_ipv4) = ip_list.public_ipv4 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(public_ipv4.to_string().as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv4,
?listener,
?dst_peer_id,
"failed to set host for public ipv4"
);
}
Some(SocketAddr::V4(s_addr)) => {
if s_addr.ip().is_unspecified() {
ip_list
.interface_ipv4s
.iter()
.chain(ip_list.public_ipv4.iter())
.for_each(|ip| {
let mut addr = (*listener).clone();
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
listener.to_string(),
));
}
}
Some(SocketAddr::V6(_)) => {
ip_list.interface_ipv6s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv6"
);
}
});
if let Some(public_ipv6) = ip_list.public_ipv6 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv6,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
}
Some(SocketAddr::V6(s_addr)) => {
if s_addr.ip().is_unspecified() {
// for ipv6, only try public ip
ip_list
.interface_ipv6s
.iter()
.chain(ip_list.public_ipv6.iter())
.filter_map(|x| Ipv6Addr::from_str(&x.to_string()).ok())
.filter(|x| {
TESTING.load(Ordering::Relaxed)
|| (!x.is_loopback()
&& !x.is_unspecified()
&& !x.is_unique_local()
&& !x.is_unicast_link_local()
&& !x.is_multicast())
})
.collect::<HashSet<_>>()
.iter()
.for_each(|ip| {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
}
});
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
listener.to_string(),
));
}
}
p => {
@@ -405,14 +394,6 @@ impl DirectConnectorManager {
dst_peer_id: PeerId,
) -> Result<(), Error> {
let peer_manager = data.peer_manager.clone();
// check if we have direct connection with dst_peer_id
if let Some(c) = peer_manager.list_peer_conns(dst_peer_id).await {
// currently if we have any type of direct connection (udp or tcp), we will not try to connect
if !c.is_empty() {
return Ok(());
}
}
tracing::debug!("try direct connect to peer: {}", dst_peer_id);
let rpc_stub = peer_manager
@@ -441,8 +422,7 @@ mod tests {
use crate::{
connector::direct::{
DirectConnectorManager, DirectConnectorManagerData, DstBlackListItem,
DstListenerUrlBlackListItem,
DirectConnectorManager, DirectConnectorManagerData, DstListenerUrlBlackListItem,
},
instance::listeners::ListenerManager,
peers::tests::{
@@ -452,15 +432,56 @@ mod tests {
proto::peer_rpc::GetIpListResponse,
};
use super::TESTING;
#[tokio::test]
async fn direct_connector_mapped_listener() {
TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
let p_c = create_mock_peer_manager().await;
let p_x = create_mock_peer_manager().await;
connect_peer_manager(p_a.clone(), p_b.clone()).await;
connect_peer_manager(p_b.clone(), p_c.clone()).await;
connect_peer_manager(p_c.clone(), p_x.clone()).await;
wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap();
wait_route_appear(p_a.clone(), p_x.clone()).await.unwrap();
let mut f = p_a.get_global_ctx().get_flags();
f.bind_device = false;
p_a.get_global_ctx().config.set_flags(f);
p_c.get_global_ctx()
.config
.set_mapped_listeners(Some(vec!["tcp://127.0.0.1:11334".parse().unwrap()]));
p_x.get_global_ctx()
.config
.set_listeners(vec!["tcp://0.0.0.0:11334".parse().unwrap()]);
let mut lis_x = ListenerManager::new(p_x.get_global_ctx(), p_x.clone());
lis_x.prepare_listeners().await.unwrap();
lis_x.run().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let mut dm_a = DirectConnectorManager::new(p_a.get_global_ctx(), p_a.clone());
let mut dm_c = DirectConnectorManager::new(p_c.get_global_ctx(), p_c.clone());
dm_a.run_as_client();
dm_c.run_as_server();
// p_c's mapped listener is p_x's listener, so p_a should connect to p_x directly
wait_route_appear_with_cost(p_a.clone(), p_x.my_peer_id(), Some(1))
.await
.unwrap();
}
#[rstest::rstest]
#[tokio::test]
async fn direct_connector_basic_test(
#[values("tcp", "udp", "wg")] proto: &str,
#[values("true", "false")] ipv6: bool,
) {
if ipv6 && proto != "udp" {
return;
}
TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
@@ -476,14 +497,18 @@ mod tests {
dm_a.run_as_client();
dm_c.run_as_server();
let port = if proto == "wg" { 11040 } else { 11041 };
if !ipv6 {
let port = if proto == "wg" { 11040 } else { 11041 };
p_c.get_global_ctx().config.set_listeners(vec![format!(
"{}://0.0.0.0:{}",
proto, port
)
.parse()
.unwrap()]);
} else {
p_c.get_global_ctx()
.config
.set_listeners(vec![format!("{}://[::]:{}", proto, port).parse().unwrap()]);
}
let mut f = p_c.get_global_ctx().config.get_flags();
f.enable_ipv6 = ipv6;
@@ -524,9 +549,5 @@ mod tests {
1,
"tcp://127.0.0.1:10222".parse().unwrap()
)));
assert!(data
.dst_blacklist
.contains(&DstBlackListItem(1, ip_list.listeners[0].to_string())));
}
}

View File

@@ -0,0 +1,277 @@
use std::{net::SocketAddr, sync::Arc};
use crate::{
common::{
error::Error,
global_ctx::ArcGlobalCtx,
stun::{get_default_resolver_config, resolve_txt_record},
},
tunnel::{IpVersion, Tunnel, TunnelConnector, TunnelError, PROTO_PORT_OFFSET},
};
use anyhow::Context;
use dashmap::DashSet;
use hickory_resolver::{
config::{ResolverConfig, ResolverOpts},
proto::rr::rdata::SRV,
TokioAsyncResolver,
};
use rand::{seq::SliceRandom, Rng as _};
use crate::proto::common::TunnelInfo;
use super::{create_connector_by_url, http_connector::TunnelWithInfo};
fn weighted_choice<T>(options: &[(T, u64)]) -> Option<&T> {
let total_weight = options.iter().map(|(_, weight)| *weight).sum();
let mut rng = rand::thread_rng();
let rand_value = rng.gen_range(0..total_weight);
let mut accumulated_weight = 0;
for (item, weight) in options {
accumulated_weight += *weight;
if rand_value < accumulated_weight {
return Some(item);
}
}
None
}
#[derive(Debug)]
pub struct DNSTunnelConnector {
addr: url::Url,
bind_addrs: Vec<SocketAddr>,
global_ctx: ArcGlobalCtx,
ip_version: IpVersion,
default_resolve_config: ResolverConfig,
default_resolve_opts: ResolverOpts,
}
impl DNSTunnelConnector {
pub fn new(addr: url::Url, global_ctx: ArcGlobalCtx) -> Self {
Self {
addr,
bind_addrs: Vec::new(),
global_ctx,
ip_version: IpVersion::Both,
default_resolve_config: get_default_resolver_config(),
default_resolve_opts: ResolverOpts::default(),
}
}
#[tracing::instrument(ret, err)]
pub async fn handle_txt_record(
&self,
domain_name: &str,
) -> Result<Box<dyn TunnelConnector>, Error> {
let resolver =
TokioAsyncResolver::tokio_from_system_conf().unwrap_or(TokioAsyncResolver::tokio(
self.default_resolve_config.clone(),
self.default_resolve_opts.clone(),
));
let txt_data = resolve_txt_record(domain_name, &resolver)
.await
.with_context(|| format!("resolve txt record failed, domain_name: {}", domain_name))?;
let candidate_urls = txt_data
.split(" ")
.map(|s| s.to_string())
.filter_map(|s| url::Url::parse(s.as_str()).ok())
.collect::<Vec<_>>();
// shuffle candidate_urls and get the first one
let url = candidate_urls
.choose(&mut rand::thread_rng())
.with_context(|| {
format!(
"no valid url found, txt_data: {}, expecting an url list splitted by space",
txt_data
)
})?;
let connector =
create_connector_by_url(url.as_str(), &self.global_ctx, self.ip_version).await?;
Ok(connector)
}
fn handle_one_srv_record(record: &SRV, protocol: &str) -> Result<(url::Url, u64), Error> {
// port must be non-zero
if record.port() == 0 {
return Err(anyhow::anyhow!("port must be non-zero").into());
}
let connector_dst = record.target().to_utf8();
let dst_url = format!("{}://{}:{}", protocol, connector_dst, record.port());
Ok((
dst_url.parse().with_context(|| {
format!(
"parse dst_url failed, protocol: {}, connector_dst: {}, port: {}, dst_url: {}",
protocol,
connector_dst,
record.port(),
dst_url
)
})?,
record.priority() as _,
))
}
#[tracing::instrument(ret, err)]
pub async fn handle_srv_record(
&self,
domain_name: &str,
) -> Result<Box<dyn TunnelConnector>, Error> {
tracing::info!("handle_srv_record: {}", domain_name);
let resolver =
TokioAsyncResolver::tokio_from_system_conf().unwrap_or(TokioAsyncResolver::tokio(
self.default_resolve_config.clone(),
self.default_resolve_opts.clone(),
));
let srv_domains = PROTO_PORT_OFFSET
.iter()
.map(|(p, _)| (format!("_easytier._{}.{}", p, domain_name), *p)) // _easytier._udp.{domain_name}
.collect::<Vec<_>>();
tracing::info!("build srv_domains: {:?}", srv_domains);
let responses = Arc::new(DashSet::new());
let srv_lookup_tasks = srv_domains
.iter()
.map(|(srv_domain, protocol)| {
let resolver = resolver.clone();
let responses = responses.clone();
async move {
let response = resolver.srv_lookup(srv_domain).await.with_context(|| {
format!("srv_lookup failed, srv_domain: {}", srv_domain.to_string())
})?;
tracing::info!(?response, ?srv_domain, "srv_lookup response");
for record in response.iter() {
let parsed_record = Self::handle_one_srv_record(record, &protocol);
tracing::info!(?parsed_record, ?srv_domain, "parsed_record");
if parsed_record.is_err() {
eprintln!(
"got invalid srv record {:?}",
parsed_record.as_ref().unwrap_err()
);
continue;
}
responses.insert(parsed_record.unwrap());
}
Ok::<_, Error>(())
}
})
.collect::<Vec<_>>();
let _ = futures::future::join_all(srv_lookup_tasks).await;
let srv_records = responses.iter().map(|r| r.clone()).collect::<Vec<_>>();
if srv_records.is_empty() {
return Err(anyhow::anyhow!("no srv record found").into());
}
let url = weighted_choice(srv_records.as_slice()).with_context(|| {
format!(
"failed to choose a srv record, domain_name: {}, srv_records: {:?}",
domain_name.to_string(),
srv_records
)
})?;
let connector =
create_connector_by_url(url.as_str(), &self.global_ctx, self.ip_version).await?;
Ok(connector)
}
}
#[async_trait::async_trait]
impl super::TunnelConnector for DNSTunnelConnector {
async fn connect(&mut self) -> Result<Box<dyn Tunnel>, TunnelError> {
let mut conn = if self.addr.scheme() == "txt" {
self.handle_txt_record(self.addr.host_str().as_ref().unwrap())
.await
.with_context(|| "get txt record url failed")?
} else if self.addr.scheme() == "srv" {
self.handle_srv_record(self.addr.host_str().as_ref().unwrap())
.await
.with_context(|| "get srv record url failed")?
} else {
return Err(anyhow::anyhow!(
"unsupported dns scheme: {}, expecting txt or srv",
self.addr.scheme()
)
.into());
};
let t = conn.connect().await?;
let info = t.info().unwrap_or_default();
Ok(Box::new(TunnelWithInfo::new(
t,
TunnelInfo {
local_addr: info.local_addr.clone(),
remote_addr: Some(self.addr.clone().into()),
tunnel_type: format!(
"{}-{}",
self.addr.scheme(),
info.remote_addr.unwrap_or_default()
),
},
)))
}
fn remote_url(&self) -> url::Url {
self.addr.clone()
}
fn set_bind_addrs(&mut self, addrs: Vec<SocketAddr>) {
self.bind_addrs = addrs;
}
fn set_ip_version(&mut self, ip_version: IpVersion) {
self.ip_version = ip_version;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::global_ctx::tests::get_mock_global_ctx;
#[tokio::test]
async fn test_txt() {
let url = "txt://txt.easytier.cn";
let global_ctx = get_mock_global_ctx();
let mut connector = DNSTunnelConnector::new(url.parse().unwrap(), global_ctx);
connector.set_ip_version(IpVersion::V4);
for _ in 0..5 {
match connector.connect().await {
Ok(ret) => {
println!("{:?}", ret.info());
return;
}
Err(e) => {
println!("{:?}", e);
}
}
}
}
#[tokio::test]
async fn test_srv() {
let url = "srv://easytier.cn";
let global_ctx = get_mock_global_ctx();
let mut connector = DNSTunnelConnector::new(url.parse().unwrap(), global_ctx);
connector.set_ip_version(IpVersion::V4);
for _ in 0..5 {
match connector.connect().await {
Ok(ret) => {
println!("{:?}", ret.info());
return;
}
Err(e) => {
println!("{:?}", e);
}
}
}
}
}

View File

@@ -0,0 +1,320 @@
use std::{
net::SocketAddr,
pin::Pin,
sync::{Arc, RwLock},
};
use anyhow::Context;
use http_req::request::{RedirectPolicy, Request};
use rand::seq::SliceRandom as _;
use url::Url;
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx},
tunnel::{IpVersion, Tunnel, TunnelConnector, TunnelError, ZCPacketSink, ZCPacketStream},
};
use crate::proto::common::TunnelInfo;
use super::create_connector_by_url;
pub struct TunnelWithInfo {
inner: Box<dyn Tunnel>,
info: TunnelInfo,
}
impl TunnelWithInfo {
pub fn new(inner: Box<dyn Tunnel>, info: TunnelInfo) -> Self {
Self { inner, info }
}
}
impl Tunnel for TunnelWithInfo {
fn split(&self) -> (Pin<Box<dyn ZCPacketStream>>, Pin<Box<dyn ZCPacketSink>>) {
self.inner.split()
}
fn info(&self) -> Option<TunnelInfo> {
Some(self.info.clone())
}
}
#[derive(Debug, PartialEq, Copy, Clone)]
enum HttpRedirectType {
Unknown,
// redirected url is in the path of new url
RedirectToQuery,
// redirected url is the entire new url
RedirectToUrl,
// redirected url is in the body of response
BodyUrls,
}
#[derive(Debug)]
pub struct HttpTunnelConnector {
addr: url::Url,
bind_addrs: Vec<SocketAddr>,
ip_version: IpVersion,
global_ctx: ArcGlobalCtx,
redirect_type: HttpRedirectType,
}
impl HttpTunnelConnector {
pub fn new(addr: url::Url, global_ctx: ArcGlobalCtx) -> Self {
Self {
addr,
bind_addrs: Vec::new(),
ip_version: IpVersion::Both,
global_ctx,
redirect_type: HttpRedirectType::Unknown,
}
}
#[tracing::instrument(ret)]
async fn handle_302_redirect(
&mut self,
new_url: url::Url,
url_str: &str,
) -> Result<Box<dyn TunnelConnector>, Error> {
// the url should be in following format:
// 1: http(s)://easytier.cn/?url=tcp://10.147.22.22:11010 (scheme is http, domain is ignored, path is splitted into proto type and addr)
// 2: http(s)://tcp://10.137.22.22:11010 (connector url is appended to the scheme)
// 3: tcp://10.137.22.22:11010 (scheme is protocol type, the url is used to construct a connector directly)
tracing::info!("redirect to {}", new_url);
let url = url::Url::parse(new_url.as_str())
.with_context(|| format!("parsing redirect url failed. url: {}", new_url))?;
if url.scheme() == "http" || url.scheme() == "https" {
let mut query = new_url
.query_pairs()
.filter_map(|x| url::Url::parse(&x.1).ok())
.collect::<Vec<_>>();
query.shuffle(&mut rand::thread_rng());
if !query.is_empty() {
tracing::info!("try to create connector by url: {}", query[0]);
self.redirect_type = HttpRedirectType::RedirectToQuery;
return create_connector_by_url(
&query[0].to_string(),
&self.global_ctx,
self.ip_version,
)
.await;
} else if let Some(new_url) = url_str
.strip_prefix(format!("{}://", url.scheme()).as_str())
.and_then(|x| Url::parse(x).ok())
{
// stripe the scheme and create connector by url
self.redirect_type = HttpRedirectType::RedirectToUrl;
return create_connector_by_url(
new_url.as_str(),
&self.global_ctx,
self.ip_version,
)
.await;
}
return Err(Error::InvalidUrl(format!(
"no valid connector url found in url: {}",
url
)));
} else {
self.redirect_type = HttpRedirectType::RedirectToUrl;
return create_connector_by_url(new_url.as_str(), &self.global_ctx, self.ip_version)
.await;
}
}
#[tracing::instrument]
async fn handle_200_success(
&mut self,
body: &String,
) -> Result<Box<dyn TunnelConnector>, Error> {
// resp body should be line of connector urls, like:
// tcp://10.1.1.1:11010
// udp://10.1.1.1:11010
let mut lines = body
.lines()
.map(|line| line.trim())
.filter(|line| !line.is_empty())
.collect::<Vec<&str>>();
tracing::info!("get {} lines of connector urls", lines.len());
// shuffle the lines and pick the usable one
lines.shuffle(&mut rand::thread_rng());
for line in lines {
let url = url::Url::parse(line);
if url.is_err() {
tracing::warn!("invalid url: {}, skip it", line);
continue;
}
self.redirect_type = HttpRedirectType::BodyUrls;
return create_connector_by_url(line, &self.global_ctx, self.ip_version).await;
}
Err(Error::InvalidUrl(format!(
"no valid connector url found, response body: {}",
body
)))
}
#[tracing::instrument(ret)]
pub async fn get_redirected_connector(
&mut self,
original_url: &str,
) -> Result<Box<dyn TunnelConnector>, Error> {
self.redirect_type = HttpRedirectType::Unknown;
tracing::info!("get_redirected_url: {}", original_url);
// Container for body of a response.
let body = Arc::new(RwLock::new(Vec::new()));
let original_url_clone = original_url.to_string();
let body_clone = body.clone();
let res = tokio::task::spawn_blocking(move || {
let uri = http_req::uri::Uri::try_from(original_url_clone.as_ref())
.with_context(|| format!("parsing url failed. url: {}", original_url_clone))?;
tracing::info!("sending http request to {}", uri);
Request::new(&uri)
.redirect_policy(RedirectPolicy::Limit(0))
.timeout(std::time::Duration::from_secs(20))
.send(&mut *body_clone.write().unwrap())
.with_context(|| format!("sending http request failed. url: {}", uri))
})
.await
.map_err(|e| Error::InvalidUrl(format!("task join error: {}", e)))??;
let body = String::from_utf8_lossy(&body.read().unwrap()).to_string();
if res.status_code().is_redirect() {
let redirect_url = res
.headers()
.get("Location")
.ok_or_else(|| Error::InvalidUrl("no redirect address found".to_string()))?;
let new_url = url::Url::parse(redirect_url.as_str())
.with_context(|| format!("parsing redirect url failed. url: {}", redirect_url))?;
return self.handle_302_redirect(new_url, &redirect_url).await;
} else if res.status_code().is_success() {
return self.handle_200_success(&body).await;
} else {
return Err(Error::InvalidUrl(format!(
"unexpected response, resp: {:?}, body: {}",
res, body,
)));
}
}
}
#[async_trait::async_trait]
impl super::TunnelConnector for HttpTunnelConnector {
async fn connect(&mut self) -> Result<Box<dyn Tunnel>, TunnelError> {
let mut conn = self
.get_redirected_connector(self.addr.to_string().as_str())
.await
.with_context(|| "get redirected url failed")?;
conn.set_ip_version(self.ip_version);
let t = conn.connect().await?;
let info = t.info().unwrap_or_default();
Ok(Box::new(TunnelWithInfo::new(
t,
TunnelInfo {
local_addr: info.local_addr.clone(),
remote_addr: Some(self.addr.clone().into()),
tunnel_type: format!(
"{:?}-{}",
self.redirect_type,
info.remote_addr.unwrap_or_default()
),
},
)))
}
fn remote_url(&self) -> url::Url {
self.addr.clone()
}
fn set_bind_addrs(&mut self, addrs: Vec<SocketAddr>) {
self.bind_addrs = addrs;
}
fn set_ip_version(&mut self, ip_version: IpVersion) {
self.ip_version = ip_version;
}
}
#[cfg(test)]
mod tests {
use tokio::{io::AsyncWriteExt as _, net::TcpListener};
use crate::{
common::global_ctx::tests::get_mock_global_ctx,
tunnel::{tcp::TcpTunnelListener, TunnelConnector, TunnelListener},
};
use super::*;
async fn run_http_redirect_server(port: u16, test_type: HttpRedirectType) -> Result<(), Error> {
let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
let (mut stream, _) = listener.accept().await?;
match test_type {
HttpRedirectType::RedirectToQuery => {
let resp = "HTTP/1.1 301 Moved Permanently\r\nLocation: http://test.com/?url=tcp://127.0.0.1:25888\r\n\r\n";
stream.write_all(resp.as_bytes()).await?;
}
HttpRedirectType::RedirectToUrl => {
let resp =
"HTTP/1.1 301 Moved Permanently\r\nLocation: tcp://127.0.0.1:25888\r\n\r\n";
stream.write_all(resp.as_bytes()).await?;
}
HttpRedirectType::BodyUrls => {
let resp = "HTTP/1.1 200 OK\r\n\r\ntcp://127.0.0.1:25888";
stream.write_all(resp.as_bytes()).await?;
}
HttpRedirectType::Unknown => {
panic!("unexpected test type");
}
}
Ok(())
}
#[rstest::rstest]
#[serial_test::serial(http_redirect_test)]
#[tokio::test]
async fn http_redirect_test(
// 1. 301 redirect
// 2. 200 success with valid connector urls
#[values(
HttpRedirectType::RedirectToQuery,
HttpRedirectType::RedirectToUrl,
HttpRedirectType::BodyUrls
)]
test_type: HttpRedirectType,
) {
let http_task = tokio::spawn(run_http_redirect_server(35888, test_type));
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let test_url: url::Url = "http://127.0.0.1:35888".parse().unwrap();
let global_ctx = get_mock_global_ctx();
let mut flags = global_ctx.config.get_flags();
flags.bind_device = false;
global_ctx.config.set_flags(flags);
let mut connector = HttpTunnelConnector::new(test_url.clone(), global_ctx.clone());
let mut listener = TcpTunnelListener::new("tcp://0.0.0.0:25888".parse().unwrap());
listener.listen().await.unwrap();
let task = tokio::spawn(async move {
let _conn = listener.accept().await.unwrap();
});
let t = connector.connect().await.unwrap();
assert_eq!(connector.redirect_type, test_type);
let info = t.info().unwrap();
let remote_addr = info.remote_addr.unwrap();
assert_eq!(remote_addr, test_url.into());
tokio::join!(task).0.unwrap();
tokio::join!(http_task).0.unwrap().unwrap();
}
}

View File

@@ -106,7 +106,7 @@ impl ManualConnectorManager {
}
pub async fn add_connector_by_url(&self, url: &str) -> Result<(), Error> {
self.add_connector(create_connector_by_url(url, &self.global_ctx).await?);
self.add_connector(create_connector_by_url(url, &self.global_ctx, IpVersion::Both).await?);
Ok(())
}
@@ -293,32 +293,26 @@ impl ManualConnectorManager {
ip_version: IpVersion,
) -> Result<ReconnResult, Error> {
let ip_collector = data.global_ctx.get_ip_collector();
let net_ns = data.net_ns.clone();
connector.lock().await.set_ip_version(ip_version);
set_bind_addr_for_peer_connector(
connector.lock().await.as_mut(),
ip_version == IpVersion::V4,
&ip_collector,
)
.await;
if data.global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
connector.lock().await.as_mut(),
ip_version == IpVersion::V4,
&ip_collector,
)
.await;
}
data.global_ctx.issue_event(GlobalCtxEvent::Connecting(
connector.lock().await.remote_url().clone(),
));
let _g = net_ns.guard();
tracing::info!("reconnect try connect... conn: {:?}", connector);
let tunnel = connector.lock().await.connect().await?;
tracing::info!("reconnect get tunnel succ: {:?}", tunnel);
assert_eq!(
dead_url,
tunnel.info().unwrap().remote_addr.unwrap().to_string(),
"info: {:?}",
tunnel.info()
);
let (peer_id, conn_id) = data.peer_manager.add_client_tunnel(tunnel).await?;
let (peer_id, conn_id) = data
.peer_manager
.try_direct_connect(connector.lock().await.as_mut())
.await?;
tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url);
Ok(ReconnResult {
dead_url,
@@ -337,7 +331,7 @@ impl ManualConnectorManager {
let mut ip_versions = vec![];
let u = url::Url::parse(&dead_url)
.with_context(|| format!("failed to parse connector url {:?}", dead_url))?;
if u.scheme() == "ring" {
if u.scheme() == "ring" || u.scheme() == "txt" || u.scheme() == "srv" {
ip_versions.push(IpVersion::Both);
} else {
let addrs = u.socket_addrs(|| Some(1000))?;
@@ -363,8 +357,12 @@ impl ManualConnectorManager {
"cannot get ip from url"
)));
for ip_version in ip_versions {
let use_long_timeout = dead_url.starts_with("http")
|| dead_url.starts_with("srv")
|| dead_url.starts_with("txt");
let ret = timeout(
std::time::Duration::from_secs(1),
// allow http connector to wait longer
std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }),
Self::conn_reconnect_with_ip_version(
data.clone(),
dead_url.clone(),

View File

@@ -3,6 +3,8 @@ use std::{
sync::Arc,
};
use http_connector::HttpTunnelConnector;
#[cfg(feature = "quic")]
use crate::tunnel::quic::QUICTunnelConnector;
#[cfg(feature = "wireguard")]
@@ -11,7 +13,7 @@ use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, network::IPCollector},
tunnel::{
check_scheme_and_get_socket_addr, ring::RingTunnelConnector, tcp::TcpTunnelConnector,
udp::UdpTunnelConnector, TunnelConnector,
udp::UdpTunnelConnector, IpVersion, TunnelConnector,
},
};
@@ -19,6 +21,9 @@ pub mod direct;
pub mod manual;
pub mod udp_hole_punch;
pub mod dns_connector;
pub mod http_connector;
async fn set_bind_addr_for_peer_connector(
connector: &mut (impl TunnelConnector + ?Sized),
is_ipv4: bool,
@@ -50,80 +55,102 @@ async fn set_bind_addr_for_peer_connector(
pub async fn create_connector_by_url(
url: &str,
global_ctx: &ArcGlobalCtx,
ip_version: IpVersion,
) -> Result<Box<dyn TunnelConnector + 'static>, Error> {
let url = url::Url::parse(url).map_err(|_| Error::InvalidUrl(url.to_owned()))?;
match url.scheme() {
let mut connector: Box<dyn TunnelConnector + 'static> = match url.scheme() {
"tcp" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "tcp")?;
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "tcp", ip_version)?;
let mut connector = TcpTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
return Ok(Box::new(connector));
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
Box::new(connector)
}
"udp" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "udp")?;
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "udp", ip_version)?;
let mut connector = UdpTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
return Ok(Box::new(connector));
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
Box::new(connector)
}
"http" | "https" => {
let connector = HttpTunnelConnector::new(url, global_ctx.clone());
Box::new(connector)
}
"ring" => {
check_scheme_and_get_socket_addr::<uuid::Uuid>(&url, "ring")?;
check_scheme_and_get_socket_addr::<uuid::Uuid>(&url, "ring", IpVersion::Both)?;
let connector = RingTunnelConnector::new(url);
return Ok(Box::new(connector));
Box::new(connector)
}
#[cfg(feature = "quic")]
"quic" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "quic")?;
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "quic", ip_version)?;
let mut connector = QUICTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
return Ok(Box::new(connector));
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
Box::new(connector)
}
#[cfg(feature = "wireguard")]
"wg" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "wg")?;
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "wg", ip_version)?;
let nid = global_ctx.get_network_identity();
let wg_config = WgConfig::new_from_network_identity(
&nid.network_name,
&nid.network_secret.unwrap_or_default(),
);
let mut connector = WgTunnelConnector::new(url, wg_config);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
return Ok(Box::new(connector));
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
Box::new(connector)
}
#[cfg(feature = "websocket")]
"ws" | "wss" => {
use crate::tunnel::{FromUrl, IpVersion};
let dst_addr = SocketAddr::from_url(url.clone(), IpVersion::Both)?;
use crate::tunnel::FromUrl;
let dst_addr = SocketAddr::from_url(url.clone(), ip_version)?;
let mut connector = crate::tunnel::websocket::WSTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
return Ok(Box::new(connector));
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
Box::new(connector)
}
"txt" | "srv" => {
let connector = dns_connector::DNSTunnelConnector::new(url, global_ctx.clone());
Box::new(connector)
}
_ => {
return Err(Error::InvalidUrl(url.into()));
}
}
};
connector.set_ip_version(ip_version);
Ok(connector)
}

View File

@@ -388,7 +388,7 @@ impl UdpHolePunchListener {
tracing::warn!(?conn, "udp hole punching listener got peer connection");
let peer_mgr = peer_mgr.clone();
tokio::spawn(async move {
if let Err(e) = peer_mgr.add_tunnel_as_server(conn).await {
if let Err(e) = peer_mgr.add_tunnel_as_server(conn, false).await {
tracing::error!(
?e,
"failed to add tunnel as server in hole punch listener"

View File

@@ -284,6 +284,7 @@ impl PunchSymToConeHoleClient {
BaseController {
timeout_ms: 4000,
trace_id: 0,
..Default::default()
},
req,
)
@@ -314,6 +315,7 @@ impl PunchSymToConeHoleClient {
BaseController {
timeout_ms: 4000,
trace_id: 0,
..Default::default()
},
req,
)
@@ -432,7 +434,7 @@ impl PunchSymToConeHoleClient {
let public_ips: Vec<Ipv4Addr> = stun_info
.public_ip
.iter()
.map(|x| x.parse().unwrap())
.filter_map(|x| x.parse().ok())
.collect();
if public_ips.is_empty() {
return Err(anyhow::anyhow!("failed to get public ips"));
@@ -529,6 +531,7 @@ pub mod tests {
};
#[tokio::test]
#[serial_test::serial]
#[serial_test::serial(hole_punch)]
async fn hole_punching_symmetric_only_random() {
RUN_TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
@@ -577,13 +580,15 @@ pub mod tests {
)
.await;
println!("start punching {:?}", p_a.list_routes().await);
wait_for_condition(
|| async {
wait_route_appear_with_cost(p_a.clone(), p_c.my_peer_id(), Some(1))
.await
.is_ok()
},
Duration::from_secs(5),
Duration::from_secs(10),
)
.await;
println!("{:?}", p_a.list_routes().await);

View File

@@ -1,8 +1,14 @@
use std::{
ffi::OsString, fmt::Write, net::SocketAddr, path::PathBuf, sync::Mutex, time::Duration, vec,
ffi::OsString,
fmt::Write,
net::{IpAddr, SocketAddr},
path::PathBuf,
sync::Mutex,
time::Duration,
vec,
};
use anyhow::{Context, Ok};
use anyhow::Context;
use clap::{command, Args, Parser, Subcommand};
use humansize::format_size;
use service_manager::*;
@@ -20,7 +26,8 @@ use easytier::{
DumpRouteRequest, GetVpnPortalInfoRequest, ListConnectorRequest,
ListForeignNetworkRequest, ListGlobalForeignNetworkRequest, ListPeerRequest,
ListPeerResponse, ListRouteRequest, ListRouteResponse, NodeInfo, PeerManageRpc,
PeerManageRpcClientFactory, ShowNodeInfoRequest, VpnPortalRpc,
PeerManageRpcClientFactory, ShowNodeInfoRequest, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyRpcClientFactory, VpnPortalRpc,
VpnPortalRpcClientFactory,
},
common::NatType,
@@ -50,14 +57,24 @@ struct Cli {
#[derive(Subcommand, Debug)]
enum SubCommand {
#[command(about = "show peers info")]
Peer(PeerArgs),
#[command(about = "manage connectors")]
Connector(ConnectorArgs),
#[command(about = "do stun test")]
Stun,
#[command(about = "show route info")]
Route(RouteArgs),
#[command(about = "show global peers info")]
PeerCenter,
#[command(about = "show vpn portal (wireguard) info")]
VpnPortal,
#[command(about = "inspect self easytier-core status")]
Node(NodeArgs),
#[command(about = "manage easytier-core as a system service")]
Service(ServiceArgs),
#[command(about = "show tcp/kcp proxy status")]
Proxy,
}
#[derive(Args, Debug)]
@@ -114,7 +131,9 @@ enum ConnectorSubCommand {
#[derive(Subcommand, Debug)]
enum NodeSubCommand {
#[command(about = "show node info")]
Info,
#[command(about = "show node config")]
Config,
}
@@ -135,10 +154,15 @@ struct ServiceArgs {
#[derive(Subcommand, Debug)]
enum ServiceSubCommand {
#[command(about = "register easytier-core as a system service")]
Install(InstallArgs),
#[command(about = "unregister easytier-core system service")]
Uninstall,
#[command(about = "check easytier-core system service status")]
Status,
#[command(about = "start easytier-core system service")]
Start,
#[command(about = "stop easytier-core system service")]
Stop,
}
@@ -153,13 +177,17 @@ struct InstallArgs {
#[arg(long, default_value = "false")]
disable_autostart: bool,
#[arg(long)]
#[arg(long, help = "path to easytier-core binary")]
core_path: Option<PathBuf>,
#[arg(long)]
service_work_dir: Option<PathBuf>,
#[arg(trailing_var_arg = true, allow_hyphen_values = true)]
#[arg(
trailing_var_arg = true,
allow_hyphen_values = true,
help = "args to pass to easytier-core"
)]
core_args: Option<Vec<OsString>>,
}
@@ -221,6 +249,19 @@ impl CommandHandler {
.with_context(|| "failed to get vpn portal client")?)
}
async fn get_tcp_proxy_client(
&self,
transport_type: &str,
) -> Result<Box<dyn TcpProxyRpc<Controller = BaseController>>, Error> {
Ok(self
.client
.lock()
.unwrap()
.scoped_client::<TcpProxyRpcClientFactory<BaseController>>(transport_type.to_string())
.await
.with_context(|| "failed to get vpn portal client")?)
}
async fn list_peers(&self) -> Result<ListPeerResponse, Error> {
let client = self.get_peer_manager_client().await?;
let request = ListPeerRequest::default();
@@ -276,7 +317,11 @@ impl CommandHandler {
ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(),
hostname: route.hostname.clone(),
cost: cost_to_str(route.cost),
lat_ms: float_to_str(p.get_latency_ms().unwrap_or(0.0), 3),
lat_ms: if route.cost == 1 {
float_to_str(p.get_latency_ms().unwrap_or(0.0), 3)
} else {
route.path_latency_latency_first().to_string()
},
loss_rate: float_to_str(p.get_loss_rate().unwrap_or(0.0), 3),
rx_bytes: format_size(p.get_rx_bytes().unwrap_or(0), humansize::DECIMAL),
tx_bytes: format_size(p.get_tx_bytes().unwrap_or(0), humansize::DECIMAL),
@@ -647,12 +692,22 @@ impl Service {
environment: None,
};
if self.status()? != ServiceStatus::NotInstalled {
return Err(anyhow::anyhow!("Service is already installed"));
return Err(anyhow::anyhow!(
"Service is already installed! Service Name: {}",
self.lable
));
}
self.service_manager
.install(ctx)
.map_err(|e| anyhow::anyhow!("failed to install service: {}", e))
.install(ctx.clone())
.map_err(|e| anyhow::anyhow!("failed to install service: {:?}", e))?;
println!(
"Service installed successfully! Service Name: {}",
self.lable
);
Ok(())
}
pub fn uninstall(&self) -> Result<(), Error> {
@@ -769,7 +824,8 @@ impl Service {
writeln!(unit_content, "Type=simple")?;
writeln!(unit_content, "WorkingDirectory={work_dir}")?;
writeln!(unit_content, "ExecStart={target_app} {args}")?;
writeln!(unit_content, "Restart=Always")?;
writeln!(unit_content, "Restart=always")?;
writeln!(unit_content, "RestartSec=1")?;
writeln!(unit_content, "LimitNOFILE=infinity")?;
writeln!(unit_content)?;
writeln!(unit_content, "[Install]")?;
@@ -990,6 +1046,7 @@ async fn main() -> Result<(), Error> {
match sub_cmd.sub_command {
Some(NodeSubCommand::Info) | None => {
let stun_info = node_info.stun_info.clone().unwrap_or_default();
let ip_list = node_info.ip_list.clone().unwrap_or_default();
let mut builder = tabled::builder::Builder::default();
builder.push_record(vec!["Virtual IP", node_info.ipv4_addr.as_str()]);
@@ -999,11 +1056,32 @@ async fn main() -> Result<(), Error> {
node_info.proxy_cidrs.join(", ").as_str(),
]);
builder.push_record(vec!["Peer ID", node_info.peer_id.to_string().as_str()]);
builder.push_record(vec!["Public IP", stun_info.public_ip.join(", ").as_str()]);
stun_info.public_ip.iter().for_each(|ip| {
let Ok(ip) = ip.parse::<IpAddr>() else {
return;
};
if ip.is_ipv4() {
builder.push_record(vec!["Public IPv4", ip.to_string().as_str()]);
} else {
builder.push_record(vec!["Public IPv6", ip.to_string().as_str()]);
}
});
builder.push_record(vec![
"UDP Stun Type",
format!("{:?}", stun_info.udp_nat_type()).as_str(),
]);
ip_list.interface_ipv4s.iter().for_each(|ip| {
builder.push_record(vec![
"Interface IPv4",
format!("{}", ip.to_string()).as_str(),
]);
});
ip_list.interface_ipv6s.iter().for_each(|ip| {
builder.push_record(vec![
"Interface IPv6",
format!("{}", ip.to_string()).as_str(),
]);
});
for (idx, l) in node_info.listeners.iter().enumerate() {
if l.starts_with("ring") {
continue;
@@ -1088,6 +1166,57 @@ async fn main() -> Result<(), Error> {
}
}
}
SubCommand::Proxy => {
let mut entries = vec![];
let client = handler.get_tcp_proxy_client("tcp").await?;
let ret = client
.list_tcp_proxy_entry(BaseController::default(), Default::default())
.await;
entries.extend(ret.unwrap_or_default().entries);
let client = handler.get_tcp_proxy_client("kcp_src").await?;
let ret = client
.list_tcp_proxy_entry(BaseController::default(), Default::default())
.await;
entries.extend(ret.unwrap_or_default().entries);
let client = handler.get_tcp_proxy_client("kcp_dst").await?;
let ret = client
.list_tcp_proxy_entry(BaseController::default(), Default::default())
.await;
entries.extend(ret.unwrap_or_default().entries);
#[derive(tabled::Tabled)]
struct TableItem {
src: String,
dst: String,
start_time: String,
state: String,
transport_type: String,
}
let table_rows = entries
.iter()
.map(|e| TableItem {
src: SocketAddr::from(e.src.unwrap_or_default()).to_string(),
dst: SocketAddr::from(e.dst.unwrap_or_default()).to_string(),
start_time: chrono::DateTime::<chrono::Utc>::from_timestamp_millis(
(e.start_time * 1000) as i64,
)
.unwrap()
.with_timezone(&chrono::Local)
.format("%Y-%m-%d %H:%M:%S")
.to_string(),
state: format!("{:?}", TcpProxyEntryState::try_from(e.state).unwrap()),
transport_type: format!(
"{:?}",
TcpProxyEntryTransportType::try_from(e.transport_type).unwrap()
),
})
.collect::<Vec<_>>();
println!("{}", tabled::Table::new(table_rows).with(Style::modern()));
}
}
Ok(())

View File

@@ -6,6 +6,7 @@ extern crate rust_i18n;
use std::{
net::{Ipv4Addr, SocketAddr},
path::PathBuf,
sync::Arc,
};
use anyhow::Context;
@@ -19,12 +20,17 @@ use easytier::{
TomlConfigLoader, VpnPortalConfig,
},
constants::EASYTIER_VERSION,
global_ctx::{EventBusSubscriber, GlobalCtxEvent},
global_ctx::{EventBusSubscriber, GlobalCtx, GlobalCtxEvent},
scoped_task::ScopedTask,
stun::MockStunInfoCollector,
},
connector::{create_connector_by_url, dns_connector::DNSTunnelConnector},
launcher,
proto::{self, common::CompressionAlgoPb},
tunnel::udp::UdpTunnelConnector,
proto::{
self,
common::{CompressionAlgoPb, NatType},
},
tunnel::{IpVersion, PROTO_PORT_OFFSET},
utils::{init_logger, setup_panic_handler},
web_client,
};
@@ -123,6 +129,13 @@ struct Cli {
)]
listeners: Vec<String>,
#[arg(
long,
help = t!("core_clap.mapped_listeners").to_string(),
num_args = 0..
)]
mapped_listeners: Vec<String>,
#[arg(
long,
help = t!("core_clap.no_listener").to_string(),
@@ -185,7 +198,7 @@ struct Cli {
#[arg(
long,
help = t!("core_clap.multi_thread").to_string(),
default_value = "false"
default_value = "true"
)]
multi_thread: bool,
@@ -229,6 +242,13 @@ struct Cli {
)]
enable_exit_node: bool,
#[arg(
long,
help = t!("core_clap.proxy_forward_by_system").to_string(),
default_value = "false"
)]
proxy_forward_by_system: bool,
#[arg(
long,
help = t!("core_clap.no_tun").to_string(),
@@ -288,26 +308,38 @@ struct Cli {
)]
socks5: Option<u16>,
#[arg(
long,
help = t!("core_clap.ipv6_listener").to_string()
)]
ipv6_listener: Option<String>,
#[arg(
long,
help = t!("core_clap.compression").to_string(),
default_value = "none",
)]
compression: String,
#[arg(
long,
help = t!("core_clap.bind_device").to_string()
)]
bind_device: Option<bool>,
#[arg(
long,
help = t!("core_clap.enable_kcp_proxy").to_string(),
default_value = "false"
)]
enable_kcp_proxy: bool,
#[arg(
long,
help = t!("core_clap.disable_kcp_input").to_string(),
default_value = "false"
)]
disable_kcp_input: bool,
}
rust_i18n::i18n!("locales", fallback = "en");
impl Cli {
fn parse_listeners(no_listener: bool, listeners: Vec<String>) -> anyhow::Result<Vec<String>> {
let proto_port_offset = vec![("tcp", 0), ("udp", 0), ("wg", 1), ("ws", 1), ("wss", 2)];
if no_listener || listeners.is_empty() {
return Ok(vec![]);
}
@@ -316,8 +348,8 @@ impl Cli {
let mut listeners: Vec<String> = Vec::new();
if origin_listners.len() == 1 {
if let Ok(port) = origin_listners[0].parse::<u16>() {
for (proto, offset) in proto_port_offset {
listeners.push(format!("{}://0.0.0.0:{}", proto, port + offset));
for (proto, offset) in PROTO_PORT_OFFSET {
listeners.push(format!("{}://0.0.0.0:{}", proto, port + *offset));
}
return Ok(listeners);
}
@@ -332,7 +364,7 @@ impl Cli {
panic!("failed to parse listener: {}", l);
}
} else {
let Some((proto, offset)) = proto_port_offset
let Some((proto, offset)) = PROTO_PORT_OFFSET
.iter()
.find(|(proto, _)| *proto == proto_port[0])
else {
@@ -422,6 +454,23 @@ impl TryFrom<&Cli> for TomlConfigLoader {
.collect(),
);
cfg.set_mapped_listeners(Some(
cli.mapped_listeners
.iter()
.map(|s| {
s.parse()
.with_context(|| format!("mapped listener is not a valid url: {}", s))
.unwrap()
})
.map(|s: url::Url| {
if s.port().is_none() {
panic!("mapped listener port is missing: {}", s);
}
s
})
.collect(),
));
for n in cli.proxy_networks.iter() {
cfg.add_proxy_cidr(
n.parse()
@@ -512,18 +561,15 @@ impl TryFrom<&Cli> for TomlConfigLoader {
f.mtu = mtu as u32;
}
f.enable_exit_node = cli.enable_exit_node;
f.proxy_forward_by_system = cli.proxy_forward_by_system;
f.no_tun = cli.no_tun || cfg!(not(feature = "tun"));
f.use_smoltcp = cli.use_smoltcp;
if let Some(wl) = cli.relay_network_whitelist.as_ref() {
f.relay_network_whitelist = wl.join(" ");
}
f.disable_p2p = cli.disable_p2p;
f.disable_udp_hole_punching = cli.disable_udp_hole_punching;
f.relay_all_peer_rpc = cli.relay_all_peer_rpc;
if let Some(ipv6_listener) = cli.ipv6_listener.as_ref() {
f.ipv6_listener = ipv6_listener
.parse()
.with_context(|| format!("failed to parse ipv6 listener: {}", ipv6_listener))?
}
f.multi_thread = cli.multi_thread;
f.data_compress_algo = match cli.compression.as_str() {
"none" => CompressionAlgoPb::None,
@@ -534,6 +580,11 @@ impl TryFrom<&Cli> for TomlConfigLoader {
),
}
.into();
if let Some(bind_device) = cli.bind_device {
f.bind_device = bind_device;
}
f.enable_kcp_proxy = cli.enable_kcp_proxy;
f.disable_kcp_input = cli.disable_kcp_input;
cfg.set_flags(f);
cfg.set_exit_nodes(cli.exit_nodes.clone());
@@ -783,9 +834,12 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> {
let config_server_url_s = cli.config_server.clone().unwrap();
let config_server_url = match url::Url::parse(&config_server_url_s) {
Ok(u) => u,
Err(_) => format!("udp://easytier.cn:22020/{}", config_server_url_s)
.parse()
.unwrap(),
Err(_) => format!(
"udp://config-server.easytier.cn:22020/{}",
config_server_url_s
)
.parse()
.unwrap(),
};
let mut c_url = config_server_url.clone();
@@ -801,12 +855,26 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> {
c_url, token,
);
println!("Official config website: https://easytier.cn/web");
if token.is_empty() {
panic!("empty token");
}
let _wc = web_client::WebClient::new(UdpTunnelConnector::new(c_url), token.to_string());
let config = TomlConfigLoader::default();
let global_ctx = Arc::new(GlobalCtx::new(config));
global_ctx.replace_stun_info_collector(Box::new(MockStunInfoCollector {
udp_nat_type: NatType::Unknown,
}));
let mut flags = global_ctx.get_flags();
flags.bind_device = false;
global_ctx.set_flags(flags);
let _wc = web_client::WebClient::new(
create_connector_by_url(c_url.as_str(), &global_ctx, IpVersion::Both).await?,
token.to_string(),
);
tokio::signal::ctrl_c().await.unwrap();
DNSTunnelConnector::new("".parse().unwrap(), global_ctx);
return Ok(());
}
@@ -823,7 +891,7 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> {
Ok(())
}
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() {
let locale = sys_locale::get_locale().unwrap_or_else(|| String::from("en-US"));
rust_i18n::set_locale(&locale);

View File

@@ -6,6 +6,7 @@ use std::{
time::Duration,
};
use anyhow::Context;
use pnet::packet::{
icmp::{self, echo_reply::MutableEchoReplyPacket, IcmpCode, IcmpTypes, MutableIcmpPacket},
ip::IpNextHeaderProtocols,
@@ -212,7 +213,7 @@ impl IcmpProxy {
Err(e) => {
tracing::warn!("create icmp socket failed: {:?}", e);
if !self.global_ctx.no_tun() {
return Err(e);
return Err(anyhow::anyhow!("create icmp socket failed: {:?}", e).into());
}
}
}
@@ -281,10 +282,15 @@ impl IcmpProxy {
dst_ip: Ipv4Addr,
icmp_packet: &icmp::echo_request::EchoRequestPacket,
) -> Result<(), Error> {
self.socket.lock().unwrap().as_ref().unwrap().send_to(
icmp_packet.packet(),
&SocketAddrV4::new(dst_ip.into(), 0).into(),
)?;
self.socket
.lock()
.unwrap()
.as_ref()
.with_context(|| "icmp socket not created")?
.send_to(
icmp_packet.packet(),
&SocketAddrV4::new(dst_ip.into(), 0).into(),
)?;
Ok(())
}

View File

@@ -0,0 +1,441 @@
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{Arc, Weak},
time::Duration,
};
use anyhow::Context;
use bytes::Bytes;
use dashmap::DashMap;
use kcp_sys::{
endpoint::{ConnId, KcpEndpoint, KcpPacketReceiver},
ffi_safe::KcpConfig,
packet_def::KcpPacket,
stream::KcpStream,
};
use pnet::packet::{
ip::IpNextHeaderProtocols,
ipv4::Ipv4Packet,
tcp::{TcpFlags, TcpPacket},
Packet as _,
};
use prost::Message;
use tokio::{io::copy_bidirectional, task::JoinSet};
use super::{
tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy},
CidrSet,
};
use crate::{
common::{
error::Result,
global_ctx::{ArcGlobalCtx, GlobalCtx},
},
peers::{peer_manager::PeerManager, NicPacketFilter, PeerPacketFilter},
proto::{
cli::{
ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc,
},
peer_rpc::KcpConnData,
rpc_types::{self, controller::BaseController},
},
tunnel::packet_def::{PacketType, PeerManagerHeader, ZCPacket},
};
fn create_kcp_endpoint() -> KcpEndpoint {
let mut kcp_endpoint = KcpEndpoint::new();
kcp_endpoint.set_kcp_config_factory(Box::new(|conv| {
let mut cfg = KcpConfig::new_turbo(conv);
cfg.interval = Some(5);
cfg
}));
kcp_endpoint
}
struct KcpEndpointFilter {
kcp_endpoint: Arc<KcpEndpoint>,
is_src: bool,
}
#[async_trait::async_trait]
impl PeerPacketFilter for KcpEndpointFilter {
async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option<ZCPacket> {
let t = packet.peer_manager_header().unwrap().packet_type;
if t == PacketType::KcpSrc as u8 && !self.is_src {
} else if t == PacketType::KcpDst as u8 && self.is_src {
} else {
return Some(packet);
}
let _ = self
.kcp_endpoint
.input_sender_ref()
.send(KcpPacket::from(packet.payload_bytes()))
.await;
None
}
}
#[tracing::instrument]
async fn handle_kcp_output(
peer_mgr: Arc<PeerManager>,
mut output_receiver: KcpPacketReceiver,
is_src: bool,
) {
while let Some(packet) = output_receiver.recv().await {
let dst_peer_id = if is_src {
packet.header().dst_session_id()
} else {
packet.header().src_session_id()
};
let packet_type = if is_src {
PacketType::KcpSrc as u8
} else {
PacketType::KcpDst as u8
};
let mut packet = ZCPacket::new_with_payload(&packet.inner().freeze());
packet.fill_peer_manager_hdr(peer_mgr.my_peer_id(), dst_peer_id, packet_type as u8);
if let Err(e) = peer_mgr.send_msg(packet, dst_peer_id).await {
tracing::error!("failed to send kcp packet to peer: {:?}", e);
}
}
}
#[derive(Debug, Clone)]
pub struct NatDstKcpConnector {
kcp_endpoint: Arc<KcpEndpoint>,
peer_mgr: Arc<PeerManager>,
}
#[async_trait::async_trait]
impl NatDstConnector for NatDstKcpConnector {
type DstStream = KcpStream;
async fn connect(&self, src: SocketAddr, nat_dst: SocketAddr) -> Result<Self::DstStream> {
let conn_data = KcpConnData {
src: Some(src.into()),
dst: Some(nat_dst.into()),
};
let (dst_peers, _) = match nat_dst {
SocketAddr::V4(addr) => {
let ip = addr.ip();
self.peer_mgr.get_msg_dst_peer(&ip).await
}
SocketAddr::V6(_) => return Err(anyhow::anyhow!("ipv6 is not supported").into()),
};
tracing::trace!("kcp nat dst: {:?}, dst peers: {:?}", nat_dst, dst_peers);
if dst_peers.len() != 1 {
return Err(anyhow::anyhow!("no dst peer found for nat dst: {}", nat_dst).into());
}
let ret = self
.kcp_endpoint
.connect(
Duration::from_secs(10),
self.peer_mgr.my_peer_id(),
dst_peers[0],
Bytes::from(conn_data.encode_to_vec()),
)
.await
.with_context(|| format!("failed to connect to nat dst: {}", nat_dst.to_string()))?;
let stream = KcpStream::new(&self.kcp_endpoint, ret)
.ok_or(anyhow::anyhow!("failed to create kcp stream"))?;
Ok(stream)
}
fn check_packet_from_peer_fast(&self, _cidr_set: &CidrSet, _global_ctx: &GlobalCtx) -> bool {
true
}
fn check_packet_from_peer(
&self,
_cidr_set: &CidrSet,
_global_ctx: &GlobalCtx,
hdr: &PeerManagerHeader,
_ipv4: &Ipv4Packet,
) -> bool {
return hdr.from_peer_id == hdr.to_peer_id;
}
fn transport_type(&self) -> TcpProxyEntryTransportType {
TcpProxyEntryTransportType::Kcp
}
}
#[derive(Clone)]
struct TcpProxyForKcpSrc(Arc<TcpProxy<NatDstKcpConnector>>);
pub struct KcpProxySrc {
kcp_endpoint: Arc<KcpEndpoint>,
peer_manager: Arc<PeerManager>,
tcp_proxy: TcpProxyForKcpSrc,
tasks: JoinSet<()>,
}
impl TcpProxyForKcpSrc {
async fn check_dst_allow_kcp_input(&self, dst_ip: &Ipv4Addr) -> bool {
let peer_map: Arc<crate::peers::peer_map::PeerMap> =
self.0.get_peer_manager().get_peer_map();
let Some(dst_peer_id) = peer_map.get_peer_id_by_ipv4(dst_ip).await else {
return false;
};
let Some(feature_flag) = peer_map.get_peer_feature_flag(dst_peer_id).await else {
return false;
};
feature_flag.kcp_input
}
}
#[async_trait::async_trait]
impl NicPacketFilter for TcpProxyForKcpSrc {
async fn try_process_packet_from_nic(&self, zc_packet: &mut ZCPacket) -> bool {
let ret = self.0.try_process_packet_from_nic(zc_packet).await;
if ret {
return true;
}
let data = zc_packet.payload();
let ip_packet = Ipv4Packet::new(data).unwrap();
if ip_packet.get_version() != 4
|| ip_packet.get_next_level_protocol() != IpNextHeaderProtocols::Tcp
{
return false;
}
// if no connection is established, only allow SYN packet
let tcp_packet = TcpPacket::new(ip_packet.payload()).unwrap();
let is_syn = tcp_packet.get_flags() & TcpFlags::SYN != 0
&& tcp_packet.get_flags() & TcpFlags::ACK == 0;
if is_syn {
// only check dst feature flag when SYN packet
if !self
.check_dst_allow_kcp_input(&ip_packet.get_destination())
.await
{
return false;
}
} else {
// if not syn packet, only allow established connection
if !self.0.is_tcp_proxy_connection(SocketAddr::new(
IpAddr::V4(ip_packet.get_source()),
tcp_packet.get_source(),
)) {
return false;
}
}
if let Some(my_ipv4) = self.0.get_global_ctx().get_ipv4() {
// this is a net-to-net packet, only allow it when smoltcp is enabled
// because the syn-ack packet will not be through and handled by the tun device when
// the source ip is in the local network
if ip_packet.get_source() != my_ipv4.address() && !self.0.is_smoltcp_enabled() {
return false;
}
};
zc_packet.mut_peer_manager_header().unwrap().to_peer_id = self.0.get_my_peer_id().into();
true
}
}
impl KcpProxySrc {
pub async fn new(peer_manager: Arc<PeerManager>) -> Self {
let mut kcp_endpoint = create_kcp_endpoint();
kcp_endpoint.run().await;
let output_receiver = kcp_endpoint.output_receiver().unwrap();
let mut tasks = JoinSet::new();
tasks.spawn(handle_kcp_output(
peer_manager.clone(),
output_receiver,
true,
));
let kcp_endpoint = Arc::new(kcp_endpoint);
let tcp_proxy = TcpProxy::new(
peer_manager.clone(),
NatDstKcpConnector {
kcp_endpoint: kcp_endpoint.clone(),
peer_mgr: peer_manager.clone(),
},
);
Self {
kcp_endpoint,
peer_manager,
tcp_proxy: TcpProxyForKcpSrc(tcp_proxy),
tasks,
}
}
pub async fn start(&self) {
self.peer_manager
.add_nic_packet_process_pipeline(Box::new(self.tcp_proxy.clone()))
.await;
self.peer_manager
.add_packet_process_pipeline(Box::new(self.tcp_proxy.0.clone()))
.await;
self.peer_manager
.add_packet_process_pipeline(Box::new(KcpEndpointFilter {
kcp_endpoint: self.kcp_endpoint.clone(),
is_src: true,
}))
.await;
self.tcp_proxy.0.start(false).await.unwrap();
}
pub fn get_tcp_proxy(&self) -> Arc<TcpProxy<NatDstKcpConnector>> {
self.tcp_proxy.0.clone()
}
}
pub struct KcpProxyDst {
kcp_endpoint: Arc<KcpEndpoint>,
peer_manager: Arc<PeerManager>,
proxy_entries: Arc<DashMap<ConnId, TcpProxyEntry>>,
tasks: JoinSet<()>,
}
impl KcpProxyDst {
pub async fn new(peer_manager: Arc<PeerManager>) -> Self {
let mut kcp_endpoint = create_kcp_endpoint();
kcp_endpoint.run().await;
let mut tasks = JoinSet::new();
let output_receiver = kcp_endpoint.output_receiver().unwrap();
tasks.spawn(handle_kcp_output(
peer_manager.clone(),
output_receiver,
false,
));
Self {
kcp_endpoint: Arc::new(kcp_endpoint),
peer_manager,
proxy_entries: Arc::new(DashMap::new()),
tasks,
}
}
#[tracing::instrument(ret)]
async fn handle_one_in_stream(
mut kcp_stream: KcpStream,
global_ctx: ArcGlobalCtx,
proxy_entries: Arc<DashMap<ConnId, TcpProxyEntry>>,
) -> Result<()> {
let mut conn_data = kcp_stream.conn_data().clone();
let parsed_conn_data = KcpConnData::decode(&mut conn_data)
.with_context(|| format!("failed to decode kcp conn data: {:?}", conn_data))?;
let mut dst_socket: SocketAddr = parsed_conn_data
.dst
.ok_or(anyhow::anyhow!(
"failed to get dst socket from kcp conn data: {:?}",
parsed_conn_data
))?
.into();
let conn_id = kcp_stream.conn_id();
proxy_entries.insert(
conn_id,
TcpProxyEntry {
src: parsed_conn_data.src,
dst: parsed_conn_data.dst,
start_time: chrono::Local::now().timestamp() as u64,
state: TcpProxyEntryState::ConnectingDst.into(),
transport_type: TcpProxyEntryTransportType::Kcp.into(),
},
);
crate::defer! {
proxy_entries.remove(&conn_id);
}
if Some(dst_socket.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address()))
&& global_ctx.no_tun()
{
dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap();
}
tracing::debug!("kcp connect to dst socket: {:?}", dst_socket);
let _g = global_ctx.net_ns.guard();
let connector = NatDstTcpConnector {};
let mut ret = connector
.connect("0.0.0.0:0".parse().unwrap(), dst_socket)
.await?;
if let Some(mut e) = proxy_entries.get_mut(&kcp_stream.conn_id()) {
e.state = TcpProxyEntryState::Connected.into();
}
copy_bidirectional(&mut ret, &mut kcp_stream).await?;
Ok(())
}
async fn run_accept_task(&mut self) {
let kcp_endpoint = self.kcp_endpoint.clone();
let global_ctx = self.peer_manager.get_global_ctx().clone();
let proxy_entries = self.proxy_entries.clone();
self.tasks.spawn(async move {
while let Ok(conn) = kcp_endpoint.accept().await {
let stream = KcpStream::new(&kcp_endpoint, conn)
.ok_or(anyhow::anyhow!("failed to create kcp stream"))
.unwrap();
let global_ctx = global_ctx.clone();
let proxy_entries = proxy_entries.clone();
tokio::spawn(async move {
let _ = Self::handle_one_in_stream(stream, global_ctx, proxy_entries).await;
});
}
});
}
pub async fn start(&mut self) {
self.run_accept_task().await;
self.peer_manager
.add_packet_process_pipeline(Box::new(KcpEndpointFilter {
kcp_endpoint: self.kcp_endpoint.clone(),
is_src: false,
}))
.await;
}
}
#[derive(Clone)]
pub struct KcpProxyDstRpcService(Weak<DashMap<ConnId, TcpProxyEntry>>);
impl KcpProxyDstRpcService {
pub fn new(kcp_proxy_dst: &KcpProxyDst) -> Self {
Self(Arc::downgrade(&kcp_proxy_dst.proxy_entries))
}
}
#[async_trait::async_trait]
impl TcpProxyRpc for KcpProxyDstRpcService {
type Controller = BaseController;
async fn list_tcp_proxy_entry(
&self,
_: BaseController,
_request: ListTcpProxyEntryRequest, // Accept request of type HelloRequest
) -> std::result::Result<ListTcpProxyEntryResponse, rpc_types::error::Error> {
let mut reply = ListTcpProxyEntryResponse::default();
if let Some(tcp_proxy) = self.0.upgrade() {
for item in tcp_proxy.iter() {
reply.entries.push(item.value().clone());
}
}
Ok(reply)
}
}

View File

@@ -15,8 +15,10 @@ pub mod fast_socks5;
#[cfg(feature = "socks5")]
pub mod socks5;
pub mod kcp_proxy;
#[derive(Debug)]
struct CidrSet {
pub(crate) struct CidrSet {
global_ctx: ArcGlobalCtx,
cidr_set: Arc<Mutex<Vec<cidr::IpCidr>>>,
tasks: JoinSet<()>,

View File

@@ -1,3 +1,4 @@
use anyhow::Context;
use cidr::Ipv4Inet;
use core::panic;
use crossbeam::atomic::AtomicCell;
@@ -7,14 +8,17 @@ use pnet::packet::ipv4::{Ipv4Packet, MutableIpv4Packet};
use pnet::packet::tcp::{ipv4_checksum, MutableTcpPacket, TcpPacket};
use pnet::packet::MutablePacket;
use pnet::packet::Packet;
use socket2::{SockRef, TcpKeepalive};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::atomic::{AtomicBool, AtomicU16};
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use tokio::io::copy_bidirectional;
use tokio::io::{copy_bidirectional, AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::select;
use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinSet;
use tokio::time::timeout;
use tracing::Instrument;
use crate::common::error::Result;
@@ -23,31 +27,107 @@ use crate::common::join_joinset_background;
use crate::peers::peer_manager::PeerManager;
use crate::peers::{NicPacketFilter, PeerPacketFilter};
use crate::tunnel::packet_def::{PacketType, ZCPacket};
use crate::proto::cli::{
ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc,
};
use crate::proto::rpc_types;
use crate::proto::rpc_types::controller::BaseController;
use crate::tunnel::packet_def::{PacketType, PeerManagerHeader, ZCPacket};
use super::CidrSet;
#[cfg(feature = "smoltcp")]
use super::tokio_smoltcp::{self, channel_device, Net, NetConfig};
#[derive(Debug, Clone, Copy, PartialEq)]
enum NatDstEntryState {
// receive syn packet but not start connecting to dst
SynReceived,
// connecting to dst
ConnectingDst,
// connected to dst
Connected,
// connection closed
Closed,
#[async_trait::async_trait]
pub(crate) trait NatDstConnector: Send + Sync + Clone + 'static {
type DstStream: AsyncRead + AsyncWrite + Unpin + Send;
async fn connect(&self, src: SocketAddr, dst: SocketAddr) -> Result<Self::DstStream>;
fn check_packet_from_peer_fast(&self, cidr_set: &CidrSet, global_ctx: &GlobalCtx) -> bool;
fn check_packet_from_peer(
&self,
cidr_set: &CidrSet,
global_ctx: &GlobalCtx,
hdr: &PeerManagerHeader,
ipv4: &Ipv4Packet,
) -> bool;
fn transport_type(&self) -> TcpProxyEntryTransportType;
}
#[derive(Debug, Clone)]
pub struct NatDstTcpConnector;
#[async_trait::async_trait]
impl NatDstConnector for NatDstTcpConnector {
type DstStream = TcpStream;
async fn connect(&self, _src: SocketAddr, nat_dst: SocketAddr) -> Result<Self::DstStream> {
let socket = TcpSocket::new_v4().unwrap();
if let Err(e) = socket.set_nodelay(true) {
tracing::warn!("set_nodelay failed, ignore it: {:?}", e);
}
const TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
const TCP_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(2);
const TCP_KEEPALIVE_RETRIES: u32 = 2;
let stream = timeout(Duration::from_secs(10), socket.connect(nat_dst))
.await?
.with_context(|| format!("connect to nat dst failed: {:?}", nat_dst))?;
let ka = TcpKeepalive::new()
.with_time(TCP_KEEPALIVE_TIME)
.with_interval(TCP_KEEPALIVE_INTERVAL);
#[cfg(not(target_os = "windows"))]
let ka = ka.with_retries(TCP_KEEPALIVE_RETRIES);
let sf = SockRef::from(&stream);
sf.set_tcp_keepalive(&ka)?;
Ok(stream)
}
fn check_packet_from_peer_fast(&self, cidr_set: &CidrSet, global_ctx: &GlobalCtx) -> bool {
!cidr_set.is_empty() || global_ctx.enable_exit_node() || global_ctx.no_tun()
}
fn check_packet_from_peer(
&self,
cidr_set: &CidrSet,
global_ctx: &GlobalCtx,
hdr: &PeerManagerHeader,
ipv4: &Ipv4Packet,
) -> bool {
let is_exit_node = hdr.is_exit_node();
if !cidr_set.contains_v4(ipv4.get_destination())
&& !is_exit_node
&& !(global_ctx.no_tun()
&& Some(ipv4.get_destination())
== global_ctx.get_ipv4().as_ref().map(Ipv4Inet::address))
{
return false;
}
true
}
fn transport_type(&self) -> TcpProxyEntryTransportType {
TcpProxyEntryTransportType::Tcp
}
}
type NatDstEntryState = TcpProxyEntryState;
#[derive(Debug)]
pub struct NatDstEntry {
id: uuid::Uuid,
src: SocketAddr,
dst: SocketAddr,
start_time: Instant,
start_time_local: chrono::DateTime<chrono::Local>,
tasks: Mutex<JoinSet<()>>,
state: AtomicCell<NatDstEntryState>,
}
@@ -59,10 +139,21 @@ impl NatDstEntry {
src,
dst,
start_time: Instant::now(),
start_time_local: chrono::Local::now(),
tasks: Mutex::new(JoinSet::new()),
state: AtomicCell::new(NatDstEntryState::SynReceived),
}
}
fn into_pb(&self, transport_type: TcpProxyEntryTransportType) -> TcpProxyEntry {
TcpProxyEntry {
src: Some(self.src.clone().into()),
dst: Some(self.dst.clone().into()),
start_time: self.start_time_local.timestamp() as u64,
state: self.state.load().into(),
transport_type: transport_type.into(),
}
}
}
enum ProxyTcpStream {
@@ -83,7 +174,24 @@ impl ProxyTcpStream {
}
}
pub async fn copy_bidirectional(&mut self, dst: &mut TcpStream) -> Result<()> {
pub async fn shutdown(&mut self) -> Result<()> {
match self {
Self::KernelTcpStream(stream) => {
stream.shutdown().await?;
Ok(())
}
#[cfg(feature = "smoltcp")]
Self::SmolTcpStream(stream) => {
stream.shutdown().await?;
Ok(())
}
}
}
pub async fn copy_bidirectional<D: AsyncRead + AsyncWrite + Unpin>(
&mut self,
dst: &mut D,
) -> Result<()> {
match self {
Self::KernelTcpStream(stream) => {
copy_bidirectional(stream, dst).await?;
@@ -122,11 +230,29 @@ impl SmolTcpListener {
.unwrap();
let tx = tx.clone();
tasks.spawn(async move {
let mut not_listening_count = 0;
loop {
tx.send(tcp.accept().await.map_err(|e| {
anyhow::anyhow!("smol tcp listener accept failed: {:?}", e).into()
}))
.unwrap();
select! {
_ = tokio::time::sleep(Duration::from_secs(2)) => {
if tcp.is_listening() {
not_listening_count = 0;
continue;
}
not_listening_count += 1;
if not_listening_count >= 2 {
tracing::error!("smol tcp listener not listening");
tcp.relisten();
}
}
accept_ret = tcp.accept() => {
tx.send(accept_ret.map_err(|e| {
anyhow::anyhow!("smol tcp listener accept failed: {:?}", e).into()
}))
.unwrap();
not_listening_count = 0;
}
}
}
});
}
@@ -176,7 +302,7 @@ type ConnSockMap = Arc<DashMap<uuid::Uuid, ArcNatDstEntry>>;
type AddrConnSockMap = Arc<DashMap<SocketAddr, ArcNatDstEntry>>;
#[derive(Debug)]
pub struct TcpProxy {
pub struct TcpProxy<C: NatDstConnector> {
global_ctx: Arc<GlobalCtx>,
peer_manager: Arc<PeerManager>,
local_port: AtomicU16,
@@ -194,10 +320,12 @@ pub struct TcpProxy {
#[cfg(feature = "smoltcp")]
smoltcp_net: Arc<Mutex<Option<Net>>>,
enable_smoltcp: Arc<AtomicBool>,
connector: C,
}
#[async_trait::async_trait]
impl PeerPacketFilter for TcpProxy {
impl<C: NatDstConnector> PeerPacketFilter for TcpProxy<C> {
async fn try_process_packet_from_peer(&self, mut packet: ZCPacket) -> Option<ZCPacket> {
if let Some(_) = self.try_handle_peer_packet(&mut packet).await {
if self
@@ -221,10 +349,10 @@ impl PeerPacketFilter for TcpProxy {
}
#[async_trait::async_trait]
impl NicPacketFilter for TcpProxy {
async fn try_process_packet_from_nic(&self, zc_packet: &mut ZCPacket) {
impl<C: NatDstConnector> NicPacketFilter for TcpProxy<C> {
async fn try_process_packet_from_nic(&self, zc_packet: &mut ZCPacket) -> bool {
let Some(my_ipv4) = self.get_local_ip() else {
return;
return false;
};
let data = zc_packet.payload();
@@ -233,25 +361,33 @@ impl NicPacketFilter for TcpProxy {
|| ip_packet.get_source() != my_ipv4
|| ip_packet.get_next_level_protocol() != IpNextHeaderProtocols::Tcp
{
return;
return false;
}
let tcp_packet = TcpPacket::new(ip_packet.payload()).unwrap();
if tcp_packet.get_source() != self.get_local_port() {
return;
return false;
}
let dst_addr = SocketAddr::V4(SocketAddrV4::new(
let mut dst_addr = SocketAddr::V4(SocketAddrV4::new(
ip_packet.get_destination(),
tcp_packet.get_destination(),
));
let mut need_transform_dst = false;
// for kcp proxy, the src ip of nat entry will be converted from my ip to fake ip
// here we need to convert it back
if !self.is_smoltcp_enabled() && dst_addr.ip() == Self::get_fake_local_ipv4(my_ipv4) {
dst_addr.set_ip(IpAddr::V4(my_ipv4));
need_transform_dst = true;
}
tracing::trace!(dst_addr = ?dst_addr, "tcp packet try find entry");
let entry = if let Some(entry) = self.addr_conn_map.get(&dst_addr) {
entry
} else {
let Some(syn_entry) = self.syn_map.get(&dst_addr) else {
return;
return false;
};
syn_entry
};
@@ -267,9 +403,15 @@ impl NicPacketFilter for TcpProxy {
.mut_peer_manager_header()
.unwrap()
.set_no_proxy(true);
if need_transform_dst {
zc_packet.mut_peer_manager_header().unwrap().to_peer_id = self.get_my_peer_id().into();
}
let mut ip_packet = MutableIpv4Packet::new(zc_packet.mut_payload()).unwrap();
ip_packet.set_source(ip);
if need_transform_dst {
ip_packet.set_destination(my_ipv4);
}
let dst = ip_packet.get_destination();
let mut tcp_packet = MutableTcpPacket::new(ip_packet.payload_mut()).unwrap();
@@ -280,12 +422,15 @@ impl NicPacketFilter for TcpProxy {
Self::update_ip_packet_checksum(&mut ip_packet);
tracing::trace!(dst_addr = ?dst_addr, nat_entry = ?nat_entry, packet = ?ip_packet, "tcp packet after modified");
true
}
}
impl TcpProxy {
pub fn new(global_ctx: Arc<GlobalCtx>, peer_manager: Arc<PeerManager>) -> Arc<Self> {
impl<C: NatDstConnector> TcpProxy<C> {
pub fn new(peer_manager: Arc<PeerManager>, connector: C) -> Arc<Self> {
let (smoltcp_stack_sender, smoltcp_stack_receiver) = mpsc::channel::<ZCPacket>(1000);
let global_ctx = peer_manager.get_global_ctx();
Arc::new(Self {
global_ctx: global_ctx.clone(),
@@ -307,6 +452,8 @@ impl TcpProxy {
smoltcp_net: Arc::new(Mutex::new(None)),
enable_smoltcp: Arc::new(AtomicBool::new(true)),
connector,
})
}
@@ -326,15 +473,17 @@ impl TcpProxy {
ip_packet.set_checksum(pnet::packet::ipv4::checksum(&ip_packet.to_immutable()));
}
pub async fn start(self: &Arc<Self>) -> Result<()> {
pub async fn start(self: &Arc<Self>, add_pipeline: bool) -> Result<()> {
self.run_syn_map_cleaner().await?;
self.run_listener().await?;
self.peer_manager
.add_packet_process_pipeline(Box::new(self.clone()))
.await;
self.peer_manager
.add_nic_packet_process_pipeline(Box::new(self.clone()))
.await;
if add_pipeline {
self.peer_manager
.add_packet_process_pipeline(Box::new(self.clone()))
.await;
self.peer_manager
.add_nic_packet_process_pipeline(Box::new(self.clone()))
.await;
}
join_joinset_background(self.tasks.clone(), "TcpProxy".to_owned());
Ok(())
@@ -364,7 +513,10 @@ impl TcpProxy {
async fn get_proxy_listener(&self) -> Result<ProxyTcpListener> {
#[cfg(feature = "smoltcp")]
if self.global_ctx.get_flags().use_smoltcp || self.global_ctx.no_tun() {
if self.global_ctx.get_flags().use_smoltcp
|| self.global_ctx.no_tun()
|| cfg!(target_os = "android")
{
// use smoltcp network stack
self.local_port
.store(8899, std::sync::atomic::Ordering::Relaxed);
@@ -458,11 +610,32 @@ impl TcpProxy {
let syn_map = self.syn_map.clone();
let conn_map = self.conn_map.clone();
let addr_conn_map = self.addr_conn_map.clone();
let connector = self.connector.clone();
let accept_task = async move {
let conn_map = conn_map.clone();
while let Ok((tcp_stream, socket_addr)) = tcp_listener.accept().await {
loop {
let accept_ret = tcp_listener.accept().await;
let Ok((tcp_stream, mut socket_addr)) = accept_ret else {
tracing::error!("nat tcp listener accept failed: {:?}", accept_ret.err());
continue;
};
let my_ip = global_ctx
.get_ipv4()
.as_ref()
.map(Ipv4Inet::address)
.unwrap_or(Ipv4Addr::UNSPECIFIED);
if socket_addr.ip() == Self::get_fake_local_ipv4(my_ip) {
socket_addr.set_ip(IpAddr::V4(my_ip));
}
let Some(entry) = syn_map.get(&socket_addr) else {
tracing::error!("tcp connection from unknown source: {:?}", socket_addr);
tracing::error!(
?my_ip,
?socket_addr,
"tcp connection from unknown source, ignore it"
);
continue;
};
tracing::info!(
@@ -483,6 +656,7 @@ impl TcpProxy {
assert!(old_nat_val.is_none());
tasks.lock().unwrap().spawn(Self::connect_to_nat_dst(
connector.clone(),
global_ctx.clone(),
tcp_stream,
conn_map.clone(),
@@ -490,8 +664,6 @@ impl TcpProxy {
entry_clone,
));
}
tracing::error!("nat tcp listener exited");
panic!("nat tcp listener exited");
};
self.tasks
.lock()
@@ -511,6 +683,7 @@ impl TcpProxy {
}
async fn connect_to_nat_dst(
connector: C,
global_ctx: ArcGlobalCtx,
src_tcp_stream: ProxyTcpStream,
conn_map: ConnSockMap,
@@ -521,12 +694,6 @@ impl TcpProxy {
tracing::warn!("set_nodelay failed, ignore it: {:?}", e);
}
let _guard = global_ctx.net_ns.guard();
let socket = TcpSocket::new_v4().unwrap();
if let Err(e) = socket.set_nodelay(true) {
tracing::warn!("set_nodelay failed, ignore it: {:?}", e);
}
let nat_dst = if Some(nat_entry.dst.ip())
== global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address()))
{
@@ -537,12 +704,8 @@ impl TcpProxy {
nat_entry.dst
};
let Ok(Ok(dst_tcp_stream)) = tokio::time::timeout(
Duration::from_secs(10),
TcpSocket::new_v4().unwrap().connect(nat_dst),
)
.await
else {
let _guard = global_ctx.net_ns.guard();
let Ok(dst_tcp_stream) = connector.connect(nat_entry.src, nat_dst).await else {
tracing::error!("connect to dst failed: {:?}", nat_entry);
nat_entry.state.store(NatDstEntryState::Closed);
Self::remove_entry_from_all_conn_map(conn_map, addr_conn_map, nat_entry);
@@ -567,7 +730,7 @@ impl TcpProxy {
async fn handle_nat_connection(
mut src_tcp_stream: ProxyTcpStream,
mut dst_tcp_stream: TcpStream,
mut dst_tcp_stream: C::DstStream,
conn_map: ConnSockMap,
addr_conn_map: AddrConnSockMap,
nat_entry: ArcNatDstEntry,
@@ -576,7 +739,21 @@ impl TcpProxy {
nat_entry.tasks.lock().await.spawn(async move {
let ret = src_tcp_stream.copy_bidirectional(&mut dst_tcp_stream).await;
tracing::info!(nat_entry = ?nat_entry_clone, ret = ?ret, "nat tcp connection closed");
nat_entry_clone.state.store(NatDstEntryState::ClosingSrc);
let ret = timeout(Duration::from_secs(10), src_tcp_stream.shutdown()).await;
tracing::info!(nat_entry = ?nat_entry_clone, ret = ?ret, "src tcp stream shutdown");
nat_entry_clone.state.store(NatDstEntryState::ClosingDst);
let ret = timeout(Duration::from_secs(10), dst_tcp_stream.shutdown()).await;
tracing::info!(nat_entry = ?nat_entry_clone, ret = ?ret, "dst tcp stream shutdown");
drop(src_tcp_stream);
drop(dst_tcp_stream);
nat_entry_clone.state.store(NatDstEntryState::Closed);
// sleep later so the fin packet can be processed
tokio::time::sleep(Duration::from_secs(10)).await;
Self::remove_entry_from_all_conn_map(conn_map, addr_conn_map, nat_entry_clone);
});
@@ -586,11 +763,12 @@ impl TcpProxy {
self.local_port.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn get_my_peer_id(&self) -> u32 {
self.peer_manager.my_peer_id()
}
pub fn get_local_ip(&self) -> Option<Ipv4Addr> {
if self
.enable_smoltcp
.load(std::sync::atomic::Ordering::Relaxed)
{
if self.is_smoltcp_enabled() {
Some(Ipv4Addr::new(192, 88, 99, 254))
} else {
self.global_ctx
@@ -600,17 +778,30 @@ impl TcpProxy {
}
}
pub fn get_global_ctx(&self) -> &ArcGlobalCtx {
&self.global_ctx
}
pub fn is_smoltcp_enabled(&self) -> bool {
self.enable_smoltcp
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn get_fake_local_ipv4(local_ip: Ipv4Addr) -> Ipv4Addr {
let octets = local_ip.octets();
Ipv4Addr::new(octets[0], octets[1], octets[2], 0)
}
async fn try_handle_peer_packet(&self, packet: &mut ZCPacket) -> Option<()> {
if self.cidr_set.is_empty()
&& !self.global_ctx.enable_exit_node()
&& !self.global_ctx.no_tun()
if !self
.connector
.check_packet_from_peer_fast(&self.cidr_set, &self.global_ctx)
{
return None;
}
let ipv4_addr = self.get_local_ip()?;
let hdr = packet.peer_manager_header().unwrap();
let is_exit_node = hdr.is_exit_node();
let hdr = packet.peer_manager_header().unwrap().clone();
if hdr.packet_type != PacketType::Data as u8 || hdr.is_no_proxy() {
return None;
@@ -623,11 +814,9 @@ impl TcpProxy {
return None;
}
if !self.cidr_set.contains_v4(ipv4.get_destination())
&& !is_exit_node
&& !(self.global_ctx.no_tun()
&& Some(ipv4.get_destination())
== self.global_ctx.get_ipv4().as_ref().map(Ipv4Inet::address))
if !self
.connector
.check_packet_from_peer(&self.cidr_set, &self.global_ctx, &hdr, &ipv4)
{
return None;
}
@@ -658,6 +847,10 @@ impl TcpProxy {
}
let mut ip_packet = MutableIpv4Packet::new(payload_bytes).unwrap();
if !self.is_smoltcp_enabled() && source_ip == ipv4_addr {
// modify the source so the response packet can be handled by tun device
ip_packet.set_source(Self::get_fake_local_ipv4(ipv4_addr));
}
ip_packet.set_destination(ipv4_addr);
let source = ip_packet.get_source();
@@ -672,4 +865,53 @@ impl TcpProxy {
Some(())
}
pub fn get_peer_manager(&self) -> &Arc<PeerManager> {
&self.peer_manager
}
pub fn is_tcp_proxy_connection(&self, src: SocketAddr) -> bool {
self.syn_map.contains_key(&src) || self.addr_conn_map.contains_key(&src)
}
pub fn list_proxy_entries(&self) -> Vec<TcpProxyEntry> {
let mut entries: Vec<TcpProxyEntry> = Vec::new();
let transport_type = self.connector.transport_type();
for entry in self.syn_map.iter() {
entries.push(entry.value().as_ref().into_pb(transport_type));
}
for entry in self.conn_map.iter() {
entries.push(entry.value().as_ref().into_pb(transport_type));
}
entries
}
}
#[derive(Clone)]
pub struct TcpProxyRpcService<C: NatDstConnector> {
tcp_proxy: Weak<TcpProxy<C>>,
}
#[async_trait::async_trait]
impl<C: NatDstConnector> TcpProxyRpc for TcpProxyRpcService<C> {
type Controller = BaseController;
async fn list_tcp_proxy_entry(
&self,
_: BaseController,
_request: ListTcpProxyEntryRequest, // Accept request of type HelloRequest
) -> std::result::Result<ListTcpProxyEntryResponse, rpc_types::error::Error> {
let mut reply = ListTcpProxyEntryResponse::default();
if let Some(tcp_proxy) = self.tcp_proxy.upgrade() {
reply.entries = tcp_proxy.list_proxy_entries();
}
Ok(reply)
}
}
impl<C: NatDstConnector> TcpProxyRpcService<C> {
pub fn new(tcp_proxy: Arc<TcpProxy<C>>) -> Self {
Self {
tcp_proxy: Arc::downgrade(&tcp_proxy),
}
}
}

View File

@@ -43,7 +43,7 @@ pub struct BufferRxToken(Packet);
impl RxToken for BufferRxToken {
fn consume<R, F>(mut self, f: F) -> R
where
F: FnOnce(&mut [u8]) -> R,
F: FnOnce(&[u8]) -> R,
{
let p = &mut self.0;
let result = f(p);

View File

@@ -91,6 +91,19 @@ async fn run(
&mut device,
&mut socket_allocator.sockets().lock(),
);
// wake up all closed sockets (smoltcp seems have a bug that it doesn't wake up closed sockets)
for (_, socket) in socket_allocator.sockets().lock().iter_mut() {
match socket {
Socket::Tcp(tcp) => {
if tcp.state() == smoltcp::socket::tcp::State::Closed {
tcp.abort();
}
}
#[allow(unreachable_patterns)]
_ => {}
}
}
}
Ok(())

View File

@@ -67,6 +67,19 @@ impl TcpListener {
pub fn local_addr(&self) -> io::Result<SocketAddr> {
Ok(self.local_addr)
}
pub fn relisten(&mut self) {
let mut socket = self.reactor.get_socket::<tcp::Socket>(*self.handle);
let local_endpoint = socket.local_endpoint().unwrap();
socket.abort();
socket.listen(local_endpoint).unwrap();
self.reactor.notify();
}
pub fn is_listening(&self) -> bool {
let socket = self.reactor.get_socket::<tcp::Socket>(*self.handle);
socket.is_listening()
}
}
pub struct Incoming(TcpListener);
@@ -210,6 +223,9 @@ impl AsyncWrite for TcpStream {
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let mut socket = self.reactor.get_socket::<tcp::Socket>(*self.handle);
if !socket.may_send() {
return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
}
if socket.send_queue() == 0 {
return Poll::Ready(Ok(()));
}

View File

@@ -2,6 +2,7 @@ use parking_lot::Mutex;
use smoltcp::{
iface::{SocketHandle as InnerSocketHandle, SocketSet},
socket::tcp,
time::Duration,
};
use std::{
ops::{Deref, DerefMut},
@@ -53,6 +54,8 @@ impl SocketAlloctor {
let tx_buffer = tcp::SocketBuffer::new(vec![0; self.buffer_size.tcp_tx_size]);
let mut tcp = tcp::Socket::new(rx_buffer, tx_buffer);
tcp.set_nagle_enabled(false);
tcp.set_keep_alive(Some(Duration::from_secs(10)));
tcp.set_timeout(Some(Duration::from_secs(60)));
tcp
}

View File

@@ -4,6 +4,7 @@ use std::{
time::Duration,
};
use bytes::{BufMut, BytesMut};
use cidr::Ipv4Inet;
use crossbeam::atomic::AtomicCell;
use dashmap::DashMap;
@@ -24,11 +25,11 @@ use tokio::{
use tracing::Level;
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
common::{error::Error, global_ctx::ArcGlobalCtx, scoped_task::ScopedTask, PeerId},
gateway::ip_reassembler::compose_ipv4_packet,
peers::{peer_manager::PeerManager, PeerPacketFilter},
tunnel::{
common::setup_sokcet2,
common::{reserve_buf, setup_sokcet2},
packet_def::{PacketType, ZCPacket},
},
};
@@ -139,59 +140,81 @@ impl UdpNatEntry {
mut packet_sender: Sender<ZCPacket>,
virtual_ipv4: Ipv4Addr,
) {
let mut buf = [0u8; 65536];
let mut udp_body: &mut [u8] = unsafe { std::mem::transmute(&mut buf[20 + 8..]) };
let mut ip_id = 1;
let (s, mut r) = tachyonix::channel(128);
loop {
let (len, src_socket) = match timeout(
Duration::from_secs(120),
self.socket.recv_from(&mut udp_body),
)
.await
{
Ok(Ok(x)) => x,
Ok(Err(err)) => {
tracing::error!(?err, "udp nat recv failed");
let self_clone = self.clone();
let recv_task = ScopedTask::from(tokio::spawn(async move {
let mut cur_buf = BytesMut::new();
loop {
if self_clone
.stopped
.load(std::sync::atomic::Ordering::Relaxed)
{
break;
}
Err(err) => {
tracing::error!(?err, "udp nat recv timeout");
break;
reserve_buf(&mut cur_buf, 64 * 1024 + 28, 128 * 1024 + 28);
assert_eq!(cur_buf.len(), 0);
unsafe {
cur_buf.advance_mut(28);
}
};
tracing::trace!(?len, ?src_socket, "udp nat packet response received");
let (len, src_socket) = match timeout(
Duration::from_secs(120),
self_clone.socket.recv_buf_from(&mut cur_buf),
)
.await
{
Ok(Ok(x)) => x,
Ok(Err(err)) => {
tracing::error!(?err, "udp nat recv failed");
break;
}
Err(err) => {
tracing::error!(?err, "udp nat recv timeout");
break;
}
};
if self.stopped.load(std::sync::atomic::Ordering::Relaxed) {
break;
tracing::trace!(?len, ?src_socket, "udp nat packet response received");
let ret_buf = cur_buf.split();
s.send((ret_buf, len, src_socket)).await.unwrap();
}
}));
let SocketAddr::V4(mut src_v4) = src_socket else {
continue;
};
let self_clone = self.clone();
let send_task = ScopedTask::from(tokio::spawn(async move {
let mut ip_id = 1;
while let Ok((mut packet, len, src_socket)) = r.recv().await {
let SocketAddr::V4(mut src_v4) = src_socket else {
continue;
};
self.mark_active();
self_clone.mark_active();
if src_v4.ip().is_loopback() {
src_v4.set_ip(virtual_ipv4);
if src_v4.ip().is_loopback() {
src_v4.set_ip(virtual_ipv4);
}
let Ok(_) = Self::compose_ipv4_packet(
&self_clone,
&mut packet_sender,
&mut packet,
&src_v4,
len,
1280,
ip_id,
)
.await
else {
break;
};
ip_id = ip_id.wrapping_add(1);
}
}));
let Ok(_) = Self::compose_ipv4_packet(
&self,
&mut packet_sender,
&mut buf,
&src_v4,
len,
1200,
ip_id,
)
.await
else {
break;
};
ip_id = ip_id.wrapping_add(1);
}
let _ = tokio::join!(recv_task, send_task);
self.stop();
}

View File

@@ -17,13 +17,14 @@ use crate::connector::direct::DirectConnectorManager;
use crate::connector::manual::{ConnectorManagerRpcService, ManualConnectorManager};
use crate::connector::udp_hole_punch::UdpHolePunchConnector;
use crate::gateway::icmp_proxy::IcmpProxy;
use crate::gateway::tcp_proxy::TcpProxy;
use crate::gateway::kcp_proxy::{KcpProxyDst, KcpProxyDstRpcService, KcpProxySrc};
use crate::gateway::tcp_proxy::{NatDstTcpConnector, TcpProxy, TcpProxyRpcService};
use crate::gateway::udp_proxy::UdpProxy;
use crate::peer_center::instance::PeerCenterInstance;
use crate::peers::peer_conn::PeerConnId;
use crate::peers::peer_manager::{PeerManager, RouteAlgoType};
use crate::peers::rpc_service::PeerManagerRpcService;
use crate::peers::PacketRecvChanReceiver;
use crate::peers::{create_packet_recv_chan, recv_packet_from_chan, PacketRecvChanReceiver};
use crate::proto::cli::VpnPortalRpc;
use crate::proto::cli::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo};
use crate::proto::peer_rpc::PeerCenterRpcServer;
@@ -40,7 +41,7 @@ use crate::gateway::socks5::Socks5Server;
#[derive(Clone)]
struct IpProxy {
tcp_proxy: Arc<TcpProxy>,
tcp_proxy: Arc<TcpProxy<NatDstTcpConnector>>,
icmp_proxy: Arc<IcmpProxy>,
udp_proxy: Arc<UdpProxy>,
global_ctx: ArcGlobalCtx,
@@ -49,7 +50,7 @@ struct IpProxy {
impl IpProxy {
fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc<PeerManager>) -> Result<Self, Error> {
let tcp_proxy = TcpProxy::new(global_ctx.clone(), peer_manager.clone());
let tcp_proxy = TcpProxy::new(peer_manager.clone(), NatDstTcpConnector {});
let icmp_proxy = IcmpProxy::new(global_ctx.clone(), peer_manager.clone())
.with_context(|| "create icmp proxy failed")?;
let udp_proxy = UdpProxy::new(global_ctx.clone(), peer_manager.clone())
@@ -64,7 +65,9 @@ impl IpProxy {
}
async fn start(&self) -> Result<(), Error> {
if (self.global_ctx.get_proxy_cidrs().is_empty() || self.started.load(Ordering::Relaxed))
if (self.global_ctx.get_proxy_cidrs().is_empty()
|| self.global_ctx.proxy_forward_by_system()
|| self.started.load(Ordering::Relaxed))
&& !self.global_ctx.enable_exit_node()
&& !self.global_ctx.no_tun()
{
@@ -72,7 +75,7 @@ impl IpProxy {
}
self.started.store(true, Ordering::Relaxed);
self.tcp_proxy.start().await?;
self.tcp_proxy.start(true).await?;
self.icmp_proxy.start().await?;
self.udp_proxy.start().await?;
Ok(())
@@ -116,6 +119,9 @@ pub struct Instance {
ip_proxy: Option<IpProxy>,
kcp_proxy_src: Option<KcpProxySrc>,
kcp_proxy_dst: Option<KcpProxyDst>,
peer_center: Arc<PeerCenterInstance>,
vpn_portal: Arc<Mutex<Box<dyn VpnPortal>>>,
@@ -137,7 +143,7 @@ impl Instance {
global_ctx.config.dump()
);
let (peer_packet_sender, peer_packet_receiver) = tokio::sync::mpsc::channel(100);
let (peer_packet_sender, peer_packet_receiver) = create_packet_recv_chan();
let id = global_ctx.get_id();
@@ -193,6 +199,8 @@ impl Instance {
udp_hole_puncher: Arc::new(Mutex::new(udp_hole_puncher)),
ip_proxy: None,
kcp_proxy_src: None,
kcp_proxy_dst: None,
peer_center,
@@ -230,7 +238,7 @@ impl Instance {
let mut tasks = JoinSet::new();
tasks.spawn(async move {
let mut packet_recv = packet_recv.lock().await;
while let Some(packet) = packet_recv.recv().await {
while let Ok(packet) = recv_packet_from_chan(&mut packet_recv).await {
tracing::trace!("packet consumed by mock nic ctx: {:?}", packet);
}
});
@@ -374,7 +382,17 @@ impl Instance {
self.check_dhcp_ip_conflict();
}
self.run_rpc_server().await?;
if self.global_ctx.get_flags().enable_kcp_proxy {
let src_proxy = KcpProxySrc::new(self.get_peer_manager()).await;
src_proxy.start().await;
self.kcp_proxy_src = Some(src_proxy);
}
if !self.global_ctx.get_flags().disable_kcp_input {
let mut dst_proxy = KcpProxyDst::new(self.get_peer_manager()).await;
dst_proxy.start().await;
self.kcp_proxy_dst = Some(dst_proxy);
}
// run after tun device created, so listener can bind to tun device, which may be required by win 10
self.ip_proxy = Some(IpProxy::new(
@@ -401,6 +419,8 @@ impl Instance {
#[cfg(feature = "socks5")]
self.socks5_server.run().await?;
self.run_rpc_server().await?;
Ok(())
}
@@ -523,6 +543,26 @@ impl Instance {
s.registry()
.register(VpnPortalRpcServer::new(vpn_portal_rpc), "");
if let Some(ip_proxy) = self.ip_proxy.as_ref() {
s.registry().register(
TcpProxyRpcServer::new(TcpProxyRpcService::new(ip_proxy.tcp_proxy.clone())),
"tcp",
);
}
if let Some(kcp_proxy) = self.kcp_proxy_src.as_ref() {
s.registry().register(
TcpProxyRpcServer::new(TcpProxyRpcService::new(kcp_proxy.get_tcp_proxy())),
"kcp_src",
);
}
if let Some(kcp_proxy) = self.kcp_proxy_dst.as_ref() {
s.registry().register(
TcpProxyRpcServer::new(KcpProxyDstRpcService::new(kcp_proxy)),
"kcp_dst",
);
}
let _g = self.global_ctx.net_ns.guard();
Ok(s.serve().await.with_context(|| "rpc server start failed")?)
}

View File

@@ -2,7 +2,7 @@ use std::{fmt::Debug, sync::Arc};
use anyhow::Context;
use async_trait::async_trait;
use tokio::{sync::Mutex, task::JoinSet};
use tokio::task::JoinSet;
#[cfg(feature = "quic")]
use crate::tunnel::quic::QUICTunnelListener;
@@ -50,6 +50,10 @@ pub fn get_listener_by_url(
})
}
pub fn is_url_host_ipv6(l: &url::Url) -> bool {
l.host_str().map_or(false, |h| h.contains(':'))
}
#[async_trait]
pub trait TunnelHandlerForListener {
async fn handle_tunnel(&self, tunnel: Box<dyn Tunnel>) -> Result<(), Error>;
@@ -59,20 +63,24 @@ pub trait TunnelHandlerForListener {
impl TunnelHandlerForListener for PeerManager {
#[tracing::instrument]
async fn handle_tunnel(&self, tunnel: Box<dyn Tunnel>) -> Result<(), Error> {
self.add_tunnel_as_server(tunnel).await
self.add_tunnel_as_server(tunnel, true).await
}
}
#[derive(Debug, Clone)]
struct Listener {
inner: Arc<Mutex<dyn TunnelListener>>,
pub trait ListenerCreatorTrait: Fn() -> Box<dyn TunnelListener> + Send + Sync {}
impl<T: Send + Sync> ListenerCreatorTrait for T where T: Fn() -> Box<dyn TunnelListener> + Send {}
pub type ListenerCreator = Box<dyn ListenerCreatorTrait>;
#[derive(Clone)]
struct ListenerFactory {
creator_fn: Arc<ListenerCreator>,
must_succ: bool,
}
pub struct ListenerManager<H> {
global_ctx: ArcGlobalCtx,
net_ns: NetNS,
listeners: Vec<Listener>,
listeners: Vec<ListenerFactory>,
peer_manager: Arc<H>,
tasks: JoinSet<()>,
@@ -90,118 +98,145 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
}
pub async fn prepare_listeners(&mut self) -> Result<(), Error> {
let self_id = self.global_ctx.get_id();
self.add_listener(
RingTunnelListener::new(
format!("ring://{}", self.global_ctx.get_id())
.parse()
.unwrap(),
),
move || {
Box::new(RingTunnelListener::new(
format!("ring://{}", self_id).parse().unwrap(),
))
},
true,
)
.await?;
for l in self.global_ctx.config.get_listener_uris().iter() {
let Ok(lis) = get_listener_by_url(l, self.global_ctx.clone()) else {
let l = l.clone();
let Ok(_) = get_listener_by_url(&l, self.global_ctx.clone()) else {
let msg = format!("failed to get listener by url: {}, maybe not supported", l);
self.global_ctx
.issue_event(GlobalCtxEvent::ListenerAddFailed(l.clone(), msg));
continue;
};
self.add_listener(lis, true).await?;
}
let ctx = self.global_ctx.clone();
if self.global_ctx.config.get_flags().enable_ipv6 {
let ipv6_listener = self.global_ctx.config.get_flags().ipv6_listener.clone();
let _ = self
.add_listener(
UdpTunnelListener::new(ipv6_listener.parse().unwrap()),
let listener = l.clone();
self.add_listener(
move || get_listener_by_url(&listener, ctx.clone()).unwrap(),
true,
)
.await?;
if self.global_ctx.config.get_flags().enable_ipv6 && !is_url_host_ipv6(&l) {
let mut ipv6_listener = l.clone();
ipv6_listener
.set_host(Some("[::]".to_string().as_str()))
.with_context(|| format!("failed to set ipv6 host for listener: {}", l))?;
let ctx = self.global_ctx.clone();
self.add_listener(
move || get_listener_by_url(&ipv6_listener, ctx.clone()).unwrap(),
false,
)
.await?;
}
}
Ok(())
}
pub async fn add_listener<L>(&mut self, listener: L, must_succ: bool) -> Result<(), Error>
where
L: TunnelListener + 'static,
{
let listener = Arc::new(Mutex::new(listener));
self.listeners.push(Listener {
inner: listener,
pub async fn add_listener<C: ListenerCreatorTrait + 'static>(
&mut self,
creator: C,
must_succ: bool,
) -> Result<(), Error> {
self.listeners.push(ListenerFactory {
creator_fn: Arc::new(Box::new(creator)),
must_succ,
});
Ok(())
}
#[tracing::instrument]
#[tracing::instrument(skip(creator))]
async fn run_listener(
listener: Arc<Mutex<dyn TunnelListener>>,
creator: Arc<ListenerCreator>,
peer_manager: Arc<H>,
global_ctx: ArcGlobalCtx,
) {
let mut l = listener.lock().await;
global_ctx.add_running_listener(l.local_url());
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
loop {
let ret = match l.accept().await {
Ok(ret) => ret,
let mut l = (creator)();
let _g = global_ctx.net_ns.guard();
match l.listen().await {
Ok(_) => {
global_ctx.add_running_listener(l.local_url());
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
}
Err(e) => {
global_ctx.issue_event(GlobalCtxEvent::ListenerAcceptFailed(
tracing::error!(?e, ?l, "listener listen error");
global_ctx.issue_event(GlobalCtxEvent::ListenerAddFailed(
l.local_url(),
e.to_string(),
format!("error: {:?}, retry listen later...", e),
));
tracing::error!(?e, ?l, "listener accept error");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
};
}
loop {
let ret = match l.accept().await {
Ok(ret) => ret,
Err(e) => {
global_ctx.issue_event(GlobalCtxEvent::ListenerAcceptFailed(
l.local_url(),
format!("error: {:?}, retry listen later...", e),
));
tracing::error!(?e, ?l, "listener accept error");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
};
let tunnel_info = ret.info().unwrap();
global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted(
tunnel_info
.local_addr
.clone()
.unwrap_or_default()
.to_string(),
tunnel_info
.remote_addr
.clone()
.unwrap_or_default()
.to_string(),
));
tracing::info!(ret = ?ret, "conn accepted");
let peer_manager = peer_manager.clone();
let global_ctx = global_ctx.clone();
tokio::spawn(async move {
let server_ret = peer_manager.handle_tunnel(ret).await;
if let Err(e) = &server_ret {
global_ctx.issue_event(GlobalCtxEvent::ConnectionError(
tunnel_info.local_addr.unwrap_or_default().to_string(),
tunnel_info.remote_addr.unwrap_or_default().to_string(),
e.to_string(),
));
tracing::error!(error = ?e, "handle conn error");
}
});
let tunnel_info = ret.info().unwrap();
global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted(
tunnel_info
.local_addr
.clone()
.unwrap_or_default()
.to_string(),
tunnel_info
.remote_addr
.clone()
.unwrap_or_default()
.to_string(),
));
tracing::info!(ret = ?ret, "conn accepted");
let peer_manager = peer_manager.clone();
let global_ctx = global_ctx.clone();
tokio::spawn(async move {
let server_ret = peer_manager.handle_tunnel(ret).await;
if let Err(e) = &server_ret {
global_ctx.issue_event(GlobalCtxEvent::ConnectionError(
tunnel_info.local_addr.unwrap_or_default().to_string(),
tunnel_info.remote_addr.unwrap_or_default().to_string(),
e.to_string(),
));
tracing::error!(error = ?e, "handle conn error");
}
});
}
}
}
pub async fn run(&mut self) -> Result<(), Error> {
for listener in &self.listeners {
let _guard = self.net_ns.guard();
let addr = listener.inner.lock().await.local_url();
tracing::warn!("run listener: {:?}", listener);
listener
.inner
.lock()
.await
.listen()
.await
.with_context(|| format!("failed to add listener {}", addr))?;
if listener.must_succ {
// try listen once
let mut l = (listener.creator_fn)();
let _g = self.net_ns.guard();
l.listen()
.await
.with_context(|| format!("failed to listen on {}", l.local_url()))?;
}
self.tasks.spawn(Self::run_listener(
listener.inner.clone(),
listener.creator_fn.clone(),
self.peer_manager.clone(),
self.global_ctx.clone(),
));
@@ -213,12 +248,14 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicI32, Ordering};
use futures::{SinkExt, StreamExt};
use tokio::time::timeout;
use crate::{
common::global_ctx::tests::get_mock_global_ctx,
tunnel::{packet_def::ZCPacket, ring::RingTunnelConnector, TunnelConnector},
tunnel::{packet_def::ZCPacket, ring::RingTunnelConnector, TunnelConnector, TunnelError},
};
use super::*;
@@ -245,12 +282,18 @@ mod tests {
let ring_id = format!("ring://{}", uuid::Uuid::new_v4());
let ring_id_clone = ring_id.clone();
listener_mgr
.add_listener(RingTunnelListener::new(ring_id.parse().unwrap()), true)
.add_listener(
move || Box::new(RingTunnelListener::new(ring_id_clone.parse().unwrap())),
true,
)
.await
.unwrap();
listener_mgr.run().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let connect_once = |ring_id| async move {
let tunnel = RingTunnelConnector::new(ring_id).connect().await.unwrap();
let (mut recv, _send) = tunnel.split();
@@ -269,4 +312,60 @@ mod tests {
.await
.unwrap();
}
#[tokio::test]
async fn retry_listen() {
let counter = Arc::new(AtomicI32::new(0));
let drop_counter = Arc::new(AtomicI32::new(0));
struct MockListener {
counter: Arc<AtomicI32>,
drop_counter: Arc<AtomicI32>,
}
#[async_trait::async_trait]
impl TunnelListener for MockListener {
fn local_url(&self) -> url::Url {
"mock://".parse().unwrap()
}
async fn listen(&mut self) -> Result<(), TunnelError> {
self.counter.fetch_add(1, Ordering::Relaxed);
Ok(())
}
async fn accept(&mut self) -> Result<Box<dyn Tunnel>, TunnelError> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Err(TunnelError::BufferFull)
}
}
impl Drop for MockListener {
fn drop(&mut self) {
self.drop_counter.fetch_add(1, Ordering::Relaxed);
}
}
let handler = Arc::new(MockListenerHandler {});
let mut listener_mgr = ListenerManager::new(get_mock_global_ctx(), handler.clone());
let counter_clone = counter.clone();
let drop_counter_clone = drop_counter.clone();
listener_mgr
.add_listener(
move || {
Box::new(MockListener {
counter: counter_clone.clone(),
drop_counter: drop_counter_clone.clone(),
})
},
true,
)
.await
.unwrap();
listener_mgr.run().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
assert!(counter.load(Ordering::Relaxed) >= 2);
assert!(drop_counter.load(Ordering::Relaxed) >= 1);
}
}

View File

@@ -12,7 +12,7 @@ use crate::{
global_ctx::{ArcGlobalCtx, GlobalCtxEvent},
ifcfg::{IfConfiger, IfConfiguerTrait},
},
peers::{peer_manager::PeerManager, PacketRecvChanReceiver},
peers::{peer_manager::PeerManager, recv_packet_from_chan, PacketRecvChanReceiver},
tunnel::{
common::{reserve_buf, FramedWriter, TunnelWrapper, ZCPacketToBytes},
packet_def::{ZCPacket, ZCPacketType, TAIL_RESERVED_SIZE},
@@ -349,6 +349,15 @@ impl VirtualNic {
{
let dev_name = self.global_ctx.get_flags().dev_name;
match crate::arch::windows::add_self_to_firewall_allowlist() {
Ok(_) => tracing::info!("add_self_to_firewall_allowlist successful!"),
Err(e) => {
println!("Failed to add Easytier to firewall allowlist, Subnet proxy and KCP proxy may not work properly. error: {}", e);
println!("You can add firewall rules manually, or use --use-smoltcp to run with user-space TCP/IP stack.");
println!("");
}
}
match checkreg(&dev_name) {
Ok(_) => tracing::trace!("delete successful!"),
Err(e) => tracing::error!("An error occurred: {}", e),
@@ -610,7 +619,7 @@ impl NicCtx {
self.tasks.spawn(async move {
// unlock until coroutine finished
let mut channel = channel.lock().await;
while let Some(packet) = channel.recv().await {
while let Ok(packet) = recv_packet_from_chan(&mut channel).await {
tracing::trace!(
"[USER_PACKET] forward packet from peers to nic. packet: {:?}",
packet

View File

@@ -256,7 +256,7 @@ impl EasyTierLauncher {
fetch_node_info,
));
if let Err(e) = ret {
error_msg.write().unwrap().replace(e.to_string());
error_msg.write().unwrap().replace(format!("{:?}", e));
}
instance_alive.store(false, std::sync::atomic::Ordering::Relaxed);
notifier.notify_one();
@@ -517,6 +517,39 @@ impl NetworkConfig {
})?,
});
}
if self.enable_manual_routes.unwrap_or_default() {
let mut routes = Vec::<cidr::Ipv4Cidr>::with_capacity(self.routes.len());
for route in self.routes.iter() {
routes.push(
route.parse()
.with_context(|| format!("failed to parse route: {}", route))?,
);
}
cfg.set_routes(Some(routes));
}
if self.exit_nodes.len() > 0 {
let mut exit_nodes = Vec::<std::net::Ipv4Addr>::with_capacity(self.exit_nodes.len());
for node in self.exit_nodes.iter() {
exit_nodes.push(
node.parse()
.with_context(|| format!("failed to parse exit node: {}", node))?,
);
}
cfg.set_exit_nodes(exit_nodes);
}
if self.enable_socks5.unwrap_or_default() {
if let Some(socks5_port) = self.socks5_port {
cfg.set_socks5_portal(Some(
format!("socks5://0.0.0.0:{}", socks5_port)
.parse()
.unwrap(),
));
}
}
let mut flags = gen_default_flags();
if let Some(latency_first) = self.latency_first {
flags.latency_first = latency_first;
@@ -525,6 +558,60 @@ impl NetworkConfig {
if let Some(dev_name) = self.dev_name.clone() {
flags.dev_name = dev_name;
}
if let Some(use_smoltcp) = self.use_smoltcp {
flags.use_smoltcp = use_smoltcp;
}
if let Some(enable_kcp_proxy) = self.enable_kcp_proxy {
flags.enable_kcp_proxy = enable_kcp_proxy;
}
if let Some(disable_kcp_input) = self.disable_kcp_input {
flags.disable_kcp_input = disable_kcp_input;
}
if let Some(disable_p2p) = self.disable_p2p {
flags.disable_p2p = disable_p2p;
}
if let Some(bind_device) = self.bind_device {
flags.bind_device = bind_device;
}
if let Some(no_tun) = self.no_tun {
flags.no_tun = no_tun;
}
if let Some(enable_exit_node) = self.enable_exit_node {
flags.enable_exit_node = enable_exit_node;
}
if let Some(relay_all_peer_rpc) = self.relay_all_peer_rpc {
flags.relay_all_peer_rpc = relay_all_peer_rpc;
}
if let Some(multi_thread) = self.multi_thread {
flags.multi_thread = multi_thread;
}
if let Some(proxy_forward_by_system) = self.proxy_forward_by_system {
flags.proxy_forward_by_system = proxy_forward_by_system;
}
if let Some(disable_encryption) = self.disable_encryption {
flags.enable_encryption = !disable_encryption;
}
if self.enable_relay_network_whitelist.unwrap_or_default() {
if self.relay_network_whitelist.len() > 0 {
flags.relay_network_whitelist = self.relay_network_whitelist.join(" ")
} else {
flags.relay_network_whitelist = "".to_string()
}
}
cfg.set_flags(flags);
Ok(cfg)
}

View File

@@ -1,13 +1,13 @@
#![allow(dead_code)]
mod arch;
mod connector;
mod gateway;
mod instance;
mod peer_center;
mod vpn_portal;
pub mod common;
pub mod connector;
pub mod launcher;
pub mod peers;
pub mod proto;

Some files were not shown because too many files have changed in this diff Show More