refactor(gui): refactor gui to use RemoteClient trait and RemoteManagement component (#1489)

* refactor(gui): refactor gui to use RemoteClient trait and RemoteManagement component
* feat(gui): Add network config saving and refactor RemoteManagement
This commit is contained in:
Mg Pig
2025-10-20 22:07:01 +08:00
committed by GitHub
parent 67ac9b00ff
commit eba9504fc2
27 changed files with 1040 additions and 793 deletions

View File

@@ -474,17 +474,16 @@ impl IfConfiguerTrait for NetlinkIfConfiger {
}
async fn remove_ip(&self, name: &str, ip: Option<Ipv4Inet>) -> Result<(), Error> {
if ip.is_none() {
if let Some(ip) = ip {
let prefix_len = Self::get_prefix_len(name, ip.address())?;
Self::remove_one_ip(name, ip.address(), prefix_len)?;
} else {
let addrs = Self::list_addresses(name)?;
for addr in addrs {
if let IpAddr::V4(ipv4) = addr.address() {
Self::remove_one_ip(name, ipv4, addr.network_length())?;
}
}
} else {
let ip = ip.unwrap();
let prefix_len = Self::get_prefix_len(name, ip.address())?;
Self::remove_one_ip(name, ip.address(), prefix_len)?;
}
Ok(())
@@ -520,7 +519,10 @@ impl IfConfiguerTrait for NetlinkIfConfiger {
}
async fn remove_ipv6(&self, name: &str, ip: Option<Ipv6Inet>) -> Result<(), Error> {
if ip.is_none() {
if let Some(ipv6) = ip {
let prefix_len = Self::get_prefix_len_ipv6(name, ipv6.address())?;
Self::remove_one_ipv6(name, ipv6.address(), prefix_len)?;
} else {
let addrs = Self::list_addresses(name)?;
for addr in addrs {
if let IpAddr::V6(ipv6) = addr.address() {
@@ -528,10 +530,6 @@ impl IfConfiguerTrait for NetlinkIfConfiger {
Self::remove_one_ipv6(name, ipv6, prefix_len)?;
}
}
} else {
let ipv6 = ip.unwrap();
let prefix_len = Self::get_prefix_len_ipv6(name, ipv6.address())?;
Self::remove_one_ipv6(name, ipv6.address(), prefix_len)?;
}
Ok(())

View File

@@ -17,6 +17,7 @@ pub struct NetworkInstanceManager {
instance_map: Arc<DashMap<uuid::Uuid, NetworkInstance>>,
instance_stop_tasks: Arc<DashMap<uuid::Uuid, ScopedTask<()>>>,
stop_check_notifier: Arc<tokio::sync::Notify>,
instance_error_messages: Arc<DashMap<uuid::Uuid, String>>,
}
impl Default for NetworkInstanceManager {
@@ -31,6 +32,7 @@ impl NetworkInstanceManager {
instance_map: Arc::new(DashMap::new()),
instance_stop_tasks: Arc::new(DashMap::new()),
stop_check_notifier: Arc::new(tokio::sync::Notify::new()),
instance_error_messages: Arc::new(DashMap::new()),
}
}
@@ -64,6 +66,7 @@ impl NetworkInstanceManager {
let instance_map = self.instance_map.clone();
let instance_stop_tasks = self.instance_stop_tasks.clone();
let instance_error_messages = self.instance_error_messages.clone();
let stop_check_notifier = self.stop_check_notifier.clone();
self.instance_stop_tasks.insert(
@@ -80,6 +83,7 @@ impl NetworkInstanceManager {
if let Some(e) = instance.get_latest_error_msg() {
tracing::error!(?e, ?instance_id, "instance stopped with error");
eprintln!("instance {} stopped with error: {}", instance_id, e);
instance_error_messages.insert(instance_id, e);
}
}
stop_check_notifier.notify_one();
@@ -114,6 +118,9 @@ impl NetworkInstanceManager {
) -> Result<Vec<uuid::Uuid>, anyhow::Error> {
self.instance_map.retain(|k, _| instance_ids.contains(k));
self.instance_map.shrink_to_fit();
self.instance_error_messages
.retain(|k, _| instance_ids.contains(k));
self.instance_error_messages.shrink_to_fit();
Ok(self.list_network_instance_ids())
}
@@ -123,6 +130,9 @@ impl NetworkInstanceManager {
) -> Result<Vec<uuid::Uuid>, anyhow::Error> {
self.instance_map.retain(|k, _| !instance_ids.contains(k));
self.instance_map.shrink_to_fit();
self.instance_error_messages
.retain(|k, _| !instance_ids.contains(k));
self.instance_error_messages.shrink_to_fit();
Ok(self.list_network_instance_ids())
}
@@ -135,6 +145,15 @@ impl NetworkInstanceManager {
ret.insert(*instance.key(), info);
}
}
for v in self.instance_error_messages.iter() {
ret.insert(
*v.key(),
NetworkInstanceRunningInfo {
error_msg: Some(v.value().clone()),
..Default::default()
},
);
}
Ok(ret)
}
@@ -148,6 +167,12 @@ impl NetworkInstanceManager {
&self,
instance_id: &uuid::Uuid,
) -> Option<NetworkInstanceRunningInfo> {
if let Some(err_msg) = self.instance_error_messages.get(instance_id) {
return Some(NetworkInstanceRunningInfo {
error_msg: Some(err_msg.value().clone()),
..Default::default()
});
}
self.instance_map
.get(instance_id)?
.get_running_info()

View File

@@ -33,7 +33,7 @@ impl DirectConnectorRpc for DirectConnectorManagerRpcServer {
.config
.get_mapped_listeners()
.into_iter()
.chain(self.global_ctx.get_running_listeners().into_iter())
.chain(self.global_ctx.get_running_listeners())
.map(Into::into)
.collect();
// remove et ipv6 from the interface ipv6 list

View File

@@ -27,36 +27,52 @@ use crate::{
peer_manage::PeerManageRpcService, port_forward_manage::PortForwardManageRpcService,
proxy::TcpProxyRpcService, stats::StatsRpcService, vpn_portal::VpnPortalRpcService,
},
tunnel::tcp::TcpTunnelListener,
tunnel::{tcp::TcpTunnelListener, TunnelListener},
};
pub struct ApiRpcServer {
rpc_server: StandAloneServer<TcpTunnelListener>,
pub struct ApiRpcServer<T: TunnelListener + 'static> {
rpc_server: StandAloneServer<T>,
}
impl ApiRpcServer {
impl ApiRpcServer<TcpTunnelListener> {
pub fn new(
rpc_portal: Option<String>,
rpc_portal_whitelist: Option<Vec<IpCidr>>,
instance_manager: Arc<NetworkInstanceManager>,
) -> anyhow::Result<Self> {
let mut rpc_server = StandAloneServer::new(TcpTunnelListener::new(
format!("tcp://{}", parse_rpc_portal(rpc_portal)?)
.parse()
.context("failed to parse rpc portal address")?,
));
rpc_server.set_hook(Arc::new(InstanceRpcServerHook::new(rpc_portal_whitelist)));
register_api_rpc_service(&instance_manager, rpc_server.registry());
Ok(Self { rpc_server })
}
let mut server = Self::from_tunnel(
TcpTunnelListener::new(
format!("tcp://{}", parse_rpc_portal(rpc_portal)?)
.parse()
.context("failed to parse rpc portal address")?,
),
instance_manager,
);
server
.rpc_server
.set_hook(Arc::new(InstanceRpcServerHook::new(rpc_portal_whitelist)));
Ok(server)
}
}
impl<T: TunnelListener + 'static> ApiRpcServer<T> {
pub fn from_tunnel(tunnel: T, instance_manager: Arc<NetworkInstanceManager>) -> Self {
let rpc_server = StandAloneServer::new(tunnel);
register_api_rpc_service(&instance_manager, rpc_server.registry());
Self { rpc_server }
}
}
impl<T: TunnelListener + 'static> ApiRpcServer<T> {
pub async fn serve(mut self) -> Result<Self, Error> {
self.rpc_server.serve().await?;
Ok(self)
}
}
impl Drop for ApiRpcServer {
impl<T: TunnelListener + 'static> Drop for ApiRpcServer<T> {
fn drop(&mut self) {
self.rpc_server.registry().unregister_all();
}

View File

@@ -13,7 +13,7 @@ pub mod instance_manage;
pub mod logger;
pub mod remote_client;
pub type ApiRpcServer = self::api::ApiRpcServer;
pub type ApiRpcServer<T> = self::api::ApiRpcServer<T>;
pub trait InstanceRpcService: Sync + Send {
fn get_peer_manage_service(

View File

@@ -6,8 +6,8 @@ use crate::proto::{api::manage::*, rpc_types::controller::BaseController};
#[async_trait]
pub trait RemoteClientManager<T, C, E>
where
T: Copy + Send + 'static,
C: PersistentConfig + Send + 'static,
T: Clone + Send + 'static,
C: PersistentConfig<E> + Send + 'static,
E: Send + 'static,
{
fn get_rpc_client(
@@ -42,17 +42,14 @@ where
config: NetworkConfig,
) -> Result<(), RemoteClientError<E>> {
let client = self
.get_rpc_client(identify)
.get_rpc_client(identify.clone())
.ok_or(RemoteClientError::ClientNotFound)?;
let network_config_json = serde_json::to_string(&config).map_err(|e| {
RemoteClientError::Other(format!("Failed to serialize config: {:?}", e))
})?;
let resp = client
.run_network_instance(
BaseController::default(),
RunNetworkInstanceRequest {
inst_id: None,
config: Some(config),
config: Some(config.clone()),
},
)
.await?;
@@ -61,7 +58,7 @@ where
.insert_or_update_user_network_config(
identify,
resp.inst_id.unwrap_or_default().into(),
network_config_json,
config,
)
.await
.map_err(RemoteClientError::PersistentError)?;
@@ -98,7 +95,7 @@ where
identify: T,
) -> Result<ListNetworkInstanceIdsJsonResp, RemoteClientError<E>> {
let client = self
.get_rpc_client(identify)
.get_rpc_client(identify.clone())
.ok_or(RemoteClientError::ClientNotFound)?;
let ret = client
.list_network_instance(BaseController::default(), ListNetworkInstanceRequest {})
@@ -122,16 +119,19 @@ where
})
}
async fn handle_remove_network_instance(
async fn handle_remove_network_instances(
&self,
identify: T,
inst_id: uuid::Uuid,
inst_ids: Vec<uuid::Uuid>,
) -> Result<(), RemoteClientError<E>> {
if inst_ids.is_empty() {
return Ok(());
}
let client = self
.get_rpc_client(identify)
.get_rpc_client(identify.clone())
.ok_or(RemoteClientError::ClientNotFound)?;
self.get_storage()
.delete_network_config(identify, inst_id)
.delete_network_configs(identify, &inst_ids)
.await
.map_err(RemoteClientError::PersistentError)?;
@@ -139,7 +139,7 @@ where
.delete_network_instance(
BaseController::default(),
DeleteNetworkInstanceRequest {
inst_ids: vec![inst_id.into()],
inst_ids: inst_ids.into_iter().map(|id| id.into()).collect(),
},
)
.await?;
@@ -154,7 +154,7 @@ where
disabled: bool,
) -> Result<(), RemoteClientError<E>> {
let client = self
.get_rpc_client(identify)
.get_rpc_client(identify.clone())
.ok_or(RemoteClientError::ClientNotFound)?;
let cfg = self
.get_storage()
@@ -177,14 +177,10 @@ where
BaseController::default(),
RunNetworkInstanceRequest {
inst_id: Some(inst_id.into()),
config: Some(serde_json::from_str(cfg.get_network_config()).map_err(
|e| {
RemoteClientError::Other(format!(
"Failed to parse network config: {:?}",
e
))
},
)?),
config: Some(
cfg.get_network_config()
.map_err(RemoteClientError::PersistentError)?,
),
},
)
.await?;
@@ -193,6 +189,23 @@ where
Ok(())
}
async fn handle_save_network_config(
&self,
identify: T,
inst_id: uuid::Uuid,
config: NetworkConfig,
) -> Result<(), RemoteClientError<E>> {
self.get_storage()
.insert_or_update_user_network_config(identify.clone(), inst_id, config)
.await
.map_err(RemoteClientError::PersistentError)?;
self.get_storage()
.update_network_config_state(identify, inst_id, true)
.await
.map_err(RemoteClientError::PersistentError)?;
Ok(())
}
async fn handle_get_network_config(
&self,
identify: T,
@@ -210,21 +223,23 @@ where
inst_id
)))?;
Ok(
serde_json::from_str::<NetworkConfig>(db_row.get_network_config()).map_err(|e| {
RemoteClientError::Other(format!("Failed to parse network config: {:?}", e))
})?,
)
Ok(db_row
.get_network_config()
.map_err(RemoteClientError::PersistentError)?)
}
}
#[derive(Debug, thiserror::Error)]
pub enum RemoteClientError<E> {
#[error("Client not found")]
ClientNotFound,
#[error("Not found: {0}")]
NotFound(String),
#[error(transparent)]
RpcError(#[from] crate::proto::rpc_types::error::Error),
#[error(transparent)]
PersistentError(E),
#[error("Other error: {0}")]
Other(String),
}
@@ -240,24 +255,25 @@ pub struct ListNetworkInstanceIdsJsonResp {
disabled_inst_ids: Vec<crate::proto::common::Uuid>,
}
pub trait PersistentConfig {
pub trait PersistentConfig<E> {
fn get_network_inst_id(&self) -> &str;
fn get_network_config(&self) -> &str;
fn get_network_config(&self) -> Result<NetworkConfig, E>;
}
#[async_trait]
pub trait Storage<T, C, E>: Send + Sync
where
C: PersistentConfig,
C: PersistentConfig<E>,
{
async fn insert_or_update_user_network_config(
&self,
identify: T,
network_inst_id: Uuid,
network_config: impl ToString + Send,
network_config: NetworkConfig,
) -> Result<(), E>;
async fn delete_network_config(&self, identify: T, network_inst_id: Uuid) -> Result<(), E>;
async fn delete_network_configs(&self, identify: T, network_inst_ids: &[Uuid])
-> Result<(), E>;
async fn update_network_config_state(
&self,