better user interface

This commit is contained in:
sijie.sun
2024-03-23 00:56:27 +08:00
committed by Sijie.Sun
parent a4af83e82d
commit 2cfc5a6ef6
18 changed files with 872 additions and 511 deletions
+55 -78
View File
@@ -1,7 +1,8 @@
use std::borrow::BorrowMut;
use std::io::Write;
use std::net::Ipv4Addr;
use std::sync::Arc;
use anyhow::Context;
use futures::StreamExt;
use pnet::packet::ethernet::EthernetPacket;
use pnet::packet::ipv4::Ipv4Packet;
@@ -10,10 +11,9 @@ use tokio::{sync::Mutex, task::JoinSet};
use tokio_util::bytes::{Bytes, BytesMut};
use tonic::transport::Server;
use crate::common::config_fs::ConfigFs;
use crate::common::config::ConfigLoader;
use crate::common::error::Error;
use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx};
use crate::common::netns::NetNS;
use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent};
use crate::common::PeerId;
use crate::connector::direct::DirectConnectorManager;
use crate::connector::manual::{ConnectorManagerRpcService, ManualConnectorManager};
@@ -32,43 +32,6 @@ use tokio_stream::wrappers::ReceiverStream;
use super::listeners::ListenerManager;
use super::virtual_nic;
pub struct InstanceConfigWriter {
config: ConfigFs,
}
impl InstanceConfigWriter {
pub fn new(inst_name: &str) -> Self {
InstanceConfigWriter {
config: ConfigFs::new(inst_name),
}
}
pub fn set_ns(self, net_ns: Option<String>) -> Self {
let net_ns_in_conf = if let Some(net_ns) = net_ns {
net_ns
} else {
"".to_string()
};
self.config
.add_file("net_ns")
.unwrap()
.write_all(net_ns_in_conf.as_bytes())
.unwrap();
self
}
pub fn set_addr(self, addr: String) -> Self {
self.config
.add_file("ipv4")
.unwrap()
.write_all(addr.as_bytes())
.unwrap();
self
}
}
pub struct Instance {
inst_name: String,
@@ -95,30 +58,16 @@ pub struct Instance {
}
impl Instance {
pub fn new(inst_name: &str) -> Self {
let config = ConfigFs::new(inst_name);
let net_ns_in_conf = config.get_or_default("net_ns", || "".to_string()).unwrap();
let net_ns = NetNS::new(if net_ns_in_conf.is_empty() {
None
} else {
Some(net_ns_in_conf.clone())
});
let addr = config
.get_or_default("ipv4", || "10.144.144.10".to_string())
.unwrap();
pub fn new(config: impl ConfigLoader + Send + Sync + 'static) -> Self {
let global_ctx = Arc::new(GlobalCtx::new(config));
log::info!(
"[INIT] instance creating. inst_name: {}, addr: {}, netns: {}",
inst_name,
addr,
net_ns_in_conf
"[INIT] instance creating. config: {}",
global_ctx.config.dump()
);
let (peer_packet_sender, peer_packet_receiver) = tokio::sync::mpsc::channel(100);
let global_ctx = Arc::new(GlobalCtx::new(inst_name, config, net_ns.clone(), None));
let id = global_ctx.get_id();
let peer_manager = Arc::new(PeerManager::new(
@@ -128,8 +77,7 @@ impl Instance {
));
let listener_manager = Arc::new(Mutex::new(ListenerManager::new(
peer_manager.my_node_id(),
net_ns.clone(),
global_ctx.clone(),
peer_manager.clone(),
)));
@@ -145,13 +93,17 @@ impl Instance {
let udp_hole_puncher = UdpHolePunchConnector::new(global_ctx.clone(), peer_manager.clone());
let arc_tcp_proxy = TcpProxy::new(global_ctx.clone(), peer_manager.clone());
let arc_icmp_proxy = IcmpProxy::new(global_ctx.clone(), peer_manager.clone()).unwrap();
let arc_udp_proxy = UdpProxy::new(global_ctx.clone(), peer_manager.clone()).unwrap();
let arc_icmp_proxy = IcmpProxy::new(global_ctx.clone(), peer_manager.clone())
.with_context(|| "create icmp proxy failed")
.unwrap();
let arc_udp_proxy = UdpProxy::new(global_ctx.clone(), peer_manager.clone())
.with_context(|| "create udp proxy failed")
.unwrap();
let peer_center = Arc::new(PeerCenterInstance::new(peer_manager.clone()));
Instance {
inst_name: inst_name.to_string(),
inst_name: global_ctx.inst_name.clone(),
id,
virtual_nic: None,
@@ -251,22 +203,22 @@ impl Instance {
});
}
pub async fn run(&mut self) -> Result<(), Error> {
let ipv4_addr = self.global_ctx.get_ipv4().unwrap();
async fn add_initial_peers(&mut self) -> Result<(), Error> {
for peer in self.global_ctx.config.get_peers().iter() {
self.get_conn_manager()
.add_connector_by_url(peer.uri.as_str())
.await?;
}
Ok(())
}
let mut nic = virtual_nic::VirtualNic::new(self.get_global_ctx())
async fn prepare_tun_device(&mut self) -> Result<(), Error> {
let nic = virtual_nic::VirtualNic::new(self.get_global_ctx())
.create_dev()
.await?
.link_up()
.await?
.remove_ip(None)
.await?
.add_ip(ipv4_addr, 24)
.await?;
if cfg!(target_os = "macos") {
nic = nic.add_route(ipv4_addr, 24).await?;
}
self.global_ctx
.issue_event(GlobalCtxEvent::TunDeviceReady(nic.ifname().to_string()));
self.virtual_nic = Some(Arc::new(nic));
@@ -277,6 +229,26 @@ impl Instance {
self.peer_packet_receiver.take(),
);
Ok(())
}
async fn assign_ipv4_to_tun_device(&mut self, ipv4_addr: Ipv4Addr) -> Result<(), Error> {
let nic = self.virtual_nic.as_ref().unwrap().clone();
nic.link_up().await?;
nic.remove_ip(None).await?;
nic.add_ip(ipv4_addr, 24).await?;
if cfg!(target_os = "macos") {
nic.add_route(ipv4_addr, 24).await?;
}
Ok(())
}
pub async fn run(&mut self) -> Result<(), Error> {
self.prepare_tun_device().await?;
if let Some(ipv4_addr) = self.global_ctx.get_ipv4() {
self.assign_ipv4_to_tun_device(ipv4_addr).await?;
}
self.listener_manager
.lock()
.await
@@ -296,6 +268,8 @@ impl Instance {
self.peer_center.init().await;
self.add_initial_peers().await?;
Ok(())
}
@@ -331,7 +305,10 @@ impl Instance {
}
fn run_rpc_server(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let addr = "0.0.0.0:15888".parse()?;
let Some(addr) = self.global_ctx.config.get_rpc_portal() else {
tracing::info!("rpc server not enabled, because rpc_portal is not set.");
return Ok(());
};
let peer_mgr = self.peer_manager.clone();
let conn_manager = self.conn_manager.clone();
let net_ns = self.global_ctx.net_ns.clone();
@@ -339,7 +316,6 @@ impl Instance {
self.tasks.spawn(async move {
let _g = net_ns.guard();
log::info!("[INIT RPC] start rpc server. addr: {}", addr);
Server::builder()
.add_service(
crate::rpc::peer_manage_rpc_server::PeerManageRpcServer::new(
@@ -358,6 +334,7 @@ impl Instance {
)
.serve(addr)
.await
.with_context(|| format!("rpc server failed. addr: {}", addr))
.unwrap();
});
Ok(())
+57 -20
View File
@@ -1,10 +1,15 @@
use std::{fmt::Debug, sync::Arc};
use anyhow::Context;
use async_trait::async_trait;
use tokio::{sync::Mutex, task::JoinSet};
use crate::{
common::{error::Error, netns::NetNS},
common::{
error::Error,
global_ctx::{ArcGlobalCtx, GlobalCtxEvent},
netns::NetNS,
},
peers::peer_manager::PeerManager,
tunnels::{
ring_tunnel::RingTunnelListener, tcp_tunnel::TcpTunnelListener,
@@ -26,7 +31,7 @@ impl TunnelHandlerForListener for PeerManager {
}
pub struct ListenerManager<H> {
my_node_id: uuid::Uuid,
global_ctx: ArcGlobalCtx,
net_ns: NetNS,
listeners: Vec<Arc<Mutex<dyn TunnelListener>>>,
peer_manager: Arc<H>,
@@ -35,10 +40,10 @@ pub struct ListenerManager<H> {
}
impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManager<H> {
pub fn new(my_node_id: uuid::Uuid, net_ns: NetNS, peer_manager: Arc<H>) -> Self {
pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc<H>) -> Self {
Self {
my_node_id,
net_ns,
global_ctx: global_ctx.clone(),
net_ns: global_ctx.net_ns.clone(),
listeners: Vec::new(),
peer_manager,
tasks: JoinSet::new(),
@@ -46,18 +51,27 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
}
pub async fn prepare_listeners(&mut self) -> Result<(), Error> {
self.add_listener(UdpTunnelListener::new(
"udp://0.0.0.0:11010".parse().unwrap(),
))
.await?;
self.add_listener(TcpTunnelListener::new(
"tcp://0.0.0.0:11010".parse().unwrap(),
))
.await?;
self.add_listener(RingTunnelListener::new(
format!("ring://{}", self.my_node_id).parse().unwrap(),
format!("ring://{}", self.global_ctx.get_id())
.parse()
.unwrap(),
))
.await?;
for l in self.global_ctx.config.get_listener_uris().iter() {
match l.scheme() {
"tcp" => {
self.add_listener(TcpTunnelListener::new(l.clone())).await?;
}
"udp" => {
self.add_listener(UdpTunnelListener::new(l.clone())).await?;
}
_ => {
log::warn!("unsupported listener uri: {}", l);
}
}
}
Ok(())
}
@@ -71,12 +85,27 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
}
#[tracing::instrument]
async fn run_listener(listener: Arc<Mutex<dyn TunnelListener>>, peer_manager: Arc<H>) {
async fn run_listener(
listener: Arc<Mutex<dyn TunnelListener>>,
peer_manager: Arc<H>,
global_ctx: ArcGlobalCtx,
) {
let mut l = listener.lock().await;
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
while let Ok(ret) = l.accept().await {
let tunnel_info = ret.info().unwrap();
global_ctx.issue_event(GlobalCtxEvent::ConnectionAccepted(
tunnel_info.local_addr.clone(),
tunnel_info.remote_addr.clone(),
));
tracing::info!(ret = ?ret, "conn accepted");
let server_ret = peer_manager.handle_tunnel(ret).await;
if let Err(e) = &server_ret {
global_ctx.issue_event(GlobalCtxEvent::ConnectionError(
tunnel_info.local_addr,
tunnel_info.remote_addr,
e.to_string(),
));
tracing::error!(error = ?e, "handle conn error");
}
}
@@ -85,11 +114,18 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
pub async fn run(&mut self) -> Result<(), Error> {
for listener in &self.listeners {
let _guard = self.net_ns.guard();
let addr = listener.lock().await.local_url();
log::warn!("run listener: {:?}", listener);
listener.lock().await.listen().await?;
listener
.lock()
.await
.listen()
.await
.with_context(|| format!("failed to add listener {}", addr))?;
self.tasks.spawn(Self::run_listener(
listener.clone(),
self.peer_manager.clone(),
self.global_ctx.clone(),
));
}
@@ -102,7 +138,10 @@ mod tests {
use futures::{SinkExt, StreamExt};
use tokio::time::timeout;
use crate::tunnels::{ring_tunnel::RingTunnelConnector, TunnelConnector};
use crate::{
common::global_ctx::tests::get_mock_global_ctx,
tunnels::{ring_tunnel::RingTunnelConnector, TunnelConnector},
};
use super::*;
@@ -120,10 +159,8 @@ mod tests {
#[tokio::test]
async fn handle_error_in_accept() {
let net_ns = NetNS::new(None);
let handler = Arc::new(MockListenerHandler {});
let mut listener_mgr =
ListenerManager::new(uuid::Uuid::new_v4(), net_ns.clone(), handler.clone());
let mut listener_mgr = ListenerManager::new(get_mock_global_ctx(), handler.clone());
let ring_id = format!("ring://{}", uuid::Uuid::new_v4());
+12 -14
View File
@@ -119,32 +119,32 @@ impl VirtualNic {
self.ifname.as_ref().unwrap().as_str()
}
pub async fn link_up(self) -> Result<Self> {
pub async fn link_up(&self) -> Result<()> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg.set_link_status(self.ifname(), true).await?;
Ok(self)
Ok(())
}
pub async fn add_route(self, address: Ipv4Addr, cidr: u8) -> Result<Self> {
pub async fn add_route(&self, address: Ipv4Addr, cidr: u8) -> Result<()> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg
.add_ipv4_route(self.ifname(), address, cidr)
.await?;
Ok(self)
Ok(())
}
pub async fn remove_ip(self, ip: Option<Ipv4Addr>) -> Result<Self> {
pub async fn remove_ip(&self, ip: Option<Ipv4Addr>) -> Result<()> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg.remove_ip(self.ifname(), ip).await?;
Ok(self)
Ok(())
}
pub async fn add_ip(self, ip: Ipv4Addr, cidr: i32) -> Result<Self> {
pub async fn add_ip(&self, ip: Ipv4Addr, cidr: i32) -> Result<()> {
let _g = self.global_ctx.net_ns.guard();
self.ifcfg
.add_ipv4_ip(self.ifname(), ip, cidr as u8)
.await?;
Ok(self)
Ok(())
}
pub fn pin_recv_stream(&self) -> Pin<Box<dyn DatagramStream>> {
@@ -170,12 +170,10 @@ mod tests {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
dev.link_up()
.await?
.remove_ip(None)
.await?
.add_ip("10.144.111.1".parse().unwrap(), 24)
.await
dev.link_up().await?;
dev.remove_ip(None).await?;
dev.add_ip("10.144.111.1".parse().unwrap(), 24).await?;
Ok(dev)
}
#[tokio::test]