refactor route so we can add other router (#15)
This commit is contained in:
@@ -1,8 +1,4 @@
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
net::Ipv4Addr,
|
||||
sync::{atomic::AtomicU8, Arc},
|
||||
};
|
||||
use std::{fmt::Debug, net::Ipv4Addr, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::{StreamExt, TryFutureExt};
|
||||
@@ -32,6 +28,7 @@ use crate::{
|
||||
|
||||
use super::{
|
||||
peer_map::PeerMap,
|
||||
peer_rip_route::BasicRoute,
|
||||
peer_rpc::PeerRpcManager,
|
||||
route_trait::{ArcRoute, Route},
|
||||
PeerId,
|
||||
@@ -43,8 +40,6 @@ struct RpcTransport {
|
||||
|
||||
packet_recv: Mutex<UnboundedReceiver<Bytes>>,
|
||||
peer_rpc_tspt_sender: UnboundedSender<Bytes>,
|
||||
|
||||
route: Arc<Mutex<Option<ArcRoute>>>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -54,14 +49,8 @@ impl PeerRpcManagerTransport for RpcTransport {
|
||||
}
|
||||
|
||||
async fn send(&self, msg: Bytes, dst_peer_id: &uuid::Uuid) -> Result<(), Error> {
|
||||
let route = self.route.lock().await;
|
||||
if route.is_none() {
|
||||
log::error!("no route info when send rpc msg");
|
||||
return Err(Error::RouteError("No route info".to_string()));
|
||||
}
|
||||
|
||||
self.peers
|
||||
.send_msg(msg, dst_peer_id, route.as_ref().unwrap().clone())
|
||||
.send_msg(msg, dst_peer_id)
|
||||
.map_err(|e| e.into())
|
||||
.await
|
||||
}
|
||||
@@ -104,14 +93,14 @@ pub struct PeerManager {
|
||||
packet_recv: Arc<Mutex<Option<mpsc::Receiver<Bytes>>>>,
|
||||
|
||||
peers: Arc<PeerMap>,
|
||||
route: Arc<Mutex<Option<ArcRoute>>>,
|
||||
cur_route_id: AtomicU8,
|
||||
|
||||
peer_rpc_mgr: Arc<PeerRpcManager>,
|
||||
peer_rpc_tspt: Arc<RpcTransport>,
|
||||
|
||||
peer_packet_process_pipeline: Arc<RwLock<Vec<BoxPeerPacketFilter>>>,
|
||||
nic_packet_process_pipeline: Arc<RwLock<Vec<BoxNicPacketFilter>>>,
|
||||
|
||||
basic_route: Arc<BasicRoute>,
|
||||
}
|
||||
|
||||
impl Debug for PeerManager {
|
||||
@@ -120,7 +109,6 @@ impl Debug for PeerManager {
|
||||
.field("my_node_id", &self.my_node_id)
|
||||
.field("instance_name", &self.global_ctx.inst_name)
|
||||
.field("net_ns", &self.global_ctx.net_ns.name())
|
||||
.field("cur_route_id", &self.cur_route_id)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -137,9 +125,10 @@ impl PeerManager {
|
||||
peers: peers.clone(),
|
||||
packet_recv: Mutex::new(peer_rpc_tspt_recv),
|
||||
peer_rpc_tspt_sender,
|
||||
route: Arc::new(Mutex::new(None)),
|
||||
});
|
||||
|
||||
let basic_route = Arc::new(BasicRoute::new(global_ctx.get_id(), global_ctx.clone()));
|
||||
|
||||
PeerManager {
|
||||
my_node_id: global_ctx.get_id(),
|
||||
global_ctx,
|
||||
@@ -150,14 +139,14 @@ impl PeerManager {
|
||||
packet_recv: Arc::new(Mutex::new(Some(packet_recv))),
|
||||
|
||||
peers: peers.clone(),
|
||||
route: Arc::new(Mutex::new(None)),
|
||||
cur_route_id: AtomicU8::new(0),
|
||||
|
||||
peer_rpc_mgr: Arc::new(PeerRpcManager::new(rpc_tspt.clone())),
|
||||
peer_rpc_tspt: rpc_tspt,
|
||||
|
||||
peer_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())),
|
||||
nic_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())),
|
||||
|
||||
basic_route,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,7 +185,6 @@ impl PeerManager {
|
||||
let mut recv = ReceiverStream::new(self.packet_recv.lock().await.take().unwrap());
|
||||
let my_node_id = self.my_node_id;
|
||||
let peers = self.peers.clone();
|
||||
let arc_route = self.route.clone();
|
||||
let pipe_line = self.peer_packet_process_pipeline.clone();
|
||||
self.tasks.lock().await.spawn(async move {
|
||||
log::trace!("start_peer_recv");
|
||||
@@ -206,22 +194,12 @@ impl PeerManager {
|
||||
let from_peer_uuid = packet.from_peer.to_uuid();
|
||||
let to_peer_uuid = packet.to_peer.as_ref().unwrap().to_uuid();
|
||||
if to_peer_uuid != my_node_id {
|
||||
let locked_arc_route = arc_route.lock().await;
|
||||
if locked_arc_route.is_none() {
|
||||
log::error!("no route info after recv a packet");
|
||||
continue;
|
||||
}
|
||||
|
||||
let route = locked_arc_route.as_ref().unwrap().clone();
|
||||
drop(locked_arc_route);
|
||||
log::trace!(
|
||||
"need forward: to_peer_uuid: {:?}, my_uuid: {:?}",
|
||||
to_peer_uuid,
|
||||
my_node_id
|
||||
);
|
||||
let ret = peers
|
||||
.send_msg(ret.clone(), &to_peer_uuid, route.clone())
|
||||
.await;
|
||||
let ret = peers.send_msg(ret.clone(), &to_peer_uuid).await;
|
||||
if ret.is_err() {
|
||||
log::error!(
|
||||
"forward packet error: {:?}, dst: {:?}, from: {:?}",
|
||||
@@ -295,46 +273,9 @@ impl PeerManager {
|
||||
}))
|
||||
.await;
|
||||
|
||||
// for peer manager router packet
|
||||
struct RoutePacketProcessor {
|
||||
route: Arc<Mutex<Option<ArcRoute>>>,
|
||||
}
|
||||
#[async_trait::async_trait]
|
||||
impl PeerPacketFilter for RoutePacketProcessor {
|
||||
async fn try_process_packet_from_peer(
|
||||
&self,
|
||||
packet: &packet::ArchivedPacket,
|
||||
data: &Bytes,
|
||||
) -> Option<()> {
|
||||
if let ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::RoutePacket(
|
||||
route_packet,
|
||||
)) = &packet.body
|
||||
{
|
||||
let r = self.route.lock().await;
|
||||
match r.as_ref() {
|
||||
Some(x) => {
|
||||
let x = x.clone();
|
||||
drop(r);
|
||||
x.handle_route_packet(
|
||||
packet.from_peer.to_uuid(),
|
||||
extract_bytes_from_archived_vec(&data, &route_packet.body),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
None => {
|
||||
log::error!("no route info when handle route packet");
|
||||
}
|
||||
}
|
||||
Some(())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
self.add_packet_process_pipeline(Box::new(RoutePacketProcessor {
|
||||
route: self.route.clone(),
|
||||
}))
|
||||
.await;
|
||||
// for route
|
||||
self.add_packet_process_pipeline(Box::new(self.basic_route.clone()))
|
||||
.await;
|
||||
|
||||
// for peer rpc packet
|
||||
struct PeerRpcPacketProcessor {
|
||||
@@ -364,7 +305,7 @@ impl PeerManager {
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn set_route<T>(&self, route: T)
|
||||
pub async fn add_route<T>(&self, route: T)
|
||||
where
|
||||
T: Route + Send + Sync + 'static,
|
||||
{
|
||||
@@ -400,7 +341,7 @@ impl PeerManager {
|
||||
}
|
||||
|
||||
let my_node_id = self.my_node_id;
|
||||
let route_id = route
|
||||
let _route_id = route
|
||||
.open(Box::new(Interface {
|
||||
my_node_id,
|
||||
peers: self.peers.clone(),
|
||||
@@ -408,28 +349,12 @@ impl PeerManager {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
self.cur_route_id
|
||||
.store(route_id, std::sync::atomic::Ordering::Relaxed);
|
||||
let arc_route: ArcRoute = Arc::new(Box::new(route));
|
||||
|
||||
self.route.lock().await.replace(arc_route.clone());
|
||||
|
||||
self.peer_rpc_tspt
|
||||
.route
|
||||
.lock()
|
||||
.await
|
||||
.replace(arc_route.clone());
|
||||
self.peers.add_route(arc_route).await;
|
||||
}
|
||||
|
||||
pub async fn list_routes(&self) -> Vec<easytier_rpc::Route> {
|
||||
let route_info = self.route.lock().await;
|
||||
if route_info.is_none() {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let route = route_info.as_ref().unwrap().clone();
|
||||
drop(route_info);
|
||||
route.list_routes().await
|
||||
self.basic_route.list_routes().await
|
||||
}
|
||||
|
||||
async fn run_nic_packet_process_pipeline(&self, mut data: BytesMut) -> BytesMut {
|
||||
@@ -440,45 +365,33 @@ impl PeerManager {
|
||||
}
|
||||
|
||||
pub async fn send_msg(&self, msg: Bytes, dst_peer_id: &PeerId) -> Result<(), Error> {
|
||||
self.peer_rpc_tspt.send(msg, dst_peer_id).await
|
||||
self.peers.send_msg(msg, dst_peer_id).await
|
||||
}
|
||||
|
||||
pub async fn send_msg_ipv4(&self, msg: BytesMut, ipv4_addr: Ipv4Addr) -> Result<(), Error> {
|
||||
let route_info = self.route.lock().await;
|
||||
if route_info.is_none() {
|
||||
log::error!("no route info");
|
||||
return Err(Error::RouteError("No route info".to_string()));
|
||||
}
|
||||
|
||||
let route = route_info.as_ref().unwrap().clone();
|
||||
drop(route_info);
|
||||
|
||||
log::trace!(
|
||||
"do send_msg in peer manager, msg: {:?}, ipv4_addr: {}",
|
||||
msg,
|
||||
ipv4_addr
|
||||
);
|
||||
|
||||
match route.get_peer_id_by_ipv4(&ipv4_addr).await {
|
||||
Some(peer_id) => {
|
||||
let msg = self.run_nic_packet_process_pipeline(msg).await;
|
||||
self.peers
|
||||
.send_msg(
|
||||
packet::Packet::new_data_packet(self.my_node_id, peer_id, &msg).into(),
|
||||
&peer_id,
|
||||
route.clone(),
|
||||
)
|
||||
.await?;
|
||||
log::trace!(
|
||||
"do send_msg in peer manager done, dst_peer_id: {:?}",
|
||||
peer_id
|
||||
);
|
||||
}
|
||||
None => {
|
||||
log::trace!("no peer id for ipv4: {}", ipv4_addr);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
let Some(peer_id) = self.peers.get_peer_id_by_ipv4(&ipv4_addr).await else {
|
||||
log::trace!("no peer id for ipv4: {}", ipv4_addr);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let msg = self.run_nic_packet_process_pipeline(msg).await;
|
||||
self.peers
|
||||
.send_msg(
|
||||
packet::Packet::new_data_packet(self.my_node_id, peer_id, &msg).into(),
|
||||
&peer_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
log::trace!(
|
||||
"do send_msg in peer manager done, dst_peer_id: {:?}",
|
||||
peer_id
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -506,6 +419,8 @@ impl PeerManager {
|
||||
}
|
||||
|
||||
pub async fn run(&self) -> Result<(), Error> {
|
||||
self.add_route(self.basic_route.clone()).await;
|
||||
|
||||
self.init_packet_process_pipeline().await;
|
||||
self.start_peer_recv().await;
|
||||
self.peer_rpc_mgr.run();
|
||||
|
||||
Reference in New Issue
Block a user