fix peer rpc send response error (#19)

This commit is contained in:
Sijie.Sun
2024-02-26 21:04:33 +08:00
committed by GitHub
parent 756d498b90
commit e5b3fb09e6
+28 -2
View File
@@ -146,6 +146,11 @@ impl PeerRpcManager {
} }
let info = info.unwrap(); let info = info.unwrap();
if info.from_peer != peer_id {
tracing::warn!("recv packet from peer, but peer_id not match, ignore it");
continue;
}
assert_eq!(info.service_id, service_id); assert_eq!(info.service_id, service_id);
cur_req_uuid = Some(packet.from_peer.clone().into()); cur_req_uuid = Some(packet.from_peer.clone().into());
@@ -233,7 +238,7 @@ impl PeerRpcManager {
} }
let endpoint = peer_rpc_endpoints let endpoint = peer_rpc_endpoints
.entry((info.to_peer, info.service_id)) .entry((info.from_peer, info.service_id))
.or_insert_with(|| { .or_insert_with(|| {
service_registry.get(&info.service_id).unwrap()(info.from_peer) service_registry.get(&info.service_id).unwrap()(info.from_peer)
}); });
@@ -431,10 +436,15 @@ mod tests {
async fn test_rpc_with_peer_manager() { async fn test_rpc_with_peer_manager() {
let peer_mgr_a = create_mock_peer_manager().await; let peer_mgr_a = create_mock_peer_manager().await;
let peer_mgr_b = create_mock_peer_manager().await; let peer_mgr_b = create_mock_peer_manager().await;
let peer_mgr_c = create_mock_peer_manager().await;
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await;
wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_node_id()) wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_node_id())
.await .await
.unwrap(); .unwrap();
wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_node_id())
.await
.unwrap();
assert_eq!(peer_mgr_a.get_peer_map().list_peers().await.len(), 1); assert_eq!(peer_mgr_a.get_peer_map().list_peers().await.len(), 1);
assert_eq!( assert_eq!(
@@ -442,6 +452,12 @@ mod tests {
peer_mgr_b.my_node_id() peer_mgr_b.my_node_id()
); );
assert_eq!(peer_mgr_c.get_peer_map().list_peers().await.len(), 1);
assert_eq!(
peer_mgr_c.get_peer_map().list_peers().await[0],
peer_mgr_b.my_node_id()
);
let s = MockService { let s = MockService {
prefix: "hello".to_owned(), prefix: "hello".to_owned(),
}; };
@@ -455,9 +471,19 @@ mod tests {
ret ret
}) })
.await; .await;
println!("ip_list: {:?}", ip_list); println!("ip_list: {:?}", ip_list);
assert_eq!(ip_list.as_ref().unwrap(), "hello abc"); assert_eq!(ip_list.as_ref().unwrap(), "hello abc");
let ip_list = peer_mgr_c
.get_peer_rpc_mgr()
.do_client_rpc_scoped(1, peer_mgr_b.my_node_id(), |c| async {
let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn();
let ret = c.hello(tarpc::context::current(), "bcd".to_owned()).await;
ret
})
.await;
println!("ip_list: {:?}", ip_list);
assert_eq!(ip_list.as_ref().unwrap(), "hello bcd");
} }
#[tokio::test] #[tokio::test]