Files
Easytier_lkddi/easytier/src/peers/peer_rpc.rs
T

601 lines
22 KiB
Rust
Raw Normal View History

2024-03-21 23:33:19 +08:00
use std::sync::{atomic::AtomicU32, Arc};
2023-09-23 01:53:45 +00:00
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
2024-04-24 23:12:46 +08:00
use prost::Message;
2023-09-23 01:53:45 +00:00
use tarpc::{server::Channel, transport::channel::UnboundedChannel};
use tokio::{
sync::mpsc::{self, UnboundedSender},
task::JoinSet,
};
2024-04-24 23:12:46 +08:00
2023-09-23 01:53:45 +00:00
use tracing::Instrument;
2024-03-13 00:15:22 +08:00
use crate::{
common::{error::Error, PeerId},
2024-04-24 23:12:46 +08:00
rpc::TaRpcPacket,
tunnel::packet_def::{PacketType, ZCPacket},
2024-03-13 00:15:22 +08:00
};
2023-09-23 01:53:45 +00:00
type PeerRpcServiceId = u32;
2024-03-21 23:33:19 +08:00
type PeerRpcTransactId = u32;
2023-09-23 01:53:45 +00:00
#[async_trait::async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait PeerRpcManagerTransport: Send + Sync + 'static {
2024-03-13 00:15:22 +08:00
fn my_peer_id(&self) -> PeerId;
2024-04-24 23:12:46 +08:00
async fn send(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error>;
async fn recv(&self) -> Result<ZCPacket, Error>;
2023-09-23 01:53:45 +00:00
}
2024-04-24 23:12:46 +08:00
type PacketSender = UnboundedSender<ZCPacket>;
2023-09-23 01:53:45 +00:00
struct PeerRpcEndPoint {
2024-03-13 00:15:22 +08:00
peer_id: PeerId,
2023-09-23 01:53:45 +00:00
packet_sender: PacketSender,
tasks: JoinSet<()>,
}
2024-03-13 00:15:22 +08:00
type PeerRpcEndPointCreator = Box<dyn Fn(PeerId) -> PeerRpcEndPoint + Send + Sync + 'static>;
2023-09-23 01:53:45 +00:00
#[derive(Hash, Eq, PartialEq, Clone)]
2024-03-21 23:33:19 +08:00
struct PeerRpcClientCtxKey(PeerId, PeerRpcServiceId, PeerRpcTransactId);
2023-09-23 01:53:45 +00:00
// handle rpc request from one peer
pub struct PeerRpcManager {
service_map: Arc<DashMap<PeerRpcServiceId, PacketSender>>,
tasks: JoinSet<()>,
tspt: Arc<Box<dyn PeerRpcManagerTransport>>,
service_registry: Arc<DashMap<PeerRpcServiceId, PeerRpcEndPointCreator>>,
2024-03-13 00:15:22 +08:00
peer_rpc_endpoints: Arc<DashMap<(PeerId, PeerRpcServiceId), PeerRpcEndPoint>>,
2023-09-23 01:53:45 +00:00
client_resp_receivers: Arc<DashMap<PeerRpcClientCtxKey, PacketSender>>,
2024-03-21 23:33:19 +08:00
transact_id: AtomicU32,
2023-09-23 01:53:45 +00:00
}
impl std::fmt::Debug for PeerRpcManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PeerRpcManager")
.field("node_id", &self.tspt.my_peer_id())
.finish()
}
}
impl PeerRpcManager {
pub fn new(tspt: impl PeerRpcManagerTransport) -> Self {
Self {
service_map: Arc::new(DashMap::new()),
tasks: JoinSet::new(),
tspt: Arc::new(Box::new(tspt)),
service_registry: Arc::new(DashMap::new()),
peer_rpc_endpoints: Arc::new(DashMap::new()),
client_resp_receivers: Arc::new(DashMap::new()),
2024-03-21 23:33:19 +08:00
transact_id: AtomicU32::new(0),
2023-09-23 01:53:45 +00:00
}
}
pub fn run_service<S, Req>(self: &Self, service_id: PeerRpcServiceId, s: S) -> ()
where
S: tarpc::server::Serve<Req> + Clone + Send + Sync + 'static,
Req: Send + 'static + serde::Serialize + for<'a> serde::Deserialize<'a>,
S::Resp:
Send + std::fmt::Debug + 'static + serde::Serialize + for<'a> serde::Deserialize<'a>,
S::Fut: Send + 'static,
{
let tspt = self.tspt.clone();
2024-03-13 00:15:22 +08:00
let creator = Box::new(move |peer_id: PeerId| {
2023-09-23 01:53:45 +00:00
let mut tasks = JoinSet::new();
2024-04-24 23:12:46 +08:00
let (packet_sender, mut packet_receiver) = mpsc::unbounded_channel();
2023-09-23 01:53:45 +00:00
let (mut client_transport, server_transport) = tarpc::transport::channel::unbounded();
let server = tarpc::server::BaseChannel::with_defaults(server_transport);
let my_peer_id_clone = tspt.my_peer_id();
let peer_id_clone = peer_id.clone();
let o = server.execute(s.clone());
tasks.spawn(o);
let tspt = tspt.clone();
tasks.spawn(async move {
2024-03-13 00:15:22 +08:00
let mut cur_req_peer_id = None;
2024-03-21 23:33:19 +08:00
let mut cur_transact_id = 0;
2023-09-23 01:53:45 +00:00
loop {
tokio::select! {
Some(resp) = client_transport.next() => {
2024-03-18 17:14:43 +08:00
let Some(cur_req_peer_id) = cur_req_peer_id.take() else {
tracing::error!("[PEER RPC MGR] cur_req_peer_id is none, ignore this resp");
continue;
};
2024-04-24 23:12:46 +08:00
tracing::debug!(resp = ?resp, "server recv packet from service provider");
2023-09-23 01:53:45 +00:00
if resp.is_err() {
tracing::warn!(err = ?resp.err(),
"[PEER RPC MGR] client_transport in server side got channel error, ignore it.");
continue;
}
let resp = resp.unwrap();
2024-03-18 17:14:43 +08:00
let serialized_resp = postcard::to_allocvec(&resp);
2023-09-23 01:53:45 +00:00
if serialized_resp.is_err() {
tracing::error!(error = ?serialized_resp.err(), "serialize resp failed");
continue;
}
2024-04-24 23:12:46 +08:00
let msg = Self::build_rpc_packet(
2023-09-23 01:53:45 +00:00
tspt.my_peer_id(),
2024-03-18 17:14:43 +08:00
cur_req_peer_id,
2023-09-23 01:53:45 +00:00
service_id,
2024-03-21 23:33:19 +08:00
cur_transact_id,
2023-09-23 01:53:45 +00:00
false,
serialized_resp.unwrap(),
);
2024-04-24 23:12:46 +08:00
if let Err(e) = tspt.send(msg, peer_id).await {
2023-09-23 01:53:45 +00:00
tracing::error!(error = ?e, peer_id = ?peer_id, service_id = ?service_id, "send resp to peer failed");
}
}
Some(packet) = packet_receiver.recv() => {
let info = Self::parse_rpc_packet(&packet);
2024-04-24 23:12:46 +08:00
tracing::debug!(?info, "server recv packet from peer");
2023-09-23 01:53:45 +00:00
if let Err(e) = info {
tracing::error!(error = ?e, packet = ?packet, "parse rpc packet failed");
continue;
}
let info = info.unwrap();
2024-02-26 21:04:33 +08:00
if info.from_peer != peer_id {
tracing::warn!("recv packet from peer, but peer_id not match, ignore it");
continue;
}
2024-03-18 17:14:43 +08:00
if cur_req_peer_id.is_some() {
tracing::warn!("cur_req_peer_id is not none, ignore this packet");
continue;
}
2023-09-23 01:53:45 +00:00
assert_eq!(info.service_id, service_id);
2024-04-24 23:12:46 +08:00
cur_req_peer_id = Some(info.from_peer);
2024-03-21 23:33:19 +08:00
cur_transact_id = info.transact_id;
2023-09-23 01:53:45 +00:00
tracing::trace!("recv packet from peer, packet: {:?}", packet);
2024-03-18 17:14:43 +08:00
let decoded_ret = postcard::from_bytes(&info.content.as_slice());
2023-09-23 01:53:45 +00:00
if let Err(e) = decoded_ret {
tracing::error!(error = ?e, "decode rpc packet failed");
continue;
}
let decoded: tarpc::ClientMessage<Req> = decoded_ret.unwrap();
if let Err(e) = client_transport.send(decoded).await {
tracing::error!(error = ?e, "send to req to client transport failed");
}
}
else => {
tracing::warn!("[PEER RPC MGR] service runner destroy, peer_id: {}, service_id: {}", peer_id, service_id);
}
}
}
}.instrument(tracing::info_span!("service_runner", my_id = ?my_peer_id_clone, peer_id = ?peer_id_clone, service_id = ?service_id)));
tracing::info!(
"[PEER RPC MGR] create new service endpoint for peer {}, service {}",
peer_id,
service_id
);
return PeerRpcEndPoint {
peer_id,
packet_sender,
tasks,
};
// let resp = client_transport.next().await;
});
if let Some(_) = self.service_registry.insert(service_id, creator) {
panic!(
"[PEER RPC MGR] service {} is already registered",
service_id
);
}
log::info!(
"[PEER RPC MGR] register service {} succeed, my_node_id {}",
service_id,
self.tspt.my_peer_id()
)
}
2024-04-24 23:12:46 +08:00
fn parse_rpc_packet(packet: &ZCPacket) -> Result<TaRpcPacket, Error> {
let payload = packet.payload();
TaRpcPacket::decode(payload).map_err(|e| Error::MessageDecodeError(e.to_string()))
}
fn build_rpc_packet(
from_peer: PeerId,
to_peer: PeerId,
service_id: PeerRpcServiceId,
transact_id: PeerRpcTransactId,
is_req: bool,
content: Vec<u8>,
) -> ZCPacket {
let packet = TaRpcPacket {
from_peer,
to_peer,
service_id,
transact_id,
is_req,
content,
};
let mut buf = Vec::new();
packet.encode(&mut buf).unwrap();
let mut zc_packet = ZCPacket::new_with_payload(&buf);
zc_packet.fill_peer_manager_hdr(from_peer, to_peer, PacketType::TaRpc as u8);
zc_packet
2023-09-23 01:53:45 +00:00
}
pub fn run(&self) {
let tspt = self.tspt.clone();
let service_registry = self.service_registry.clone();
let peer_rpc_endpoints = self.peer_rpc_endpoints.clone();
let client_resp_receivers = self.client_resp_receivers.clone();
tokio::spawn(async move {
loop {
2024-03-06 20:59:17 +08:00
let Ok(o) = tspt.recv().await else {
tracing::warn!("peer rpc transport read aborted, exiting");
break;
};
2024-04-24 23:12:46 +08:00
let info = Self::parse_rpc_packet(&o).unwrap();
tracing::debug!(?info, "recv rpc packet from peer");
2023-09-23 01:53:45 +00:00
if info.is_req {
if !service_registry.contains_key(&info.service_id) {
log::warn!(
"service {} not found, my_node_id: {}",
info.service_id,
tspt.my_peer_id()
);
continue;
}
let endpoint = peer_rpc_endpoints
2024-02-26 21:04:33 +08:00
.entry((info.from_peer, info.service_id))
2023-09-23 01:53:45 +00:00
.or_insert_with(|| {
service_registry.get(&info.service_id).unwrap()(info.from_peer)
});
2024-04-24 23:12:46 +08:00
endpoint.packet_sender.send(o).unwrap();
2023-09-23 01:53:45 +00:00
} else {
2024-03-21 23:33:19 +08:00
if let Some(a) = client_resp_receivers.get(&PeerRpcClientCtxKey(
info.from_peer,
info.service_id,
info.transact_id,
)) {
2024-04-24 23:12:46 +08:00
tracing::trace!("recv resp: {:?}", info);
if let Err(e) = a.send(o) {
2023-09-23 01:53:45 +00:00
tracing::error!(error = ?e, "send resp to client failed");
}
} else {
log::warn!("client resp receiver not found, info: {:?}", info);
}
}
}
});
}
#[tracing::instrument(skip(f))]
pub async fn do_client_rpc_scoped<CM, Req, RpcRet, Fut>(
&self,
service_id: PeerRpcServiceId,
2024-03-13 00:15:22 +08:00
dst_peer_id: PeerId,
2023-09-23 01:53:45 +00:00
f: impl FnOnce(UnboundedChannel<CM, Req>) -> Fut,
) -> RpcRet
where
CM: serde::Serialize + for<'a> serde::Deserialize<'a> + Send + Sync + 'static,
Req: serde::Serialize + for<'a> serde::Deserialize<'a> + Send + Sync + 'static,
Fut: std::future::Future<Output = RpcRet>,
{
let mut tasks = JoinSet::new();
2024-04-24 23:12:46 +08:00
let (packet_sender, mut packet_receiver) = mpsc::unbounded_channel();
2023-09-23 01:53:45 +00:00
let (client_transport, server_transport) =
tarpc::transport::channel::unbounded::<CM, Req>();
let (mut server_s, mut server_r) = server_transport.split();
2024-03-21 23:33:19 +08:00
let transact_id = self
.transact_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2023-09-23 01:53:45 +00:00
let tspt = self.tspt.clone();
tasks.spawn(async move {
while let Some(a) = server_r.next().await {
if a.is_err() {
tracing::error!(error = ?a.err(), "channel error");
continue;
}
2024-03-18 17:14:43 +08:00
let a = postcard::to_allocvec(&a.unwrap());
2023-09-23 01:53:45 +00:00
if a.is_err() {
tracing::error!(error = ?a.err(), "bincode serialize failed");
continue;
}
2024-04-24 23:12:46 +08:00
let packet = Self::build_rpc_packet(
2023-09-23 01:53:45 +00:00
tspt.my_peer_id(),
dst_peer_id,
service_id,
2024-03-21 23:33:19 +08:00
transact_id,
2023-09-23 01:53:45 +00:00
true,
a.unwrap(),
);
2024-04-24 23:12:46 +08:00
tracing::debug!(?packet, "client send rpc packet to peer");
if let Err(e) = tspt.send(packet, dst_peer_id).await {
2023-09-23 01:53:45 +00:00
tracing::error!(error = ?e, dst_peer_id = ?dst_peer_id, "send to peer failed");
}
}
tracing::warn!("[PEER RPC MGR] server trasport read aborted");
});
tasks.spawn(async move {
while let Some(packet) = packet_receiver.recv().await {
tracing::trace!("tunnel recv: {:?}", packet);
2024-04-24 23:12:46 +08:00
let info = Self::parse_rpc_packet(&packet);
2023-09-23 01:53:45 +00:00
if let Err(e) = info {
tracing::error!(error = ?e, "parse rpc packet failed");
continue;
}
2024-04-24 23:12:46 +08:00
tracing::debug!(?info, "client recv rpc packet from peer");
2023-09-23 01:53:45 +00:00
2024-03-18 17:14:43 +08:00
let decoded = postcard::from_bytes(&info.unwrap().content.as_slice());
2023-09-23 01:53:45 +00:00
if let Err(e) = decoded {
tracing::error!(error = ?e, "decode rpc packet failed");
continue;
}
if let Err(e) = server_s.send(decoded.unwrap()).await {
tracing::error!(error = ?e, "send to rpc server channel failed");
}
}
tracing::warn!("[PEER RPC MGR] server packet read aborted");
});
2024-03-21 23:33:19 +08:00
let key = PeerRpcClientCtxKey(dst_peer_id, service_id, transact_id);
2023-09-23 01:53:45 +00:00
let _insert_ret = self
.client_resp_receivers
2024-03-21 23:33:19 +08:00
.insert(key.clone(), packet_sender);
let ret = f(client_transport).await;
2023-09-23 01:53:45 +00:00
2024-03-21 23:33:19 +08:00
self.client_resp_receivers.remove(&key);
ret
2023-09-23 01:53:45 +00:00
}
2024-03-13 00:15:22 +08:00
pub fn my_peer_id(&self) -> PeerId {
2023-09-23 01:53:45 +00:00
self.tspt.my_peer_id()
}
}
#[cfg(test)]
mod tests {
2024-04-24 23:12:46 +08:00
use std::{pin::Pin, sync::Arc};
2023-09-23 01:53:45 +00:00
use futures::{SinkExt, StreamExt};
2024-04-24 23:12:46 +08:00
use tokio::sync::Mutex;
2023-09-23 01:53:45 +00:00
use crate::{
2024-03-13 00:15:22 +08:00
common::{error::Error, new_peer_id, PeerId},
2023-09-23 01:53:45 +00:00
peers::{
peer_rpc::PeerRpcManager,
tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear},
},
2024-04-24 23:12:46 +08:00
tunnel::{
packet_def::ZCPacket, ring::create_ring_tunnel_pair, Tunnel, ZCPacketSink,
ZCPacketStream,
},
2023-09-23 01:53:45 +00:00
};
use super::PeerRpcManagerTransport;
#[tarpc::service]
pub trait TestRpcService {
async fn hello(s: String) -> String;
}
#[derive(Clone)]
struct MockService {
prefix: String,
}
#[tarpc::server]
impl TestRpcService for MockService {
async fn hello(self, _: tarpc::context::Context, s: String) -> String {
format!("{} {}", self.prefix, s)
}
}
#[tokio::test]
async fn peer_rpc_basic_test() {
struct MockTransport {
2024-04-24 23:12:46 +08:00
sink: Arc<Mutex<Pin<Box<dyn ZCPacketSink>>>>,
stream: Arc<Mutex<Pin<Box<dyn ZCPacketStream>>>>,
2024-03-13 00:15:22 +08:00
my_peer_id: PeerId,
2023-09-23 01:53:45 +00:00
}
#[async_trait::async_trait]
impl PeerRpcManagerTransport for MockTransport {
2024-03-13 00:15:22 +08:00
fn my_peer_id(&self) -> PeerId {
2023-09-23 01:53:45 +00:00
self.my_peer_id
}
2024-04-24 23:12:46 +08:00
async fn send(&self, msg: ZCPacket, _dst_peer_id: PeerId) -> Result<(), Error> {
2023-09-23 01:53:45 +00:00
println!("rpc mgr send: {:?}", msg);
2024-04-24 23:12:46 +08:00
self.sink.lock().await.send(msg).await.unwrap();
2023-09-23 01:53:45 +00:00
Ok(())
}
2024-04-24 23:12:46 +08:00
async fn recv(&self) -> Result<ZCPacket, Error> {
let ret = self.stream.lock().await.next().await.unwrap();
2023-09-23 01:53:45 +00:00
println!("rpc mgr recv: {:?}", ret);
2024-04-24 23:12:46 +08:00
return ret.map_err(|e| e.into());
2023-09-23 01:53:45 +00:00
}
}
let (ct, st) = create_ring_tunnel_pair();
2024-04-24 23:12:46 +08:00
let (cts, ctsr) = ct.split();
let (sts, stsr) = st.split();
2023-09-23 01:53:45 +00:00
let server_rpc_mgr = PeerRpcManager::new(MockTransport {
2024-04-24 23:12:46 +08:00
sink: Arc::new(Mutex::new(ctsr)),
stream: Arc::new(Mutex::new(cts)),
2024-03-13 00:15:22 +08:00
my_peer_id: new_peer_id(),
2023-09-23 01:53:45 +00:00
});
server_rpc_mgr.run();
let s = MockService {
prefix: "hello".to_owned(),
};
server_rpc_mgr.run_service(1, s.serve());
let client_rpc_mgr = PeerRpcManager::new(MockTransport {
2024-04-24 23:12:46 +08:00
sink: Arc::new(Mutex::new(stsr)),
stream: Arc::new(Mutex::new(sts)),
2024-03-13 00:15:22 +08:00
my_peer_id: new_peer_id(),
2023-09-23 01:53:45 +00:00
});
client_rpc_mgr.run();
let ret = client_rpc_mgr
.do_client_rpc_scoped(1, server_rpc_mgr.my_peer_id(), |c| async {
let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn();
let ret = c.hello(tarpc::context::current(), "abc".to_owned()).await;
ret
})
.await;
println!("ret: {:?}", ret);
assert_eq!(ret.unwrap(), "hello abc");
}
#[tokio::test]
async fn test_rpc_with_peer_manager() {
let peer_mgr_a = create_mock_peer_manager().await;
let peer_mgr_b = create_mock_peer_manager().await;
2024-02-26 21:04:33 +08:00
let peer_mgr_c = create_mock_peer_manager().await;
2023-09-23 01:53:45 +00:00
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
2024-02-26 21:04:33 +08:00
connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await;
2024-03-18 17:16:33 +08:00
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone())
2023-09-23 01:53:45 +00:00
.await
.unwrap();
2024-03-18 17:16:33 +08:00
wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.clone())
2024-02-26 21:04:33 +08:00
.await
.unwrap();
2023-09-23 01:53:45 +00:00
assert_eq!(peer_mgr_a.get_peer_map().list_peers().await.len(), 1);
assert_eq!(
peer_mgr_a.get_peer_map().list_peers().await[0],
2024-03-13 00:15:22 +08:00
peer_mgr_b.my_peer_id()
2023-09-23 01:53:45 +00:00
);
2024-02-26 21:04:33 +08:00
assert_eq!(peer_mgr_c.get_peer_map().list_peers().await.len(), 1);
assert_eq!(
peer_mgr_c.get_peer_map().list_peers().await[0],
2024-03-13 00:15:22 +08:00
peer_mgr_b.my_peer_id()
2024-02-26 21:04:33 +08:00
);
2023-09-23 01:53:45 +00:00
let s = MockService {
prefix: "hello".to_owned(),
};
peer_mgr_b.get_peer_rpc_mgr().run_service(1, s.serve());
let ip_list = peer_mgr_a
.get_peer_rpc_mgr()
2024-03-13 00:15:22 +08:00
.do_client_rpc_scoped(1, peer_mgr_b.my_peer_id(), |c| async {
2023-09-23 01:53:45 +00:00
let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn();
let ret = c.hello(tarpc::context::current(), "abc".to_owned()).await;
ret
})
.await;
println!("ip_list: {:?}", ip_list);
assert_eq!(ip_list.as_ref().unwrap(), "hello abc");
2024-02-26 21:04:33 +08:00
2024-03-21 23:33:19 +08:00
// call again
let ip_list = peer_mgr_a
.get_peer_rpc_mgr()
.do_client_rpc_scoped(1, peer_mgr_b.my_peer_id(), |c| async {
let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn();
let ret = c.hello(tarpc::context::current(), "abcd".to_owned()).await;
ret
})
.await;
println!("ip_list: {:?}", ip_list);
assert_eq!(ip_list.as_ref().unwrap(), "hello abcd");
2024-02-26 21:04:33 +08:00
let ip_list = peer_mgr_c
.get_peer_rpc_mgr()
2024-03-13 00:15:22 +08:00
.do_client_rpc_scoped(1, peer_mgr_b.my_peer_id(), |c| async {
2024-02-26 21:04:33 +08:00
let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn();
let ret = c.hello(tarpc::context::current(), "bcd".to_owned()).await;
ret
})
.await;
println!("ip_list: {:?}", ip_list);
assert_eq!(ip_list.as_ref().unwrap(), "hello bcd");
2023-09-23 01:53:45 +00:00
}
#[tokio::test]
async fn test_multi_service_with_peer_manager() {
let peer_mgr_a = create_mock_peer_manager().await;
let peer_mgr_b = create_mock_peer_manager().await;
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
2024-03-18 17:16:33 +08:00
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone())
2023-09-23 01:53:45 +00:00
.await
.unwrap();
assert_eq!(peer_mgr_a.get_peer_map().list_peers().await.len(), 1);
assert_eq!(
peer_mgr_a.get_peer_map().list_peers().await[0],
2024-03-13 00:15:22 +08:00
peer_mgr_b.my_peer_id()
2023-09-23 01:53:45 +00:00
);
let s = MockService {
prefix: "hello_a".to_owned(),
};
peer_mgr_b.get_peer_rpc_mgr().run_service(1, s.serve());
let b = MockService {
prefix: "hello_b".to_owned(),
};
peer_mgr_b.get_peer_rpc_mgr().run_service(2, b.serve());
let ip_list = peer_mgr_a
.get_peer_rpc_mgr()
2024-03-13 00:15:22 +08:00
.do_client_rpc_scoped(1, peer_mgr_b.my_peer_id(), |c| async {
2023-09-23 01:53:45 +00:00
let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn();
let ret = c.hello(tarpc::context::current(), "abc".to_owned()).await;
ret
})
.await;
assert_eq!(ip_list.as_ref().unwrap(), "hello_a abc");
let ip_list = peer_mgr_a
.get_peer_rpc_mgr()
2024-03-13 00:15:22 +08:00
.do_client_rpc_scoped(2, peer_mgr_b.my_peer_id(), |c| async {
2023-09-23 01:53:45 +00:00
let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn();
let ret = c.hello(tarpc::context::current(), "abc".to_owned()).await;
ret
})
.await;
assert_eq!(ip_list.as_ref().unwrap(), "hello_b abc");
}
}