refactor(config): unify runtime configuration management via ConfigRpc (#1397)

* refactor(config): unify runtime configuration management via ConfigRpc
* feat(tests): add config patch test and fix problem
This commit is contained in:
Mg Pig
2025-10-01 00:32:28 +08:00
committed by GitHub
parent 4d91582fd8
commit 020bf04ec4
16 changed files with 772 additions and 280 deletions

View File

@@ -148,6 +148,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"src/proto/web.proto",
"src/proto/magic_dns.proto",
"src/proto/acl.proto",
"src/proto/config.proto",
];
for proto_file in proto_files.iter().chain(proto_files_reflect.iter()) {
@@ -162,6 +163,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute(".error", "#[derive(serde::Serialize, serde::Deserialize)]")
.type_attribute(".cli", "#[derive(serde::Serialize, serde::Deserialize)]")
.type_attribute(".web", "#[derive(serde::Serialize, serde::Deserialize)]")
.type_attribute(".config", "#[derive(serde::Serialize, serde::Deserialize)]")
.type_attribute(
"peer_rpc.GetIpListResponse",
"#[derive(serde::Serialize, serde::Deserialize)]",

View File

@@ -152,6 +152,7 @@ pub trait ConfigLoader: Send + Sync {
mapped_cidr: Option<cidr::Ipv4Cidr>,
) -> Result<(), anyhow::Error>;
fn remove_proxy_cidr(&self, cidr: cidr::Ipv4Cidr);
fn clear_proxy_cidrs(&self);
fn get_proxy_cidrs(&self) -> Vec<ProxyNetworkConfig>;
fn get_network_identity(&self) -> NetworkIdentity;
@@ -610,6 +611,11 @@ impl ConfigLoader for TomlConfigLoader {
}
}
fn clear_proxy_cidrs(&self) {
let mut locked_config = self.config.lock().unwrap();
locked_config.proxy_network = None;
}
fn get_proxy_cidrs(&self) -> Vec<ProxyNetworkConfig> {
self.config
.lock()

View File

@@ -12,6 +12,7 @@ use crate::peers::acl_filter::AclFilter;
use crate::proto::acl::GroupIdentity;
use crate::proto::cli::PeerConnInfo;
use crate::proto::common::{PeerFeatureFlag, PortForwardConfigPb};
use crate::proto::config::InstanceConfigPatch;
use crate::proto::peer_rpc::PeerGroupInfo;
use crossbeam::atomic::AtomicCell;
@@ -52,6 +53,8 @@ pub enum GlobalCtxEvent {
DhcpIpv4Conflicted(Option<cidr::Ipv4Inet>),
PortForwardAdded(PortForwardConfigPb),
ConfigPatched(InstanceConfigPatch),
}
pub type EventBus = tokio::sync::broadcast::Sender<GlobalCtxEvent>;

View File

@@ -21,29 +21,30 @@ use tokio::time::timeout;
use easytier::{
common::{
config::PortForwardConfig,
constants::EASYTIER_VERSION,
stun::{StunInfoCollector, StunInfoCollectorTrait},
},
peers,
proto::{
cli::{
list_peer_route_pair, AclManageRpc, AclManageRpcClientFactory, AddPortForwardRequest,
ConnectorManageRpc, ConnectorManageRpcClientFactory, DumpRouteRequest,
GetAclStatsRequest, GetLoggerConfigRequest, GetPrometheusStatsRequest, GetStatsRequest,
list_peer_route_pair, AclManageRpc, AclManageRpcClientFactory, ConnectorManageRpc,
ConnectorManageRpcClientFactory, DumpRouteRequest, GetAclStatsRequest,
GetLoggerConfigRequest, GetPrometheusStatsRequest, GetStatsRequest,
GetVpnPortalInfoRequest, GetWhitelistRequest, ListConnectorRequest,
ListForeignNetworkRequest, ListGlobalForeignNetworkRequest, ListMappedListenerRequest,
ListPeerRequest, ListPeerResponse, ListPortForwardRequest, ListRouteRequest,
ListRouteResponse, LogLevel, LoggerRpc, LoggerRpcClientFactory,
ManageMappedListenerRequest, MappedListenerManageAction, MappedListenerManageRpc,
MappedListenerManageRpcClientFactory, NodeInfo, PeerManageRpc,
MappedListenerManageRpc, MappedListenerManageRpcClientFactory, NodeInfo, PeerManageRpc,
PeerManageRpcClientFactory, PortForwardManageRpc, PortForwardManageRpcClientFactory,
RemovePortForwardRequest, SetLoggerConfigRequest, SetWhitelistRequest,
ShowNodeInfoRequest, StatsRpc, StatsRpcClientFactory, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyRpcClientFactory, VpnPortalRpc,
VpnPortalRpcClientFactory,
SetLoggerConfigRequest, ShowNodeInfoRequest, StatsRpc, StatsRpcClientFactory,
TcpProxyEntryState, TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyRpcClientFactory,
VpnPortalRpc, VpnPortalRpcClientFactory,
},
common::{NatType, PortForwardConfigPb, SocketType},
config::{
AclPatch, ConfigPatchAction, ConfigRpc, ConfigRpcClientFactory, InstanceConfigPatch,
PatchConfigRequest, PortForwardPatch, StringPatch, UrlPatch,
},
common::{NatType, SocketType},
peer_rpc::{GetGlobalPeerMapRequest, PeerCenterRpc, PeerCenterRpcClientFactory},
rpc_impl::standalone::StandAloneClient,
rpc_types::controller::BaseController,
@@ -476,6 +477,18 @@ impl CommandHandler<'_> {
.with_context(|| "failed to get logger client")?)
}
async fn get_config_client(
&self,
) -> Result<Box<dyn ConfigRpc<Controller = BaseController>>, Error> {
Ok(self
.client
.lock()
.await
.scoped_client::<ConfigRpcClientFactory<BaseController>>("".to_string())
.await
.with_context(|| "failed to get config client")?)
}
async fn list_peers(&self) -> Result<ListPeerResponse, Error> {
let client = self.get_peer_manager_client().await?;
let request = ListPeerRequest::default();
@@ -931,28 +944,24 @@ impl CommandHandler<'_> {
Ok(())
}
async fn handle_mapped_listener_add(&self, url: &str) -> Result<(), Error> {
async fn handle_mapped_listener_modify(
&self,
url: &str,
action: ConfigPatchAction,
) -> Result<(), Error> {
let url = Self::mapped_listener_validate_url(url)?;
let client = self.get_mapped_listener_manager_client().await?;
let request = ManageMappedListenerRequest {
action: MappedListenerManageAction::MappedListenerAdd as i32,
url: Some(url.into()),
let client = self.get_config_client().await?;
let request = PatchConfigRequest {
patch: Some(InstanceConfigPatch {
mapped_listeners: vec![UrlPatch {
action: action.into(),
url: Some(url.into()),
}],
..Default::default()
}),
};
let _response = client
.manage_mapped_listener(BaseController::default(), request)
.await?;
Ok(())
}
async fn handle_mapped_listener_remove(&self, url: &str) -> Result<(), Error> {
let url = Self::mapped_listener_validate_url(url)?;
let client = self.get_mapped_listener_manager_client().await?;
let request = ManageMappedListenerRequest {
action: MappedListenerManageAction::MappedListenerRemove as i32,
url: Some(url.into()),
};
let _response = client
.manage_mapped_listener(BaseController::default(), request)
.patch_config(BaseController::default(), request)
.await?;
Ok(())
}
@@ -969,47 +978,9 @@ impl CommandHandler<'_> {
Ok(url)
}
async fn handle_port_forward_add(
&self,
protocol: &str,
bind_addr: &str,
dst_addr: &str,
) -> Result<(), Error> {
let bind_addr: std::net::SocketAddr = bind_addr
.parse()
.with_context(|| format!("Invalid bind address: {}", bind_addr))?;
let dst_addr: std::net::SocketAddr = dst_addr
.parse()
.with_context(|| format!("Invalid destination address: {}", dst_addr))?;
if protocol != "tcp" && protocol != "udp" {
return Err(anyhow::anyhow!("Protocol must be 'tcp' or 'udp'"));
}
let client = self.get_port_forward_manager_client().await?;
let request = AddPortForwardRequest {
cfg: Some(
PortForwardConfig {
proto: protocol.to_string(),
bind_addr,
dst_addr,
}
.into(),
),
};
client
.add_port_forward(BaseController::default(), request)
.await?;
println!(
"Port forward rule added: {} {} -> {}",
protocol, bind_addr, dst_addr
);
Ok(())
}
async fn handle_port_forward_remove(
async fn handle_port_forward_modify(
&self,
action: ConfigPatchAction,
protocol: &str,
bind_addr: &str,
dst_addr: Option<&str>,
@@ -1018,28 +989,36 @@ impl CommandHandler<'_> {
.parse()
.with_context(|| format!("Invalid bind address: {}", bind_addr))?;
if protocol != "tcp" && protocol != "udp" {
return Err(anyhow::anyhow!("Protocol must be 'tcp' or 'udp'"));
}
let socket_type = match protocol {
"tcp" => SocketType::Tcp,
"udp" => SocketType::Udp,
_ => return Err(anyhow::anyhow!("Protocol must be 'tcp' or 'udp'")),
};
let client = self.get_port_forward_manager_client().await?;
let request = RemovePortForwardRequest {
cfg: Some(
PortForwardConfig {
proto: protocol.to_string(),
bind_addr,
dst_addr: dst_addr
.map(|s| s.parse::<SocketAddr>().unwrap())
.unwrap_or("0.0.0.0:0".parse::<SocketAddr>().unwrap()),
}
.into(),
),
let client = self.get_config_client().await?;
let request = PatchConfigRequest {
patch: Some(InstanceConfigPatch {
port_forwards: vec![PortForwardPatch {
action: action.into(),
cfg: Some(PortForwardConfigPb {
bind_addr: Some(bind_addr.into()),
dst_addr: dst_addr.map(|s| s.parse::<SocketAddr>().unwrap().into()),
socket_type: socket_type.into(),
}),
}],
..Default::default()
}),
};
client
.remove_port_forward(BaseController::default(), request)
.patch_config(BaseController::default(), request)
.await?;
println!("Port forward rule removed: {} {}", protocol, bind_addr);
println!(
"Port forward rule {}: {} {}",
action.as_str_name().to_lowercase(),
protocol,
bind_addr
);
Ok(())
}
@@ -1086,78 +1065,114 @@ impl CommandHandler<'_> {
}
async fn handle_whitelist_set_tcp(&self, ports: &str) -> Result<(), Error> {
let tcp_ports = Self::parse_port_list(ports)?;
let client = self.get_acl_manager_client().await?;
let mut whitelist = Self::parse_port_list(ports)?
.into_iter()
.map(|p| StringPatch {
action: ConfigPatchAction::Add.into(),
value: p,
})
.collect::<Vec<_>>();
whitelist.insert(
0,
StringPatch {
action: ConfigPatchAction::Clear.into(),
value: "".to_string(),
},
);
let client = self.get_config_client().await?;
// Get current UDP ports to preserve them
let current = client
.get_whitelist(BaseController::default(), GetWhitelistRequest::default())
.await?;
let request = SetWhitelistRequest {
tcp_ports,
udp_ports: current.udp_ports,
let request = PatchConfigRequest {
patch: Some(InstanceConfigPatch {
acl: Some(AclPatch {
tcp_whitelist: whitelist,
..Default::default()
}),
..Default::default()
}),
};
client
.set_whitelist(BaseController::default(), request)
.patch_config(BaseController::default(), request)
.await?;
println!("TCP whitelist updated: {}", ports);
Ok(())
}
async fn handle_whitelist_set_udp(&self, ports: &str) -> Result<(), Error> {
let udp_ports = Self::parse_port_list(ports)?;
let client = self.get_acl_manager_client().await?;
let mut whitelist = Self::parse_port_list(ports)?
.into_iter()
.map(|p| StringPatch {
action: ConfigPatchAction::Add.into(),
value: p,
})
.collect::<Vec<_>>();
whitelist.insert(
0,
StringPatch {
action: ConfigPatchAction::Clear.into(),
value: "".to_string(),
},
);
let client = self.get_config_client().await?;
// Get current TCP ports to preserve them
let current = client
.get_whitelist(BaseController::default(), GetWhitelistRequest::default())
.await?;
let request = SetWhitelistRequest {
tcp_ports: current.tcp_ports,
udp_ports,
let request = PatchConfigRequest {
patch: Some(InstanceConfigPatch {
acl: Some(AclPatch {
udp_whitelist: whitelist,
..Default::default()
}),
..Default::default()
}),
};
client
.set_whitelist(BaseController::default(), request)
.patch_config(BaseController::default(), request)
.await?;
println!("UDP whitelist updated: {}", ports);
Ok(())
}
async fn handle_whitelist_clear_tcp(&self) -> Result<(), Error> {
let client = self.get_acl_manager_client().await?;
let client = self.get_config_client().await?;
// Get current UDP ports to preserve them
let current = client
.get_whitelist(BaseController::default(), GetWhitelistRequest::default())
.await?;
let request = SetWhitelistRequest {
tcp_ports: vec![],
udp_ports: current.udp_ports,
let request = PatchConfigRequest {
patch: Some(InstanceConfigPatch {
acl: Some(AclPatch {
tcp_whitelist: vec![StringPatch {
action: ConfigPatchAction::Clear.into(),
value: "".to_string(),
}],
..Default::default()
}),
..Default::default()
}),
};
client
.set_whitelist(BaseController::default(), request)
.patch_config(BaseController::default(), request)
.await?;
println!("TCP whitelist cleared");
Ok(())
}
async fn handle_whitelist_clear_udp(&self) -> Result<(), Error> {
let client = self.get_acl_manager_client().await?;
let client = self.get_config_client().await?;
// Get current TCP ports to preserve them
let current = client
.get_whitelist(BaseController::default(), GetWhitelistRequest::default())
.await?;
let request = SetWhitelistRequest {
tcp_ports: current.tcp_ports,
udp_ports: vec![],
let request = PatchConfigRequest {
patch: Some(InstanceConfigPatch {
acl: Some(AclPatch {
udp_whitelist: vec![StringPatch {
action: ConfigPatchAction::Clear.into(),
value: "".to_string(),
}],
..Default::default()
}),
..Default::default()
}),
};
client
.set_whitelist(BaseController::default(), request)
.patch_config(BaseController::default(), request)
.await?;
println!("UDP whitelist cleared");
Ok(())
@@ -1626,11 +1641,15 @@ async fn main() -> Result<(), Error> {
SubCommand::MappedListener(mapped_listener_args) => {
match mapped_listener_args.sub_command {
Some(MappedListenerSubCommand::Add { url }) => {
handler.handle_mapped_listener_add(&url).await?;
handler
.handle_mapped_listener_modify(&url, ConfigPatchAction::Add)
.await?;
println!("add mapped listener: {url}");
}
Some(MappedListenerSubCommand::Remove { url }) => {
handler.handle_mapped_listener_remove(&url).await?;
handler
.handle_mapped_listener_modify(&url, ConfigPatchAction::Remove)
.await?;
println!("remove mapped listener: {url}");
}
Some(MappedListenerSubCommand::List) | None => {
@@ -1993,7 +2012,12 @@ async fn main() -> Result<(), Error> {
dst_addr,
}) => {
handler
.handle_port_forward_add(protocol, bind_addr, dst_addr)
.handle_port_forward_modify(
ConfigPatchAction::Add,
protocol,
bind_addr,
Some(dst_addr),
)
.await?;
}
Some(PortForwardSubCommand::Remove {
@@ -2002,7 +2026,12 @@ async fn main() -> Result<(), Error> {
dst_addr,
}) => {
handler
.handle_port_forward_remove(protocol, bind_addr, dst_addr.as_deref())
.handle_port_forward_modify(
ConfigPatchAction::Remove,
protocol,
bind_addr,
dst_addr.as_deref(),
)
.await?;
}
Some(PortForwardSubCommand::List) | None => {

View File

@@ -34,15 +34,16 @@ use crate::peers::rpc_service::PeerManagerRpcService;
use crate::peers::{create_packet_recv_chan, recv_packet_from_chan, PacketRecvChanReceiver};
use crate::proto::cli::VpnPortalRpc;
use crate::proto::cli::{
AddPortForwardRequest, AddPortForwardResponse, GetPrometheusStatsRequest,
GetPrometheusStatsResponse, GetStatsRequest, GetStatsResponse, ListMappedListenerRequest,
ListMappedListenerResponse, ListPortForwardRequest, ListPortForwardResponse,
ManageMappedListenerRequest, ManageMappedListenerResponse, MappedListener,
MappedListenerManageAction, MappedListenerManageRpc, MetricSnapshot, PortForwardManageRpc,
RemovePortForwardRequest, RemovePortForwardResponse, StatsRpc,
GetPrometheusStatsRequest, GetPrometheusStatsResponse, GetStatsRequest, GetStatsResponse,
ListMappedListenerRequest, ListMappedListenerResponse, ListPortForwardRequest,
ListPortForwardResponse, MappedListener, MappedListenerManageRpc, MetricSnapshot,
PortForwardManageRpc, StatsRpc,
};
use crate::proto::cli::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo};
use crate::proto::common::{PortForwardConfigPb, TunnelInfo};
use crate::proto::common::{PortForwardConfigPb, TunnelInfo, Void};
use crate::proto::config::{
ConfigPatchAction, ConfigRpc, ConfigRpcServer, PatchConfigRequest, PortForwardPatch,
};
use crate::proto::peer_rpc::PeerCenterRpcServer;
use crate::proto::rpc_impl::standalone::{RpcServerHook, StandAloneServer};
use crate::proto::rpc_types;
@@ -226,6 +227,234 @@ impl RpcServerHook for InstanceRpcServerHook {
}
}
#[derive(Clone)]
pub struct InstanceConfigPatcher {
global_ctx: ArcGlobalCtx,
socks5_server: Weak<Socks5Server>,
peer_manager: Arc<PeerManager>,
}
impl InstanceConfigPatcher {
pub async fn apply_patch(
&self,
patch: crate::proto::config::InstanceConfigPatch,
) -> Result<(), anyhow::Error> {
let patch_for_event = patch.clone();
self.patch_port_forwards(patch.port_forwards).await?;
self.patch_acl(patch.acl).await?;
self.patch_proxy_networks(patch.proxy_networks).await?;
self.patch_routes(patch.routes).await?;
self.patch_exit_nodes(patch.exit_nodes).await?;
self.patch_mapped_listeners(patch.mapped_listeners).await?;
if let Some(hostname) = patch.hostname {
self.global_ctx.set_hostname(hostname.clone());
self.global_ctx.config.set_hostname(Some(hostname));
}
if let Some(ipv4) = patch.ipv4 {
if !self.global_ctx.config.get_dhcp() {
self.global_ctx.set_ipv4(Some(ipv4.into()));
self.global_ctx.config.set_ipv4(Some(ipv4.into()));
}
}
if let Some(ipv6) = patch.ipv6 {
self.global_ctx.set_ipv6(Some(ipv6.into()));
self.global_ctx.config.set_ipv6(Some(ipv6.into()));
}
self.global_ctx
.issue_event(GlobalCtxEvent::ConfigPatched(patch_for_event));
Ok(())
}
fn trace_patchables<T: std::fmt::Debug>(patches: &Vec<crate::proto::config::Patchable<T>>) {
for patch in patches {
match patch.action {
Some(ConfigPatchAction::Add) | Some(ConfigPatchAction::Remove) => {
if let Some(value) = &patch.value {
tracing::info!("{:?} {:?}", patch.action, value);
} else {
tracing::warn!(
"Ignored {:?} patch with no value for type '{}'. Please ensure the patch value is provided.",
patch.action,
std::any::type_name::<T>()
);
}
}
Some(ConfigPatchAction::Clear) => {
tracing::info!("Clear all for type '{}'", std::any::type_name::<T>());
}
None => {
tracing::warn!(
"Invalid patch action for type '{}'",
std::any::type_name::<T>()
);
}
}
}
}
async fn patch_port_forwards(
&self,
port_forwards: Vec<PortForwardPatch>,
) -> Result<(), anyhow::Error> {
if port_forwards.is_empty() {
return Ok(());
}
let Some(socks5_server) = self.socks5_server.upgrade() else {
return Err(anyhow::anyhow!("socks5 server not available"));
};
let mut current_forwards = self.global_ctx.config.get_port_forwards();
let patches = port_forwards.into_iter().map(Into::into).collect();
InstanceConfigPatcher::trace_patchables(&patches);
crate::proto::config::patch_vec(&mut current_forwards, patches);
self.global_ctx
.config
.set_port_forwards(current_forwards.clone());
socks5_server
.reload_port_forwards(&current_forwards)
.await
.with_context(|| "Failed to reload port forwards")?;
Ok(())
}
async fn patch_acl(
&self,
acl_patch: Option<crate::proto::config::AclPatch>,
) -> Result<(), anyhow::Error> {
let Some(acl_patch) = acl_patch else {
return Ok(());
};
if let Some(acl) = acl_patch.acl {
self.global_ctx.config.set_acl(Some(acl));
}
if !acl_patch.tcp_whitelist.is_empty() {
let mut current_whitelist = self.global_ctx.config.get_tcp_whitelist();
let patches = acl_patch
.tcp_whitelist
.into_iter()
.map(Into::into)
.collect();
InstanceConfigPatcher::trace_patchables(&patches);
crate::proto::config::patch_vec(&mut current_whitelist, patches);
self.global_ctx.config.set_tcp_whitelist(current_whitelist);
}
if !acl_patch.udp_whitelist.is_empty() {
let mut current_whitelist = self.global_ctx.config.get_udp_whitelist();
let patches = acl_patch
.udp_whitelist
.into_iter()
.map(Into::into)
.collect();
InstanceConfigPatcher::trace_patchables(&patches);
crate::proto::config::patch_vec(&mut current_whitelist, patches);
self.global_ctx.config.set_udp_whitelist(current_whitelist);
}
self.global_ctx
.get_acl_filter()
.reload_rules(AclRuleBuilder::build(&self.global_ctx)?.as_ref());
Ok(())
}
async fn patch_proxy_networks(
&self,
proxy_networks: Vec<crate::proto::config::ProxyNetworkPatch>,
) -> Result<(), anyhow::Error> {
if proxy_networks.is_empty() {
return Ok(());
}
for proxy_network_patch in proxy_networks {
let Some(cidr) = proxy_network_patch.cidr.map(|c| c.into()) else {
tracing::warn!("Proxy network cidr is None, skipping.");
continue;
};
let mapped_cidr: Option<cidr::Ipv4Cidr> =
proxy_network_patch.mapped_cidr.map(|s| s.into());
match ConfigPatchAction::try_from(proxy_network_patch.action) {
Ok(ConfigPatchAction::Add) => {
tracing::info!("Proxy network added: {}", cidr);
self.global_ctx.config.add_proxy_cidr(cidr, mapped_cidr)?;
}
Ok(ConfigPatchAction::Remove) => {
tracing::info!("Proxy network removed: {}", cidr);
self.global_ctx.config.remove_proxy_cidr(cidr);
}
Ok(ConfigPatchAction::Clear) => {
tracing::info!("Proxy networks cleared.");
self.global_ctx.config.clear_proxy_cidrs();
}
Err(_) => {
tracing::warn!(
"Invalid proxy network action: {}",
proxy_network_patch.action
);
}
}
}
Ok(())
}
async fn patch_routes(
&self,
routes: Vec<crate::proto::config::RoutePatch>,
) -> Result<(), anyhow::Error> {
if routes.is_empty() {
return Ok(());
}
let mut current_routes = self.global_ctx.config.get_routes().unwrap_or_default();
let patches = routes.into_iter().map(Into::into).collect();
InstanceConfigPatcher::trace_patchables(&patches);
crate::proto::config::patch_vec(&mut current_routes, patches);
if current_routes.is_empty() {
self.global_ctx.config.set_routes(None);
} else {
self.global_ctx.config.set_routes(Some(current_routes));
}
Ok(())
}
async fn patch_exit_nodes(
&self,
exit_nodes: Vec<crate::proto::config::ExitNodePatch>,
) -> Result<(), anyhow::Error> {
if exit_nodes.is_empty() {
return Ok(());
}
let mut current_exit_nodes = self.global_ctx.config.get_exit_nodes();
let patches = exit_nodes.into_iter().map(Into::into).collect();
InstanceConfigPatcher::trace_patchables(&patches);
crate::proto::config::patch_vec(&mut current_exit_nodes, patches);
self.global_ctx.config.set_exit_nodes(current_exit_nodes);
self.peer_manager.update_exit_nodes().await;
Ok(())
}
async fn patch_mapped_listeners(
&self,
mapped_listeners: Vec<crate::proto::config::UrlPatch>,
) -> Result<(), anyhow::Error> {
if mapped_listeners.is_empty() {
return Ok(());
}
let mut current_mapped_listeners = self.global_ctx.config.get_mapped_listeners();
let patches = mapped_listeners.into_iter().map(Into::into).collect();
InstanceConfigPatcher::trace_patchables(&patches);
crate::proto::config::patch_vec(&mut current_mapped_listeners, patches);
if current_mapped_listeners.is_empty() {
self.global_ctx.config.set_mapped_listeners(None);
} else {
self.global_ctx
.config
.set_mapped_listeners(Some(current_mapped_listeners));
}
Ok(())
}
}
pub struct Instance {
inst_name: String,
@@ -827,25 +1056,6 @@ impl Instance {
ret.mappedlisteners = mapped_listeners;
Ok(ret)
}
async fn manage_mapped_listener(
&self,
_: BaseController,
req: ManageMappedListenerRequest,
) -> Result<ManageMappedListenerResponse, rpc_types::error::Error> {
let url: url::Url = req.url.ok_or(anyhow::anyhow!("url is empty"))?.into();
let urls = self.0.config.get_mapped_listeners();
let mut set_urls: HashSet<url::Url> = urls.into_iter().collect();
if req.action == MappedListenerManageAction::MappedListenerRemove as i32 {
set_urls.remove(&url);
} else if req.action == MappedListenerManageAction::MappedListenerAdd as i32 {
set_urls.insert(url);
}
let urls: Vec<url::Url> = set_urls.into_iter().collect();
self.0.config.set_mapped_listeners(Some(urls));
Ok(ManageMappedListenerResponse::default())
}
}
MappedListenerManagerRpcService(self.global_ctx.clone())
@@ -864,55 +1074,6 @@ impl Instance {
impl PortForwardManageRpc for PortForwardManagerRpcService {
type Controller = BaseController;
async fn add_port_forward(
&self,
_: BaseController,
request: AddPortForwardRequest,
) -> Result<AddPortForwardResponse, rpc_types::error::Error> {
let Some(socks5_server) = self.socks5_server.upgrade() else {
return Err(anyhow::anyhow!("socks5 server not available").into());
};
if let Some(cfg) = request.cfg {
tracing::info!("Port forward rule added: {:?}", cfg);
let mut current_forwards = self.global_ctx.config.get_port_forwards();
current_forwards.push(cfg.into());
self.global_ctx
.config
.set_port_forwards(current_forwards.clone());
socks5_server
.reload_port_forwards(&current_forwards)
.await
.with_context(|| "Failed to reload port forwards")?;
}
Ok(AddPortForwardResponse {})
}
async fn remove_port_forward(
&self,
_: BaseController,
request: RemovePortForwardRequest,
) -> Result<RemovePortForwardResponse, rpc_types::error::Error> {
let Some(socks5_server) = self.socks5_server.upgrade() else {
return Err(anyhow::anyhow!("socks5 server not available").into());
};
let Some(cfg) = request.cfg else {
return Err(anyhow::anyhow!("port forward config is empty").into());
};
let cfg = cfg.into();
let mut current_forwards = self.global_ctx.config.get_port_forwards();
current_forwards.retain(|e| *e != cfg);
self.global_ctx
.config
.set_port_forwards(current_forwards.clone());
socks5_server
.reload_port_forwards(&current_forwards)
.await
.with_context(|| "Failed to reload port forwards")?;
tracing::info!("Port forward rule removed: {:?}", cfg);
Ok(RemovePortForwardResponse {})
}
async fn list_port_forward(
&self,
_: BaseController,
@@ -984,6 +1145,43 @@ impl Instance {
}
}
pub fn get_config_patcher(&self) -> InstanceConfigPatcher {
InstanceConfigPatcher {
global_ctx: self.global_ctx.clone(),
socks5_server: Arc::downgrade(&self.socks5_server),
peer_manager: self.peer_manager.clone(),
}
}
fn get_config_service(&self) -> impl ConfigRpc<Controller = BaseController> + Clone {
#[derive(Clone)]
pub struct ConfigRpcService {
patcher: InstanceConfigPatcher,
}
#[async_trait::async_trait]
impl ConfigRpc for ConfigRpcService {
type Controller = BaseController;
async fn patch_config(
&self,
_: Self::Controller,
request: PatchConfigRequest,
) -> crate::proto::rpc_types::error::Result<Void> {
let Some(patch) = request.patch else {
return Ok(Void::default());
};
self.patcher.apply_patch(patch).await?;
Ok(Void::default())
}
}
ConfigRpcService {
patcher: self.get_config_patcher(),
}
}
async fn run_rpc_server(&mut self) -> Result<(), Error> {
let Some(_) = self.global_ctx.config.get_rpc_portal() else {
tracing::info!("rpc server not enabled, because rpc_portal is not set.");
@@ -1001,6 +1199,7 @@ impl Instance {
let port_forward_manager_rpc = self.get_port_forward_manager_rpc_service();
let stats_rpc_service = self.get_stats_rpc_service();
let logger_rpc_service = LoggerRpcService::new();
let config_rpc_service = self.get_config_service();
let s = self.rpc_server.as_mut().unwrap();
let peer_mgr_rpc_service = PeerManagerRpcService::new(peer_mgr.clone());
@@ -1031,6 +1230,8 @@ impl Instance {
);
s.registry()
.register(LoggerRpcServer::new(logger_rpc_service), "");
s.registry()
.register(ConfigRpcServer::new(config_rpc_service), "");
if let Some(ip_proxy) = self.ip_proxy.as_ref() {
s.registry().register(

View File

@@ -319,6 +319,10 @@ fn handle_event(
),
);
}
GlobalCtxEvent::ConfigPatched(patch) => {
print_event(instance_id, format!("config patched. patch: {:?}", patch));
}
}
} else {
events = events.resubscribe();

View File

@@ -152,7 +152,7 @@ pub struct PeerManager {
encryptor: Arc<dyn Encryptor + 'static>,
data_compress_algo: CompressorAlgo,
exit_nodes: Vec<IpAddr>,
exit_nodes: RwLock<Vec<IpAddr>>,
reserved_my_peer_id_map: DashMap<String, PeerId>,
@@ -304,7 +304,7 @@ impl PeerManager {
encryptor,
data_compress_algo,
exit_nodes,
exit_nodes: RwLock::new(exit_nodes),
reserved_my_peer_id_map: DashMap::new(),
@@ -1068,7 +1068,7 @@ impl PeerManager {
.global_ctx
.is_ip_in_same_network(&std::net::IpAddr::V4(*ipv4_addr))
{
for exit_node in &self.exit_nodes {
for exit_node in self.exit_nodes.read().await.iter() {
let IpAddr::V4(exit_node) = exit_node else {
continue;
};
@@ -1109,7 +1109,7 @@ impl PeerManager {
dst_peers.push(peer_id);
} else if !ipv6_addr.is_unicast_link_local() {
// NOTE: never route link local address to exit node.
for exit_node in &self.exit_nodes {
for exit_node in self.exit_nodes.read().await.iter() {
let IpAddr::V6(exit_node) = exit_node else {
continue;
};
@@ -1419,6 +1419,11 @@ impl PeerManager {
true
}
pub async fn update_exit_nodes(&self) {
let exit_nodes = self.global_ctx.config.get_exit_nodes();
*self.exit_nodes.write().await = exit_nodes;
}
}
#[cfg(test)]

View File

@@ -1,18 +1,14 @@
use std::sync::Arc;
use crate::{
common::acl_processor::AclRuleBuilder,
proto::{
cli::{
AclManageRpc, DumpRouteRequest, DumpRouteResponse, GetAclStatsRequest,
GetAclStatsResponse, GetWhitelistRequest, GetWhitelistResponse,
ListForeignNetworkRequest, ListForeignNetworkResponse, ListGlobalForeignNetworkRequest,
ListGlobalForeignNetworkResponse, ListPeerRequest, ListPeerResponse, ListRouteRequest,
ListRouteResponse, PeerInfo, PeerManageRpc, SetWhitelistRequest, SetWhitelistResponse,
ShowNodeInfoRequest, ShowNodeInfoResponse,
},
rpc_types::{self, controller::BaseController},
use crate::proto::{
cli::{
AclManageRpc, DumpRouteRequest, DumpRouteResponse, GetAclStatsRequest, GetAclStatsResponse,
GetWhitelistRequest, GetWhitelistResponse, ListForeignNetworkRequest,
ListForeignNetworkResponse, ListGlobalForeignNetworkRequest,
ListGlobalForeignNetworkResponse, ListPeerRequest, ListPeerResponse, ListRouteRequest,
ListRouteResponse, PeerInfo, PeerManageRpc, ShowNodeInfoRequest, ShowNodeInfoResponse,
},
rpc_types::{self, controller::BaseController},
};
use super::peer_manager::PeerManager;
@@ -163,28 +159,6 @@ impl AclManageRpc for PeerManagerRpcService {
})
}
async fn set_whitelist(
&self,
_: BaseController,
request: SetWhitelistRequest,
) -> Result<SetWhitelistResponse, rpc_types::error::Error> {
tracing::info!(
"Setting whitelist - TCP: {:?}, UDP: {:?}",
request.tcp_ports,
request.udp_ports
);
let global_ctx = self.peer_manager.get_global_ctx();
global_ctx.config.set_tcp_whitelist(request.tcp_ports);
global_ctx.config.set_udp_whitelist(request.udp_ports);
global_ctx
.get_acl_filter()
.reload_rules(AclRuleBuilder::build(&global_ctx)?.as_ref());
Ok(SetWhitelistResponse {})
}
async fn get_whitelist(
&self,
_: BaseController,

View File

@@ -181,21 +181,8 @@ message ListMappedListenerRequest {}
message ListMappedListenerResponse { repeated MappedListener mappedlisteners = 1; }
enum MappedListenerManageAction {
MAPPED_LISTENER_ADD = 0;
MAPPED_LISTENER_REMOVE = 1;
}
message ManageMappedListenerRequest {
MappedListenerManageAction action = 1;
common.Url url = 2;
}
message ManageMappedListenerResponse {}
service MappedListenerManageRpc {
rpc ListMappedListener(ListMappedListenerRequest) returns (ListMappedListenerResponse);
rpc ManageMappedListener(ManageMappedListenerRequest) returns (ManageMappedListenerResponse);
}
message VpnPortalInfo {
@@ -261,17 +248,9 @@ message GetAclStatsResponse {
service AclManageRpc {
rpc GetAclStats(GetAclStatsRequest) returns (GetAclStatsResponse);
rpc SetWhitelist(SetWhitelistRequest) returns (SetWhitelistResponse);
rpc GetWhitelist(GetWhitelistRequest) returns (GetWhitelistResponse);
}
message SetWhitelistRequest {
repeated string tcp_ports = 1;
repeated string udp_ports = 2;
}
message SetWhitelistResponse {}
message GetWhitelistRequest {}
message GetWhitelistResponse {
@@ -279,18 +258,6 @@ message GetWhitelistResponse {
repeated string udp_ports = 2;
}
message AddPortForwardRequest {
common.PortForwardConfigPb cfg = 1;
}
message AddPortForwardResponse {}
message RemovePortForwardRequest {
common.PortForwardConfigPb cfg = 1;
}
message RemovePortForwardResponse {}
message ListPortForwardRequest {}
message ListPortForwardResponse {
@@ -298,8 +265,6 @@ message ListPortForwardResponse {
}
service PortForwardManageRpc {
rpc AddPortForward(AddPortForwardRequest) returns (AddPortForwardResponse);
rpc RemovePortForward(RemovePortForwardRequest) returns (RemovePortForwardResponse);
rpc ListPortForward(ListPortForwardRequest) returns (ListPortForwardResponse);
}

View File

@@ -49,10 +49,10 @@ message FlagsInConfig {
// enable relay foreign network kcp packets
bool enable_relay_foreign_network_kcp = 28;
// encryption algorithm to use, empty string means default (aes-gcm)
// encryption algorithm to use, empty string means default (aes-gcm)
string encryption_algorithm = 29;
// disable symmetric nat hole punching, treat symmetric as cone when enabled
bool disable_sym_hole_punching = 30;
}
@@ -144,6 +144,13 @@ message Ipv6Addr {
uint32 part4 = 4;
}
message IpAddr {
oneof ip {
Ipv4Addr ipv4 = 1;
Ipv6Addr ipv6 = 2;
};
}
message Ipv4Inet {
Ipv4Addr address = 1;
uint32 network_length = 2;

View File

@@ -108,6 +108,43 @@ impl From<cidr::Ipv4Inet> for Ipv4Inet {
}
}
impl From<std::net::IpAddr> for IpAddr {
fn from(value: std::net::IpAddr) -> Self {
match value {
std::net::IpAddr::V4(v4) => IpAddr {
ip: Some(ip_addr::Ip::Ipv4(Ipv4Addr::from(v4))),
},
std::net::IpAddr::V6(v6) => IpAddr {
ip: Some(ip_addr::Ip::Ipv6(Ipv6Addr::from(v6))),
},
}
}
}
impl From<IpAddr> for std::net::IpAddr {
fn from(value: IpAddr) -> Self {
match value.ip {
Some(ip_addr::Ip::Ipv4(v4)) => std::net::IpAddr::V4(v4.into()),
Some(ip_addr::Ip::Ipv6(v6)) => std::net::IpAddr::V6(v6.into()),
None => panic!("IpAddr is None"),
}
}
}
impl Display for IpAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", std::net::IpAddr::from(*self))
}
}
impl FromStr for IpAddr {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(IpAddr::from(std::net::IpAddr::from_str(s)?))
}
}
impl From<Ipv4Inet> for cidr::Ipv4Inet {
fn from(value: Ipv4Inet) -> Self {
cidr::Ipv4Inet::new(
@@ -118,6 +155,16 @@ impl From<Ipv4Inet> for cidr::Ipv4Inet {
}
}
impl From<Ipv4Inet> for cidr::Ipv4Cidr {
fn from(value: Ipv4Inet) -> Self {
cidr::Ipv4Cidr::new(
value.address.unwrap_or_default().into(),
value.network_length as u8,
)
.unwrap()
}
}
impl fmt::Display for Ipv4Inet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", cidr::Ipv4Inet::from(*self))

View File

@@ -0,0 +1,65 @@
syntax = "proto3";
import "common.proto";
import "acl.proto";
package config;
enum ConfigPatchAction {
ADD = 0;
REMOVE = 1;
CLEAR = 2;
}
message InstanceConfigPatch {
optional string hostname = 1;
optional common.Ipv4Inet ipv4 = 2;
optional common.Ipv6Inet ipv6 = 3;
repeated PortForwardPatch port_forwards = 4;
optional AclPatch acl = 5;
repeated ProxyNetworkPatch proxy_networks = 6;
repeated RoutePatch routes = 7;
repeated ExitNodePatch exit_nodes = 8;
repeated UrlPatch mapped_listeners = 9;
}
message PortForwardPatch {
ConfigPatchAction action = 1;
common.PortForwardConfigPb cfg = 2;
}
message StringPatch {
ConfigPatchAction action = 1;
string value = 2;
}
message UrlPatch {
ConfigPatchAction action = 1;
common.Url url = 2;
}
message AclPatch {
optional acl.Acl acl = 1;
repeated StringPatch tcp_whitelist = 2;
repeated StringPatch udp_whitelist = 3;
}
message ProxyNetworkPatch {
ConfigPatchAction action = 1;
common.Ipv4Inet cidr = 2;
optional common.Ipv4Inet mapped_cidr = 3;
}
message RoutePatch {
ConfigPatchAction action = 1;
common.Ipv4Inet cidr = 2;
}
message ExitNodePatch {
ConfigPatchAction action = 1;
common.IpAddr node = 2;
}
message PatchConfigRequest { InstanceConfigPatch patch = 1; }
service ConfigRpc { rpc PatchConfig(PatchConfigRequest) returns (common.Void); }

View File

@@ -0,0 +1,75 @@
include!(concat!(env!("OUT_DIR"), "/config.rs"));
pub struct Patchable<T> {
pub action: Option<ConfigPatchAction>,
pub value: Option<T>,
}
impl From<PortForwardPatch> for Patchable<crate::common::config::PortForwardConfig> {
fn from(patch: PortForwardPatch) -> Self {
Patchable {
action: ConfigPatchAction::try_from(patch.action).ok(),
value: patch.cfg.map(Into::into),
}
}
}
impl From<RoutePatch> for Patchable<cidr::Ipv4Cidr> {
fn from(value: RoutePatch) -> Self {
Patchable {
action: ConfigPatchAction::try_from(value.action).ok(),
value: value.cidr.map(Into::into),
}
}
}
impl From<ExitNodePatch> for Patchable<std::net::IpAddr> {
fn from(value: ExitNodePatch) -> Self {
Patchable {
action: ConfigPatchAction::try_from(value.action).ok(),
value: value.node.map(Into::into),
}
}
}
impl From<StringPatch> for Patchable<String> {
fn from(value: StringPatch) -> Self {
Patchable {
action: ConfigPatchAction::try_from(value.action).ok(),
value: Some(value.value),
}
}
}
impl From<UrlPatch> for Patchable<url::Url> {
fn from(value: UrlPatch) -> Self {
Patchable {
action: ConfigPatchAction::try_from(value.action).ok(),
value: value.url.map(Into::into),
}
}
}
pub fn patch_vec<T>(v: &mut Vec<T>, patches: Vec<Patchable<T>>)
where
T: PartialEq,
{
for patch in patches {
match patch.action {
Some(ConfigPatchAction::Add) => {
if let Some(value) = patch.value {
v.push(value);
}
}
Some(ConfigPatchAction::Remove) => {
if let Some(value) = patch.value {
v.retain(|x| x != &value);
}
}
Some(ConfigPatchAction::Clear) => {
v.clear();
}
None => {}
}
}
}

View File

@@ -4,6 +4,7 @@ pub mod rpc_types;
pub mod acl;
pub mod cli;
pub mod common;
pub mod config;
pub mod error;
pub mod magic_dns;
pub mod peer_rpc;

View File

@@ -147,6 +147,21 @@ fn check_route(ipv4: &str, dst_peer_id: PeerId, routes: Vec<crate::proto::cli::R
);
}
fn check_route_ex(
routes: Vec<crate::proto::cli::Route>,
peer_id: PeerId,
checker: impl Fn(&crate::proto::cli::Route) -> bool,
) {
let mut found = false;
for r in routes.iter() {
if r.peer_id == peer_id {
found = true;
assert!(checker(r), "{:?}", routes);
}
}
assert!(found, "routes: {:?}, dst_peer_id: {}", routes, peer_id);
}
async fn wait_proxy_route_appear(
mgr: &std::sync::Arc<PeerManager>,
ipv4: &str,

View File

@@ -2094,3 +2094,96 @@ pub async fn acl_group_based_test(
drop_insts(insts).await;
}
#[rstest::rstest]
#[tokio::test]
#[serial_test::serial]
pub async fn config_patch_test() {
use crate::proto::{
common::{PortForwardConfigPb, SocketType},
config::{ConfigPatchAction, InstanceConfigPatch, PortForwardPatch, ProxyNetworkPatch},
};
use crate::tunnel::common::tests::_tunnel_pingpong_netns_with_timeout;
let insts = init_three_node("udp").await;
check_route(
"10.144.144.2/24",
insts[1].peer_id(),
insts[0].get_peer_manager().list_routes().await,
);
check_route(
"10.144.144.3/24",
insts[2].peer_id(),
insts[0].get_peer_manager().list_routes().await,
);
// 测试1 修改hostname、ip、子网代理
let patch = InstanceConfigPatch {
hostname: Some("new_inst1".to_string()),
ipv4: Some("10.144.144.22/24".parse().unwrap()),
proxy_networks: vec![ProxyNetworkPatch {
action: ConfigPatchAction::Add as i32,
cidr: Some("10.144.145.0/24".parse().unwrap()),
mapped_cidr: None,
}],
..Default::default()
};
insts[1]
.get_config_patcher()
.apply_patch(patch)
.await
.unwrap();
assert_eq!(insts[1].get_global_ctx().get_hostname(), "new_inst1");
assert_eq!(
insts[1].get_global_ctx().get_ipv4().unwrap(),
"10.144.144.22/24".parse().unwrap()
);
tokio::time::sleep(Duration::from_secs(1)).await;
check_route_ex(
insts[0].get_peer_manager().list_routes().await,
insts[1].peer_id(),
|r| {
assert_eq!(r.hostname, "new_inst1");
assert_eq!(r.ipv4_addr, Some("10.144.144.22/24".parse().unwrap()));
assert_eq!(r.proxy_cidrs[0], "10.144.145.0/24");
true
},
);
// 测试2: 端口转发
let patch = InstanceConfigPatch {
port_forwards: vec![PortForwardPatch {
action: ConfigPatchAction::Add as i32,
cfg: Some(PortForwardConfigPb {
bind_addr: Some("0.0.0.0:23458".parse::<SocketAddr>().unwrap().into()),
dst_addr: Some("10.144.144.3:23457".parse::<SocketAddr>().unwrap().into()),
socket_type: SocketType::Tcp as i32,
}),
}],
..Default::default()
};
insts[0]
.get_config_patcher()
.apply_patch(patch)
.await
.unwrap();
let mut buf = vec![0; 32];
rand::thread_rng().fill(&mut buf[..]);
let tcp_listener = TcpTunnelListener::new("tcp://0.0.0.0:23457".parse().unwrap());
let tcp_connector = TcpTunnelConnector::new("tcp://127.0.0.1:23458".parse().unwrap());
let result = _tunnel_pingpong_netns_with_timeout(
tcp_listener,
tcp_connector,
NetNS::new(Some("net_c".into())),
NetNS::new(Some("net_a".into())),
buf.clone(),
std::time::Duration::from_millis(30000),
)
.await;
assert!(result.is_ok(), "Port forward pingpong should succeed");
drop_insts(insts).await;
}