Compare commits

..

25 Commits

Author SHA1 Message Date
sijie.sun
6f1ecd147b bump version to v2.2.3 2025-03-17 21:37:00 +08:00
Sijie.Sun
23f69ce6a4 improve direct connector (#685)
* support ipv6 stun
* show interface and public ip in cli node info
* direct conn should keep trying unless already direct connected
* peer should use conn with smallest latency
* deprecate ipv6_listener, use -l instead
2025-03-17 10:46:14 +08:00
sijie.sun
f84ae228fc fix some tailwind style not work 2025-03-16 11:45:18 +08:00
kevin
74c716ccaa fix web bugs 2025-03-15 14:52:09 +08:00
sijie.sun
445b02b2ca do not upload to oss 2025-03-15 00:16:12 +08:00
sijie.sun
bb17ffa9fc fix wireguard not respond after idle for 120s 2025-03-15 00:16:12 +08:00
sijie.sun
389ea709ce fix smoltcp not wakeup closed socket 2025-03-15 00:16:12 +08:00
kevin
c2f535ead4 import/export network config for web (#676)
* import/export network config for web
* add socks5 config for web
2025-03-12 23:19:56 +08:00
Sijie.Sun
0318f55322 add serde default to NetworkConfig (#675)
* add serde default to NetworkConfig

* set base z-index of event-dialog
2025-03-12 10:36:54 +08:00
kevin
1f4340e82f add configurable items for web/gui
enable_exit_node
relay_all_peer_rpc
multi_thread
proxy_forward_by_system
relay_network_whitelist
manual_routes
exit_nodes
2025-03-11 22:30:39 +08:00
loecom
ed08707c98 easytier-web add tcp support
easytier-web add tcp support
2025-03-11 12:48:48 +08:00
sijie.sun
7397abcb94 txt connector should not rely on A record 2025-03-09 21:31:43 +08:00
sijie.sun
98d321f8ac fix kcp traffic not encrypted 2025-03-08 22:09:43 +08:00
sijie.sun
e78b0ef869 test serializedly 2025-03-08 15:59:54 +08:00
sijie.sun
8d654330ac fix http_connector
1. use ipv4 first when connect to http server.
2. allow redirect to url like: http://tcp://p.com:11010
3. dns should also use long timeout
2025-03-08 15:59:54 +08:00
L-Trump
00d61333d3 allow proxy packets to be forwarded by system kernel 2025-03-08 12:56:49 +08:00
sijie.sun
03b55b61e7 support txt/srv record 2025-03-08 12:56:23 +08:00
sijie.sun
745e44cc87 allow using http connector for config server 2025-03-07 22:17:23 +08:00
sijie.sun
24213a874a make http connector timeout longer
http response may be slow, make its timeout longer.
2025-03-07 22:17:23 +08:00
sijie.sun
155f8a2ba2 make prost build smaller 2025-03-06 11:07:05 +08:00
sijie.sun
568dca6f9c fix memory leak 2025-03-06 11:07:05 +08:00
sijie.sun
673c34cf5a http redirector 2025-02-21 11:51:13 +08:00
sijie.sun
2050ed78d0 remove some dep 2025-02-21 11:51:13 +08:00
zhj9709
2632c44195 fix docker stop issue by using tini for graceful shutdown 2025-02-10 22:54:07 +08:00
Sijie.Sun
5449eabf2a Update docker.yml 2025-02-10 12:47:12 +08:00
71 changed files with 2476 additions and 865 deletions

View File

@@ -18,7 +18,7 @@ RUN mkdir -p /tmp/output; \
FROM alpine:latest
RUN apk add --no-cache tzdata
RUN apk add --no-cache tzdata tini
WORKDIR /app
COPY --from=builder --chmod=755 /tmp/output/* /usr/local/bin
@@ -36,4 +36,4 @@ EXPOSE 11011/tcp
# wss
EXPOSE 11012/tcp
ENTRYPOINT ["easytier-core"]
ENTRYPOINT ["/sbin/tini", "--", "easytier-core"]

View File

@@ -208,18 +208,6 @@ jobs:
path: |
./artifacts/*
- name: Upload OSS
if: ${{ env.OSS_BUCKET != '' }}
uses: Menci/upload-to-oss@main
with:
access-key-id: ${{ secrets.ALIYUN_OSS_ACCESS_ID }}
access-key-secret: ${{ secrets.ALIYUN_OSS_ACCESS_KEY }}
endpoint: ${{ secrets.ALIYUN_OSS_ENDPOINT }}
bucket: ${{ secrets.ALIYUN_OSS_BUCKET }}
local-path: ./artifacts/
remote-path: /easytier-releases/${{env.GIT_DESC}}/easytier-${{ matrix.ARTIFACT_NAME }}
no-delete-remote-files: true
retry: 5
core-result:
if: needs.pre_job.outputs.should_skip != 'true' && always()
runs-on: ubuntu-latest

View File

@@ -11,7 +11,7 @@ on:
image_tag:
description: 'Tag for this image build'
type: string
default: 'v2.2.2'
default: 'v2.2.3'
required: true
mark_latest:
description: 'Mark this image as latest'
@@ -66,4 +66,4 @@ jobs:
file: .github/workflows/Dockerfile
tags: |
easytier/easytier:${{ inputs.image_tag }}${{ inputs.mark_latest && ',easytier/easytier:latest' || '' }},
ghcr.io/${{ github.actor }}/easytier:${{ inputs.image_tag }}${{ inputs.mark_latest && ',easytier/easytier:latest' || '' }},
ghcr.io/easytier/easytier:${{ inputs.image_tag }}${{ inputs.mark_latest && ',easytier/easytier:latest' || '' }},

View File

@@ -221,18 +221,6 @@ jobs:
path: |
./artifacts/*
- name: Upload OSS
if: ${{ env.OSS_BUCKET != '' }}
uses: Menci/upload-to-oss@main
with:
access-key-id: ${{ secrets.ALIYUN_OSS_ACCESS_ID }}
access-key-secret: ${{ secrets.ALIYUN_OSS_ACCESS_KEY }}
endpoint: ${{ secrets.ALIYUN_OSS_ENDPOINT }}
bucket: ${{ secrets.ALIYUN_OSS_BUCKET }}
local-path: ./artifacts/
remote-path: /easytier-releases/${{env.GIT_DESC}}/easytier-gui-${{ matrix.ARTIFACT_NAME }}
no-delete-remote-files: true
retry: 5
gui-result:
if: needs.pre_job.outputs.should_skip != 'true' && always()
runs-on: ubuntu-latest

View File

@@ -146,18 +146,6 @@ jobs:
path: |
./artifacts/*
- name: Upload OSS
if: ${{ env.OSS_BUCKET != '' }}
uses: Menci/upload-to-oss@main
with:
access-key-id: ${{ secrets.ALIYUN_OSS_ACCESS_ID }}
access-key-secret: ${{ secrets.ALIYUN_OSS_ACCESS_KEY }}
endpoint: ${{ secrets.ALIYUN_OSS_ENDPOINT }}
bucket: ${{ secrets.ALIYUN_OSS_BUCKET }}
local-path: ./artifacts/
remote-path: /easytier-releases/${{env.GIT_DESC}}/easytier-gui-${{ matrix.ARTIFACT_NAME }}
no-delete-remote-files: true
retry: 5
mobile-result:
if: needs.pre_job.outputs.should_skip != 'true' && always()
runs-on: ubuntu-latest

View File

@@ -21,7 +21,7 @@ on:
version:
description: 'Version for this release'
type: string
default: 'v2.2.2'
default: 'v2.2.3'
required: true
make_latest:
description: 'Mark this release as latest'

View File

@@ -62,6 +62,6 @@ jobs:
- name: Run tests
run: |
sudo -E env "PATH=$PATH" cargo test --no-default-features --features=full --verbose
sudo -E env "PATH=$PATH" cargo test --no-default-features --features=full --verbose -- --test-threads=1 --nocapture
sudo chown -R $USER:$USER ./target
sudo chown -R $USER:$USER ~/.cargo

939
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -8,7 +8,7 @@
[简体中文](/README_CN.md) | [English](/README.md)
**Please visit the [EasyTier Official Website](https://www.easytier.top/en/) to view the full documentation.**
**Please visit the [EasyTier Official Website](https://easytier.cn/en/) to view the full documentation.**
EasyTier is a simple, safe and decentralized VPN networking solution implemented with the Rust language and Tokio framework.
@@ -53,7 +53,7 @@ EasyTier is a simple, safe and decentralized VPN networking solution implemented
4. **Install by Docker Compose**
Please visit the [EasyTier Official Website](https://www.easytier.cn/en/) to view the full documentation.
Please visit the [EasyTier Official Website](https://easytier.cn/en/) to view the full documentation.
5. **Install by script (For Linux Only)**

View File

@@ -8,7 +8,7 @@
[简体中文](/README_CN.md) | [English](/README.md)
**请访问 [EasyTier 官网](https://www.easytier.cn/) 以查看完整的文档。**
**请访问 [EasyTier 官网](https://easytier.cn/) 以查看完整的文档。**
一个简单、安全、去中心化的内网穿透 VPN 组网方案,使用 Rust 语言和 Tokio 框架实现。
@@ -53,7 +53,7 @@
4. **通过Docker Compose安装**
请访问 [EasyTier 官网](https://www.easytier.cn/) 以查看完整的文档。
请访问 [EasyTier 官网](https://easytier.cn/) 以查看完整的文档。
5. **使用一键脚本安装 (仅适用于 Linux)**

View File

@@ -1,7 +1,7 @@
{
"name": "easytier-gui",
"type": "module",
"version": "2.2.2",
"version": "2.2.3",
"private": true,
"packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4",
"scripts": {
@@ -46,7 +46,7 @@
"eslint": "^9.12.0",
"eslint-plugin-format": "^0.1.2",
"postcss": "^8.4.47",
"tailwindcss": "^3.4.13",
"tailwindcss": "=3.4.17",
"typescript": "^5.6.2",
"unplugin-auto-import": "^0.18.3",
"unplugin-vue-components": "^0.27.4",

View File

@@ -1,6 +1,6 @@
[package]
name = "easytier-gui"
version = "2.2.2"
version = "2.2.3"
description = "EasyTier GUI"
authors = ["you"]
edition = "2021"

View File

@@ -17,7 +17,7 @@
"createUpdaterArtifacts": false
},
"productName": "easytier-gui",
"version": "2.2.2",
"version": "2.2.3",
"identifier": "com.kkrainbow.easytier",
"plugins": {},
"app": {

View File

@@ -50,8 +50,8 @@ async function main() {
darkModeSelector: 'system',
cssLayer: {
name: 'primevue',
order: 'tailwind-base, primevue, tailwind-utilities'
}
order: 'tailwind-base, primevue, tailwind-utilities',
},
},
},
})

View File

@@ -1,6 +1,6 @@
[package]
name = "easytier-web"
version = "2.2.2"
version = "2.2.3"
edition = "2021"
description = "Config server for easytier. easytier-core gets config from this and web frontend use it as restful api server."
@@ -24,7 +24,11 @@ tower-sessions = { version = "0.13.0", default-features = false, features = [
] }
tower-http = { version = "0.6", features = ["cors", "compression-full"] }
sqlx = { version = "0.8", features = ["sqlite"] }
sea-orm = { version = "1.1", features = [ "sqlx-sqlite", "runtime-tokio-rustls", "macros" ] }
sea-orm = { version = "1.1", features = [
"sqlx-sqlite",
"runtime-tokio-rustls",
"macros",
] }
sea-orm-migration = { version = "1.1" }
@@ -32,7 +36,7 @@ sea-orm-migration = { version = "1.1" }
rust-embed = { version = "8.5.0", features = ["debug-embed"] }
base64 = "0.22"
rand = "0.8"
image = {version="0.24", default-features = false, features = ["png"]}
image = { version = "0.24", default-features = false, features = ["png"] }
rusttype = "0.9.3"
imageproc = "0.23.0"

View File

@@ -40,10 +40,10 @@
"postcss": "^8.4.47",
"postcss-import": "^16.1.0",
"postcss-nested": "^7.0.2",
"tailwindcss": "^3.4.14",
"tailwindcss": "=3.4.17",
"typescript": "~5.6.3",
"vite": "^5.4.10",
"vite-plugin-dts": "^4.3.0",
"vue-tsc": "^2.1.10"
}
}
}

View File

@@ -120,6 +120,23 @@ function searchListenerSuggestions(e: { query: string }) {
listenerSuggestions.value = ret
}
const exitNodesSuggestions = ref([''])
function searchExitNodesSuggestions(e: { query: string }) {
const ret = []
ret.push(e.query)
exitNodesSuggestions.value = ret
}
const whitelistSuggestions = ref([''])
function searchWhitelistSuggestions(e: { query: string }) {
const ret = []
ret.push(e.query)
whitelistSuggestions.value = ret
}
interface BoolFlag {
field: keyof NetworkConfig
help: string
@@ -133,6 +150,11 @@ const bool_flags: BoolFlag[] = [
{ field: 'disable_p2p', help: 'disable_p2p_help' },
{ field: 'bind_device', help: 'bind_device_help' },
{ field: 'no_tun', help: 'no_tun_help' },
{ field: 'enable_exit_node', help: 'enable_exit_node_help' },
{ field: 'relay_all_peer_rpc', help: 'relay_all_peer_rpc_help' },
{ field: 'multi_thread', help: 'multi_thread_help' },
{ field: 'proxy_forward_by_system', help: 'proxy_forward_by_system_help' },
{ field: 'disable_encryption', help: 'disable_encryption_help' },
]
</script>
@@ -141,7 +163,7 @@ const bool_flags: BoolFlag[] = [
<div class="frontend-lib">
<div class="flex flex-col h-full">
<div class="flex flex-col">
<div class="w-10/12 self-center ">
<div class="w-11/12 self-center ">
<Panel :header="t('basic_settings')">
<div class="flex flex-col gap-y-2">
<div class="flex flex-row gap-x-9 flex-wrap">
@@ -209,7 +231,7 @@ const bool_flags: BoolFlag[] = [
<label> {{ t('flags_switch') }} </label>
<div class="flex flex-row flex-wrap">
<div class="basis-64 flex" v-for="flag in bool_flags">
<div class="basis-[20rem] flex items-center" v-for="flag in bool_flags">
<Checkbox v-model="curNetwork[flag.field]" :input-id="flag.field" :binary="true" />
<label :for="flag.field" class="ml-2"> {{ t(flag.field) }} </label>
<span class="pi pi-question-circle ml-2 self-center" v-tooltip="t(flag.help)"></span>
@@ -242,17 +264,20 @@ const bool_flags: BoolFlag[] = [
<ToggleButton v-model="curNetwork.enable_vpn_portal" on-icon="pi pi-check" off-icon="pi pi-times"
:on-label="t('off_text')" :off-label="t('on_text')" class="w-48" />
<div v-if="curNetwork.enable_vpn_portal" class="items-center flex flex-row gap-x-4">
<div class="min-w-64">
<InputGroup>
<InputText v-model="curNetwork.vpn_portal_client_network_addr"
:placeholder="t('vpn_portal_client_network')" />
<InputGroupAddon>
<span>/{{ curNetwork.vpn_portal_client_network_len }}</span>
</InputGroupAddon>
</InputGroup>
<InputNumber v-model="curNetwork.vpn_portal_listen_port" :allow-empty="false" :format="false"
:min="0" :max="65535" class="w-8/12" fluid />
<div class="flex flex-row gap-x-9 flex-wrap w-full">
<div class="flex flex-col gap-2 basis-8/12 grow">
<InputGroup>
<InputText v-model="curNetwork.vpn_portal_client_network_addr"
:placeholder="t('vpn_portal_client_network')" />
<InputGroupAddon>
<span>/{{ curNetwork.vpn_portal_client_network_len }}</span>
</InputGroupAddon>
</InputGroup>
</div>
<div class="flex flex-col gap-2 basis-3/12 grow">
<InputNumber v-model="curNetwork.vpn_portal_listen_port" :allow-empty="false" :format="false"
:min="0" :max="65535" fluid />
</div>
</div>
</div>
</div>
@@ -283,6 +308,73 @@ const bool_flags: BoolFlag[] = [
:placeholder="t('dev_name_placeholder')" />
</div>
</div>
<div class="flex flex-row gap-x-9 flex-wrap">
<div class="flex flex-col gap-2 basis-5/12 grow">
<div class="flex">
<label for="relay_network_whitelist">{{ t('relay_network_whitelist') }}</label>
<span class="pi pi-question-circle ml-2 self-center"
v-tooltip="t('relay_network_whitelist_help')"></span>
</div>
<ToggleButton v-model="curNetwork.enable_relay_network_whitelist" on-icon="pi pi-check" off-icon="pi pi-times"
:on-label="t('off_text')" :off-label="t('on_text')" class="w-48" />
<div v-if="curNetwork.enable_relay_network_whitelist" class="items-center flex flex-row gap-x-4">
<div class="min-w-64 w-full">
<AutoComplete id="relay_network_whitelist" v-model="curNetwork.relay_network_whitelist"
:placeholder="t('relay_network_whitelist')" class="w-full" multiple fluid
:suggestions="whitelistSuggestions" @complete="searchWhitelistSuggestions" />
</div>
</div>
</div>
</div>
<div class="flex flex-row gap-x-9 flex-wrap ">
<div class="flex flex-col gap-2 grow">
<div class="flex">
<label for="routes">{{ t('manual_routes') }}</label>
<span class="pi pi-question-circle ml-2 self-center" v-tooltip="t('manual_routes_help')"></span>
</div>
<ToggleButton v-model="curNetwork.enable_manual_routes" on-icon="pi pi-check" off-icon="pi pi-times"
:on-label="t('off_text')" :off-label="t('on_text')" class="w-48" />
<div v-if="curNetwork.enable_manual_routes" class="items-center flex flex-row gap-x-4">
<div class="min-w-64 w-full">
<AutoComplete id="routes" v-model="curNetwork.routes"
:placeholder="t('chips_placeholder', ['192.168.0.0/16'])" class="w-full" multiple fluid
:suggestions="inetSuggestions" @complete="searchInetSuggestions" />
</div>
</div>
</div>
</div>
<div class="flex flex-row gap-x-9 flex-wrap ">
<div class="flex flex-col gap-2 grow">
<div class="flex">
<label for="socks5_port">{{ t('socks5') }}</label>
<span class="pi pi-question-circle ml-2 self-center" v-tooltip="t('socks5_help')"></span>
</div>
<ToggleButton v-model="curNetwork.enable_socks5" on-icon="pi pi-check" off-icon="pi pi-times"
:on-label="t('off_text')" :off-label="t('on_text')" class="w-48" />
<div v-if="curNetwork.enable_socks5" class="items-center flex flex-row gap-x-4">
<div class="min-w-64 w-full">
<InputNumber id="socks5_port" v-model="curNetwork.socks5_port" aria-describedby="rpc_port-help"
:format="false" :allow-empty="false" :min="0" :max="65535" class="w-full"/>
</div>
</div>
</div>
</div>
<div class="flex flex-row gap-x-9 flex-wrap w-full">
<div class="flex flex-col gap-2 grow p-fluid">
<div class="flex">
<label for="exit_nodes">{{ t('exit_nodes') }}</label>
<span class="pi pi-question-circle ml-2 self-center" v-tooltip="t('exit_nodes_help')"></span>
</div>
<AutoComplete id="exit_nodes" v-model="curNetwork.exit_nodes"
:placeholder="t('chips_placeholder', ['192.168.8.8'])" class="w-full" multiple fluid
:suggestions="exitNodesSuggestions" @complete="searchExitNodesSuggestions" />
</div>
</div>
</div>
</Panel>

View File

@@ -5,7 +5,7 @@ import { NetworkInstance, type NodeInfo, type PeerRoutePair } from '../types/net
import { useI18n } from 'vue-i18n';
import { computed, onMounted, onUnmounted, ref } from 'vue';
import { ipv4InetToString, ipv4ToString, ipv6ToString } from '../modules/utils';
import { DataTable, Column, Tag, Chip, Button, Dialog, ScrollPanel, Timeline, Divider, Card, } from 'primevue';
import { DataTable, Column, Tag, Chip, Button, Dialog, ScrollPanel, Timeline, Divider, Panel, } from 'primevue';
const props = defineProps<{
curNetworkInst: NetworkInstance | null,
@@ -303,14 +303,15 @@ function showEventLogs() {
<template>
<div class="frontend-lib">
<Dialog v-model:visible="dialogVisible" modal :header="t(dialogHeader)" class="w-2/3 h-auto max-w-full">
<ScrollPanel v-if="dialogHeader === 'vpn_portal_config'">
<Dialog v-model:visible="dialogVisible" modal :header="t(dialogHeader)" class="w-2/3 h-auto max-h-full"
:baseZIndex="2000">
<ScrollPanel v-if="dialogHeader === 'vpn_portal_config'" class="w-2/3">
<pre>{{ dialogContent }}</pre>
</ScrollPanel>
<Timeline v-else :value="dialogContent">
<template #opposite="slotProps">
<small class="text-surface-500 dark:text-surface-400">{{ useTimeAgo(Date.parse(slotProps.item.time))
}}</small>
}}</small>
</template>
<template #content="slotProps">
<HumanEvent :event="slotProps.item.event" />
@@ -318,107 +319,101 @@ function showEventLogs() {
</Timeline>
</Dialog>
<Card v-if="curNetworkInst?.error_msg">
<template #title>
<Panel v-if="curNetworkInst?.error_msg">
<template #header>
Run Network Error
</template>
<template #content>
<div class="flex flex-col gap-y-5">
<div class="text-red-500">
{{ curNetworkInst.error_msg }}
</div>
<div class="flex flex-col gap-y-5">
<div class="text-red-500">
{{ curNetworkInst.error_msg }}
</div>
</template>
</Card>
</div>
</Panel>
<template v-else>
<Card>
<template #title>
<Panel>
<template #header>
{{ t('my_node_info') }}
</template>
<template #content>
<div class="flex w-full flex-col gap-y-5">
<div class="m-0 flex flex-row justify-center gap-x-5">
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid green">
<div class="font-bold">
{{ t('peer_count') }}
</div>
<div class="text-5xl mt-1">
{{ peerCount }}
</div>
<div class="flex w-full flex-col gap-y-5">
<div class="m-0 flex flex-row justify-center gap-x-5">
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid green">
<div class="font-bold">
{{ t('peer_count') }}
</div>
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid purple">
<div class="font-bold">
{{ t('upload') }}
</div>
<div class="text-xl mt-2">
{{ txRate }}/s
</div>
</div>
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid fuchsia">
<div class="font-bold">
{{ t('download') }}
</div>
<div class="text-xl mt-2">
{{ rxRate }}/s
</div>
<div class="text-5xl mt-1">
{{ peerCount }}
</div>
</div>
<div class="flex flex-row items-center flex-wrap w-full max-h-40 overflow-scroll">
<Chip v-for="(chip, i) in myNodeInfoChips" :key="i" :label="chip.label" :icon="chip.icon"
class="mr-2 mt-2 text-sm" />
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid purple">
<div class="font-bold">
{{ t('upload') }}
</div>
<div class="text-xl mt-2">
{{ txRate }}/s
</div>
</div>
<div v-if="myNodeInfo" class="m-0 flex flex-row justify-center gap-x-5 text-sm">
<Button severity="info" :label="t('show_vpn_portal_config')" @click="showVpnPortalConfig" />
<Button severity="info" :label="t('show_event_log')" @click="showEventLogs" />
<div class="rounded-full w-32 h-32 flex flex-col items-center pt-6" style="border: 1px solid fuchsia">
<div class="font-bold">
{{ t('download') }}
</div>
<div class="text-xl mt-2">
{{ rxRate }}/s
</div>
</div>
</div>
</template>
</Card>
<div class="flex flex-row items-center flex-wrap w-full max-h-40 overflow-scroll">
<Chip v-for="(chip, i) in myNodeInfoChips" :key="i" :label="chip.label" :icon="chip.icon"
class="mr-2 mt-2 text-sm" />
</div>
<div v-if="myNodeInfo" class="m-0 flex flex-row justify-center gap-x-5 text-sm">
<Button severity="info" :label="t('show_vpn_portal_config')" @click="showVpnPortalConfig" />
<Button severity="info" :label="t('show_event_log')" @click="showEventLogs" />
</div>
</div>
</Panel>
<Divider />
<Card>
<template #title>
<Panel>
<template #header>
{{ t('peer_info') }}
</template>
<template #content>
<DataTable :value="peerRouteInfos" column-resize-mode="fit" table-class="w-full">
<Column :field="ipFormat" :header="t('virtual_ipv4')" />
<Column :header="t('hostname')">
<template #body="slotProps">
<div v-if="!slotProps.data.route.cost || !slotProps.data.route.feature_flag.is_public_server"
v-tooltip="slotProps.data.route.hostname">
{{
slotProps.data.route.hostname }}
</div>
<div v-else v-tooltip="slotProps.data.route.hostname" class="space-x-1">
<Tag v-if="slotProps.data.route.feature_flag.is_public_server" severity="info" value="Info">
{{ t('status.server') }}
</Tag>
<Tag v-if="slotProps.data.route.feature_flag.avoid_relay_data" severity="warn" value="Warn">
{{ t('status.relay') }}
</Tag>
</div>
</template>
</Column>
<Column :field="routeCost" :header="t('route_cost')" />
<Column :field="latencyMs" :header="t('latency')" />
<Column :field="txBytes" :header="t('upload_bytes')" />
<Column :field="rxBytes" :header="t('download_bytes')" />
<Column :field="lossRate" :header="t('loss_rate')" />
<Column :header="t('status.version')">
<template #body="slotProps">
<span>{{ version(slotProps.data) }}</span>
</template>
</Column>
</DataTable>
</template>
</Card>
<DataTable :value="peerRouteInfos" column-resize-mode="fit" table-class="w-full">
<Column :field="ipFormat" :header="t('virtual_ipv4')" />
<Column :header="t('hostname')">
<template #body="slotProps">
<div v-if="!slotProps.data.route.cost || !slotProps.data.route.feature_flag.is_public_server"
v-tooltip="slotProps.data.route.hostname">
{{
slotProps.data.route.hostname }}
</div>
<div v-else v-tooltip="slotProps.data.route.hostname" class="space-x-1">
<Tag v-if="slotProps.data.route.feature_flag.is_public_server" severity="info" value="Info">
{{ t('status.server') }}
</Tag>
<Tag v-if="slotProps.data.route.feature_flag.avoid_relay_data" severity="warn" value="Warn">
{{ t('status.relay') }}
</Tag>
</div>
</template>
</Column>
<Column :field="routeCost" :header="t('route_cost')" />
<Column :field="latencyMs" :header="t('latency')" />
<Column :field="txBytes" :header="t('upload_bytes')" />
<Column :field="rxBytes" :header="t('download_bytes')" />
<Column :field="lossRate" :header="t('loss_rate')" />
<Column :header="t('status.version')">
<template #body="slotProps">
<span>{{ version(slotProps.data) }}</span>
</template>
</Column>
</DataTable>
</Panel>
</template>
</div>
</template>

View File

@@ -92,6 +92,39 @@ bind_device_help: 仅使用物理网卡,避免 EasyTier 通过其他虚拟网
no_tun: 无 TUN 模式
no_tun_help: 不使用 TUN 网卡,适合无管理员权限时使用。本节点仅允许被访问。访问其他节点需要使用 SOCK5
enable_exit_node: 启用出口节点
enable_exit_node_help: 允许此节点成为出口节点
relay_all_peer_rpc: 转发RPC包
relay_all_peer_rpc_help: |
允许转发所有对等节点的RPC数据包即使对等节点不在转发网络白名单中。
这可以帮助白名单外网络中的对等节点建立P2P连接。
multi_thread: 启用多线程
multi_thread_help: 使用多线程运行时
proxy_forward_by_system: 系统转发
proxy_forward_by_system_help: 通过系统内核转发子网代理数据包禁用内置NAT
disable_encryption: 禁用加密
disable_encryption_help: 禁用对等节点通信的加密默认为false必须与对等节点相同
relay_network_whitelist: 网络白名单
relay_network_whitelist_help: |
仅转发白名单网络的流量,支持通配符字符串。多个网络名称间可以使用英文空格间隔。
如果该参数为空,则禁用转发。默认允许所有网络。
例如:'*'(所有网络),'def*'以def为前缀的网络'net1 net2'只允许net1和net2
manual_routes: 自定义路由
manual_routes_help: 手动分配路由CIDR将禁用子网代理和从对等节点传播的wireguard路由。例如192.168.0.0/16
socks5: socks5服务器
socks5_help: |
启用 socks5 服务器,允许 socks5 客户端访问虚拟网络. 格式: <端口>例如1080
exit_nodes: 出口节点列表
exit_nodes_help: 转发所有流量的出口节点虚拟IPv4地址优先级由列表顺序决定
status:
version: 内核版本
local: 本机

View File

@@ -91,6 +91,40 @@ bind_device_help: Use only the physical network interface to prevent EasyTier fr
no_tun: No TUN Mode
no_tun_help: Do not use a TUN interface, suitable for environments without administrator privileges. This node is only accessible; accessing other nodes requires SOCKS5.
enable_exit_node: Enable Exit Node
enable_exit_node_help: Allow this node to be an exit node
relay_all_peer_rpc: Relay RPC Packets
relay_all_peer_rpc_help: |
Relay all peer rpc packets, even if the peer is not in the relay network whitelist.
This can help peers not in relay network whitelist to establish p2p connection.
multi_thread: Multi Thread
multi_thread_help: Use multi-thread runtime
proxy_forward_by_system: System Forward
proxy_forward_by_system_help: Forward packet to proxy networks via system kernel, disable internal nat for network proxy
disable_encryption: Disable Encryption
disable_encryption_help: Disable encryption for peers communication, default is false, must be same with peers
relay_network_whitelist: Network Whitelist
relay_network_whitelist_help: |
Only forward traffic from the whitelist networks, supporting wildcard strings, multiple network names can be separated by spaces.
If this parameter is empty, forwarding is disabled. By default, all networks are allowed.
e.g.: '*' (all networks), 'def*' (networks with the prefix 'def'), 'net1 net2' (only allow net1 and net2)
manual_routes: Manual Route
manual_routes_help: |
Assign routes cidr manually, will disable subnet proxy and wireguard routes propagated from peers. e.g.:192.168.0.0/16
socks5: Socks5 Server
socks5_help: |
Enable socks5 server, allow socks5 client to access virtual network. format: <port>, e.g.: 1080
exit_nodes: Exit Nodes
exit_nodes_help: Exit nodes to forward all traffic to, a virtual ipv4 address, priority is determined by the order of the list
status:
version: Version
local: Local

View File

@@ -1,8 +1,6 @@
@import 'primeicons/primeicons.css';
@import 'floating-vue/dist/style.css';
.frontend-lib {
@layer tailwind-base, primevue, tailwind-utilities;
@layer tailwind-base {
@@ -51,4 +49,6 @@
background-color: #0000005d;
}
.v-popper__inner {
white-space: pre-wrap;
}

View File

@@ -42,6 +42,22 @@ export interface NetworkConfig {
disable_p2p?: boolean
bind_device?: boolean
no_tun?: boolean
enable_exit_node?: boolean
relay_all_peer_rpc?: boolean
multi_thread?: boolean
proxy_forward_by_system?: boolean
disable_encryption?: boolean
enable_relay_network_whitelist?: boolean
relay_network_whitelist: string[]
enable_manual_routes: boolean
routes: string[]
exit_nodes: string[]
enable_socks5?: boolean
socks5_port: number
}
export function DEFAULT_NETWORK_CONFIG(): NetworkConfig {
@@ -83,6 +99,18 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig {
disable_p2p: false,
bind_device: true,
no_tun: false,
enable_exit_node: false,
relay_all_peer_rpc: false,
multi_thread: true,
proxy_forward_by_system: false,
disable_encryption: false,
enable_relay_network_whitelist: false,
relay_network_whitelist: [],
enable_manual_routes: false,
routes: [],
exit_nodes: [],
enable_socks5: false,
socks5_port: 1080,
}
}

View File

@@ -23,10 +23,10 @@
"@vitejs/plugin-vue": "^5.1.4",
"autoprefixer": "^10.4.20",
"postcss": "^8.4.47",
"tailwindcss": "^3.4.14",
"tailwindcss": "=3.4.17",
"typescript": "~5.6.2",
"vite": "^5.4.10",
"vite-plugin-singlefile": "^2.0.3",
"vue-tsc": "^2.1.10"
}
}
}

View File

@@ -27,11 +27,11 @@ const generateConfig = (config: NetworkTypes.NetworkConfig) => {
<template>
<div class="flex items-center justify-center m-5">
<div class="flex w-full">
<div class="w-1/2 p-4">
<div class="sm:block md:flex w-full">
<div class="sm:w-full md:w-1/2 p-4">
<Config :cur-network="newNetworkConfig" @run-network="generateConfig" />
</div>
<div class="w-1/2 p-4 bg-gray-100">
<div class="sm:w-full md:w-1/2 p-4 bg-gray-100">
<pre class="whitespace-pre-wrap">{{ toml_config }}</pre>
</div>
</div>

View File

@@ -27,7 +27,7 @@ const loadDevices = async () => {
public_ip: device.client_url,
running_network_instances: device.info?.running_network_instances.map((instance: any) => Utils.UuidToStr(instance)),
running_network_count: device.info?.running_network_instances.length,
report_time: device.info?.report_time,
report_time: new Date(device.info?.report_time).toLocaleString(),
easytier_version: device.info?.easytier_version,
machine_id: Utils.UuidToStr(device.info?.machine_id),
});
@@ -102,7 +102,7 @@ const selectedDeviceHostname = computed<string | undefined>(() => {
</DataTable>
<Drawer v-model:visible="deviceManageVisible" :header="`Manage ${selectedDeviceHostname}`" position="right"
class="w-1/2 min-w-96">
:baseZIndex=1000 class="w-3/5 min-w-96">
<RouterView v-slot="{ Component }">
<component :is="Component" :api="api" :deviceList="deviceList" @update="loadDevices" />
</RouterView>

View File

@@ -1,5 +1,5 @@
<script setup lang="ts">
import { Toolbar, IftaLabel, Select, Button, ConfirmPopup, Dialog, useConfirm, useToast } from 'primevue';
import {Toolbar, IftaLabel, Select, Button, ConfirmPopup, Dialog, useConfirm, useToast, Divider} from 'primevue';
import { NetworkTypes, Status, Utils, Api, } from 'easytier-frontend-lib';
import { watch, computed, onMounted, onUnmounted, ref } from 'vue';
import { useRoute, useRouter } from 'vue-router';
@@ -27,6 +27,8 @@ const deviceInfo = computed<Utils.DeviceInfo | undefined | null>(() => {
return deviceId.value ? props.deviceList?.find((device) => device.machine_id === deviceId.value) : null;
});
const configFile = ref();
const curNetworkInfo = ref<NetworkTypes.NetworkInstance | null>(null);
const isEditing = ref(false);
@@ -207,6 +209,65 @@ const loadDeviceInfo = async () => {
} as NetworkTypes.NetworkInstance;
}
const exportConfig = async () => {
if (!deviceId.value || !instanceId.value) {
toast.add({ severity: 'error', summary: 'Error', detail: 'No network instance selected', life: 2000 });
return;
}
try {
let ret = await props.api?.get_network_config(deviceId.value, instanceId.value);
delete ret.instance_id;
exportJsonFile(JSON.stringify(ret, null, 2),instanceId.value +'.json');
} catch (e: any) {
console.error(e);
toast.add({ severity: 'error', summary: 'Error', detail: 'Failed to export network config, error: ' + JSON.stringify(e.response.data), life: 2000 });
return;
}
}
const importConfig = () => {
configFile.value.click();
}
const handleFileUpload = (event: Event) => {
const files = (event.target as HTMLInputElement).files;
const file = files ? files[0] : null;
if (file) {
const reader = new FileReader();
reader.onload = (e) => {
try {
let str = e.target?.result?.toString();
if(str){
const config = JSON.parse(str);
if(config === null || typeof config !== "object"){
throw new Error();
}
Object.assign(newNetworkConfig.value, config);
toast.add({ severity: 'success', summary: 'Import Success', detail: "Config file import success", life: 2000 });
}
} catch (error) {
toast.add({ severity: 'error', summary: 'Error', detail: 'Config file parse error.', life: 2000 });
}
configFile.value.value = null;
}
reader.readAsText(file);
}
}
const exportJsonFile = (context: string, name: string) => {
let url = window.URL.createObjectURL(new Blob([context], { type: 'application/json' }));
let link = document.createElement('a');
link.style.display = 'none';
link.href = url;
link.setAttribute('download', name);
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
window.URL.revokeObjectURL(url);
}
let periodFunc = new Utils.PeriodicTask(async () => {
try {
await Promise.all([loadNetworkInstanceIds(), loadDeviceInfo()]);
@@ -226,9 +287,16 @@ onUnmounted(() => {
</script>
<template>
<input type="file" @change="handleFileUpload" class="hidden" accept="application/json" ref="configFile"/>
<ConfirmPopup></ConfirmPopup>
<Dialog v-model:visible="showCreateNetworkDialog" modal :header="!isEditing ? 'Create New Network' : 'Edit Network'"
:style="{ width: '55rem' }">
<div class="flex flex-col">
<div class="w-11/12 self-center ">
<Button @click="importConfig" icon="pi pi-file-import" label="Import" iconPos="right" />
<Divider />
</div>
</div>
<Config :cur-network="newNetworkConfig" @run-network="createNewNetwork"></Config>
</Dialog>
@@ -245,19 +313,23 @@ onUnmounted(() => {
<div class="gap-x-3 flex">
<Button @click="confirmDeleteNetwork($event)" icon="pi pi-minus" severity="danger" label="Delete"
iconPos="right" />
<Button @click="exportConfig" icon="pi pi-file-export" severity="help" label="Export" iconPos="right" />
<Button @click="editNetwork" icon="pi pi-pen-to-square" label="Edit" iconPos="right" severity="info" />
<Button @click="newNetwork" icon="pi pi-plus" label="Create" iconPos="right" />
</div>
</template>
</Toolbar>
<Divider />
<!-- For running network, show the status -->
<div v-if="needShowNetworkStatus">
<Status v-bind:cur-network-inst="curNetworkInfo" v-if="needShowNetworkStatus">
</Status>
<center>
<Button @click="updateNetworkState(true)" label="Disable Network" severity="warn" />
</center>
<Divider />
<div class="text-center">
<Button @click="updateNetworkState(true)" label="Disable Network" severity="warn" />
</div>
</div>
<!-- For disabled network, show the config -->

View File

@@ -1,6 +1,6 @@
import { createApp } from 'vue'
import './style.css'
import 'easytier-frontend-lib/style.css'
import './style.css'
import App from './App.vue'
import EasytierFrontendLib from 'easytier-frontend-lib'
import PrimeVue from 'primevue/config'

View File

@@ -10,8 +10,9 @@ use easytier::{
common::{
config::{ConfigLoader, ConsoleLoggerConfig, FileLoggerConfig, TomlConfigLoader},
constants::EASYTIER_VERSION,
error::Error,
},
tunnel::udp::UdpTunnelListener,
tunnel::{tcp::TcpTunnelListener, udp::UdpTunnelListener, TunnelListener},
utils::{init_logger, setup_panic_handler},
};
@@ -71,6 +72,18 @@ struct Cli {
api_server_port: u16,
}
pub fn get_listener_by_url(
l: &url::Url,
) -> Result<Box<dyn TunnelListener>, Error> {
Ok(match l.scheme() {
"tcp" => Box::new(TcpTunnelListener::new(l.clone())),
"udp" => Box::new(UdpTunnelListener::new(l.clone())),
_ => {
return Err(Error::InvalidUrl(l.to_string()));
}
})
}
#[tokio::main]
async fn main() {
let locale = sys_locale::get_locale().unwrap_or_else(|| String::from("en-US"));
@@ -92,14 +105,10 @@ async fn main() {
// let db = db::Db::new(":memory:").await.unwrap();
let db = db::Db::new(cli.db).await.unwrap();
let listener = UdpTunnelListener::new(
format!(
"{}://0.0.0.0:{}",
cli.config_server_protocol, cli.config_server_port
)
.parse()
.unwrap(),
);
let listener = get_listener_by_url(
&format!("{}://0.0.0.0:{}", cli.config_server_protocol, cli.config_server_port).parse().unwrap(),
)
.unwrap();
let mut mgr = client_manager::ClientManager::new(db.clone());
mgr.serve(listener).await.unwrap();
let mgr = Arc::new(mgr);

View File

@@ -3,7 +3,7 @@ name = "easytier"
description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
homepage = "https://github.com/EasyTier/EasyTier"
repository = "https://github.com/EasyTier/EasyTier"
version = "2.2.2"
version = "2.2.3"
edition = "2021"
authors = ["kkrainbow"]
keywords = ["vpn", "p2p", "network", "easytier"]
@@ -62,7 +62,6 @@ timedmap = "=1.0.1"
zerocopy = { version = "0.7.32", features = ["derive", "simd"] }
bytes = "1.5.0"
pin-project-lite = "0.2.13"
atomicbox = "0.4.0"
tachyonix = "0.3.0"
quinn = { version = "0.11.0", optional = true, features = ["ring"] }
@@ -99,7 +98,6 @@ uuid = { version = "1.5.0", features = [
] }
# for ring tunnel
crossbeam-queue = "0.3"
once_cell = "1.18.0"
# for rpc
@@ -126,7 +124,7 @@ serde = { version = "1.0", features = ["derive"] }
pnet = { version = "0.35.0", features = ["serde"] }
serde_json = "1"
clap = { version = "4.4.8", features = [
clap = { version = "4.5.30", features = [
"string",
"unicode",
"derive",
@@ -138,7 +136,7 @@ async-recursion = "1.0.5"
network-interface = "2.0"
# for ospf route
petgraph = "0.6.5"
petgraph = "0.7.1"
# for wireguard
boringtun = { package = "boringtun-easytier", version = "0.6.1", optional = true }
@@ -154,13 +152,9 @@ humansize = "2.1.3"
base64 = "0.22"
derivative = "2.2.0"
mimalloc-rust = { version = "0.2.1", optional = true }
# for mips
indexmap = { version = "~1.9.3", optional = false, features = ["std"] }
# mips
atomic-shim = "0.2.0"
smoltcp = { version = "0.12.0", optional = true, default-features = false, features = [
@@ -188,12 +182,18 @@ async-compression = { version = "0.4.17", default-features = false, features = [
kcp-sys = { git = "https://github.com/EasyTier/kcp-sys" }
prost-reflect = { version = "0.14.5", features = [
"serde",
prost-reflect = { version = "0.14.5", default-features = false, features = [
"derive",
"text-format"
] }
# for http connector
http_req = { git = "https://github.com/EasyTier/http_req.git", default-features = false, features = ["rust-tls"] }
# for dns connector
hickory-resolver = "0.24.4"
bounded_join_set = "0.3.0"
[target.'cfg(any(target_os = "linux", target_os = "macos", target_os = "windows", target_os = "freebsd"))'.dependencies]
machine-uid = "0.5.3"
@@ -226,7 +226,7 @@ rpc_build = { package = "easytier-rpc-build", version = "0.1.0", features = ["in
prost-reflect-build = { version = "0.14.0" }
[target.'cfg(windows)'.build-dependencies]
reqwest = { version = "0.11", features = ["blocking"] }
reqwest = { version = "0.12.12", features = ["blocking"] }
zip = "0.6.6"

View File

@@ -128,16 +128,16 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(target_os = "windows")]
WindowsBuild::check_for_win();
let proto_files_reflect = ["src/proto/peer_rpc.proto", "src/proto/common.proto"];
let proto_files = [
"src/proto/peer_rpc.proto",
"src/proto/common.proto",
"src/proto/error.proto",
"src/proto/tests.proto",
"src/proto/cli.proto",
"src/proto/web.proto",
];
for proto_file in &proto_files {
for proto_file in proto_files.iter().chain(proto_files_reflect.iter()) {
println!("cargo:rerun-if-changed={}", proto_file);
}
@@ -156,12 +156,15 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute("peer_rpc.PeerInfoForGlobalMap", "#[derive(Hash)]")
.type_attribute("peer_rpc.ForeignNetworkRouteInfoKey", "#[derive(Hash, Eq)]")
.type_attribute("common.RpcDescriptor", "#[derive(Hash, Eq)]")
.field_attribute(".web.NetworkConfig", "#[serde(default)]")
.service_generator(Box::new(rpc_build::ServiceGenerator::new()))
.btree_map(["."]);
config.compile_protos(&proto_files, &["src/proto/"])?;
prost_reflect_build::Builder::new()
.file_descriptor_set_bytes("crate::proto::DESCRIPTOR_POOL_BYTES")
.compile_protos_with_config(config, &proto_files, &["src/proto/"])?;
.compile_protos_with_config(config, &proto_files_reflect, &["src/proto/"])?;
check_locale();
Ok(())

View File

@@ -96,6 +96,9 @@ core_clap:
enable_exit_node:
en: "allow this node to be an exit node"
zh-CN: "允许此节点成为出口节点"
proxy_forward_by_system:
en: "forward packet to proxy networks via system kernel, disable internal nat for network proxy"
zh-CN: "通过系统内核转发子网代理数据包禁用内置NAT"
no_tun:
en: "do not create TUN device, can use subnet proxy to access node"
zh-CN: "不创建TUN设备可以使用子网代理访问节点"

View File

@@ -20,13 +20,13 @@ pub fn gen_default_flags() -> Flags {
mtu: 1380,
latency_first: false,
enable_exit_node: false,
proxy_forward_by_system: false,
no_tun: false,
use_smoltcp: false,
relay_network_whitelist: "*".to_string(),
disable_p2p: false,
relay_all_peer_rpc: false,
disable_udp_hole_punching: false,
ipv6_listener: "udp://[::]:0".to_string(),
multi_thread: true,
data_compress_algo: CompressionAlgoPb::None.into(),
bind_device: true,

View File

@@ -68,6 +68,7 @@ pub struct GlobalCtx {
running_listeners: Mutex<Vec<url::Url>>,
enable_exit_node: bool,
proxy_forward_by_system: bool,
no_tun: bool,
feature_flags: AtomicCell<PeerFeatureFlag>,
@@ -99,6 +100,7 @@ impl GlobalCtx {
let stun_info_collection = Arc::new(StunInfoCollector::new_with_default_servers());
let enable_exit_node = config_fs.get_flags().enable_exit_node;
let proxy_forward_by_system = config_fs.get_flags().proxy_forward_by_system;
let no_tun = config_fs.get_flags().no_tun;
let mut feature_flags = PeerFeatureFlag::default();
@@ -125,6 +127,7 @@ impl GlobalCtx {
running_listeners: Mutex::new(Vec::new()),
enable_exit_node,
proxy_forward_by_system,
no_tun,
feature_flags: AtomicCell::new(feature_flags),
@@ -273,6 +276,10 @@ impl GlobalCtx {
self.enable_exit_node
}
pub fn proxy_forward_by_system(&self) -> bool {
self.proxy_forward_by_system
}
pub fn no_tun(&self) -> bool {
self.no_tun
}

View File

@@ -4,7 +4,7 @@ use std::{
io::Write as _,
sync::{Arc, Mutex},
};
use tokio::task::JoinSet;
use tokio::{task::JoinSet, time::timeout};
use tracing::Instrument;
pub mod compressor;
@@ -47,16 +47,13 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
origin: String,
) {
let js = Arc::downgrade(&js);
let o = origin.clone();
tokio::spawn(
async move {
loop {
while js.strong_count() > 0 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if js.weak_count() == 0 {
tracing::info!("joinset task exit");
break;
}
future::poll_fn(|cx| {
let fut = future::poll_fn(|cx| {
let Some(js) = js.upgrade() else {
return std::task::Poll::Ready(());
};
@@ -64,15 +61,24 @@ pub fn join_joinset_background<T: Debug + Send + Sync + 'static>(
let mut js = js.lock().unwrap();
while !js.is_empty() {
let ret = js.poll_join_next(cx);
if ret.is_pending() {
return std::task::Poll::Pending;
match ret {
std::task::Poll::Ready(Some(_)) => {
continue;
}
std::task::Poll::Ready(None) => {
break;
}
std::task::Poll::Pending => {
return std::task::Poll::Pending;
}
}
}
std::task::Poll::Ready(())
})
.await;
});
let _ = timeout(std::time::Duration::from_secs(5), fut).await;
}
tracing::debug!(?o, "joinset task exit");
}
.instrument(tracing::info_span!(
"join_joinset_background",
@@ -167,5 +173,6 @@ mod tests {
drop(js);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
assert_eq!(weak_js.weak_count(), 0);
assert_eq!(weak_js.strong_count(), 0);
}
}

View File

@@ -1,5 +1,5 @@
use std::collections::BTreeSet;
use std::net::{IpAddr, SocketAddr};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -8,6 +8,8 @@ use crate::proto::common::{NatType, StunInfo};
use anyhow::Context;
use chrono::Local;
use crossbeam::atomic::AtomicCell;
use hickory_resolver::config::{NameServerConfig, Protocol, ResolverConfig, ResolverOpts};
use hickory_resolver::TokioAsyncResolver;
use rand::seq::IteratorRandom;
use tokio::net::{lookup_host, UdpSocket};
use tokio::sync::{broadcast, Mutex};
@@ -22,21 +24,68 @@ use crate::common::error::Error;
use super::stun_codec_ext::*;
pub fn get_default_resolver_config() -> ResolverConfig {
let mut default_resolve_config = ResolverConfig::new();
default_resolve_config.add_name_server(NameServerConfig::new(
"223.5.5.5:53".parse().unwrap(),
Protocol::Udp,
));
default_resolve_config.add_name_server(NameServerConfig::new(
"180.184.1.1:53".parse().unwrap(),
Protocol::Udp,
));
default_resolve_config
}
pub async fn resolve_txt_record(
domain_name: &str,
resolver: &TokioAsyncResolver,
) -> Result<String, Error> {
let response = resolver.txt_lookup(domain_name).await.with_context(|| {
format!(
"txt_lookup failed, domain_name: {}",
domain_name.to_string()
)
})?;
let txt_record = response.iter().next().with_context(|| {
format!(
"no txt record found, domain_name: {}",
domain_name.to_string()
)
})?;
let txt_data = String::from_utf8_lossy(&txt_record.txt_data()[0]);
tracing::info!(?txt_data, ?domain_name, "get txt record");
Ok(txt_data.to_string())
}
struct HostResolverIter {
hostnames: Vec<String>,
ips: Vec<SocketAddr>,
max_ip_per_domain: u32,
use_ipv6: bool,
}
impl HostResolverIter {
fn new(hostnames: Vec<String>, max_ip_per_domain: u32) -> Self {
fn new(hostnames: Vec<String>, max_ip_per_domain: u32, use_ipv6: bool) -> Self {
Self {
hostnames,
ips: vec![],
max_ip_per_domain,
use_ipv6,
}
}
async fn get_txt_record(domain_name: &str) -> Result<Vec<String>, Error> {
let resolver = TokioAsyncResolver::tokio_from_system_conf().unwrap_or(
TokioAsyncResolver::tokio(get_default_resolver_config(), ResolverOpts::default()),
);
let txt_data = resolve_txt_record(domain_name, &resolver).await?;
Ok(txt_data.split(" ").map(|x| x.to_string()).collect())
}
#[async_recursion::async_recursion]
async fn next(&mut self) -> Option<SocketAddr> {
if self.ips.is_empty() {
@@ -51,10 +100,35 @@ impl HostResolverIter {
format!("{}:3478", host)
};
if host.starts_with("txt:") {
let domain_name = host.trim_start_matches("txt:");
match Self::get_txt_record(domain_name).await {
Ok(hosts) => {
tracing::info!(
?domain_name,
?hosts,
"get txt record success when resolve stun server"
);
// insert hosts to the head of hostnames
self.hostnames.splice(0..0, hosts.into_iter());
}
Err(e) => {
tracing::warn!(
?domain_name,
?e,
"get txt record failed when resolve stun server"
);
}
}
return self.next().await;
}
let use_ipv6 = self.use_ipv6;
match lookup_host(&host).await {
Ok(ips) => {
self.ips = ips
.filter(|x| x.is_ipv4())
.filter(|x| if use_ipv6 { x.is_ipv6() } else { x.is_ipv4() })
.choose_multiple(&mut rand::thread_rng(), self.max_ip_per_domain as usize);
if self.ips.is_empty() {
@@ -400,7 +474,7 @@ impl UdpNatTypeDetectResult {
// find resp with distinct stun server
self.stun_resps
.iter()
.map(|x| x.stun_server_addr)
.map(|x| x.recv_from_addr)
.collect::<BTreeSet<_>>()
.len()
}
@@ -555,8 +629,11 @@ impl UdpNatTypeDetector {
udp: Arc<UdpSocket>,
) -> Result<UdpNatTypeDetectResult, Error> {
let mut stun_servers = vec![];
let mut host_resolver =
HostResolverIter::new(self.stun_server_hosts.clone(), self.max_ip_per_domain);
let mut host_resolver = HostResolverIter::new(
self.stun_server_hosts.clone(),
self.max_ip_per_domain,
false,
);
while let Some(addr) = host_resolver.next().await {
stun_servers.push(addr);
}
@@ -602,7 +679,9 @@ pub trait StunInfoCollectorTrait: Send + Sync {
pub struct StunInfoCollector {
stun_servers: Arc<RwLock<Vec<String>>>,
stun_servers_v6: Arc<RwLock<Vec<String>>>,
udp_nat_test_result: Arc<RwLock<Option<UdpNatTypeDetectResult>>>,
public_ipv6: Arc<AtomicCell<Option<Ipv6Addr>>>,
nat_test_result_time: Arc<AtomicCell<chrono::DateTime<Local>>>,
redetect_notify: Arc<tokio::sync::Notify>,
tasks: std::sync::Mutex<JoinSet<()>>,
@@ -621,7 +700,12 @@ impl StunInfoCollectorTrait for StunInfoCollector {
udp_nat_type: result.nat_type() as i32,
tcp_nat_type: 0,
last_update_time: self.nat_test_result_time.load().timestamp(),
public_ip: result.public_ips().iter().map(|x| x.to_string()).collect(),
public_ip: result
.public_ips()
.iter()
.map(|x| x.to_string())
.chain(self.public_ipv6.load().map(|x| x.to_string()))
.collect(),
min_port: result.min_port() as u32,
max_port: result.max_port() as u32,
}
@@ -640,7 +724,7 @@ impl StunInfoCollectorTrait for StunInfoCollector {
if stun_servers.is_empty() {
let mut host_resolver =
HostResolverIter::new(self.stun_servers.read().unwrap().clone(), 2);
HostResolverIter::new(self.stun_servers.read().unwrap().clone(), 2, false);
while let Some(addr) = host_resolver.next().await {
stun_servers.push(addr);
if stun_servers.len() >= 2 {
@@ -680,7 +764,9 @@ impl StunInfoCollector {
pub fn new(stun_servers: Vec<String>) -> Self {
Self {
stun_servers: Arc::new(RwLock::new(stun_servers)),
stun_servers_v6: Arc::new(RwLock::new(Self::get_default_servers_v6())),
udp_nat_test_result: Arc::new(RwLock::new(None)),
public_ipv6: Arc::new(AtomicCell::new(None)),
nat_test_result_time: Arc::new(AtomicCell::new(Local::now())),
redetect_notify: Arc::new(tokio::sync::Notify::new()),
tasks: std::sync::Mutex::new(JoinSet::new()),
@@ -696,28 +782,42 @@ impl StunInfoCollector {
// NOTICE: we may need to choose stun stun server based on geo location
// stun server cross nation may return a external ip address with high latency and loss rate
vec![
"txt:stun.easytier.cn",
"stun.miwifi.com",
"stun.chat.bilibili.com",
"stun.hitv.com",
"stun.cdnbye.com",
"stun.douyucdn.cn:18000",
"fwa.lifesizecloud.com",
"global.turn.twilio.com",
"turn.cloudflare.com",
"stun.isp.net.au",
"stun.nextcloud.com",
"stun.freeswitch.org",
"stun.voip.blackberry.com",
"stunserver.stunprotocol.org",
"stun.sipnet.com",
"stun.radiojar.com",
"stun.sonetel.com",
]
.iter()
.map(|x| x.to_string())
.collect()
}
pub fn get_default_servers_v6() -> Vec<String> {
vec!["txt:stun-v6.easytier.cn"]
.iter()
.map(|x| x.to_string())
.collect()
}
async fn get_public_ipv6(servers: &Vec<String>) -> Option<Ipv6Addr> {
let mut ips = HostResolverIter::new(servers.to_vec(), 10, true);
while let Some(ip) = ips.next().await {
let udp = Arc::new(UdpSocket::bind(format!("[::]:0")).await.unwrap());
let ret = StunClientBuilder::new(udp.clone())
.new_stun_client(ip)
.bind_request(false, false)
.await;
tracing::debug!(?ret, "finish ipv6 udp nat type detect");
match ret.map(|x| x.mapped_socket_addr.map(|x| x.ip())) {
Ok(Some(IpAddr::V6(v6))) => {
return Some(v6);
}
_ => {}
}
}
None
}
fn start_stun_routine(&self) {
if self.started.load(std::sync::atomic::Ordering::Relaxed) {
return;
@@ -784,6 +884,30 @@ impl StunInfoCollector {
}
}
});
// for ipv6
let stun_servers = self.stun_servers_v6.clone();
let stored_ipv6 = self.public_ipv6.clone();
let redetect_notify = self.redetect_notify.clone();
self.tasks.lock().unwrap().spawn(async move {
loop {
let servers = stun_servers.read().unwrap().clone();
Self::get_public_ipv6(&servers)
.await
.map(|x| stored_ipv6.store(Some(x)));
let sleep_sec = if stored_ipv6.load().is_none() {
60
} else {
360
};
tokio::select! {
_ = redetect_notify.notified() => {}
_ = tokio::time::sleep(Duration::from_secs(sleep_sec)) => {}
}
}
});
}
pub fn update_stun_info(&self) {
@@ -862,6 +986,48 @@ mod tests {
let detector = UdpNatTypeDetector::new(stun_servers, 1);
let ret = detector.detect_nat_type(0).await;
println!("{:#?}, {:?}", ret, ret.as_ref().unwrap().nat_type());
assert_eq!(ret.unwrap().nat_type(), NatType::PortRestricted);
assert_eq!(ret.unwrap().nat_type(), NatType::Restricted);
}
#[tokio::test]
async fn test_txt_public_stun_server() {
let stun_servers = vec!["txt:stun.easytier.cn".to_string()];
let detector = UdpNatTypeDetector::new(stun_servers, 1);
let ret = detector.detect_nat_type(0).await;
println!("{:#?}, {:?}", ret, ret.as_ref().unwrap().nat_type());
assert!(!ret.unwrap().stun_resps.is_empty());
}
#[tokio::test]
async fn test_v4_stun() {
let mut udp_server = UdpTunnelListener::new("udp://0.0.0.0:55355".parse().unwrap());
let mut tasks = JoinSet::new();
tasks.spawn(async move {
udp_server.listen().await.unwrap();
loop {
udp_server.accept().await.unwrap();
}
});
let stun_servers = vec!["127.0.0.1:55355".to_string()];
let detector = UdpNatTypeDetector::new(stun_servers, 1);
let ret = detector.detect_nat_type(0).await;
println!("{:#?}, {:?}", ret, ret.as_ref().unwrap().nat_type());
assert_eq!(ret.unwrap().nat_type(), NatType::Restricted);
}
#[tokio::test]
async fn test_v6_stun() {
let mut udp_server = UdpTunnelListener::new("udp://[::]:55355".parse().unwrap());
let mut tasks = JoinSet::new();
tasks.spawn(async move {
udp_server.listen().await.unwrap();
loop {
udp_server.accept().await.unwrap();
}
});
let stun_servers = vec!["::1:55355".to_string()];
let ret = StunInfoCollector::get_public_ipv6(&stun_servers).await;
println!("{:#?}", ret);
}
}

View File

@@ -1,7 +1,9 @@
// try connect peers directly, with either its public ip or lan ip
use std::{
net::SocketAddr,
collections::HashSet,
net::{Ipv6Addr, SocketAddr},
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
@@ -79,7 +81,6 @@ struct DstListenerUrlBlackListItem(PeerId, url::Url);
struct DirectConnectorManagerData {
global_ctx: ArcGlobalCtx,
peer_manager: Arc<PeerManager>,
dst_blacklist: timedmap::TimedMap<DstBlackListItem, ()>,
dst_listener_blacklist: timedmap::TimedMap<DstListenerUrlBlackListItem, ()>,
}
@@ -88,7 +89,6 @@ impl DirectConnectorManagerData {
Self {
global_ctx,
peer_manager,
dst_blacklist: timedmap::TimedMap::new(),
dst_listener_blacklist: timedmap::TimedMap::new(),
}
}
@@ -150,7 +150,9 @@ impl DirectConnectorManager {
let peers = data.peer_manager.list_peers().await;
let mut tasks = JoinSet::new();
for peer_id in peers {
if peer_id == my_peer_id {
if peer_id == my_peer_id
|| data.peer_manager.has_directly_connected_conn(peer_id)
{
continue;
}
tasks.spawn(Self::do_try_direct_connect(data.clone(), peer_id));
@@ -173,24 +175,13 @@ impl DirectConnectorManager {
dst_peer_id: PeerId,
addr: String,
) -> Result<(), Error> {
data.dst_blacklist.cleanup();
if data
.dst_blacklist
.contains(&DstBlackListItem(dst_peer_id.clone(), addr.clone()))
{
tracing::debug!("try_connect_to_ip failed, addr in blacklist: {}", addr);
return Err(Error::UrlInBlacklist);
}
let connector = create_connector_by_url(&addr, &data.global_ctx).await?;
let (peer_id, conn_id) = timeout(
std::time::Duration::from_secs(5),
data.peer_manager.try_connect(connector),
std::time::Duration::from_secs(3),
data.peer_manager.try_direct_connect(connector),
)
.await??;
// let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?;
if peer_id != dst_peer_id && !TESTING.load(Ordering::Relaxed) {
tracing::info!(
"connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}",
@@ -204,6 +195,7 @@ impl DirectConnectorManager {
.await?;
return Err(Error::InvalidUrl(addr));
}
Ok(())
}
@@ -214,7 +206,7 @@ impl DirectConnectorManager {
addr: String,
) -> Result<(), Error> {
let mut rand_gen = rand::rngs::OsRng::default();
let backoff_ms = vec![1000, 2000, 4000];
let backoff_ms = vec![1000, 2000];
let mut backoff_idx = 0;
loop {
@@ -237,12 +229,6 @@ impl DirectConnectorManager {
backoff_idx += 1;
continue;
} else {
data.dst_blacklist.insert(
DstBlackListItem(dst_peer_id.clone(), addr.clone()),
(),
std::time::Duration::from_secs(DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC),
);
return ret;
}
}
@@ -273,61 +259,43 @@ impl DirectConnectorManager {
tracing::debug!(?available_listeners, "got available listeners");
let mut listener = available_listeners.get(0).ok_or(anyhow::anyhow!(
"peer {} have no valid listener",
dst_peer_id
))?;
if available_listeners.is_empty() {
return Err(anyhow::anyhow!("peer {} have no valid listener", dst_peer_id).into());
}
// if have default listener, use it first
listener = available_listeners
let listener = available_listeners
.iter()
.find(|l| l.scheme() == data.global_ctx.get_flags().default_protocol)
.unwrap_or(listener);
.unwrap_or(available_listeners.get(0).unwrap());
let mut tasks = JoinSet::new();
let mut tasks = bounded_join_set::JoinSet::new(2);
let listener_host = listener.socket_addrs(|| None).unwrap().pop();
match listener_host {
Some(SocketAddr::V4(s_addr)) => {
if s_addr.ip().is_unspecified() {
ip_list.interface_ipv4s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});
if let Some(public_ipv4) = ip_list.public_ipv4 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(public_ipv4.to_string().as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv4,
?listener,
?dst_peer_id,
"failed to set host for public ipv4"
);
}
}
ip_list
.interface_ipv4s
.iter()
.chain(ip_list.public_ipv4.iter())
.for_each(|ip| {
let mut addr = (*listener).clone();
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
@@ -338,47 +306,42 @@ impl DirectConnectorManager {
}
Some(SocketAddr::V6(s_addr)) => {
if s_addr.ip().is_unspecified() {
ip_list.interface_ipv6s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv6"
);
}
});
if let Some(public_ipv6) = ip_list.public_ipv6 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv6,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
}
}
// for ipv6, only try public ip
ip_list
.interface_ipv6s
.iter()
.chain(ip_list.public_ipv6.iter())
.filter_map(|x| Ipv6Addr::from_str(&x.to_string()).ok())
.filter(|x| {
TESTING.load(Ordering::Relaxed)
|| (!x.is_loopback()
&& !x.is_unspecified()
&& !x.is_unique_local()
&& !x.is_unicast_link_local()
&& !x.is_multicast())
})
.collect::<HashSet<_>>()
.iter()
.for_each(|ip| {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
}
});
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
@@ -430,14 +393,6 @@ impl DirectConnectorManager {
dst_peer_id: PeerId,
) -> Result<(), Error> {
let peer_manager = data.peer_manager.clone();
// check if we have direct connection with dst_peer_id
if let Some(c) = peer_manager.list_peer_conns(dst_peer_id).await {
// currently if we have any type of direct connection (udp or tcp), we will not try to connect
if !c.is_empty() {
return Ok(());
}
}
tracing::debug!("try direct connect to peer: {}", dst_peer_id);
let rpc_stub = peer_manager
@@ -466,8 +421,7 @@ mod tests {
use crate::{
connector::direct::{
DirectConnectorManager, DirectConnectorManagerData, DstBlackListItem,
DstListenerUrlBlackListItem,
DirectConnectorManager, DirectConnectorManagerData, DstListenerUrlBlackListItem,
},
instance::listeners::ListenerManager,
peers::tests::{
@@ -526,9 +480,7 @@ mod tests {
#[values("tcp", "udp", "wg")] proto: &str,
#[values("true", "false")] ipv6: bool,
) {
if ipv6 && proto != "udp" {
return;
}
TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
@@ -544,14 +496,18 @@ mod tests {
dm_a.run_as_client();
dm_c.run_as_server();
let port = if proto == "wg" { 11040 } else { 11041 };
if !ipv6 {
let port = if proto == "wg" { 11040 } else { 11041 };
p_c.get_global_ctx().config.set_listeners(vec![format!(
"{}://0.0.0.0:{}",
proto, port
)
.parse()
.unwrap()]);
} else {
p_c.get_global_ctx()
.config
.set_listeners(vec![format!("{}://[::]:{}", proto, port).parse().unwrap()]);
}
let mut f = p_c.get_global_ctx().config.get_flags();
f.enable_ipv6 = ipv6;
@@ -592,9 +548,5 @@ mod tests {
1,
"tcp://127.0.0.1:10222".parse().unwrap()
)));
assert!(data
.dst_blacklist
.contains(&DstBlackListItem(1, ip_list.listeners[0].to_string())));
}
}

View File

@@ -0,0 +1,257 @@
use std::{net::SocketAddr, sync::Arc};
use crate::{
common::{
error::Error,
global_ctx::ArcGlobalCtx,
stun::{get_default_resolver_config, resolve_txt_record},
},
tunnel::{IpVersion, Tunnel, TunnelConnector, TunnelError, PROTO_PORT_OFFSET},
};
use anyhow::Context;
use dashmap::DashSet;
use hickory_resolver::{
config::{ResolverConfig, ResolverOpts},
proto::rr::rdata::SRV,
TokioAsyncResolver,
};
use rand::{seq::SliceRandom, Rng as _};
use crate::proto::common::TunnelInfo;
use super::{create_connector_by_url, http_connector::TunnelWithInfo};
fn weighted_choice<T>(options: &[(T, u64)]) -> Option<&T> {
let total_weight = options.iter().map(|(_, weight)| *weight).sum();
let mut rng = rand::thread_rng();
let rand_value = rng.gen_range(0..total_weight);
let mut accumulated_weight = 0;
for (item, weight) in options {
accumulated_weight += *weight;
if rand_value < accumulated_weight {
return Some(item);
}
}
None
}
#[derive(Debug)]
pub struct DNSTunnelConnector {
addr: url::Url,
bind_addrs: Vec<SocketAddr>,
global_ctx: ArcGlobalCtx,
ip_version: IpVersion,
default_resolve_config: ResolverConfig,
default_resolve_opts: ResolverOpts,
}
impl DNSTunnelConnector {
pub fn new(addr: url::Url, global_ctx: ArcGlobalCtx) -> Self {
Self {
addr,
bind_addrs: Vec::new(),
global_ctx,
ip_version: IpVersion::Both,
default_resolve_config: get_default_resolver_config(),
default_resolve_opts: ResolverOpts::default(),
}
}
#[tracing::instrument(ret, err)]
pub async fn handle_txt_record(
&self,
domain_name: &str,
) -> Result<Box<dyn TunnelConnector>, Error> {
let resolver =
TokioAsyncResolver::tokio_from_system_conf().unwrap_or(TokioAsyncResolver::tokio(
self.default_resolve_config.clone(),
self.default_resolve_opts.clone(),
));
let txt_data = resolve_txt_record(domain_name, &resolver)
.await
.with_context(|| format!("resolve txt record failed, domain_name: {}", domain_name))?;
let candidate_urls = txt_data
.split(" ")
.map(|s| s.to_string())
.filter_map(|s| url::Url::parse(s.as_str()).ok())
.collect::<Vec<_>>();
// shuffle candidate_urls and get the first one
let url = candidate_urls
.choose(&mut rand::thread_rng())
.with_context(|| {
format!(
"no valid url found, txt_data: {}, expecting an url list splitted by space",
txt_data
)
})?;
let mut connector = create_connector_by_url(url.as_str(), &self.global_ctx).await?;
connector.set_ip_version(self.ip_version);
Ok(connector)
}
fn handle_one_srv_record(record: &SRV, protocol: &str) -> Result<(url::Url, u64), Error> {
// port must be non-zero
if record.port() == 0 {
return Err(anyhow::anyhow!("port must be non-zero").into());
}
let connector_dst = record.target().to_utf8();
let dst_url = format!("{}://{}:{}", protocol, connector_dst, record.port());
Ok((
dst_url.parse().with_context(|| {
format!(
"parse dst_url failed, protocol: {}, connector_dst: {}, port: {}, dst_url: {}",
protocol,
connector_dst,
record.port(),
dst_url
)
})?,
record.priority() as _,
))
}
#[tracing::instrument(ret, err)]
pub async fn handle_srv_record(
&self,
domain_name: &str,
) -> Result<Box<dyn TunnelConnector>, Error> {
tracing::info!("handle_srv_record: {}", domain_name);
let resolver =
TokioAsyncResolver::tokio_from_system_conf().unwrap_or(TokioAsyncResolver::tokio(
self.default_resolve_config.clone(),
self.default_resolve_opts.clone(),
));
let srv_domains = PROTO_PORT_OFFSET
.iter()
.map(|(p, _)| (format!("_easytier._{}.{}", p, domain_name), *p)) // _easytier._udp.{domain_name}
.collect::<Vec<_>>();
tracing::info!("build srv_domains: {:?}", srv_domains);
let responses = Arc::new(DashSet::new());
let srv_lookup_tasks = srv_domains
.iter()
.map(|(srv_domain, protocol)| {
let resolver = resolver.clone();
let responses = responses.clone();
async move {
let response = resolver.srv_lookup(srv_domain).await.with_context(|| {
format!("srv_lookup failed, srv_domain: {}", srv_domain.to_string())
})?;
tracing::info!(?response, ?srv_domain, "srv_lookup response");
for record in response.iter() {
let parsed_record = Self::handle_one_srv_record(record, &protocol);
tracing::info!(?parsed_record, ?srv_domain, "parsed_record");
if parsed_record.is_err() {
eprintln!(
"got invalid srv record {:?}",
parsed_record.as_ref().unwrap_err()
);
continue;
}
responses.insert(parsed_record.unwrap());
}
Ok::<_, Error>(())
}
})
.collect::<Vec<_>>();
let _ = futures::future::join_all(srv_lookup_tasks).await;
let srv_records = responses.iter().map(|r| r.clone()).collect::<Vec<_>>();
if srv_records.is_empty() {
return Err(anyhow::anyhow!("no srv record found").into());
}
let url = weighted_choice(srv_records.as_slice()).with_context(|| {
format!(
"failed to choose a srv record, domain_name: {}, srv_records: {:?}",
domain_name.to_string(),
srv_records
)
})?;
let mut connector = create_connector_by_url(url.as_str(), &self.global_ctx).await?;
connector.set_ip_version(self.ip_version);
Ok(connector)
}
}
#[async_trait::async_trait]
impl super::TunnelConnector for DNSTunnelConnector {
async fn connect(&mut self) -> Result<Box<dyn Tunnel>, TunnelError> {
let mut conn = if self.addr.scheme() == "txt" {
self.handle_txt_record(self.addr.host_str().as_ref().unwrap())
.await
.with_context(|| "get txt record url failed")?
} else if self.addr.scheme() == "srv" {
self.handle_srv_record(self.addr.host_str().as_ref().unwrap())
.await
.with_context(|| "get srv record url failed")?
} else {
return Err(anyhow::anyhow!(
"unsupported dns scheme: {}, expecting txt or srv",
self.addr.scheme()
)
.into());
};
let t = conn.connect().await?;
let info = t.info().unwrap_or_default();
Ok(Box::new(TunnelWithInfo::new(
t,
TunnelInfo {
local_addr: info.local_addr.clone(),
remote_addr: Some(self.addr.clone().into()),
tunnel_type: format!(
"{}-{}",
self.addr.scheme(),
info.remote_addr.unwrap_or_default()
),
},
)))
}
fn remote_url(&self) -> url::Url {
self.addr.clone()
}
fn set_bind_addrs(&mut self, addrs: Vec<SocketAddr>) {
self.bind_addrs = addrs;
}
fn set_ip_version(&mut self, ip_version: IpVersion) {
self.ip_version = ip_version;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::global_ctx::tests::get_mock_global_ctx;
#[tokio::test]
async fn test_txt() {
let url = "txt://txt.easytier.cn";
let global_ctx = get_mock_global_ctx();
let mut connector = DNSTunnelConnector::new(url.parse().unwrap(), global_ctx);
let ret = connector.connect().await.unwrap();
println!("{:?}", ret.info());
}
#[tokio::test]
async fn test_srv() {
let url = "srv://easytier.cn";
let global_ctx = get_mock_global_ctx();
let mut connector = DNSTunnelConnector::new(url.parse().unwrap(), global_ctx);
let ret = connector.connect().await.unwrap();
println!("{:?}", ret.info());
}
}

View File

@@ -0,0 +1,309 @@
use std::{
net::SocketAddr,
pin::Pin,
sync::{Arc, RwLock},
};
use anyhow::Context;
use http_req::request::{RedirectPolicy, Request};
use rand::seq::SliceRandom as _;
use url::Url;
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx},
tunnel::{IpVersion, Tunnel, TunnelConnector, TunnelError, ZCPacketSink, ZCPacketStream},
};
use crate::proto::common::TunnelInfo;
use super::create_connector_by_url;
pub struct TunnelWithInfo {
inner: Box<dyn Tunnel>,
info: TunnelInfo,
}
impl TunnelWithInfo {
pub fn new(inner: Box<dyn Tunnel>, info: TunnelInfo) -> Self {
Self { inner, info }
}
}
impl Tunnel for TunnelWithInfo {
fn split(&self) -> (Pin<Box<dyn ZCPacketStream>>, Pin<Box<dyn ZCPacketSink>>) {
self.inner.split()
}
fn info(&self) -> Option<TunnelInfo> {
Some(self.info.clone())
}
}
#[derive(Debug, PartialEq, Copy, Clone)]
enum HttpRedirectType {
Unknown,
// redirected url is in the path of new url
RedirectToQuery,
// redirected url is the entire new url
RedirectToUrl,
// redirected url is in the body of response
BodyUrls,
}
#[derive(Debug)]
pub struct HttpTunnelConnector {
addr: url::Url,
bind_addrs: Vec<SocketAddr>,
ip_version: IpVersion,
global_ctx: ArcGlobalCtx,
redirect_type: HttpRedirectType,
}
impl HttpTunnelConnector {
pub fn new(addr: url::Url, global_ctx: ArcGlobalCtx) -> Self {
Self {
addr,
bind_addrs: Vec::new(),
ip_version: IpVersion::Both,
global_ctx,
redirect_type: HttpRedirectType::Unknown,
}
}
#[tracing::instrument(ret)]
async fn handle_302_redirect(
&mut self,
new_url: url::Url,
url_str: &str,
) -> Result<Box<dyn TunnelConnector>, Error> {
// the url should be in following format:
// 1: http(s)://easytier.cn/?url=tcp://10.147.22.22:11010 (scheme is http, domain is ignored, path is splitted into proto type and addr)
// 2: http(s)://tcp://10.137.22.22:11010 (connector url is appended to the scheme)
// 3: tcp://10.137.22.22:11010 (scheme is protocol type, the url is used to construct a connector directly)
tracing::info!("redirect to {}", new_url);
let url = url::Url::parse(new_url.as_str())
.with_context(|| format!("parsing redirect url failed. url: {}", new_url))?;
if url.scheme() == "http" || url.scheme() == "https" {
let mut query = new_url
.query_pairs()
.filter_map(|x| url::Url::parse(&x.1).ok())
.collect::<Vec<_>>();
query.shuffle(&mut rand::thread_rng());
if !query.is_empty() {
tracing::info!("try to create connector by url: {}", query[0]);
self.redirect_type = HttpRedirectType::RedirectToQuery;
return create_connector_by_url(&query[0].to_string(), &self.global_ctx).await;
} else if let Some(new_url) = url_str
.strip_prefix(format!("{}://", url.scheme()).as_str())
.and_then(|x| Url::parse(x).ok())
{
// stripe the scheme and create connector by url
self.redirect_type = HttpRedirectType::RedirectToUrl;
return create_connector_by_url(new_url.as_str(), &self.global_ctx).await;
}
return Err(Error::InvalidUrl(format!(
"no valid connector url found in url: {}",
url
)));
} else {
self.redirect_type = HttpRedirectType::RedirectToUrl;
return create_connector_by_url(new_url.as_str(), &self.global_ctx).await;
}
}
#[tracing::instrument]
async fn handle_200_success(
&mut self,
body: &String,
) -> Result<Box<dyn TunnelConnector>, Error> {
// resp body should be line of connector urls, like:
// tcp://10.1.1.1:11010
// udp://10.1.1.1:11010
let mut lines = body
.lines()
.map(|line| line.trim())
.filter(|line| !line.is_empty())
.collect::<Vec<&str>>();
tracing::info!("get {} lines of connector urls", lines.len());
// shuffle the lines and pick the usable one
lines.shuffle(&mut rand::thread_rng());
for line in lines {
let url = url::Url::parse(line);
if url.is_err() {
tracing::warn!("invalid url: {}, skip it", line);
continue;
}
self.redirect_type = HttpRedirectType::BodyUrls;
return create_connector_by_url(line, &self.global_ctx).await;
}
Err(Error::InvalidUrl(format!(
"no valid connector url found, response body: {}",
body
)))
}
#[tracing::instrument(ret)]
pub async fn get_redirected_connector(
&mut self,
original_url: &str,
) -> Result<Box<dyn TunnelConnector>, Error> {
self.redirect_type = HttpRedirectType::Unknown;
tracing::info!("get_redirected_url: {}", original_url);
// Container for body of a response.
let body = Arc::new(RwLock::new(Vec::new()));
let original_url_clone = original_url.to_string();
let body_clone = body.clone();
let res = tokio::task::spawn_blocking(move || {
let uri = http_req::uri::Uri::try_from(original_url_clone.as_ref())
.with_context(|| format!("parsing url failed. url: {}", original_url_clone))?;
tracing::info!("sending http request to {}", uri);
Request::new(&uri)
.redirect_policy(RedirectPolicy::Limit(0))
.timeout(std::time::Duration::from_secs(20))
.send(&mut *body_clone.write().unwrap())
.with_context(|| format!("sending http request failed. url: {}", uri))
})
.await
.map_err(|e| Error::InvalidUrl(format!("task join error: {}", e)))??;
let body = String::from_utf8_lossy(&body.read().unwrap()).to_string();
if res.status_code().is_redirect() {
let redirect_url = res
.headers()
.get("Location")
.ok_or_else(|| Error::InvalidUrl("no redirect address found".to_string()))?;
let new_url = url::Url::parse(redirect_url.as_str())
.with_context(|| format!("parsing redirect url failed. url: {}", redirect_url))?;
return self.handle_302_redirect(new_url, &redirect_url).await;
} else if res.status_code().is_success() {
return self.handle_200_success(&body).await;
} else {
return Err(Error::InvalidUrl(format!(
"unexpected response, resp: {:?}, body: {}",
res, body,
)));
}
}
}
#[async_trait::async_trait]
impl super::TunnelConnector for HttpTunnelConnector {
async fn connect(&mut self) -> Result<Box<dyn Tunnel>, TunnelError> {
let mut conn = self
.get_redirected_connector(self.addr.to_string().as_str())
.await
.with_context(|| "get redirected url failed")?;
conn.set_ip_version(self.ip_version);
let t = conn.connect().await?;
let info = t.info().unwrap_or_default();
Ok(Box::new(TunnelWithInfo::new(
t,
TunnelInfo {
local_addr: info.local_addr.clone(),
remote_addr: Some(self.addr.clone().into()),
tunnel_type: format!(
"{:?}-{}",
self.redirect_type,
info.remote_addr.unwrap_or_default()
),
},
)))
}
fn remote_url(&self) -> url::Url {
self.addr.clone()
}
fn set_bind_addrs(&mut self, addrs: Vec<SocketAddr>) {
self.bind_addrs = addrs;
}
fn set_ip_version(&mut self, ip_version: IpVersion) {
self.ip_version = ip_version;
}
}
#[cfg(test)]
mod tests {
use tokio::{io::AsyncWriteExt as _, net::TcpListener};
use crate::{
common::global_ctx::tests::get_mock_global_ctx,
tunnel::{tcp::TcpTunnelListener, TunnelConnector, TunnelListener},
};
use super::*;
async fn run_http_redirect_server(port: u16, test_type: HttpRedirectType) -> Result<(), Error> {
let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
let (mut stream, _) = listener.accept().await?;
match test_type {
HttpRedirectType::RedirectToQuery => {
let resp = "HTTP/1.1 301 Moved Permanently\r\nLocation: http://test.com/?url=tcp://127.0.0.1:25888\r\n\r\n";
stream.write_all(resp.as_bytes()).await?;
}
HttpRedirectType::RedirectToUrl => {
let resp =
"HTTP/1.1 301 Moved Permanently\r\nLocation: tcp://127.0.0.1:25888\r\n\r\n";
stream.write_all(resp.as_bytes()).await?;
}
HttpRedirectType::BodyUrls => {
let resp = "HTTP/1.1 200 OK\r\n\r\ntcp://127.0.0.1:25888";
stream.write_all(resp.as_bytes()).await?;
}
HttpRedirectType::Unknown => {
panic!("unexpected test type");
}
}
Ok(())
}
#[rstest::rstest]
#[serial_test::serial(http_redirect_test)]
#[tokio::test]
async fn http_redirect_test(
// 1. 301 redirect
// 2. 200 success with valid connector urls
#[values(
HttpRedirectType::RedirectToQuery,
HttpRedirectType::RedirectToUrl,
HttpRedirectType::BodyUrls
)]
test_type: HttpRedirectType,
) {
let http_task = tokio::spawn(run_http_redirect_server(35888, test_type));
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let test_url: url::Url = "http://127.0.0.1:35888".parse().unwrap();
let global_ctx = get_mock_global_ctx();
let mut flags = global_ctx.config.get_flags();
flags.bind_device = false;
global_ctx.config.set_flags(flags);
let mut connector = HttpTunnelConnector::new(test_url.clone(), global_ctx.clone());
let mut listener = TcpTunnelListener::new("tcp://0.0.0.0:25888".parse().unwrap());
listener.listen().await.unwrap();
let task = tokio::spawn(async move {
let _conn = listener.accept().await.unwrap();
});
let t = connector.connect().await.unwrap();
assert_eq!(connector.redirect_type, test_type);
let info = t.info().unwrap();
let remote_addr = info.remote_addr.unwrap();
assert_eq!(remote_addr, test_url.into());
tokio::join!(task).0.unwrap();
tokio::join!(http_task).0.unwrap().unwrap();
}
}

View File

@@ -293,7 +293,6 @@ impl ManualConnectorManager {
ip_version: IpVersion,
) -> Result<ReconnResult, Error> {
let ip_collector = data.global_ctx.get_ip_collector();
let net_ns = data.net_ns.clone();
connector.lock().await.set_ip_version(ip_version);
@@ -309,18 +308,11 @@ impl ManualConnectorManager {
data.global_ctx.issue_event(GlobalCtxEvent::Connecting(
connector.lock().await.remote_url().clone(),
));
let _g = net_ns.guard();
tracing::info!("reconnect try connect... conn: {:?}", connector);
let tunnel = connector.lock().await.connect().await?;
tracing::info!("reconnect get tunnel succ: {:?}", tunnel);
assert_eq!(
dead_url,
tunnel.info().unwrap().remote_addr.unwrap().to_string(),
"info: {:?}",
tunnel.info()
);
let (peer_id, conn_id) = data.peer_manager.add_client_tunnel(tunnel).await?;
let (peer_id, conn_id) = data
.peer_manager
.try_direct_connect(connector.lock().await.as_mut())
.await?;
tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url);
Ok(ReconnResult {
dead_url,
@@ -339,7 +331,7 @@ impl ManualConnectorManager {
let mut ip_versions = vec![];
let u = url::Url::parse(&dead_url)
.with_context(|| format!("failed to parse connector url {:?}", dead_url))?;
if u.scheme() == "ring" {
if u.scheme() == "ring" || u.scheme() == "txt" || u.scheme() == "srv" {
ip_versions.push(IpVersion::Both);
} else {
let addrs = u.socket_addrs(|| Some(1000))?;
@@ -365,8 +357,12 @@ impl ManualConnectorManager {
"cannot get ip from url"
)));
for ip_version in ip_versions {
let use_long_timeout = dead_url.starts_with("http")
|| dead_url.starts_with("srv")
|| dead_url.starts_with("txt");
let ret = timeout(
std::time::Duration::from_secs(1),
// allow http connector to wait longer
std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }),
Self::conn_reconnect_with_ip_version(
data.clone(),
dead_url.clone(),

View File

@@ -3,6 +3,8 @@ use std::{
sync::Arc,
};
use http_connector::HttpTunnelConnector;
#[cfg(feature = "quic")]
use crate::tunnel::quic::QUICTunnelConnector;
#[cfg(feature = "wireguard")]
@@ -19,6 +21,9 @@ pub mod direct;
pub mod manual;
pub mod udp_hole_punch;
pub mod dns_connector;
pub mod http_connector;
async fn set_bind_addr_for_peer_connector(
connector: &mut (impl TunnelConnector + ?Sized),
is_ipv4: bool,
@@ -79,6 +84,10 @@ pub async fn create_connector_by_url(
}
return Ok(Box::new(connector));
}
"http" | "https" => {
let connector = HttpTunnelConnector::new(url, global_ctx.clone());
return Ok(Box::new(connector));
}
"ring" => {
check_scheme_and_get_socket_addr::<uuid::Uuid>(&url, "ring")?;
let connector = RingTunnelConnector::new(url);
@@ -132,6 +141,10 @@ pub async fn create_connector_by_url(
}
return Ok(Box::new(connector));
}
"txt" | "srv" => {
let connector = dns_connector::DNSTunnelConnector::new(url, global_ctx.clone());
return Ok(Box::new(connector));
}
_ => {
return Err(Error::InvalidUrl(url.into()));
}

View File

@@ -388,7 +388,7 @@ impl UdpHolePunchListener {
tracing::warn!(?conn, "udp hole punching listener got peer connection");
let peer_mgr = peer_mgr.clone();
tokio::spawn(async move {
if let Err(e) = peer_mgr.add_tunnel_as_server(conn).await {
if let Err(e) = peer_mgr.add_tunnel_as_server(conn, false).await {
tracing::error!(
?e,
"failed to add tunnel as server in hole punch listener"

View File

@@ -1,8 +1,14 @@
use std::{
ffi::OsString, fmt::Write, net::SocketAddr, path::PathBuf, sync::Mutex, time::Duration, vec,
ffi::OsString,
fmt::Write,
net::{IpAddr, SocketAddr},
path::PathBuf,
sync::Mutex,
time::Duration,
vec,
};
use anyhow::{Context, Ok};
use anyhow::Context;
use clap::{command, Args, Parser, Subcommand};
use humansize::format_size;
use service_manager::*;
@@ -311,7 +317,11 @@ impl CommandHandler {
ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(),
hostname: route.hostname.clone(),
cost: cost_to_str(route.cost),
lat_ms: float_to_str(p.get_latency_ms().unwrap_or(0.0), 3),
lat_ms: if route.cost == 1 {
float_to_str(p.get_latency_ms().unwrap_or(0.0), 3)
} else {
route.path_latency_latency_first().to_string()
},
loss_rate: float_to_str(p.get_loss_rate().unwrap_or(0.0), 3),
rx_bytes: format_size(p.get_rx_bytes().unwrap_or(0), humansize::DECIMAL),
tx_bytes: format_size(p.get_tx_bytes().unwrap_or(0), humansize::DECIMAL),
@@ -1036,6 +1046,7 @@ async fn main() -> Result<(), Error> {
match sub_cmd.sub_command {
Some(NodeSubCommand::Info) | None => {
let stun_info = node_info.stun_info.clone().unwrap_or_default();
let ip_list = node_info.ip_list.clone().unwrap_or_default();
let mut builder = tabled::builder::Builder::default();
builder.push_record(vec!["Virtual IP", node_info.ipv4_addr.as_str()]);
@@ -1045,11 +1056,32 @@ async fn main() -> Result<(), Error> {
node_info.proxy_cidrs.join(", ").as_str(),
]);
builder.push_record(vec!["Peer ID", node_info.peer_id.to_string().as_str()]);
builder.push_record(vec!["Public IP", stun_info.public_ip.join(", ").as_str()]);
stun_info.public_ip.iter().for_each(|ip| {
let Ok(ip) = ip.parse::<IpAddr>() else {
return;
};
if ip.is_ipv4() {
builder.push_record(vec!["Public IPv4", ip.to_string().as_str()]);
} else {
builder.push_record(vec!["Public IPv6", ip.to_string().as_str()]);
}
});
builder.push_record(vec![
"UDP Stun Type",
format!("{:?}", stun_info.udp_nat_type()).as_str(),
]);
ip_list.interface_ipv4s.iter().for_each(|ip| {
builder.push_record(vec![
"Interface IPv4",
format!("{}", ip.to_string()).as_str(),
]);
});
ip_list.interface_ipv6s.iter().for_each(|ip| {
builder.push_record(vec![
"Interface IPv6",
format!("{}", ip.to_string()).as_str(),
]);
});
for (idx, l) in node_info.listeners.iter().enumerate() {
if l.starts_with("ring") {
continue;

View File

@@ -6,6 +6,7 @@ extern crate rust_i18n;
use std::{
net::{Ipv4Addr, SocketAddr},
path::PathBuf,
sync::Arc,
};
use anyhow::Context;
@@ -19,12 +20,17 @@ use easytier::{
TomlConfigLoader, VpnPortalConfig,
},
constants::EASYTIER_VERSION,
global_ctx::{EventBusSubscriber, GlobalCtxEvent},
global_ctx::{EventBusSubscriber, GlobalCtx, GlobalCtxEvent},
scoped_task::ScopedTask,
stun::MockStunInfoCollector,
},
connector::{create_connector_by_url, dns_connector::DNSTunnelConnector},
launcher,
proto::{self, common::CompressionAlgoPb},
tunnel::udp::UdpTunnelConnector,
proto::{
self,
common::{CompressionAlgoPb, NatType},
},
tunnel::PROTO_PORT_OFFSET,
utils::{init_logger, setup_panic_handler},
web_client,
};
@@ -236,6 +242,13 @@ struct Cli {
)]
enable_exit_node: bool,
#[arg(
long,
help = t!("core_clap.proxy_forward_by_system").to_string(),
default_value = "false"
)]
proxy_forward_by_system: bool,
#[arg(
long,
help = t!("core_clap.no_tun").to_string(),
@@ -295,12 +308,6 @@ struct Cli {
)]
socks5: Option<u16>,
#[arg(
long,
help = t!("core_clap.ipv6_listener").to_string()
)]
ipv6_listener: Option<String>,
#[arg(
long,
help = t!("core_clap.compression").to_string(),
@@ -333,8 +340,6 @@ rust_i18n::i18n!("locales", fallback = "en");
impl Cli {
fn parse_listeners(no_listener: bool, listeners: Vec<String>) -> anyhow::Result<Vec<String>> {
let proto_port_offset = vec![("tcp", 0), ("udp", 0), ("wg", 1), ("ws", 1), ("wss", 2)];
if no_listener || listeners.is_empty() {
return Ok(vec![]);
}
@@ -343,8 +348,8 @@ impl Cli {
let mut listeners: Vec<String> = Vec::new();
if origin_listners.len() == 1 {
if let Ok(port) = origin_listners[0].parse::<u16>() {
for (proto, offset) in proto_port_offset {
listeners.push(format!("{}://0.0.0.0:{}", proto, port + offset));
for (proto, offset) in PROTO_PORT_OFFSET {
listeners.push(format!("{}://0.0.0.0:{}", proto, port + *offset));
}
return Ok(listeners);
}
@@ -359,7 +364,7 @@ impl Cli {
panic!("failed to parse listener: {}", l);
}
} else {
let Some((proto, offset)) = proto_port_offset
let Some((proto, offset)) = PROTO_PORT_OFFSET
.iter()
.find(|(proto, _)| *proto == proto_port[0])
else {
@@ -556,6 +561,7 @@ impl TryFrom<&Cli> for TomlConfigLoader {
f.mtu = mtu as u32;
}
f.enable_exit_node = cli.enable_exit_node;
f.proxy_forward_by_system = cli.proxy_forward_by_system;
f.no_tun = cli.no_tun || cfg!(not(feature = "tun"));
f.use_smoltcp = cli.use_smoltcp;
if let Some(wl) = cli.relay_network_whitelist.as_ref() {
@@ -564,11 +570,6 @@ impl TryFrom<&Cli> for TomlConfigLoader {
f.disable_p2p = cli.disable_p2p;
f.disable_udp_hole_punching = cli.disable_udp_hole_punching;
f.relay_all_peer_rpc = cli.relay_all_peer_rpc;
if let Some(ipv6_listener) = cli.ipv6_listener.as_ref() {
f.ipv6_listener = ipv6_listener
.parse()
.with_context(|| format!("failed to parse ipv6 listener: {}", ipv6_listener))?
}
f.multi_thread = cli.multi_thread;
f.data_compress_algo = match cli.compression.as_str() {
"none" => CompressionAlgoPb::None,
@@ -860,8 +861,20 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> {
panic!("empty token");
}
let _wc = web_client::WebClient::new(UdpTunnelConnector::new(c_url), token.to_string());
let config = TomlConfigLoader::default();
let global_ctx = Arc::new(GlobalCtx::new(config));
global_ctx.replace_stun_info_collector(Box::new(MockStunInfoCollector {
udp_nat_type: NatType::Unknown,
}));
let mut flags = global_ctx.get_flags();
flags.bind_device = false;
global_ctx.set_flags(flags);
let _wc = web_client::WebClient::new(
create_connector_by_url(c_url.as_str(), &global_ctx).await?,
token.to_string(),
);
tokio::signal::ctrl_c().await.unwrap();
DNSTunnelConnector::new("".parse().unwrap(), global_ctx);
return Ok(());
}

View File

@@ -361,7 +361,9 @@ impl KcpProxyDst {
proxy_entries.remove(&conn_id);
}
if Some(dst_socket.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address())) {
if Some(dst_socket.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address()))
&& global_ctx.no_tun()
{
dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap();
}

View File

@@ -8,14 +8,17 @@ use pnet::packet::ipv4::{Ipv4Packet, MutableIpv4Packet};
use pnet::packet::tcp::{ipv4_checksum, MutableTcpPacket, TcpPacket};
use pnet::packet::MutablePacket;
use pnet::packet::Packet;
use socket2::{SockRef, TcpKeepalive};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::atomic::{AtomicBool, AtomicU16};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use tokio::io::{copy_bidirectional, AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::select;
use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinSet;
use tokio::time::timeout;
use tracing::Instrument;
use crate::common::error::Result;
@@ -59,18 +62,31 @@ pub struct NatDstTcpConnector;
#[async_trait::async_trait]
impl NatDstConnector for NatDstTcpConnector {
type DstStream = TcpStream;
async fn connect(&self, _src: SocketAddr, nat_dst: SocketAddr) -> Result<Self::DstStream> {
let socket = TcpSocket::new_v4().unwrap();
if let Err(e) = socket.set_nodelay(true) {
tracing::warn!("set_nodelay failed, ignore it: {:?}", e);
}
Ok(
tokio::time::timeout(Duration::from_secs(10), socket.connect(nat_dst))
.await?
.with_context(|| format!("connect to nat dst failed: {:?}", nat_dst))?,
)
const TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
const TCP_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(2);
const TCP_KEEPALIVE_RETRIES: u32 = 2;
let stream = timeout(Duration::from_secs(10), socket.connect(nat_dst))
.await?
.with_context(|| format!("connect to nat dst failed: {:?}", nat_dst))?;
let ka = TcpKeepalive::new()
.with_time(TCP_KEEPALIVE_TIME)
.with_interval(TCP_KEEPALIVE_INTERVAL);
#[cfg(not(target_os = "windows"))]
let ka = ka.with_retries(TCP_KEEPALIVE_RETRIES);
let sf = SockRef::from(&stream);
sf.set_tcp_keepalive(&ka)?;
Ok(stream)
}
fn check_packet_from_peer_fast(&self, cidr_set: &CidrSet, global_ctx: &GlobalCtx) -> bool {
@@ -214,11 +230,29 @@ impl SmolTcpListener {
.unwrap();
let tx = tx.clone();
tasks.spawn(async move {
let mut not_listening_count = 0;
loop {
tx.send(tcp.accept().await.map_err(|e| {
anyhow::anyhow!("smol tcp listener accept failed: {:?}", e).into()
}))
.unwrap();
select! {
_ = tokio::time::sleep(Duration::from_secs(2)) => {
if tcp.is_listening() {
not_listening_count = 0;
continue;
}
not_listening_count += 1;
if not_listening_count >= 2 {
tracing::error!("smol tcp listener not listening");
tcp.relisten();
}
}
accept_ret = tcp.accept() => {
tx.send(accept_ret.map_err(|e| {
anyhow::anyhow!("smol tcp listener accept failed: {:?}", e).into()
}))
.unwrap();
not_listening_count = 0;
}
}
}
});
}
@@ -705,17 +739,19 @@ impl<C: NatDstConnector> TcpProxy<C> {
nat_entry.tasks.lock().await.spawn(async move {
let ret = src_tcp_stream.copy_bidirectional(&mut dst_tcp_stream).await;
tracing::info!(nat_entry = ?nat_entry_clone, ret = ?ret, "nat tcp connection closed");
nat_entry_clone.state.store(NatDstEntryState::Closed);
let ret = src_tcp_stream.shutdown().await;
nat_entry_clone.state.store(NatDstEntryState::ClosingSrc);
let ret = timeout(Duration::from_secs(10), src_tcp_stream.shutdown()).await;
tracing::info!(nat_entry = ?nat_entry_clone, ret = ?ret, "src tcp stream shutdown");
let ret = dst_tcp_stream.shutdown().await;
nat_entry_clone.state.store(NatDstEntryState::ClosingDst);
let ret = timeout(Duration::from_secs(10), dst_tcp_stream.shutdown()).await;
tracing::info!(nat_entry = ?nat_entry_clone, ret = ?ret, "dst tcp stream shutdown");
drop(src_tcp_stream);
drop(dst_tcp_stream);
nat_entry_clone.state.store(NatDstEntryState::Closed);
// sleep later so the fin packet can be processed
tokio::time::sleep(Duration::from_secs(10)).await;

View File

@@ -91,6 +91,19 @@ async fn run(
&mut device,
&mut socket_allocator.sockets().lock(),
);
// wake up all closed sockets (smoltcp seems have a bug that it doesn't wake up closed sockets)
for (_, socket) in socket_allocator.sockets().lock().iter_mut() {
match socket {
Socket::Tcp(tcp) => {
if tcp.state() == smoltcp::socket::tcp::State::Closed {
tcp.abort();
}
}
#[allow(unreachable_patterns)]
_ => {}
}
}
}
Ok(())

View File

@@ -67,6 +67,19 @@ impl TcpListener {
pub fn local_addr(&self) -> io::Result<SocketAddr> {
Ok(self.local_addr)
}
pub fn relisten(&mut self) {
let mut socket = self.reactor.get_socket::<tcp::Socket>(*self.handle);
let local_endpoint = socket.local_endpoint().unwrap();
socket.abort();
socket.listen(local_endpoint).unwrap();
self.reactor.notify();
}
pub fn is_listening(&self) -> bool {
let socket = self.reactor.get_socket::<tcp::Socket>(*self.handle);
socket.is_listening()
}
}
pub struct Incoming(TcpListener);
@@ -210,6 +223,9 @@ impl AsyncWrite for TcpStream {
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let mut socket = self.reactor.get_socket::<tcp::Socket>(*self.handle);
if !socket.may_send() {
return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
}
if socket.send_queue() == 0 {
return Poll::Ready(Ok(()));
}

View File

@@ -65,7 +65,9 @@ impl IpProxy {
}
async fn start(&self) -> Result<(), Error> {
if (self.global_ctx.get_proxy_cidrs().is_empty() || self.started.load(Ordering::Relaxed))
if (self.global_ctx.get_proxy_cidrs().is_empty()
|| self.global_ctx.proxy_forward_by_system()
|| self.started.load(Ordering::Relaxed))
&& !self.global_ctx.enable_exit_node()
&& !self.global_ctx.no_tun()
{

View File

@@ -1,5 +1,6 @@
use std::{fmt::Debug, sync::Arc};
use anyhow::Context;
use async_trait::async_trait;
use tokio::task::JoinSet;
@@ -49,6 +50,10 @@ pub fn get_listener_by_url(
})
}
pub fn is_url_host_ipv6(l: &url::Url) -> bool {
l.host_str().map_or(false, |h| h.contains(':'))
}
#[async_trait]
pub trait TunnelHandlerForListener {
async fn handle_tunnel(&self, tunnel: Box<dyn Tunnel>) -> Result<(), Error>;
@@ -58,7 +63,7 @@ pub trait TunnelHandlerForListener {
impl TunnelHandlerForListener for PeerManager {
#[tracing::instrument]
async fn handle_tunnel(&self, tunnel: Box<dyn Tunnel>) -> Result<(), Error> {
self.add_tunnel_as_server(tunnel).await
self.add_tunnel_as_server(tunnel, true).await
}
}
@@ -113,22 +118,26 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
continue;
};
let ctx = self.global_ctx.clone();
self.add_listener(move || get_listener_by_url(&l, ctx.clone()).unwrap(), true)
.await?;
}
if self.global_ctx.config.get_flags().enable_ipv6 {
let ipv6_listener = self.global_ctx.config.get_flags().ipv6_listener.clone();
let _ = self
.add_listener(
move || {
Box::new(UdpTunnelListener::new(
ipv6_listener.clone().parse().unwrap(),
))
},
let listener = l.clone();
self.add_listener(
move || get_listener_by_url(&listener, ctx.clone()).unwrap(),
true,
)
.await?;
if self.global_ctx.config.get_flags().enable_ipv6 && !is_url_host_ipv6(&l) {
let mut ipv6_listener = l.clone();
ipv6_listener
.set_host(Some("[::]".to_string().as_str()))
.with_context(|| format!("failed to set ipv6 host for listener: {}", l))?;
let ctx = self.global_ctx.clone();
self.add_listener(
move || get_listener_by_url(&ipv6_listener, ctx.clone()).unwrap(),
false,
)
.await?;
}
}
Ok(())
@@ -161,11 +170,11 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
}
Err(e) => {
tracing::error!(?e, ?l, "listener listen error");
global_ctx.issue_event(GlobalCtxEvent::ListenerAddFailed(
l.local_url(),
format!("error: {:?}, retry listen later...", e),
));
tracing::error!(?e, ?l, "listener listen error");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
@@ -217,6 +226,15 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
pub async fn run(&mut self) -> Result<(), Error> {
for listener in &self.listeners {
if listener.must_succ {
// try listen once
let mut l = (listener.creator_fn)();
let _g = self.net_ns.guard();
l.listen()
.await
.with_context(|| format!("failed to listen on {}", l.local_url()))?;
}
self.tasks.spawn(Self::run_listener(
listener.creator_fn.clone(),
self.peer_manager.clone(),

View File

@@ -517,6 +517,39 @@ impl NetworkConfig {
})?,
});
}
if self.enable_manual_routes.unwrap_or_default() {
let mut routes = Vec::<cidr::Ipv4Cidr>::with_capacity(self.routes.len());
for route in self.routes.iter() {
routes.push(
route.parse()
.with_context(|| format!("failed to parse route: {}", route))?,
);
}
cfg.set_routes(Some(routes));
}
if self.exit_nodes.len() > 0 {
let mut exit_nodes = Vec::<std::net::Ipv4Addr>::with_capacity(self.exit_nodes.len());
for node in self.exit_nodes.iter() {
exit_nodes.push(
node.parse()
.with_context(|| format!("failed to parse exit node: {}", node))?,
);
}
cfg.set_exit_nodes(exit_nodes);
}
if self.enable_socks5.unwrap_or_default() {
if let Some(socks5_port) = self.socks5_port {
cfg.set_socks5_portal(Some(
format!("socks5://0.0.0.0:{}", socks5_port)
.parse()
.unwrap(),
));
}
}
let mut flags = gen_default_flags();
if let Some(latency_first) = self.latency_first {
flags.latency_first = latency_first;
@@ -550,6 +583,35 @@ impl NetworkConfig {
flags.no_tun = no_tun;
}
if let Some(enable_exit_node) = self.enable_exit_node {
flags.enable_exit_node = enable_exit_node;
}
if let Some(relay_all_peer_rpc) = self.relay_all_peer_rpc {
flags.relay_all_peer_rpc = relay_all_peer_rpc;
}
if let Some(multi_thread) = self.multi_thread {
flags.multi_thread = multi_thread;
}
if let Some(proxy_forward_by_system) = self.proxy_forward_by_system {
flags.proxy_forward_by_system = proxy_forward_by_system;
}
if let Some(disable_encryption) = self.disable_encryption {
flags.enable_encryption = !disable_encryption;
}
if self.enable_relay_network_whitelist.unwrap_or_default() {
if self.relay_network_whitelist.len() > 0 {
flags.relay_network_whitelist = self.relay_network_whitelist.join(" ")
} else {
flags.relay_network_whitelist = "".to_string()
}
}
cfg.set_flags(flags);
Ok(cfg)
}

View File

@@ -1,13 +1,13 @@
#![allow(dead_code)]
mod arch;
mod connector;
mod gateway;
mod instance;
mod peer_center;
mod vpn_portal;
pub mod common;
pub mod connector;
pub mod launcher;
pub mod peers;
pub mod proto;

View File

@@ -24,6 +24,7 @@ use crate::{
config::{ConfigLoader, TomlConfigLoader},
error::Error,
global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent, NetworkIdentity},
join_joinset_background,
stun::MockStunInfoCollector,
PeerId,
},
@@ -181,6 +182,15 @@ impl ForeignNetworkEntry {
}
}
impl Drop for RpcTransport {
fn drop(&mut self) {
tracing::debug!(
"drop rpc transport for foreign network manager, my_peer_id: {:?}",
self.my_peer_id
);
}
}
let (rpc_transport_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel();
let tspt = RpcTransport {
my_peer_id,
@@ -216,7 +226,6 @@ impl ForeignNetworkEntry {
.list_global_foreign_peer(&self.network_identity)
.await;
let local = peer_map.list_peers_with_conn().await;
tracing::debug!(?global, ?local, ?self.my_peer_id, "list peers in foreign network manager");
global.extend(local.iter().cloned());
global
.into_iter()
@@ -426,7 +435,7 @@ pub struct ForeignNetworkManager {
data: Arc<ForeignNetworkManagerData>,
tasks: Mutex<JoinSet<()>>,
tasks: Arc<std::sync::Mutex<JoinSet<()>>>,
}
impl ForeignNetworkManager {
@@ -444,6 +453,9 @@ impl ForeignNetworkManager {
lock: std::sync::Mutex::new(()),
});
let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new()));
join_joinset_background(tasks.clone(), "ForeignNetworkManager".to_string());
Self {
my_peer_id,
global_ctx,
@@ -451,7 +463,7 @@ impl ForeignNetworkManager {
data,
tasks: Mutex::new(JoinSet::new()),
tasks,
}
}
@@ -503,7 +515,7 @@ impl ForeignNetworkManager {
let data = self.data.clone();
let network_name = entry.network.network_name.clone();
let mut s = entry.global_ctx.subscribe();
self.tasks.lock().await.spawn(async move {
self.tasks.lock().unwrap().spawn(async move {
while let Ok(e) = s.recv().await {
match &e {
GlobalCtxEvent::PeerRemoved(peer_id) => {
@@ -683,7 +695,8 @@ mod tests {
let (a_ring, b_ring) = crate::tunnel::ring::create_ring_tunnel_pair();
let b_mgr_copy = pm_center.clone();
let s_ret = tokio::spawn(async move { b_mgr_copy.add_tunnel_as_server(b_ring).await });
let s_ret =
tokio::spawn(async move { b_mgr_copy.add_tunnel_as_server(b_ring, true).await });
pma_net1.add_client_tunnel(a_ring).await.unwrap();

View File

@@ -11,7 +11,7 @@ use super::{
peer_conn::{PeerConn, PeerConnId},
PacketRecvChan,
};
use crate::proto::cli::PeerConnInfo;
use crate::{common::scoped_task::ScopedTask, proto::cli::PeerConnInfo};
use crate::{
common::{
error::Error,
@@ -36,7 +36,8 @@ pub struct Peer {
shutdown_notifier: Arc<tokio::sync::Notify>,
default_conn_id: AtomicCell<PeerConnId>,
default_conn_id: Arc<AtomicCell<PeerConnId>>,
default_conn_id_clear_task: ScopedTask<()>,
}
impl Peer {
@@ -88,6 +89,19 @@ impl Peer {
)),
);
let default_conn_id = Arc::new(AtomicCell::new(PeerConnId::default()));
let conns_copy = conns.clone();
let default_conn_id_copy = default_conn_id.clone();
let default_conn_id_clear_task = ScopedTask::from(tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
if conns_copy.len() > 1 {
default_conn_id_copy.store(PeerConnId::default());
}
}
}));
Peer {
peer_node_id,
conns: conns.clone(),
@@ -98,7 +112,8 @@ impl Peer {
close_event_listener,
shutdown_notifier,
default_conn_id: AtomicCell::new(PeerConnId::default()),
default_conn_id,
default_conn_id_clear_task,
}
}
@@ -117,14 +132,19 @@ impl Peer {
return Some(conn.clone());
}
let conn = self.conns.iter().next();
if conn.is_none() {
return None;
// find a conn with the smallest latency
let mut min_latency = std::u64::MAX;
for conn in self.conns.iter() {
let latency = conn.value().get_stats().latency_us;
if latency < min_latency {
min_latency = latency;
self.default_conn_id.store(conn.get_conn_id());
}
}
let conn = conn.unwrap().clone();
self.default_conn_id.store(conn.get_conn_id());
Some(conn)
self.conns
.get(&self.default_conn_id.load())
.map(|conn| conn.clone())
}
pub async fn send_msg(&self, msg: ZCPacket) -> Result<(), Error> {
@@ -158,6 +178,10 @@ impl Peer {
}
ret
}
pub fn get_default_conn_id(&self) -> PeerConnId {
self.default_conn_id.load()
}
}
// pritn on drop

View File

@@ -84,9 +84,7 @@ impl PingIntervalController {
self.throughput.rx_packets() > self.last_throughput.rx_packets()
}
#[tracing::instrument]
fn should_send_ping(&mut self) -> bool {
tracing::trace!(?self, "check should_send_ping");
if self.loss_counter.load(Ordering::Relaxed) > 0 {
self.backoff_idx = 0;
} else if self.tx_increase() && !self.rx_increase() {
@@ -253,6 +251,13 @@ impl PeerConnPinger {
continue;
}
tracing::debug!(
"pingpong controller send pingpong task, seq: {}, node_id: {}, controller: {:?}",
req_seq,
my_node_id,
controller
);
let mut sink = sink.clone();
let receiver = ctrl_resp_sender.subscribe();
let ping_res_sender = ping_res_sender.clone();

View File

@@ -8,7 +8,7 @@ use std::{
use anyhow::Context;
use async_trait::async_trait;
use dashmap::DashMap;
use dashmap::{DashMap, DashSet};
use tokio::{
sync::{
@@ -23,7 +23,7 @@ use crate::{
compressor::{Compressor as _, DefaultCompressor},
constants::EASYTIER_VERSION,
error::Error,
global_ctx::{ArcGlobalCtx, NetworkIdentity},
global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity},
stun::StunInfoCollectorTrait,
PeerId,
},
@@ -141,6 +141,9 @@ pub struct PeerManager {
data_compress_algo: CompressorAlgo,
exit_nodes: Vec<Ipv4Addr>,
// conns that are directly connected (which are not hole punched)
directly_connected_conn_map: Arc<DashMap<PeerId, DashSet<uuid::Uuid>>>,
}
impl Debug for PeerManager {
@@ -267,6 +270,8 @@ impl PeerManager {
data_compress_algo,
exit_nodes,
directly_connected_conn_map: Arc::new(DashMap::new()),
}
}
@@ -325,8 +330,48 @@ impl PeerManager {
Ok((peer_id, conn_id))
}
fn add_directly_connected_conn(&self, peer_id: PeerId, conn_id: uuid::Uuid) {
let _ = self
.directly_connected_conn_map
.entry(peer_id)
.or_insert_with(DashSet::new)
.insert(conn_id);
}
pub fn has_directly_connected_conn(&self, peer_id: PeerId) -> bool {
self.directly_connected_conn_map
.get(&peer_id)
.map_or(false, |x| !x.is_empty())
}
async fn start_peer_conn_close_event_handler(&self) {
let dmap = self.directly_connected_conn_map.clone();
let mut event_recv = self.global_ctx.subscribe();
self.tasks.lock().await.spawn(async move {
while let Ok(event) = event_recv.recv().await {
match event {
GlobalCtxEvent::PeerConnRemoved(info) => {
if let Some(set) = dmap.get_mut(&info.peer_id) {
let conn_id = info.conn_id.parse().unwrap();
let old = set.remove(&conn_id);
tracing::info!(
?old,
?info,
"try remove conn id from directly connected map"
);
}
}
_ => {}
}
}
});
}
#[tracing::instrument]
pub async fn try_connect<C>(&self, mut connector: C) -> Result<(PeerId, PeerConnId), Error>
pub async fn try_direct_connect<C>(
&self,
mut connector: C,
) -> Result<(PeerId, PeerConnId), Error>
where
C: TunnelConnector + Debug,
{
@@ -334,18 +379,28 @@ impl PeerManager {
let t = ns
.run_async(|| async move { connector.connect().await })
.await?;
self.add_client_tunnel(t).await
let (peer_id, conn_id) = self.add_client_tunnel(t).await?;
self.add_directly_connected_conn(peer_id, conn_id);
Ok((peer_id, conn_id))
}
#[tracing::instrument]
pub async fn add_tunnel_as_server(&self, tunnel: Box<dyn Tunnel>) -> Result<(), Error> {
pub async fn add_tunnel_as_server(
&self,
tunnel: Box<dyn Tunnel>,
is_directly_connected: bool,
) -> Result<(), Error> {
tracing::info!("add tunnel as server start");
let mut peer = PeerConn::new(self.my_peer_id, self.global_ctx.clone(), tunnel);
peer.do_handshake_as_server().await?;
if peer.get_network_identity().network_name
== self.global_ctx.get_network_identity().network_name
{
let (peer_id, conn_id) = (peer.get_peer_id(), peer.get_conn_id());
self.add_new_peer_conn(peer).await?;
if is_directly_connected {
self.add_directly_connected_conn(peer_id, conn_id);
}
} else {
self.foreign_network_manager.add_peer_conn(peer).await?;
}
@@ -417,6 +472,7 @@ impl PeerManager {
let foreign_client = self.foreign_network_client.clone();
let foreign_mgr = self.foreign_network_manager.clone();
let encryptor = self.encryptor.clone();
let compress_algo = self.data_compress_algo;
self.tasks.lock().await.spawn(async move {
tracing::trace!("start_peer_recv");
while let Ok(ret) = recv_packet_from_chan(&mut recv).await {
@@ -447,6 +503,16 @@ impl PeerManager {
}
hdr.forward_counter += 1;
if from_peer_id == my_peer_id
&& (hdr.packet_type == PacketType::Data as u8
|| hdr.packet_type == PacketType::KcpSrc as u8
|| hdr.packet_type == PacketType::KcpDst as u8)
{
let _ = Self::try_compress_and_encrypt(compress_algo, &encryptor, &mut ret)
.await;
}
tracing::trace!(?to_peer_id, ?my_peer_id, "need forward");
let ret =
Self::send_msg_internal(&peers, &foreign_client, ret, to_peer_id).await;
@@ -757,6 +823,20 @@ impl PeerManager {
(dst_peers, is_exit_node)
}
pub async fn try_compress_and_encrypt(
compress_algo: CompressorAlgo,
encryptor: &Box<dyn Encryptor>,
msg: &mut ZCPacket,
) -> Result<(), Error> {
let compressor = DefaultCompressor {};
compressor
.compress(msg, compress_algo)
.await
.with_context(|| "compress failed")?;
encryptor.encrypt(msg).with_context(|| "encrypt failed")?;
Ok(())
}
pub async fn send_msg_ipv4(&self, mut msg: ZCPacket, ipv4_addr: Ipv4Addr) -> Result<(), Error> {
tracing::trace!(
"do send_msg in peer manager, msg: {:?}, ipv4_addr: {}",
@@ -788,14 +868,7 @@ impl PeerManager {
return Ok(());
}
let compressor = DefaultCompressor {};
compressor
.compress(&mut msg, self.data_compress_algo)
.await
.with_context(|| "compress failed")?;
self.encryptor
.encrypt(&mut msg)
.with_context(|| "encrypt failed")?;
Self::try_compress_and_encrypt(self.data_compress_algo, &self.encryptor, &mut msg).await?;
let is_latency_first = self.global_ctx.get_flags().latency_first;
msg.mut_peer_manager_header()
@@ -839,9 +912,11 @@ impl PeerManager {
async fn run_clean_peer_without_conn_routine(&self) {
let peer_map = self.peers.clone();
let dmap = self.directly_connected_conn_map.clone();
self.tasks.lock().await.spawn(async move {
loop {
peer_map.clean_peer_without_conn().await;
dmap.retain(|p, v| peer_map.has_peer(*p) && !v.is_empty());
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
}
});
@@ -858,6 +933,8 @@ impl PeerManager {
}
pub async fn run(&self) -> Result<(), Error> {
self.start_peer_conn_close_event_handler().await;
match &self.route_algo_inst {
RouteAlgoInst::Ospf(route) => self.add_route(route.clone()).await,
RouteAlgoInst::None => {}
@@ -906,7 +983,7 @@ impl PeerManager {
self.foreign_network_client.clone()
}
pub fn get_my_info(&self) -> cli::NodeInfo {
pub async fn get_my_info(&self) -> cli::NodeInfo {
cli::NodeInfo {
peer_id: self.my_peer_id,
ipv4_addr: self
@@ -932,6 +1009,7 @@ impl PeerManager {
config: self.global_ctx.config.dump(),
version: EASYTIER_VERSION.to_string(),
feature_flag: Some(self.global_ctx.get_feature_flags()),
ip_list: Some(self.global_ctx.get_ip_collector().collect_ip_addrs().await),
}
}
@@ -940,6 +1018,13 @@ impl PeerManager {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
pub fn get_directly_connections(&self, peer_id: PeerId) -> DashSet<uuid::Uuid> {
self.directly_connected_conn_map
.get(&peer_id)
.map(|x| x.clone())
.unwrap_or_default()
}
}
#[cfg(test)]
@@ -1008,7 +1093,7 @@ mod tests {
tokio::spawn(async move {
client.set_bind_addrs(vec![]);
client_mgr.try_connect(client).await.unwrap();
client_mgr.try_direct_connect(client).await.unwrap();
});
server_mgr

View File

@@ -212,6 +212,11 @@ impl PeerMap {
}
}
pub async fn get_peer_default_conn_id(&self, peer_id: PeerId) -> Option<PeerConnId> {
self.get_peer_by_id(peer_id)
.map(|p| p.get_default_conn_id())
}
pub async fn close_peer_conn(
&self,
peer_id: PeerId,

View File

@@ -1450,9 +1450,6 @@ impl PeerRouteServiceImpl {
let my_peer_id = self.my_peer_id;
let (peer_infos, conn_bitmap, foreign_network) = self.build_sync_request(&session);
tracing::trace!(?foreign_network, "building sync_route request. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}",
my_peer_id, dst_peer_id, peer_infos, conn_bitmap, self.synced_route_info, session);
if peer_infos.is_none()
&& conn_bitmap.is_none()
&& foreign_network.is_none()
@@ -1462,6 +1459,9 @@ impl PeerRouteServiceImpl {
return true;
}
tracing::debug!(?foreign_network, "sync_route request need send to peer. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}",
my_peer_id, dst_peer_id, peer_infos, conn_bitmap, self.synced_route_info, session);
session
.need_sync_initiator_info
.store(false, Ordering::Relaxed);
@@ -1728,7 +1728,6 @@ impl RouteSessionManager {
Ok(session)
}
#[tracing::instrument(skip(self))]
async fn maintain_sessions(&self, service_impl: Arc<PeerRouteServiceImpl>) -> bool {
let mut cur_dst_peer_id_to_initiate = None;
let mut next_sleep_ms = 0;
@@ -1764,8 +1763,6 @@ impl RouteSessionManager {
.map(|x| *x)
.collect::<Vec<_>>();
tracing::trace!(?service_impl.my_peer_id, ?peers, ?session_peers, ?initiator_candidates, "maintain_sessions begin");
if initiator_candidates.is_empty() {
next_sleep_ms = 1000;
continue;

View File

@@ -83,12 +83,6 @@ where
}
}
tracing::debug!(
?peers_to_connect,
?to_remove,
"got peers to connect and remove"
);
for key in to_remove {
if let Some((_, task)) = peer_task_map.remove(&key) {
task.abort();
@@ -115,7 +109,6 @@ where
.insert(item.clone(), launcher.launch_task(&data, item).await.into());
}
} else if peer_task_map.is_empty() {
tracing::debug!("all task done");
launcher.all_task_done(&data).await;
}

View File

@@ -32,12 +32,23 @@ impl PeerManagerRpcService {
.await
.iter(),
);
let peer_map = self.peer_manager.get_peer_map();
let mut peer_infos = Vec::new();
for peer in peers {
let mut peer_info = PeerInfo::default();
peer_info.peer_id = peer;
peer_info.default_conn_id = peer_map
.get_peer_default_conn_id(peer)
.await
.map(Into::into);
peer_info.directly_connected_conns = self
.peer_manager
.get_directly_connections(peer)
.into_iter()
.map(Into::into)
.collect();
if let Some(conns) = self.peer_manager.get_peer_map().list_peer_conns(peer).await {
if let Some(conns) = peer_map.list_peer_conns(peer).await {
peer_info.conns = conns;
} else if let Some(conns) = self
.peer_manager
@@ -121,7 +132,7 @@ impl PeerManageRpc for PeerManagerRpcService {
_request: ShowNodeInfoRequest, // Accept request of type HelloRequest
) -> Result<ShowNodeInfoResponse, rpc_types::error::Error> {
Ok(ShowNodeInfoResponse {
node_info: Some(self.peer_manager.get_my_info()),
node_info: Some(self.peer_manager.get_my_info().await),
})
}
}

View File

@@ -45,7 +45,7 @@ pub async fn connect_peer_manager(client: Arc<PeerManager>, server: Arc<PeerMana
});
let b_mgr_copy = server.clone();
tokio::spawn(async move {
b_mgr_copy.add_tunnel_as_server(b_ring).await.unwrap();
b_mgr_copy.add_tunnel_as_server(b_ring, true).await.unwrap();
});
}

View File

@@ -1,6 +1,7 @@
syntax = "proto3";
import "common.proto";
import "peer_rpc.proto";
package cli;
@@ -34,6 +35,8 @@ message PeerConnInfo {
message PeerInfo {
uint32 peer_id = 1;
repeated PeerConnInfo conns = 2;
common.UUID default_conn_id = 3;
repeated common.UUID directly_connected_conns = 4;
}
message ListPeerRequest {}
@@ -79,6 +82,7 @@ message NodeInfo {
string config = 8;
string version = 9;
common.PeerFeatureFlag feature_flag = 10;
peer_rpc.GetIpListResponse ip_list = 11;
}
message ShowNodeInfoRequest {}
@@ -193,6 +197,10 @@ enum TcpProxyEntryState {
Connected = 3;
// connection closed
Closed = 4;
// closing src
ClosingSrc = 5;
// closing dst
ClosingDst = 6;
}
message TcpProxyEntry {

View File

@@ -4,10 +4,14 @@ impl PeerRoutePair {
pub fn get_latency_ms(&self) -> Option<f64> {
let mut ret = u64::MAX;
let p = self.peer.as_ref()?;
let default_conn_id = p.default_conn_id.map(|id| id.to_string());
for conn in p.conns.iter() {
let Some(stats) = &conn.stats else {
continue;
};
if default_conn_id == Some(conn.conn_id.to_string()) {
return Some(f64::from(stats.latency_us as u32) / 1000.0);
}
ret = ret.min(stats.latency_us);
}

View File

@@ -18,7 +18,7 @@ message FlagsInConfig {
bool disable_p2p = 11;
bool relay_all_peer_rpc = 12;
bool disable_udp_hole_punching = 13;
string ipv6_listener = 14;
// string ipv6_listener = 14; [deprecated = true]; use -l udp://[::]:12345 instead
bool multi_thread = 15;
CompressionAlgoPb data_compress_algo = 16;
bool bind_device = 17;
@@ -29,6 +29,7 @@ message FlagsInConfig {
bool disable_kcp_input = 19;
// allow relay kcp packets (for public server, this can reduce the throughput)
bool disable_relay_kcp = 20;
bool proxy_forward_by_system = 21;
}
message RpcDescriptor {

View File

@@ -47,6 +47,19 @@ message NetworkConfig {
optional bool disable_p2p = 24;
optional bool bind_device = 25;
optional bool no_tun = 26;
optional bool enable_exit_node = 27;
optional bool relay_all_peer_rpc = 28;
optional bool multi_thread = 29;
optional bool enable_relay_network_whitelist = 30;
repeated string relay_network_whitelist = 31;
optional bool enable_manual_routes = 32;
repeated string routes = 33;
repeated string exit_nodes = 34;
optional bool proxy_forward_by_system = 35;
optional bool disable_encryption = 36;
optional bool enable_socks5 = 37;
optional int32 socks5_port = 38;
}
message MyNodeInfo {

View File

@@ -499,17 +499,20 @@ pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str
let task = tokio::spawn(async move {
for _ in 1..=2 {
tokio::time::sleep(tokio::time::Duration::from_secs(8)).await;
// inst4 should be in inst1's route list
let routes = insts[0].get_peer_manager().list_routes().await;
assert!(
routes
.iter()
.find(|r| r.peer_id == inst4.peer_id())
.is_some(),
"inst4 should be in inst1's route list, {:?}",
routes
);
wait_for_condition(
|| async {
insts[0]
.get_peer_manager()
.list_routes()
.await
.iter()
.find(|r| r.peer_id == inst4.peer_id())
.is_some()
},
Duration::from_secs(8),
)
.await;
set_link_status("net_d", false);
let _t = ScopedTask::from(tokio::spawn(async move {
@@ -520,14 +523,25 @@ pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str
}));
wait_for_condition(
|| async {
insts[2]
let ret = insts[2]
.get_peer_manager()
.get_peer_map()
.list_peers_with_conn()
.await
.iter()
.find(|r| **r == inst4.peer_id())
.is_none()
.is_none();
if !ret {
println!(
"conn info: {:?}",
insts[2]
.get_peer_manager()
.get_peer_map()
.list_peer_conns(inst4.peer_id())
.await
);
}
ret
},
// 0 down, assume last packet is recv in -0.01
// [2, 7) send ping

View File

@@ -360,7 +360,13 @@ pub(crate) fn setup_sokcet2_ext(
socket2_socket.set_nonblocking(true)?;
socket2_socket.set_reuse_address(true)?;
socket2_socket.bind(&socket2::SockAddr::from(*bind_addr))?;
if let Err(e) = socket2_socket.bind(&socket2::SockAddr::from(*bind_addr)) {
if bind_addr.is_ipv4() {
return Err(e.into());
} else {
tracing::warn!(?e, "bind failed, do not return error for ipv6");
}
}
// #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
// socket2_socket.set_reuse_port(true)?;

View File

@@ -22,6 +22,9 @@ pub mod stats;
pub mod tcp;
pub mod udp;
pub const PROTO_PORT_OFFSET: &[(&str, u16)] =
&[("tcp", 0), ("udp", 0), ("wg", 1), ("ws", 1), ("wss", 2)];
#[cfg(feature = "wireguard")]
pub mod wireguard;
@@ -123,7 +126,7 @@ pub trait TunnelListener: Send {
}
#[async_trait]
#[auto_impl::auto_impl(Box)]
#[auto_impl::auto_impl(Box, &mut)]
pub trait TunnelConnector: Send {
async fn connect(&mut self) -> Result<Box<dyn Tunnel>, TunnelError>;
fn remote_url(&self) -> url::Url;
@@ -201,9 +204,21 @@ where
Ok(T::from_url(url.clone(), IpVersion::Both)?)
}
fn default_port(scheme: &str) -> Option<u16> {
match scheme {
"tcp" => Some(11010),
"udp" => Some(11010),
"ws" => Some(11011),
"wss" => Some(11012),
"quic" => Some(11012),
"wg" => Some(11011),
_ => None,
}
}
impl FromUrl for SocketAddr {
fn from_url(url: url::Url, ip_version: IpVersion) -> Result<Self, TunnelError> {
let addrs = url.socket_addrs(|| None)?;
let addrs = url.socket_addrs(|| default_port(url.scheme()))?;
tracing::debug!(?addrs, ?ip_version, ?url, "convert url to socket addrs");
let addrs = addrs
.into_iter()

View File

@@ -150,9 +150,9 @@ impl TcpTunnelConnector {
&mut self,
addr: SocketAddr,
) -> Result<Box<dyn Tunnel>, super::TunnelError> {
tracing::info!(addr = ?self.addr, "connect tcp start");
tracing::info!(url = ?self.addr, ?addr, "connect tcp start, bind addrs: {:?}", self.bind_addrs);
let stream = TcpStream::connect(addr).await?;
tracing::info!(addr = ?self.addr, "connect tcp succ");
tracing::info!(url = ?self.addr, ?addr, "connect tcp succ");
return get_tunnel_with_tcp_stream(stream, self.addr.clone().into());
}
@@ -190,7 +190,7 @@ impl super::TunnelConnector for TcpTunnelConnector {
async fn connect(&mut self) -> Result<Box<dyn Tunnel>, super::TunnelError> {
let addr =
check_scheme_and_get_socket_addr_ext::<SocketAddr>(&self.addr, "tcp", self.ip_version)?;
if self.bind_addrs.is_empty() || addr.is_ipv6() {
if self.bind_addrs.is_empty() {
self.connect_with_default_bind(addr).await
} else {
self.connect_with_custom_bind(addr).await

View File

@@ -141,12 +141,27 @@ async fn respond_stun_packet(
.encode_into_bytes(resp_msg.clone())
.map_err(|e| anyhow::anyhow!("stun encode error: {:?}", e))?;
socket
.send_to(&rsp_buf, addr.clone())
.await
.with_context(|| "send stun response error")?;
let change_req = req_msg
.get_attribute::<ChangeRequest>()
.map(|r| r.ip() || r.port())
.unwrap_or(false);
tracing::debug!(?addr, ?req_msg, "udp respond stun packet done");
if !change_req {
socket
.send_to(&rsp_buf, addr.clone())
.await
.with_context(|| "send stun response error")?;
} else {
// send from a new udp socket
let socket = if addr.is_ipv4() {
UdpSocket::bind("0.0.0.0:0").await?
} else {
UdpSocket::bind("[::]:0").await?
};
socket.send_to(&rsp_buf, addr.clone()).await?;
}
tracing::debug!(?addr, ?req_msg, ?change_req, "udp respond stun packet done");
Ok(())
}

View File

@@ -1,7 +1,6 @@
use std::{
net::{Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
};
use anyhow::Context;
@@ -10,7 +9,7 @@ use cidr::Ipv4Inet;
use dashmap::DashMap;
use futures::StreamExt;
use pnet::packet::ipv4::Ipv4Packet;
use tokio::{task::JoinSet, time::timeout};
use tokio::task::JoinSet;
use tracing::Level;
use crate::{
@@ -96,20 +95,16 @@ impl WireGuardImpl {
let mut map_key = None;
loop {
let msg = match timeout(Duration::from_secs(120), stream.next()).await {
Ok(Some(Ok(msg))) => msg,
Ok(Some(Err(err))) => {
let msg = match stream.next().await {
Some(Ok(msg)) => msg,
Some(Err(err)) => {
tracing::error!(?err, "Failed to receive from wg client");
break;
}
Ok(None) => {
None => {
tracing::info!("Wireguard client disconnected");
break;
}
Err(err) => {
tracing::error!(?err, "Timeout while receiving from wg client");
break;
}
};
assert_eq!(msg.packet_type(), ZCPacketType::WG);

42
pnpm-lock.yaml generated
View File

@@ -103,8 +103,8 @@ importers:
specifier: ^8.4.47
version: 8.4.47
tailwindcss:
specifier: ^3.4.13
version: 3.4.14
specifier: '=3.4.17'
version: 3.4.17
typescript:
specifier: ^5.6.2
version: 5.6.3
@@ -161,7 +161,7 @@ importers:
version: 4.2.1(vue@3.5.12(typescript@5.6.3))
tailwindcss-primeui:
specifier: ^0.3.4
version: 0.3.4(tailwindcss@3.4.14)
version: 0.3.4(tailwindcss@3.4.17)
vue:
specifier: ^3.5.12
version: 3.5.12(typescript@5.6.3)
@@ -182,8 +182,8 @@ importers:
specifier: ^8.4.47
version: 8.4.47
tailwindcss:
specifier: ^3.4.14
version: 3.4.14
specifier: '=3.4.17'
version: 3.4.17
typescript:
specifier: ~5.6.2
version: 5.6.3
@@ -225,7 +225,7 @@ importers:
version: 4.2.1(vue@3.5.12(typescript@5.6.3))
tailwindcss-primeui:
specifier: ^0.3.4
version: 0.3.4(tailwindcss@3.4.14)
version: 0.3.4(tailwindcss@3.4.17)
ts-md5:
specifier: ^1.3.1
version: 1.3.1
@@ -261,8 +261,8 @@ importers:
specifier: ^7.0.2
version: 7.0.2(postcss@8.4.47)
tailwindcss:
specifier: ^3.4.14
version: 3.4.14
specifier: '=3.4.17'
version: 3.4.17
typescript:
specifier: ~5.6.3
version: 5.6.3
@@ -2699,12 +2699,8 @@ packages:
resolution: {integrity: sha512-+bT2uH4E5LGE7h/n3evcS/sQlJXCpIp6ym8OWJ5eV6+67Dsql/LaaT7qJBAt2rzfoa/5QBGBhxDix1dMt2kQKQ==}
engines: {node: '>= 0.8.0'}
lilconfig@2.1.0:
resolution: {integrity: sha512-utWOt/GHzuUxnLKxB6dk81RoOeoNeHgbrXiuGk4yyF5qlRz+iIVWu56E2fqGHFrXz0QNUhLB/8nKqvRH66JKGQ==}
engines: {node: '>=10'}
lilconfig@3.1.2:
resolution: {integrity: sha512-eop+wDAvpItUys0FWkHIKeC9ybYrTGbU41U5K7+bttZZeohvnY7M9dZ5kB21GNWiFT2q1OoPTvncPCgSOVO5ow==}
lilconfig@3.1.3:
resolution: {integrity: sha512-/vlFKAoH5Cgt3Ie+JLhRbwOsCQePABiU3tJ1egGvyQ+33R/vcwM2Zl2QR/LzjsBeItPt3oSVXapn+m4nQDvpzw==}
engines: {node: '>=14'}
lines-and-columns@1.2.4:
@@ -3443,8 +3439,8 @@ packages:
peerDependencies:
tailwindcss: '>=3.1.0'
tailwindcss@3.4.14:
resolution: {integrity: sha512-IcSvOcTRcUtQQ7ILQL5quRDg7Xs93PdJEk1ZLbhhvJc7uj/OAhYOnruEiwnGgBvUtaUAJ8/mhSw1o8L2jCiENA==}
tailwindcss@3.4.17:
resolution: {integrity: sha512-w33E2aCvSDP0tW9RZuNXadXlkHXqFzSkQew/aIa2i/Sj8fThxwovwlXHSPXTbAHwEIhBFXAedUhP2tueAKP8Og==}
engines: {node: '>=14.0.0'}
hasBin: true
@@ -6445,9 +6441,7 @@ snapshots:
prelude-ls: 1.2.1
type-check: 0.4.0
lilconfig@2.1.0: {}
lilconfig@3.1.2: {}
lilconfig@3.1.3: {}
lines-and-columns@1.2.4: {}
@@ -7029,7 +7023,7 @@ snapshots:
postcss-load-config@4.0.2(postcss@8.4.47):
dependencies:
lilconfig: 3.1.2
lilconfig: 3.1.3
yaml: 2.6.0
optionalDependencies:
postcss: 8.4.47
@@ -7324,11 +7318,11 @@ snapshots:
'@pkgr/core': 0.1.1
tslib: 2.8.1
tailwindcss-primeui@0.3.4(tailwindcss@3.4.14):
tailwindcss-primeui@0.3.4(tailwindcss@3.4.17):
dependencies:
tailwindcss: 3.4.14
tailwindcss: 3.4.17
tailwindcss@3.4.14:
tailwindcss@3.4.17:
dependencies:
'@alloc/quick-lru': 5.2.0
arg: 5.0.2
@@ -7339,7 +7333,7 @@ snapshots:
glob-parent: 6.0.2
is-glob: 4.0.3
jiti: 1.21.6
lilconfig: 2.1.0
lilconfig: 3.1.3
micromatch: 4.0.8
normalize-path: 3.0.0
object-hash: 3.0.0