introduce a link-state route algo

This commit is contained in:
sijie.sun
2024-03-18 17:16:33 +08:00
committed by Sijie.Sun
parent d70d085553
commit ba1795a113
17 changed files with 1690 additions and 48 deletions
+3
View File
@@ -92,6 +92,9 @@ async-recursion = "1.0.5"
network-interface = "1.1.1"
# for ospf route
pathfinding = "4.9.1"
[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.52", features = [
"Win32_Networking_WinSock",
+1
View File
@@ -69,6 +69,7 @@ message Route {
repeated string proxy_cidrs = 5;
string hostname = 6;
StunInfo stun_info = 7;
string inst_id = 8;
}
message ListRouteRequest {}
+1 -3
View File
@@ -288,9 +288,7 @@ mod tests {
connect_peer_manager(p_a.clone(), p_b.clone()).await;
connect_peer_manager(p_b.clone(), p_c.clone()).await;
wait_route_appear(p_a.clone(), p_c.my_peer_id())
.await
.unwrap();
wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap();
let mut dm_a = DirectConnectorManager::new(p_a.get_global_ctx(), p_a.clone());
let mut dm_c = DirectConnectorManager::new(p_c.get_global_ctx(), p_c.clone());
@@ -522,9 +522,7 @@ pub mod tests {
connect_peer_manager(p_a.clone(), p_b.clone()).await;
connect_peer_manager(p_b.clone(), p_c.clone()).await;
wait_route_appear(p_a.clone(), p_c.my_peer_id())
.await
.unwrap();
wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap();
println!("{:?}", p_a.list_routes().await);
+2 -1
View File
@@ -23,7 +23,7 @@ use crate::gateway::tcp_proxy::TcpProxy;
use crate::gateway::udp_proxy::UdpProxy;
use crate::peer_center::instance::PeerCenterInstance;
use crate::peers::peer_conn::PeerConnId;
use crate::peers::peer_manager::PeerManager;
use crate::peers::peer_manager::{PeerManager, RouteAlgoType};
use crate::peers::rpc_service::PeerManagerRpcService;
use crate::tunnels::SinkItem;
@@ -122,6 +122,7 @@ impl Instance {
let id = global_ctx.get_id();
let peer_manager = Arc::new(PeerManager::new(
RouteAlgoType::Ospf,
global_ctx.clone(),
peer_packet_sender.clone(),
));
+4 -4
View File
@@ -38,7 +38,7 @@ struct Cli {
peers: Vec<String>,
}
fn init_logger() {
fn init_logger(dir: Option<&str>, file: Option<&str>) {
// logger to rolling file
let file_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
@@ -47,8 +47,8 @@ fn init_logger() {
let file_appender = tracing_appender::rolling::Builder::new()
.rotation(tracing_appender::rolling::Rotation::DAILY)
.max_log_files(5)
.filename_prefix("core.log")
.build("/var/log/easytier")
.filename_prefix(file.unwrap_or("easytier"))
.build(dir.unwrap_or("/tmp/easytier"))
.expect("failed to initialize rolling file appender");
let mut file_layer = tracing_subscriber::fmt::layer();
file_layer.set_ansi(false);
@@ -77,7 +77,7 @@ fn init_logger() {
#[tokio::main(flavor = "current_thread")]
#[tracing::instrument]
pub async fn main() {
init_logger();
init_logger(Some("/var/log/easytier"), Some("core.log"));
let cli = Cli::parse();
tracing::info!(cli = ?cli, "cli args parsed");
+1 -1
View File
@@ -319,7 +319,7 @@ mod tests {
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await;
wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_peer_id())
wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.clone())
.await
.unwrap();
@@ -98,7 +98,7 @@ impl ForeignNetworkClient {
let peers = peer_map.list_peers().await;
let mut tasks = JoinSet::new();
if !peers.is_empty() {
tracing::warn!(?peers, "collect next hop in foreign network");
tracing::warn!(?peers, my_peer_id = ?peer_rpc.my_peer_id(), "collect next hop in foreign network");
}
for peer in peers {
let peer_rpc = peer_rpc.clone();
@@ -297,7 +297,7 @@ mod tests {
create_mock_peer_manager_with_mock_stun, replace_stun_info_collector,
},
peers::{
peer_manager::PeerManager,
peer_manager::{PeerManager, RouteAlgoType},
tests::{connect_peer_manager, wait_route_appear},
},
rpc::NatType,
@@ -308,6 +308,7 @@ mod tests {
async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc<PeerManager> {
let (s, _r) = tokio::sync::mpsc::channel(1000);
let peer_mgr = Arc::new(PeerManager::new(
RouteAlgoType::Ospf,
get_mock_global_ctx_with_network(Some(NetworkIdentity {
network_name: network.to_string(),
network_secret: network.to_string(),
@@ -359,7 +360,7 @@ mod tests {
.list_peers()
.await
);
wait_route_appear(pma_net1.clone(), pmb_net1.my_peer_id())
wait_route_appear(pma_net1.clone(), pmb_net1.clone())
.await
.unwrap();
assert_eq!(1, pma_net1.list_routes().await.len());
@@ -367,10 +368,10 @@ mod tests {
let pmc_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
connect_peer_manager(pmc_net1.clone(), pm_center.clone()).await;
wait_route_appear(pma_net1.clone(), pmc_net1.my_peer_id())
wait_route_appear(pma_net1.clone(), pmc_net1.clone())
.await
.unwrap();
wait_route_appear(pmb_net1.clone(), pmc_net1.my_peer_id())
wait_route_appear(pmb_net1.clone(), pmc_net1.clone())
.await
.unwrap();
assert_eq!(2, pmc_net1.list_routes().await.len());
@@ -379,7 +380,7 @@ mod tests {
let pmb_net2 = create_mock_peer_manager_for_foreign_network("net2").await;
connect_peer_manager(pma_net2.clone(), pm_center.clone()).await;
connect_peer_manager(pmb_net2.clone(), pm_center.clone()).await;
wait_route_appear(pma_net2.clone(), pmb_net2.my_peer_id())
wait_route_appear(pma_net2.clone(), pmb_net2.clone())
.await
.unwrap();
assert_eq!(1, pma_net2.list_routes().await.len());
+1
View File
@@ -3,6 +3,7 @@ pub mod peer;
pub mod peer_conn;
pub mod peer_manager;
pub mod peer_map;
pub mod peer_ospf_route;
pub mod peer_rip_route;
pub mod peer_rpc;
pub mod route_trait;
+57 -13
View File
@@ -34,6 +34,7 @@ use super::{
foreign_network_manager::ForeignNetworkManager,
peer_conn::PeerConnId,
peer_map::PeerMap,
peer_ospf_route::PeerRoute,
peer_rip_route::BasicRoute,
peer_rpc::PeerRpcManager,
route_trait::{ArcRoute, Route},
@@ -43,7 +44,7 @@ use super::{
struct RpcTransport {
my_peer_id: PeerId,
peers: Weak<PeerMap>,
foreign_peers: Mutex<Option<Weak<PeerMap>>>,
foreign_peers: Mutex<Option<Weak<ForeignNetworkClient>>>,
packet_recv: Mutex<UnboundedReceiver<Bytes>>,
peer_rpc_tspt_sender: UnboundedSender<Bytes>,
@@ -66,7 +67,7 @@ impl PeerRpcManagerTransport for RpcTransport {
.ok_or(Error::Unknown)?;
let peers = self.peers.upgrade().ok_or(Error::Unknown)?;
if foreign_peers.has_peer(dst_peer_id) {
if foreign_peers.has_next_hop(dst_peer_id) {
return foreign_peers.send_msg(msg, dst_peer_id).await;
}
@@ -82,6 +83,18 @@ impl PeerRpcManagerTransport for RpcTransport {
}
}
pub enum RouteAlgoType {
Rip,
Ospf,
None,
}
enum RouteAlgoInst {
Rip(Arc<BasicRoute>),
Ospf(Arc<PeerRoute>),
None,
}
pub struct PeerManager {
my_peer_id: PeerId,
@@ -100,7 +113,7 @@ pub struct PeerManager {
peer_packet_process_pipeline: Arc<RwLock<Vec<BoxPeerPacketFilter>>>,
nic_packet_process_pipeline: Arc<RwLock<Vec<BoxNicPacketFilter>>>,
basic_route: Arc<BasicRoute>,
route_algo_inst: RouteAlgoInst,
foreign_network_manager: Arc<ForeignNetworkManager>,
foreign_network_client: Arc<ForeignNetworkClient>,
@@ -117,7 +130,11 @@ impl Debug for PeerManager {
}
impl PeerManager {
pub fn new(global_ctx: ArcGlobalCtx, nic_channel: mpsc::Sender<SinkItem>) -> Self {
pub fn new(
route_algo: RouteAlgoType,
global_ctx: ArcGlobalCtx,
nic_channel: mpsc::Sender<SinkItem>,
) -> Self {
let my_peer_id = rand::random();
let (packet_send, packet_recv) = mpsc::channel(100);
@@ -138,7 +155,17 @@ impl PeerManager {
});
let peer_rpc_mgr = Arc::new(PeerRpcManager::new(rpc_tspt.clone()));
let basic_route = Arc::new(BasicRoute::new(my_peer_id, global_ctx.clone()));
let route_algo_inst = match route_algo {
RouteAlgoType::Rip => {
RouteAlgoInst::Rip(Arc::new(BasicRoute::new(my_peer_id, global_ctx.clone())))
}
RouteAlgoType::Ospf => RouteAlgoInst::Ospf(PeerRoute::new(
my_peer_id,
global_ctx.clone(),
peer_rpc_mgr.clone(),
)),
RouteAlgoType::None => RouteAlgoInst::None,
};
let foreign_network_manager = Arc::new(ForeignNetworkManager::new(
my_peer_id,
@@ -170,7 +197,7 @@ impl PeerManager {
peer_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())),
nic_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())),
basic_route,
route_algo_inst,
foreign_network_manager,
foreign_network_client,
@@ -404,8 +431,16 @@ impl PeerManager {
self.peers.add_route(arc_route).await;
}
pub fn get_route(&self) -> Box<dyn Route + Send + Sync + 'static> {
match &self.route_algo_inst {
RouteAlgoInst::Rip(route) => Box::new(route.clone()),
RouteAlgoInst::Ospf(route) => Box::new(route.clone()),
RouteAlgoInst::None => panic!("no route"),
}
}
pub async fn list_routes(&self) -> Vec<crate::rpc::Route> {
self.basic_route.list_routes().await
self.get_route().list_routes().await
}
async fn run_nic_packet_process_pipeline(&self, mut data: BytesMut) -> BytesMut {
@@ -487,21 +522,27 @@ impl PeerManager {
.foreign_peers
.lock()
.await
.replace(Arc::downgrade(&self.foreign_network_client.get_peer_map()));
.replace(Arc::downgrade(&self.foreign_network_client));
self.foreign_network_manager.run().await;
self.foreign_network_client.run().await;
}
pub async fn run(&self) -> Result<(), Error> {
self.add_route(self.basic_route.clone()).await;
match &self.route_algo_inst {
RouteAlgoInst::Ospf(route) => self.add_route(route.clone()).await,
RouteAlgoInst::Rip(route) => self.add_route(route.clone()).await,
RouteAlgoInst::None => {}
};
self.init_packet_process_pipeline().await;
self.start_peer_recv().await;
self.peer_rpc_mgr.run();
self.start_peer_recv().await;
self.run_clean_peer_without_conn_routine().await;
self.run_foriegn_network().await;
Ok(())
}
@@ -530,7 +571,10 @@ impl PeerManager {
}
pub fn get_basic_route(&self) -> Arc<BasicRoute> {
self.basic_route.clone()
match &self.route_algo_inst {
RouteAlgoInst::Rip(route) => route.clone(),
_ => panic!("not rip route"),
}
}
pub fn get_foreign_network_manager(&self) -> Arc<ForeignNetworkManager> {
@@ -560,10 +604,10 @@ mod tests {
connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await;
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_c.clone()).await;
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_peer_id())
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone())
.await
.unwrap();
wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_peer_id())
wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.clone())
.await
.unwrap();
+6 -1
View File
@@ -119,7 +119,12 @@ impl PeerMap {
}
let Some(gateway_peer_id) = gateway_peer_id else {
log::error!("no gateway for dst_peer_id: {}", dst_peer_id);
log::error!(
"no gateway for dst_peer_id: {}, peers: {:?}, my_peer_id: {}",
dst_peer_id,
self.peer_map.iter().map(|v| *v.key()).collect::<Vec<_>>(),
self.my_peer_id
);
return Ok(());
};
File diff suppressed because it is too large Load Diff
+20 -8
View File
@@ -653,27 +653,39 @@ mod tests {
use std::sync::Arc;
use crate::{
common::PeerId,
connector::udp_hole_punch::tests::create_mock_peer_manager_with_mock_stun,
common::{global_ctx::tests::get_mock_global_ctx, PeerId},
connector::udp_hole_punch::tests::replace_stun_info_collector,
peers::{
peer_manager::PeerManager,
peer_manager::{PeerManager, RouteAlgoType},
peer_rip_route::Version,
tests::{connect_peer_manager, wait_route_appear},
},
rpc::NatType,
};
async fn create_mock_pmgr() -> Arc<PeerManager> {
let (s, _r) = tokio::sync::mpsc::channel(1000);
let peer_mgr = Arc::new(PeerManager::new(
RouteAlgoType::Rip,
get_mock_global_ctx(),
s,
));
replace_stun_info_collector(peer_mgr.clone(), NatType::Unknown);
peer_mgr.run().await.unwrap();
peer_mgr
}
#[tokio::test]
async fn test_rip_route() {
let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_mgr_c = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_mgr_a = create_mock_pmgr().await;
let peer_mgr_b = create_mock_pmgr().await;
let peer_mgr_c = create_mock_pmgr().await;
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await;
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_peer_id())
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone())
.await
.unwrap();
wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_peer_id())
wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.clone())
.await
.unwrap();
+3 -3
View File
@@ -451,10 +451,10 @@ mod tests {
let peer_mgr_c = create_mock_peer_manager().await;
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await;
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_peer_id())
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone())
.await
.unwrap();
wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_peer_id())
wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.clone())
.await
.unwrap();
@@ -503,7 +503,7 @@ mod tests {
let peer_mgr_a = create_mock_peer_manager().await;
let peer_mgr_b = create_mock_peer_manager().await;
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_peer_id())
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone())
.await
.unwrap();
+12 -4
View File
@@ -7,11 +7,15 @@ use crate::{
tunnels::ring_tunnel::create_ring_tunnel_pair,
};
use super::peer_manager::PeerManager;
use super::peer_manager::{PeerManager, RouteAlgoType};
pub async fn create_mock_peer_manager() -> Arc<PeerManager> {
let (s, _r) = tokio::sync::mpsc::channel(1000);
let peer_mgr = Arc::new(PeerManager::new(get_mock_global_ctx(), s));
let peer_mgr = Arc::new(PeerManager::new(
RouteAlgoType::Ospf,
get_mock_global_ctx(),
s,
));
peer_mgr.run().await.unwrap();
peer_mgr
}
@@ -47,8 +51,12 @@ pub async fn wait_route_appear_with_cost(
return Err(Error::NotFound);
}
pub async fn wait_route_appear(peer_mgr: Arc<PeerManager>, node_id: PeerId) -> Result<(), Error> {
wait_route_appear_with_cost(peer_mgr, node_id, None).await
pub async fn wait_route_appear(
peer_mgr: Arc<PeerManager>,
target_peer: Arc<PeerManager>,
) -> Result<(), Error> {
wait_route_appear_with_cost(peer_mgr.clone(), target_peer.my_peer_id(), None).await?;
wait_route_appear_with_cost(target_peer, peer_mgr.my_peer_id(), None).await
}
pub async fn wait_for_condition<F, FRet>(mut condition: F, timeout: std::time::Duration) -> ()
+5 -1
View File
@@ -134,7 +134,11 @@ fn check_route(ipv4: &str, dst_peer_id: PeerId, routes: Vec<crate::rpc::Route>)
assert_eq!(r.peer_id, dst_peer_id, "{:?}", routes);
}
}
assert!(found);
assert!(
found,
"routes: {:?}, dst_peer_id: {}, ipv4: {}",
routes, dst_peer_id, ipv4
);
}
async fn wait_proxy_route_appear(