2024-09-18 21:55:28 +08:00
|
|
|
include!(concat!(env!("OUT_DIR"), "/tests.rs"));
|
|
|
|
|
|
|
|
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
|
|
|
|
|
|
use futures::StreamExt as _;
|
|
|
|
|
use tokio::task::JoinSet;
|
|
|
|
|
|
|
|
|
|
use super::rpc_impl::RpcController;
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
pub struct GreetingService {
|
|
|
|
|
pub delay_ms: u64,
|
|
|
|
|
pub prefix: String,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[async_trait::async_trait]
|
|
|
|
|
impl Greeting for GreetingService {
|
|
|
|
|
type Controller = RpcController;
|
|
|
|
|
async fn say_hello(
|
|
|
|
|
&self,
|
|
|
|
|
_ctrl: Self::Controller,
|
|
|
|
|
input: SayHelloRequest,
|
|
|
|
|
) -> crate::proto::rpc_types::error::Result<SayHelloResponse> {
|
|
|
|
|
let resp = SayHelloResponse {
|
|
|
|
|
greeting: format!("{} {}!", self.prefix, input.name),
|
|
|
|
|
};
|
|
|
|
|
tokio::time::sleep(std::time::Duration::from_millis(self.delay_ms)).await;
|
|
|
|
|
Ok(resp)
|
|
|
|
|
}
|
|
|
|
|
/// Generates a "goodbye" greeting based on the supplied info.
|
|
|
|
|
async fn say_goodbye(
|
|
|
|
|
&self,
|
|
|
|
|
_ctrl: Self::Controller,
|
|
|
|
|
input: SayGoodbyeRequest,
|
|
|
|
|
) -> crate::proto::rpc_types::error::Result<SayGoodbyeResponse> {
|
|
|
|
|
let resp = SayGoodbyeResponse {
|
|
|
|
|
greeting: format!("Goodbye, {}!", input.name),
|
|
|
|
|
};
|
|
|
|
|
tokio::time::sleep(std::time::Duration::from_millis(self.delay_ms)).await;
|
|
|
|
|
Ok(resp)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
use crate::proto::rpc_impl::client::Client;
|
|
|
|
|
use crate::proto::rpc_impl::server::Server;
|
|
|
|
|
|
|
|
|
|
struct TestContext {
|
|
|
|
|
client: Client,
|
|
|
|
|
server: Server,
|
|
|
|
|
tasks: Arc<Mutex<JoinSet<()>>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl TestContext {
|
|
|
|
|
fn new() -> Self {
|
|
|
|
|
let rpc_server = Server::new();
|
|
|
|
|
rpc_server.run();
|
|
|
|
|
|
|
|
|
|
let client = Client::new();
|
|
|
|
|
client.run();
|
|
|
|
|
|
|
|
|
|
let tasks = Arc::new(Mutex::new(JoinSet::new()));
|
|
|
|
|
let (mut rx, tx) = (
|
|
|
|
|
rpc_server.get_transport_stream(),
|
|
|
|
|
client.get_transport_sink(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
tasks.lock().unwrap().spawn(async move {
|
|
|
|
|
while let Some(Ok(packet)) = rx.next().await {
|
|
|
|
|
if let Err(err) = tx.send(packet).await {
|
|
|
|
|
println!("{:?}", err);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let (mut rx, tx) = (
|
|
|
|
|
client.get_transport_stream(),
|
|
|
|
|
rpc_server.get_transport_sink(),
|
|
|
|
|
);
|
|
|
|
|
tasks.lock().unwrap().spawn(async move {
|
|
|
|
|
while let Some(Ok(packet)) = rx.next().await {
|
|
|
|
|
if let Err(err) = tx.send(packet).await {
|
|
|
|
|
println!("{:?}", err);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
Self {
|
|
|
|
|
client,
|
|
|
|
|
server: rpc_server,
|
|
|
|
|
tasks,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn random_string(len: usize) -> String {
|
|
|
|
|
use rand::distributions::Alphanumeric;
|
|
|
|
|
use rand::Rng;
|
|
|
|
|
let mut rng = rand::thread_rng();
|
|
|
|
|
let s: Vec<u8> = std::iter::repeat(())
|
|
|
|
|
.map(|()| rng.sample(Alphanumeric))
|
|
|
|
|
.take(len)
|
|
|
|
|
.collect();
|
|
|
|
|
String::from_utf8(s).unwrap()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn rpc_basic_test() {
|
|
|
|
|
let ctx = TestContext::new();
|
|
|
|
|
|
|
|
|
|
let server = GreetingServer::new(GreetingService {
|
|
|
|
|
delay_ms: 0,
|
|
|
|
|
prefix: "Hello".to_string(),
|
|
|
|
|
});
|
|
|
|
|
ctx.server.registry().register(server, "");
|
|
|
|
|
|
|
|
|
|
let out = ctx
|
|
|
|
|
.client
|
|
|
|
|
.scoped_client::<GreetingClientFactory<RpcController>>(1, 1, "".to_string());
|
|
|
|
|
|
|
|
|
|
// small size req and resp
|
|
|
|
|
|
2024-10-06 22:49:18 +08:00
|
|
|
let ctrl = RpcController::default();
|
2024-09-18 21:55:28 +08:00
|
|
|
let input = SayHelloRequest {
|
|
|
|
|
name: "world".to_string(),
|
|
|
|
|
};
|
|
|
|
|
let ret = out.say_hello(ctrl, input).await;
|
|
|
|
|
assert_eq!(ret.unwrap().greeting, "Hello world!");
|
|
|
|
|
|
2024-10-06 22:49:18 +08:00
|
|
|
let ctrl = RpcController::default();
|
2024-09-18 21:55:28 +08:00
|
|
|
let input = SayGoodbyeRequest {
|
|
|
|
|
name: "world".to_string(),
|
|
|
|
|
};
|
|
|
|
|
let ret = out.say_goodbye(ctrl, input).await;
|
|
|
|
|
assert_eq!(ret.unwrap().greeting, "Goodbye, world!");
|
|
|
|
|
|
|
|
|
|
// large size req and resp
|
2024-10-06 22:49:18 +08:00
|
|
|
let ctrl = RpcController::default();
|
2024-09-18 21:55:28 +08:00
|
|
|
let name = random_string(20 * 1024 * 1024);
|
|
|
|
|
let input = SayGoodbyeRequest { name: name.clone() };
|
|
|
|
|
let ret = out.say_goodbye(ctrl, input).await;
|
|
|
|
|
assert_eq!(ret.unwrap().greeting, format!("Goodbye, {}!", name));
|
|
|
|
|
|
|
|
|
|
assert_eq!(0, ctx.client.inflight_count());
|
|
|
|
|
assert_eq!(0, ctx.server.inflight_count());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn rpc_timeout_test() {
|
|
|
|
|
let ctx = TestContext::new();
|
|
|
|
|
|
|
|
|
|
let server = GreetingServer::new(GreetingService {
|
|
|
|
|
delay_ms: 10000,
|
|
|
|
|
prefix: "Hello".to_string(),
|
|
|
|
|
});
|
|
|
|
|
ctx.server.registry().register(server, "test");
|
|
|
|
|
|
|
|
|
|
let out = ctx
|
|
|
|
|
.client
|
|
|
|
|
.scoped_client::<GreetingClientFactory<RpcController>>(1, 1, "test".to_string());
|
|
|
|
|
|
2024-10-06 22:49:18 +08:00
|
|
|
let ctrl = RpcController::default();
|
2024-09-18 21:55:28 +08:00
|
|
|
let input = SayHelloRequest {
|
|
|
|
|
name: "world".to_string(),
|
|
|
|
|
};
|
|
|
|
|
let ret = out.say_hello(ctrl, input).await;
|
|
|
|
|
assert!(ret.is_err());
|
|
|
|
|
assert!(matches!(
|
|
|
|
|
ret.unwrap_err(),
|
|
|
|
|
crate::proto::rpc_types::error::Error::Timeout(_)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
assert_eq!(0, ctx.client.inflight_count());
|
|
|
|
|
assert_eq!(0, ctx.server.inflight_count());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn standalone_rpc_test() {
|
|
|
|
|
use crate::proto::rpc_impl::standalone::{StandAloneClient, StandAloneServer};
|
|
|
|
|
use crate::tunnel::tcp::{TcpTunnelConnector, TcpTunnelListener};
|
|
|
|
|
|
|
|
|
|
let mut server = StandAloneServer::new(TcpTunnelListener::new(
|
|
|
|
|
"tcp://0.0.0.0:33455".parse().unwrap(),
|
|
|
|
|
));
|
|
|
|
|
let service = GreetingServer::new(GreetingService {
|
|
|
|
|
delay_ms: 0,
|
|
|
|
|
prefix: "Hello".to_string(),
|
|
|
|
|
});
|
|
|
|
|
server.registry().register(service, "test");
|
|
|
|
|
server.serve().await.unwrap();
|
|
|
|
|
|
|
|
|
|
let mut client = StandAloneClient::new(TcpTunnelConnector::new(
|
|
|
|
|
"tcp://127.0.0.1:33455".parse().unwrap(),
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
let out = client
|
|
|
|
|
.scoped_client::<GreetingClientFactory<RpcController>>("test".to_string())
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
2024-10-06 22:49:18 +08:00
|
|
|
let ctrl = RpcController::default();
|
2024-09-18 21:55:28 +08:00
|
|
|
let input = SayHelloRequest {
|
|
|
|
|
name: "world".to_string(),
|
|
|
|
|
};
|
|
|
|
|
let ret = out.say_hello(ctrl, input).await;
|
|
|
|
|
assert_eq!(ret.unwrap().greeting, "Hello world!");
|
|
|
|
|
|
|
|
|
|
let out = client
|
|
|
|
|
.scoped_client::<GreetingClientFactory<RpcController>>("test".to_string())
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
2024-10-06 22:49:18 +08:00
|
|
|
let ctrl = RpcController::default();
|
2024-09-18 21:55:28 +08:00
|
|
|
let input = SayGoodbyeRequest {
|
|
|
|
|
name: "world".to_string(),
|
|
|
|
|
};
|
|
|
|
|
let ret = out.say_goodbye(ctrl, input).await;
|
|
|
|
|
assert_eq!(ret.unwrap().greeting, "Goodbye, world!");
|
|
|
|
|
|
|
|
|
|
drop(client);
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
|
|
|
assert_eq!(0, server.inflight_server());
|
|
|
|
|
}
|