|
|
|
|
@@ -1,5 +1,5 @@
|
|
|
|
|
use std::{
|
|
|
|
|
collections::{BTreeMap, BTreeSet, HashMap},
|
|
|
|
|
collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
|
|
|
|
|
fmt::Debug,
|
|
|
|
|
net::{IpAddr, Ipv4Addr, Ipv6Addr},
|
|
|
|
|
sync::{
|
|
|
|
|
@@ -13,6 +13,8 @@ use arc_swap::ArcSwap;
|
|
|
|
|
use cidr::{IpCidr, Ipv4Cidr, Ipv6Cidr};
|
|
|
|
|
use crossbeam::atomic::AtomicCell;
|
|
|
|
|
use dashmap::DashMap;
|
|
|
|
|
use ordered_hash_map::OrderedHashMap;
|
|
|
|
|
use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock};
|
|
|
|
|
use petgraph::{
|
|
|
|
|
algo::dijkstra,
|
|
|
|
|
graph::{Graph, NodeIndex},
|
|
|
|
|
@@ -41,7 +43,7 @@ use crate::{
|
|
|
|
|
route_foreign_network_infos, route_foreign_network_summary,
|
|
|
|
|
sync_route_info_request::ConnInfo, ForeignNetworkRouteInfoEntry,
|
|
|
|
|
ForeignNetworkRouteInfoKey, OspfRouteRpc, OspfRouteRpcClientFactory,
|
|
|
|
|
OspfRouteRpcServer, PeerIdVersion, RouteForeignNetworkInfos,
|
|
|
|
|
OspfRouteRpcServer, PeerGroupInfo, PeerIdVersion, RouteForeignNetworkInfos,
|
|
|
|
|
RouteForeignNetworkSummary, RoutePeerInfo, RoutePeerInfos, SyncRouteInfoError,
|
|
|
|
|
SyncRouteInfoRequest, SyncRouteInfoResponse,
|
|
|
|
|
},
|
|
|
|
|
@@ -70,6 +72,12 @@ static REMOVE_DEAD_PEER_INFO_AFTER: Duration = Duration::from_secs(3660);
|
|
|
|
|
static AVOID_RELAY_COST: usize = i32::MAX as usize;
|
|
|
|
|
static FORCE_USE_CONN_LIST: AtomicBool = AtomicBool::new(true);
|
|
|
|
|
|
|
|
|
|
// if a peer is unreachable for `REMOVE_UNREACHABLE_PEER_INFO_AFTER` time, we can remove it because
|
|
|
|
|
// 1. all the ospf sessions between two zone are already destroy, new created session will resend the peer info.
|
|
|
|
|
// 2. all the dst_saved_peer_info_version in all sessions already remove the peer info, the peer info will be propagated
|
|
|
|
|
// in another zone when two zone restore the conneciton.
|
|
|
|
|
static REMOVE_UNREACHABLE_PEER_INFO_AFTER: Duration = Duration::from_secs(90);
|
|
|
|
|
|
|
|
|
|
type Version = u32;
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
|
@@ -124,7 +132,9 @@ impl RoutePeerInfo {
|
|
|
|
|
proxy_cidrs: Vec::new(),
|
|
|
|
|
hostname: None,
|
|
|
|
|
udp_stun_info: 0,
|
|
|
|
|
last_update: Some(SystemTime::now().into()),
|
|
|
|
|
// ensure this is updated when the peer_infos/conn_info/foreign_network lock is acquired.
|
|
|
|
|
// else we may assign a older timestamp than iterate time.
|
|
|
|
|
last_update: None,
|
|
|
|
|
version: 0,
|
|
|
|
|
easytier_version: EASYTIER_VERSION.to_string(),
|
|
|
|
|
feature_flag: None,
|
|
|
|
|
@@ -136,13 +146,12 @@ impl RoutePeerInfo {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn update_self(
|
|
|
|
|
&self,
|
|
|
|
|
pub fn new_updated_self(
|
|
|
|
|
my_peer_id: PeerId,
|
|
|
|
|
peer_route_id: u64,
|
|
|
|
|
global_ctx: &ArcGlobalCtx,
|
|
|
|
|
) -> Self {
|
|
|
|
|
let mut new = Self {
|
|
|
|
|
Self {
|
|
|
|
|
peer_id: my_peer_id,
|
|
|
|
|
inst_id: Some(global_ctx.get_id().into()),
|
|
|
|
|
cost: 0,
|
|
|
|
|
@@ -160,9 +169,10 @@ impl RoutePeerInfo {
|
|
|
|
|
.get_stun_info_collector()
|
|
|
|
|
.get_stun_info()
|
|
|
|
|
.udp_nat_type,
|
|
|
|
|
// following fields do not participate in comparison.
|
|
|
|
|
last_update: self.last_update,
|
|
|
|
|
version: self.version,
|
|
|
|
|
|
|
|
|
|
// these two fields should not participate in comparison.
|
|
|
|
|
last_update: None,
|
|
|
|
|
version: 0,
|
|
|
|
|
|
|
|
|
|
easytier_version: EASYTIER_VERSION.to_string(),
|
|
|
|
|
feature_flag: Some(global_ctx.get_feature_flags()),
|
|
|
|
|
@@ -176,22 +186,28 @@ impl RoutePeerInfo {
|
|
|
|
|
ipv6_addr: global_ctx.get_ipv6().map(|x| x.into()),
|
|
|
|
|
|
|
|
|
|
groups: global_ctx.get_acl_groups(my_peer_id),
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn try_update_new_peer_info(old: &RoutePeerInfo, new: &mut RoutePeerInfo) -> bool {
|
|
|
|
|
let need_update_periodically = if let Ok(Ok(d)) =
|
|
|
|
|
SystemTime::try_from(new.last_update.unwrap_or_default()).map(|x| x.elapsed())
|
|
|
|
|
SystemTime::try_from(old.last_update.unwrap_or_default()).map(|x| x.elapsed())
|
|
|
|
|
{
|
|
|
|
|
d > UPDATE_PEER_INFO_PERIOD
|
|
|
|
|
} else {
|
|
|
|
|
true
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if new != *self || need_update_periodically {
|
|
|
|
|
new.last_update = Some(SystemTime::now().into());
|
|
|
|
|
new.version += 1;
|
|
|
|
|
}
|
|
|
|
|
// these two fields should not participate in comparison.
|
|
|
|
|
new.version = old.version;
|
|
|
|
|
new.last_update = old.last_update;
|
|
|
|
|
|
|
|
|
|
new
|
|
|
|
|
if *new != *old || need_update_periodically {
|
|
|
|
|
new.version += 1;
|
|
|
|
|
true
|
|
|
|
|
} else {
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -261,7 +277,7 @@ type Error = SyncRouteInfoError;
|
|
|
|
|
|
|
|
|
|
// constructed with all infos synced from all peers.
|
|
|
|
|
struct SyncedRouteInfo {
|
|
|
|
|
peer_infos: DashMap<PeerId, RoutePeerInfo>,
|
|
|
|
|
peer_infos: RwLock<OrderedHashMap<PeerId, RoutePeerInfo>>,
|
|
|
|
|
// prost doesn't support unknown fields, so we use DynamicMessage to store raw infos and progate them to other peers.
|
|
|
|
|
raw_peer_infos: DashMap<PeerId, DynamicMessage>,
|
|
|
|
|
conn_map: DashMap<PeerId, (BTreeSet<PeerId>, AtomicVersion)>,
|
|
|
|
|
@@ -293,14 +309,13 @@ impl SyncedRouteInfo {
|
|
|
|
|
|
|
|
|
|
fn remove_peer(&self, peer_id: PeerId) {
|
|
|
|
|
tracing::warn!(?peer_id, "remove_peer from synced_route_info");
|
|
|
|
|
self.peer_infos.remove(&peer_id);
|
|
|
|
|
self.peer_infos.write().remove(&peer_id);
|
|
|
|
|
self.raw_peer_infos.remove(&peer_id);
|
|
|
|
|
self.conn_map.remove(&peer_id);
|
|
|
|
|
self.foreign_network.retain(|k, _| k.peer_id != peer_id);
|
|
|
|
|
self.group_trust_map.remove(&peer_id);
|
|
|
|
|
self.group_trust_map_cache.remove(&peer_id);
|
|
|
|
|
|
|
|
|
|
shrink_dashmap(&self.peer_infos, None);
|
|
|
|
|
shrink_dashmap(&self.raw_peer_infos, None);
|
|
|
|
|
shrink_dashmap(&self.conn_map, None);
|
|
|
|
|
shrink_dashmap(&self.foreign_network, None);
|
|
|
|
|
@@ -313,10 +328,16 @@ impl SyncedRouteInfo {
|
|
|
|
|
fn fill_empty_peer_info(&self, peer_ids: &BTreeSet<PeerId>) {
|
|
|
|
|
let mut need_inc_version = false;
|
|
|
|
|
for peer_id in peer_ids {
|
|
|
|
|
self.peer_infos.entry(*peer_id).or_insert_with(|| {
|
|
|
|
|
let guard = self.peer_infos.upgradable_read();
|
|
|
|
|
if !guard.contains_key(peer_id) {
|
|
|
|
|
let mut peer_info = RoutePeerInfo::new();
|
|
|
|
|
let mut guard = RwLockUpgradableReadGuard::upgrade(guard);
|
|
|
|
|
peer_info.last_update = Some(SystemTime::now().into());
|
|
|
|
|
guard.insert(*peer_id, peer_info);
|
|
|
|
|
need_inc_version = true;
|
|
|
|
|
RoutePeerInfo::new()
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
drop(guard);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.conn_map.entry(*peer_id).or_insert_with(|| {
|
|
|
|
|
need_inc_version = true;
|
|
|
|
|
@@ -330,6 +351,7 @@ impl SyncedRouteInfo {
|
|
|
|
|
|
|
|
|
|
fn get_peer_info_version_with_default(&self, peer_id: PeerId) -> Version {
|
|
|
|
|
self.peer_infos
|
|
|
|
|
.read()
|
|
|
|
|
.get(&peer_id)
|
|
|
|
|
.map(|x| x.version)
|
|
|
|
|
.unwrap_or(0)
|
|
|
|
|
@@ -338,8 +360,9 @@ impl SyncedRouteInfo {
|
|
|
|
|
fn get_avoid_relay_data(&self, peer_id: PeerId) -> bool {
|
|
|
|
|
// if avoid relay, just set all outgoing edges to a large value: AVOID_RELAY_COST.
|
|
|
|
|
self.peer_infos
|
|
|
|
|
.read()
|
|
|
|
|
.get(&peer_id)
|
|
|
|
|
.and_then(|x| x.value().feature_flag)
|
|
|
|
|
.and_then(|x| x.feature_flag)
|
|
|
|
|
.map(|x| x.avoid_relay_data)
|
|
|
|
|
.unwrap_or_default()
|
|
|
|
|
}
|
|
|
|
|
@@ -395,7 +418,10 @@ impl SyncedRouteInfo {
|
|
|
|
|
my_peer_route_id,
|
|
|
|
|
dst_peer_id,
|
|
|
|
|
if route_info.peer_id == dst_peer_id {
|
|
|
|
|
self.peer_infos.get(&dst_peer_id).map(|x| x.peer_route_id)
|
|
|
|
|
self.peer_infos
|
|
|
|
|
.read()
|
|
|
|
|
.get(&dst_peer_id)
|
|
|
|
|
.map(|x| x.peer_route_id)
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
},
|
|
|
|
|
@@ -409,26 +435,19 @@ impl SyncedRouteInfo {
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert_eq!(peer_id_raw, route_info.peer_id);
|
|
|
|
|
|
|
|
|
|
let mut guard = self.peer_infos.write();
|
|
|
|
|
// time between peers may not be synchronized, so update last_update to local now.
|
|
|
|
|
// note only last_update with larger version will be updated to local saved peer info.
|
|
|
|
|
route_info.last_update = Some(SystemTime::now().into());
|
|
|
|
|
|
|
|
|
|
self.peer_infos
|
|
|
|
|
.entry(route_info.peer_id)
|
|
|
|
|
.and_modify(|old_entry| {
|
|
|
|
|
if route_info.version > old_entry.version {
|
|
|
|
|
if guard
|
|
|
|
|
.get_mut(&route_info.peer_id)
|
|
|
|
|
.is_none_or(|old| route_info.version > old.version)
|
|
|
|
|
{
|
|
|
|
|
self.raw_peer_infos
|
|
|
|
|
.insert(route_info.peer_id, raw_route_info.clone());
|
|
|
|
|
*old_entry = route_info.clone();
|
|
|
|
|
guard.insert(route_info.peer_id, route_info);
|
|
|
|
|
need_inc_version = true;
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
.or_insert_with(|| {
|
|
|
|
|
need_inc_version = true;
|
|
|
|
|
self.raw_peer_infos
|
|
|
|
|
.insert(route_info.peer_id, raw_route_info.clone());
|
|
|
|
|
route_info.clone()
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
if need_inc_version {
|
|
|
|
|
self.version.inc();
|
|
|
|
|
@@ -535,15 +554,34 @@ impl SyncedRouteInfo {
|
|
|
|
|
my_peer_route_id: u64,
|
|
|
|
|
global_ctx: &ArcGlobalCtx,
|
|
|
|
|
) -> bool {
|
|
|
|
|
let mut old = self.peer_infos.entry(my_peer_id).or_default();
|
|
|
|
|
let new = old.update_self(my_peer_id, my_peer_route_id, global_ctx);
|
|
|
|
|
let new_version = new.version;
|
|
|
|
|
let old_version = old.version;
|
|
|
|
|
*old = new;
|
|
|
|
|
drop(old);
|
|
|
|
|
let mut new = RoutePeerInfo::new_updated_self(my_peer_id, my_peer_route_id, global_ctx);
|
|
|
|
|
let mut guard = self.peer_infos.upgradable_read();
|
|
|
|
|
let old = guard.get(&my_peer_id);
|
|
|
|
|
let new_version = old.map(|x| x.version).unwrap_or(0) + 1;
|
|
|
|
|
let need_insert_new = if let Some(old) = old {
|
|
|
|
|
RoutePeerInfo::try_update_new_peer_info(old, &mut new)
|
|
|
|
|
} else {
|
|
|
|
|
true
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if need_insert_new {
|
|
|
|
|
let acl_groups = if old.map(|x| x.groups != new.groups).unwrap_or(true) {
|
|
|
|
|
Some(new.groups.clone())
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
guard.with_upgraded(|peer_infos| {
|
|
|
|
|
new.last_update = Some(SystemTime::now().into());
|
|
|
|
|
new.version = new_version;
|
|
|
|
|
peer_infos.insert(my_peer_id, new)
|
|
|
|
|
});
|
|
|
|
|
drop(guard);
|
|
|
|
|
|
|
|
|
|
if let Some(acl_groups) = acl_groups {
|
|
|
|
|
self.update_my_group_trusts(my_peer_id, &acl_groups);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if new_version != old_version {
|
|
|
|
|
self.update_my_group_trusts(my_peer_id);
|
|
|
|
|
self.version.inc();
|
|
|
|
|
true
|
|
|
|
|
} else {
|
|
|
|
|
@@ -634,6 +672,28 @@ impl SyncedRouteInfo {
|
|
|
|
|
updated
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get_next_last_sync_succ_timestamp(&self) -> SystemTime {
|
|
|
|
|
let _peer_info_lock = self.peer_infos.read();
|
|
|
|
|
// TODO: add conn and foreign network lock
|
|
|
|
|
|
|
|
|
|
SystemTime::now()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn check_peer_info_last_update_monotonic_increasing(&self) {
|
|
|
|
|
let mut last_update: Option<prost_types::Timestamp> = None;
|
|
|
|
|
for peer_info in self.peer_infos.read().values() {
|
|
|
|
|
if let Some(last_update) = last_update {
|
|
|
|
|
let cur_last_update = peer_info.last_update.unwrap();
|
|
|
|
|
assert!(
|
|
|
|
|
cur_last_update.seconds > last_update.seconds
|
|
|
|
|
|| cur_last_update.nanos >= last_update.nanos,
|
|
|
|
|
"peer info last update not monotonic increasing"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
last_update = peer_info.last_update;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn is_peer_bidirectly_connected(&self, src_peer_id: PeerId, dst_peer_id: PeerId) -> bool {
|
|
|
|
|
self.conn_map
|
|
|
|
|
.get(&src_peer_id)
|
|
|
|
|
@@ -725,13 +785,15 @@ impl SyncedRouteInfo {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn update_my_group_trusts(&self, my_peer_id: PeerId) {
|
|
|
|
|
fn update_my_group_trusts(&self, my_peer_id: PeerId, groups: &[PeerGroupInfo]) {
|
|
|
|
|
let mut my_group_map = HashMap::new();
|
|
|
|
|
let mut my_group_names = Vec::new();
|
|
|
|
|
for group in self.peer_infos.entry(my_peer_id).or_default().groups.iter() {
|
|
|
|
|
|
|
|
|
|
for group in groups.iter() {
|
|
|
|
|
my_group_map.insert(group.group_name.clone(), group.group_proof.clone());
|
|
|
|
|
my_group_names.push(group.group_name.clone());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.group_trust_map.insert(my_peer_id, my_group_map);
|
|
|
|
|
self.group_trust_map_cache
|
|
|
|
|
.insert(my_peer_id, Arc::new(my_group_names));
|
|
|
|
|
@@ -749,21 +811,16 @@ struct NextHopInfo {
|
|
|
|
|
}
|
|
|
|
|
// dst_peer_id -> (next_hop_peer_id, cost, path_len)
|
|
|
|
|
type NextHopMap = DashMap<PeerId, NextHopInfo>;
|
|
|
|
|
#[derive(Debug, Clone, Copy)]
|
|
|
|
|
struct PeerIdAndVersion {
|
|
|
|
|
peer_id: PeerId,
|
|
|
|
|
version: Version,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// computed with SyncedRouteInfo. used to get next hop.
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
struct RouteTable {
|
|
|
|
|
peer_infos: DashMap<PeerId, RoutePeerInfo>,
|
|
|
|
|
next_hop_map: NextHopMap,
|
|
|
|
|
ipv4_peer_id_map: DashMap<Ipv4Addr, PeerIdAndVersion>,
|
|
|
|
|
ipv6_peer_id_map: DashMap<Ipv6Addr, PeerIdAndVersion>,
|
|
|
|
|
cidr_peer_id_map: ArcSwap<PrefixMap<Ipv4Cidr, PeerIdAndVersion>>,
|
|
|
|
|
cidr_v6_peer_id_map: ArcSwap<PrefixMap<Ipv6Cidr, PeerIdAndVersion>>,
|
|
|
|
|
ipv4_peer_id_map: DashMap<Ipv4Addr, PeerIdVersion>,
|
|
|
|
|
ipv6_peer_id_map: DashMap<Ipv6Addr, PeerIdVersion>,
|
|
|
|
|
cidr_peer_id_map: ArcSwap<PrefixMap<Ipv4Cidr, PeerIdVersion>>,
|
|
|
|
|
cidr_v6_peer_id_map: ArcSwap<PrefixMap<Ipv6Cidr, PeerIdVersion>>,
|
|
|
|
|
next_hop_map_version: AtomicVersion,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -811,18 +868,17 @@ impl RouteTable {
|
|
|
|
|
|
|
|
|
|
let mut start_node_idx = None;
|
|
|
|
|
let peer_id_to_node_index: PeerIdToNodexIdxMap = DashMap::new();
|
|
|
|
|
for item in synced_info.peer_infos.iter() {
|
|
|
|
|
let peer_id = item.key();
|
|
|
|
|
let info = item.value();
|
|
|
|
|
for (peer_id, info) in synced_info.peer_infos.read().iter() {
|
|
|
|
|
let peer_id = *peer_id;
|
|
|
|
|
|
|
|
|
|
if info.version == 0 {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let node_idx = graph.add_node(*peer_id);
|
|
|
|
|
let node_idx = graph.add_node(peer_id);
|
|
|
|
|
|
|
|
|
|
peer_id_to_node_index.insert(*peer_id, node_idx);
|
|
|
|
|
if *peer_id == my_peer_id {
|
|
|
|
|
peer_id_to_node_index.insert(peer_id, node_idx);
|
|
|
|
|
if peer_id == my_peer_id {
|
|
|
|
|
start_node_idx = Some(node_idx);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -991,18 +1047,18 @@ impl RouteTable {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let peer_id = item.key();
|
|
|
|
|
let Some(info) = synced_info.peer_infos.get(peer_id) else {
|
|
|
|
|
let Some(info) = synced_info.peer_infos.read().get(peer_id).cloned() else {
|
|
|
|
|
continue;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
self.peer_infos.insert(*peer_id, info.clone());
|
|
|
|
|
|
|
|
|
|
let peer_id_and_version = PeerIdAndVersion {
|
|
|
|
|
let peer_id_and_version = PeerIdVersion {
|
|
|
|
|
peer_id: *peer_id,
|
|
|
|
|
version,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let is_new_peer_better = |old_peer: &PeerIdAndVersion| -> bool {
|
|
|
|
|
let is_new_peer_better = |old_peer: &PeerIdVersion| -> bool {
|
|
|
|
|
if peer_id_and_version.version > old_peer.version {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
@@ -1194,6 +1250,11 @@ struct SyncRouteSession {
|
|
|
|
|
dst_saved_conn_info_version: DashMap<PeerId, VersionAndTouchTime>,
|
|
|
|
|
dst_saved_foreign_network_versions: DashMap<ForeignNetworkRouteInfoKey, VersionAndTouchTime>,
|
|
|
|
|
|
|
|
|
|
// we don't want to send unreachable peer infos to peer, so we keep track of them.
|
|
|
|
|
unreachable_peers: parking_lot::Mutex<VecDeque<PeerIdVersion>>,
|
|
|
|
|
|
|
|
|
|
last_sync_succ_timestamp: AtomicCell<Option<SystemTime>>,
|
|
|
|
|
|
|
|
|
|
my_session_id: AtomicSessionId,
|
|
|
|
|
dst_session_id: AtomicSessionId,
|
|
|
|
|
|
|
|
|
|
@@ -1218,6 +1279,10 @@ impl SyncRouteSession {
|
|
|
|
|
dst_saved_conn_info_version: DashMap::new(),
|
|
|
|
|
dst_saved_foreign_network_versions: DashMap::new(),
|
|
|
|
|
|
|
|
|
|
unreachable_peers: parking_lot::Mutex::new(VecDeque::new()),
|
|
|
|
|
|
|
|
|
|
last_sync_succ_timestamp: AtomicCell::new(None),
|
|
|
|
|
|
|
|
|
|
my_session_id: AtomicSessionId::new(rand::random()),
|
|
|
|
|
dst_session_id: AtomicSessionId::new(0),
|
|
|
|
|
|
|
|
|
|
@@ -1363,12 +1428,15 @@ impl SyncRouteSession {
|
|
|
|
|
self.need_sync_initiator_info.store(true, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// return whether session id is updated
|
|
|
|
|
fn update_dst_session_id(&self, session_id: SessionId) {
|
|
|
|
|
if session_id != self.dst_session_id.load(Ordering::Relaxed) {
|
|
|
|
|
tracing::warn!(?self, ?session_id, "session id mismatch, clear saved info.");
|
|
|
|
|
self.dst_session_id.store(session_id, Ordering::Relaxed);
|
|
|
|
|
self.dst_saved_conn_info_version.clear();
|
|
|
|
|
self.dst_saved_peer_info_versions.clear();
|
|
|
|
|
self.last_sync_succ_timestamp.store(None);
|
|
|
|
|
self.unreachable_peers.lock().clear();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1386,6 +1454,16 @@ impl SyncRouteSession {
|
|
|
|
|
self.dst_saved_foreign_network_versions.shrink_to_fit();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn update_last_sync_succ_timestamp(&self, next_last_sync_succ_timestamp: SystemTime) {
|
|
|
|
|
let _ = self.last_sync_succ_timestamp.fetch_update(|x| {
|
|
|
|
|
if x.is_none_or(|old| old < next_last_sync_succ_timestamp) {
|
|
|
|
|
Some(Some(next_last_sync_succ_timestamp))
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn short_debug_string(&self) -> String {
|
|
|
|
|
format!(
|
|
|
|
|
"session_dst_peer: {:?}, my_session_id: {:?}, dst_session_id: {:?}, we_are_initiator: {:?}, dst_is_initiator: {:?}, rpc_tx_count: {:?}, rpc_rx_count: {:?}, task: {:?}",
|
|
|
|
|
@@ -1470,7 +1548,7 @@ impl PeerRouteServiceImpl {
|
|
|
|
|
foreign_network_my_peer_id_map: DashMap::new(),
|
|
|
|
|
|
|
|
|
|
synced_route_info: SyncedRouteInfo {
|
|
|
|
|
peer_infos: DashMap::new(),
|
|
|
|
|
peer_infos: RwLock::new(OrderedHashMap::new()),
|
|
|
|
|
raw_peer_infos: DashMap::new(),
|
|
|
|
|
conn_map: DashMap::new(),
|
|
|
|
|
foreign_network: DashMap::new(),
|
|
|
|
|
@@ -1714,21 +1792,72 @@ impl PeerRouteServiceImpl {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn build_route_info(&self, session: &SyncRouteSession) -> Option<Vec<RoutePeerInfo>> {
|
|
|
|
|
self.synced_route_info
|
|
|
|
|
.check_peer_info_last_update_monotonic_increasing();
|
|
|
|
|
let mut route_infos = Vec::new();
|
|
|
|
|
for item in self.synced_route_info.peer_infos.iter() {
|
|
|
|
|
let peer_infos = self.synced_route_info.peer_infos.read();
|
|
|
|
|
let last_sync_succ_timestamp = session.last_sync_succ_timestamp.load();
|
|
|
|
|
for (peer_id, peer_info) in peer_infos.iter().rev() {
|
|
|
|
|
// stop iter if last_update of peer info is older than session.latest_scanned_peer_info_timestamp
|
|
|
|
|
if let Some(last_update) = peer_info.last_update {
|
|
|
|
|
let last_update = TryInto::<SystemTime>::try_into(last_update).unwrap();
|
|
|
|
|
if last_sync_succ_timestamp.is_some_and(|t| last_update < t) {
|
|
|
|
|
tracing::debug!(
|
|
|
|
|
"ignore peer_info {:?} because last_update: {:?} is older than last_sync_succ_timestamp: {:?}, peer_infos_count: {}, my_peer_id: {:?}, session: {:?}",
|
|
|
|
|
peer_info,
|
|
|
|
|
last_update,
|
|
|
|
|
last_sync_succ_timestamp,
|
|
|
|
|
peer_infos.len(),
|
|
|
|
|
self.my_peer_id,
|
|
|
|
|
session
|
|
|
|
|
);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if session.check_saved_peer_info_update_to_date(peer_info.peer_id, peer_info.version) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// do not send unreachable peer info to dst peer.
|
|
|
|
|
if !self.route_table.peer_reachable(*item.key()) {
|
|
|
|
|
if !self.route_table.peer_reachable(*peer_id) {
|
|
|
|
|
session.unreachable_peers.lock().push_front(PeerIdVersion {
|
|
|
|
|
peer_id: *peer_id,
|
|
|
|
|
version: peer_info.version,
|
|
|
|
|
});
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if session
|
|
|
|
|
.check_saved_peer_info_update_to_date(item.value().peer_id, item.value().version)
|
|
|
|
|
{
|
|
|
|
|
continue;
|
|
|
|
|
route_infos.push(peer_info.clone());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
route_infos.push(item.value().clone());
|
|
|
|
|
let mut unreachable_peers = session.unreachable_peers.lock();
|
|
|
|
|
let cur_len = unreachable_peers.len();
|
|
|
|
|
for _ in 0..cur_len {
|
|
|
|
|
let peer_id_version = unreachable_peers.pop_back().unwrap();
|
|
|
|
|
let peer_id = peer_id_version.peer_id;
|
|
|
|
|
let version = peer_id_version.version;
|
|
|
|
|
if session.check_saved_peer_info_update_to_date(peer_id, version) {
|
|
|
|
|
// already up-to-date, skip
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
let reachable = self.route_table.peer_reachable(peer_id);
|
|
|
|
|
match self.synced_route_info.peer_infos.read().get(&peer_id) {
|
|
|
|
|
Some(route_info) => {
|
|
|
|
|
if reachable {
|
|
|
|
|
route_infos.push(route_info.clone());
|
|
|
|
|
}
|
|
|
|
|
// this round rpc may fail, so keep it and remove the id only when it's in dst_saved_map
|
|
|
|
|
unreachable_peers.push_front(peer_id_version);
|
|
|
|
|
}
|
|
|
|
|
None => {
|
|
|
|
|
// if not found in peer info map, forget this peer id.
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
unreachable_peers.shrink_to_fit();
|
|
|
|
|
|
|
|
|
|
if route_infos.is_empty() {
|
|
|
|
|
None
|
|
|
|
|
@@ -1860,6 +1989,7 @@ impl PeerRouteServiceImpl {
|
|
|
|
|
let dst_supports_peer_list = self
|
|
|
|
|
.synced_route_info
|
|
|
|
|
.peer_infos
|
|
|
|
|
.read()
|
|
|
|
|
.get(&dst_peer_id)
|
|
|
|
|
.and_then(|p| p.feature_flag)
|
|
|
|
|
.map(|x| x.support_conn_list_sync)
|
|
|
|
|
@@ -1888,11 +2018,13 @@ impl PeerRouteServiceImpl {
|
|
|
|
|
fn clear_expired_peer(&self) {
|
|
|
|
|
let now = SystemTime::now();
|
|
|
|
|
let mut to_remove = Vec::new();
|
|
|
|
|
for item in self.synced_route_info.peer_infos.iter() {
|
|
|
|
|
if let Ok(d) = now.duration_since(item.value().last_update.unwrap().try_into().unwrap())
|
|
|
|
|
for (peer_id, peer_info) in self.synced_route_info.peer_infos.read().iter() {
|
|
|
|
|
if let Ok(d) = now.duration_since(peer_info.last_update.unwrap().try_into().unwrap()) {
|
|
|
|
|
if d > REMOVE_DEAD_PEER_INFO_AFTER
|
|
|
|
|
|| (d > REMOVE_UNREACHABLE_PEER_INFO_AFTER
|
|
|
|
|
&& !self.route_table.peer_reachable(*peer_id))
|
|
|
|
|
{
|
|
|
|
|
if d > REMOVE_DEAD_PEER_INFO_AFTER {
|
|
|
|
|
to_remove.push(*item.key());
|
|
|
|
|
to_remove.push(*peer_id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1971,6 +2103,8 @@ impl PeerRouteServiceImpl {
|
|
|
|
|
|
|
|
|
|
let my_peer_id = self.my_peer_id;
|
|
|
|
|
|
|
|
|
|
let next_last_sync_succ_timestamp =
|
|
|
|
|
self.synced_route_info.get_next_last_sync_succ_timestamp();
|
|
|
|
|
let (peer_infos, conn_info, foreign_network) =
|
|
|
|
|
self.build_sync_request(&session, dst_peer_id);
|
|
|
|
|
if peer_infos.is_none()
|
|
|
|
|
@@ -2020,6 +2154,11 @@ impl PeerRouteServiceImpl {
|
|
|
|
|
.sync_route_info(ctrl, SyncRouteInfoRequest::default())
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
tracing::debug!(
|
|
|
|
|
"sync_route_info resp: {:?}, req: {:?}, session: {:?}, my_info: {:?}, next_last_sync_succ_timestamp: {:?}",
|
|
|
|
|
ret, sync_route_info_req, session, self.global_ctx.network, next_last_sync_succ_timestamp
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if let Err(e) = &ret {
|
|
|
|
|
tracing::error!(
|
|
|
|
|
?ret,
|
|
|
|
|
@@ -2066,6 +2205,8 @@ impl PeerRouteServiceImpl {
|
|
|
|
|
if let Some(foreign_network) = &foreign_network {
|
|
|
|
|
session.update_dst_saved_foreign_network_version(foreign_network, dst_peer_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
session.update_last_sync_succ_timestamp(next_last_sync_succ_timestamp);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
false
|
|
|
|
|
@@ -2840,7 +2981,7 @@ mod tests {
|
|
|
|
|
peers::{
|
|
|
|
|
create_packet_recv_chan,
|
|
|
|
|
peer_manager::{PeerManager, RouteAlgoType},
|
|
|
|
|
peer_ospf_route::{PeerIdAndVersion, PeerRouteServiceImpl, FORCE_USE_CONN_LIST},
|
|
|
|
|
peer_ospf_route::{PeerIdVersion, PeerRouteServiceImpl, FORCE_USE_CONN_LIST},
|
|
|
|
|
route_trait::{NextHopPolicy, Route, RouteCostCalculatorInterface},
|
|
|
|
|
tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear},
|
|
|
|
|
},
|
|
|
|
|
@@ -2923,8 +3064,14 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
|
|
|
|
|
|
|
|
|
assert_eq!(2, r_a.service_impl.synced_route_info.peer_infos.len());
|
|
|
|
|
assert_eq!(2, r_b.service_impl.synced_route_info.peer_infos.len());
|
|
|
|
|
assert_eq!(
|
|
|
|
|
2,
|
|
|
|
|
r_a.service_impl.synced_route_info.peer_infos.read().len()
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
2,
|
|
|
|
|
r_b.service_impl.synced_route_info.peer_infos.read().len()
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
for s in r_a.service_impl.sessions.iter() {
|
|
|
|
|
assert!(s.value().task.is_running());
|
|
|
|
|
@@ -2934,6 +3081,7 @@ mod tests {
|
|
|
|
|
r_a.service_impl
|
|
|
|
|
.synced_route_info
|
|
|
|
|
.peer_infos
|
|
|
|
|
.read()
|
|
|
|
|
.get(&p_a.my_peer_id())
|
|
|
|
|
.unwrap()
|
|
|
|
|
.version,
|
|
|
|
|
@@ -2990,7 +3138,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
for r in [r_a.clone(), r_b.clone(), r_c.clone()].iter() {
|
|
|
|
|
wait_for_condition(
|
|
|
|
|
|| async { r.service_impl.synced_route_info.peer_infos.len() == 3 },
|
|
|
|
|
|| async { r.service_impl.synced_route_info.peer_infos.read().len() == 3 },
|
|
|
|
|
Duration::from_secs(5),
|
|
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
@@ -3095,7 +3243,9 @@ mod tests {
|
|
|
|
|
// check peer infos
|
|
|
|
|
let peer_info = synced_info
|
|
|
|
|
.peer_infos
|
|
|
|
|
.read()
|
|
|
|
|
.get(&routable_peer.my_peer_id())
|
|
|
|
|
.cloned()
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert_eq!(peer_info.peer_id, routable_peer.my_peer_id());
|
|
|
|
|
}
|
|
|
|
|
@@ -3125,7 +3275,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
for r in [r_a.clone(), r_b.clone(), r_c.clone()].iter() {
|
|
|
|
|
wait_for_condition(
|
|
|
|
|
|| async { r.service_impl.synced_route_info.peer_infos.len() == 3 },
|
|
|
|
|
|| async { r.service_impl.synced_route_info.peer_infos.read().len() == 3 },
|
|
|
|
|
Duration::from_secs(5),
|
|
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
@@ -3392,10 +3542,10 @@ mod tests {
|
|
|
|
|
let proxy_cidr: Ipv4Cidr = "192.168.100.0/24".parse().unwrap();
|
|
|
|
|
let test_ip = proxy_cidr.first_address();
|
|
|
|
|
|
|
|
|
|
let mut cidr_peer_id_map: PrefixMap<Ipv4Cidr, PeerIdAndVersion> = PrefixMap::new();
|
|
|
|
|
let mut cidr_peer_id_map: PrefixMap<Ipv4Cidr, PeerIdVersion> = PrefixMap::new();
|
|
|
|
|
cidr_peer_id_map.insert(
|
|
|
|
|
proxy_cidr,
|
|
|
|
|
PeerIdAndVersion {
|
|
|
|
|
PeerIdVersion {
|
|
|
|
|
peer_id: p_c.my_peer_id(),
|
|
|
|
|
version: 0,
|
|
|
|
|
},
|
|
|
|
|
|