11use std:: {
2- collections:: { BTreeSet , HashSet , VecDeque } ,
2+ collections:: { BTreeSet , VecDeque } ,
33 net:: SocketAddr ,
44 pin:: Pin ,
55 sync:: Arc ,
@@ -25,7 +25,7 @@ use tracing::{Instrument, Level, debug, error, event, info_span, instrument, tra
2525
2626use self :: {
2727 guarded_channel:: { GuardedReceiver , GuardedSender , guarded_channel} ,
28- path_state:: { PathState , RemotePathState } ,
28+ path_state:: RemotePathState ,
2929} ;
3030use super :: Source ;
3131use crate :: {
@@ -44,16 +44,6 @@ use crate::{
4444mod guarded_channel;
4545mod path_state;
4646
47- /// Number of addresses that are not active that we keep around per endpoint.
48- ///
49- /// See [`EndpointState::prune_ip_addresses`].
50- pub ( super ) const MAX_INACTIVE_IP_ADDRESSES : usize = 20 ;
51-
52- /// Max duration of how long ago we learned about this source before we are willing
53- /// to prune it, if the path for this ip address is inactive.
54- /// TODO(ramfox): fix this comment it's not clear enough
55- const LAST_SOURCE_PRUNE_DURATION : Duration = Duration :: from_secs ( 120 ) ;
56-
5747// TODO: use this
5848// /// The latency at or under which we don't try to upgrade to a better path.
5949// const GOOD_ENOUGH_LATENCY: Duration = Duration::from_millis(5);
@@ -443,7 +433,7 @@ impl RemoteStateActor {
443433 . insert ( path_remote, Source :: Connection { _0 : Private } ) ;
444434 self . select_path ( ) ;
445435 // TODO(ramfox): do we need to prune paths here?
446- self . prune_paths ( ) ;
436+ self . prune_ip_paths ( ) ;
447437
448438 if path_remote_is_ip {
449439 // We may have raced this with a relay address. Try and add any
@@ -496,7 +486,7 @@ impl RemoteStateActor {
496486 }
497487 // prune any unused/inactive paths, now that we have added potential
498488 // new ones
499- self . prune_paths ( )
489+ self . prune_ip_paths ( )
500490 }
501491
502492 /// Handles [`RemoteStateMessage::PingReceived`].
@@ -902,7 +892,7 @@ impl RemoteStateActor {
902892 }
903893
904894 self . select_path ( ) ;
905- self . prune_paths ( ) ;
895+ self . prune_ip_paths ( ) ;
906896 }
907897 PathEvent :: Abandoned { id, path_stats } => {
908898 trace ! ( ?path_stats, "path abandoned" ) ;
@@ -1055,19 +1045,19 @@ impl RemoteStateActor {
10551045 }
10561046 }
10571047
1058- fn prune_paths ( & mut self ) {
1048+ /// TODO: fix up docs once review indicates this is actually
1049+ /// the criteria for pruning.
1050+ fn prune_ip_paths ( & mut self ) {
10591051 // if the total number of paths, relay or ip, is less
10601052 // than the max inactive ip addrs we allow, bail early
1061- if self . paths . len ( ) < MAX_INACTIVE_IP_ADDRESSES {
1053+ if self . paths . len ( ) < path_state :: MAX_INACTIVE_IP_ADDRESSES {
10621054 return ;
10631055 }
10641056 let open_paths = self
10651057 . connections
10661058 . values ( )
1067- . map ( |state| state. open_paths . values ( ) )
1068- . flatten ( ) ;
1069- prune_paths (
1070- & mut self . paths ,
1059+ . flat_map ( |state| state. open_paths . values ( ) ) ;
1060+ self . paths . prune_ip_paths (
10711061 & self . pending_open_paths ,
10721062 & self . selected_path . get ( ) ,
10731063 open_paths,
@@ -1454,252 +1444,3 @@ fn to_transports_addr(
14541444 }
14551445 } )
14561446}
1457-
1458- fn prune_paths < ' a > (
1459- paths : & mut FxHashMap < transports:: Addr , PathState > ,
1460- pending : & VecDeque < transports:: Addr > ,
1461- selected_path : & Option < transports:: Addr > ,
1462- open_paths : impl Iterator < Item = & ' a transports:: Addr > ,
1463- ) {
1464- let ip_count = paths. keys ( ) . filter ( |p| p. is_ip ( ) ) . count ( ) ;
1465- // if the total number of ip paths is less than the allowed number of inactive
1466- // paths, just return early;
1467- if ip_count < MAX_INACTIVE_IP_ADDRESSES {
1468- return ;
1469- }
1470-
1471- let ip_paths: HashSet < _ > = paths. keys ( ) . filter ( |p| p. is_ip ( ) ) . collect ( ) ;
1472-
1473- let mut protected_paths = HashSet :: new ( ) ;
1474- for addr in pending {
1475- protected_paths. insert ( addr) ;
1476- }
1477- if let Some ( path) = selected_path {
1478- protected_paths. insert ( path) ;
1479- }
1480- for path in open_paths {
1481- protected_paths. insert ( path) ;
1482- }
1483-
1484- let inactive_paths: HashSet < _ > = ip_paths
1485- . difference ( & protected_paths)
1486- // cloned here so we can use `paths.retain` later
1487- . map ( |& addr| addr. clone ( ) )
1488- . collect ( ) ;
1489-
1490- if inactive_paths. len ( ) < MAX_INACTIVE_IP_ADDRESSES {
1491- return ;
1492- }
1493-
1494- let now = Instant :: now ( ) ;
1495-
1496- paths. retain ( |addr, state| {
1497- if inactive_paths. contains ( addr) {
1498- keep_path ( state, & now)
1499- } else {
1500- // keep all active paths
1501- true
1502- }
1503- } ) ;
1504- }
1505-
1506- /// Based on the [`PathState`], returns true if we should keep this path.
1507- ///
1508- /// Currently we have two criteria:
1509- /// 1) This path has sent a Ping
1510- /// 2) The last time we learned about this address was greater than LAST_SOURCE_PRUNE_DURATION
1511- fn keep_path ( state : & PathState , now : & Instant ) -> bool {
1512- // if we have never sent a ping, don't remove it
1513- state. ping_sent . is_none ( )
1514- || state
1515- . sources
1516- . values ( )
1517- // only keep it if this path contains recent sources
1518- . any ( |instant| * instant + LAST_SOURCE_PRUNE_DURATION > * now)
1519- }
1520-
1521- #[ cfg( test) ]
1522- mod tests {
1523- use super :: super :: Source ;
1524- use super :: { PathState , prune_paths} ;
1525- use crate :: disco:: TransactionId ;
1526- use crate :: magicsock:: { endpoint_map:: Private , transports} ;
1527- use n0_error:: Result ;
1528- use n0_future:: time:: { Duration , Instant } ;
1529- use rustc_hash:: FxHashMap ;
1530- use std:: collections:: VecDeque ;
1531- use std:: net:: { Ipv4Addr , SocketAddr } ;
1532-
1533- /// Create a test IP address with specific port
1534- fn test_ip_addr ( port : u16 ) -> transports:: Addr {
1535- transports:: Addr :: Ip ( SocketAddr :: new (
1536- std:: net:: IpAddr :: V4 ( Ipv4Addr :: LOCALHOST ) ,
1537- port,
1538- ) )
1539- }
1540-
1541- /// Create a PathState with sources at a specific time offset
1542- fn test_path_state ( time_offset : Duration , sent_ping : bool ) -> PathState {
1543- let mut state = PathState :: default ( ) ;
1544- if sent_ping {
1545- state. ping_sent = Some ( TransactionId :: default ( ) ) ;
1546- }
1547- state. sources . insert (
1548- Source :: Connection { _0 : Private } ,
1549- Instant :: now ( ) - time_offset,
1550- ) ;
1551- state
1552- }
1553-
1554- #[ test]
1555- fn test_prune_paths_too_few_total_paths ( ) -> Result {
1556- // create fewer than MAX_INACTIVE_IP_ADDRESSES paths
1557- let mut paths = FxHashMap :: default ( ) ;
1558- for i in 0 ..15 {
1559- paths. insert (
1560- test_ip_addr ( i) ,
1561- test_path_state ( Duration :: from_secs ( 0 ) , false ) ,
1562- ) ;
1563- }
1564-
1565- let pending = VecDeque :: new ( ) ;
1566- let selected_path = None ;
1567- let open_paths = Vec :: new ( ) ;
1568-
1569- let initial_len = paths. len ( ) ;
1570- // should not prune because we have fewer than MAX_INACTIVE_IP_ADDRESSES paths
1571- prune_paths ( & mut paths, & pending, & selected_path, open_paths. iter ( ) ) ;
1572- assert_eq ! (
1573- paths. len( ) ,
1574- initial_len,
1575- "Expected no paths to be pruned when total IP paths < MAX_INACTIVE_IP_ADDRESSES"
1576- ) ;
1577-
1578- Ok ( ( ) )
1579- }
1580-
1581- #[ test]
1582- fn test_prune_paths_too_few_inactive_paths ( ) -> Result {
1583- // create MAX_INACTIVE_IP_ADDRESSES + 5 paths
1584- let mut paths = FxHashMap :: default ( ) ;
1585- for i in 0 ..25 {
1586- paths. insert (
1587- test_ip_addr ( i) ,
1588- test_path_state ( Duration :: from_secs ( 0 ) , false ) ,
1589- ) ;
1590- }
1591-
1592- // mark 10 of them as "active" by adding them to open_paths
1593- let open_paths: Vec < transports:: Addr > = ( 0 ..10 ) . map ( |i| test_ip_addr ( i) ) . collect ( ) ;
1594-
1595- let pending = VecDeque :: new ( ) ;
1596- let selected_path = None ;
1597-
1598- let initial_len = paths. len ( ) ;
1599- // now we have 25 total paths, but only 15 inactive paths (25 - 10 = 15)
1600- // which is less than MAX_INACTIVE_IP_ADDRESSES (20)
1601- prune_paths ( & mut paths, & pending, & selected_path, open_paths. iter ( ) ) ;
1602- assert_eq ! (
1603- paths. len( ) ,
1604- initial_len,
1605- "Expected no paths to be pruned when inactive paths < MAX_INACTIVE_IP_ADDRESSES"
1606- ) ;
1607-
1608- Ok ( ( ) )
1609- }
1610-
1611- #[ test]
1612- fn test_prune_paths_prunes_old_inactive_paths ( ) -> Result {
1613- // create MAX_INACTIVE_IP_ADDRESSES + 10 paths
1614- let mut paths = FxHashMap :: default ( ) ;
1615-
1616- // add 20 paths with recent sources (within 2 minutes)
1617- for i in 0 ..20 {
1618- paths. insert (
1619- test_ip_addr ( i) ,
1620- test_path_state ( Duration :: from_secs ( 60 ) , true ) , // 1 minute ago
1621- ) ;
1622- }
1623-
1624- // add 10 paths with old sources (more than 2 minutes ago)
1625- for i in 20 ..30 {
1626- paths. insert (
1627- test_ip_addr ( i) ,
1628- test_path_state ( Duration :: from_secs ( 180 ) , true ) , // 3 minutes ago
1629- ) ;
1630- }
1631-
1632- let pending = VecDeque :: new ( ) ;
1633- let selected_path = None ;
1634- let open_paths = Vec :: new ( ) ;
1635-
1636- // we have 30 total paths, all inactive
1637- // paths with sources older than LAST_SOURCE_PRUNE_DURATION should be pruned
1638- prune_paths ( & mut paths, & pending, & selected_path, open_paths. iter ( ) ) ;
1639-
1640- // we should have kept the 20 recent paths
1641- assert_eq ! (
1642- paths. len( ) ,
1643- 20 ,
1644- "Expected to keep 20 paths with recent sources"
1645- ) ;
1646-
1647- // verify that the kept paths are the ones with recent sources
1648- for i in 0 ..20 {
1649- let addr = test_ip_addr ( i) ;
1650- assert ! (
1651- paths. contains_key( & addr) ,
1652- "Expected to keep path with recent source: {:?}" ,
1653- addr
1654- ) ;
1655- }
1656-
1657- // verify that the old paths were removed
1658- for i in 20 ..30 {
1659- let addr = test_ip_addr ( i) ;
1660- assert ! (
1661- !paths. contains_key( & addr) ,
1662- "Expected to prune path with old source: {:?}" ,
1663- addr
1664- ) ;
1665- }
1666-
1667- Ok ( ( ) )
1668- }
1669-
1670- #[ test]
1671- fn test_prune_paths_protects_selected_and_open_paths ( ) -> Result {
1672- // create MAX_INACTIVE_IP_ADDRESSES + 10 paths, all with old sources
1673- let mut paths = FxHashMap :: default ( ) ;
1674- for i in 0 ..30 {
1675- paths. insert (
1676- test_ip_addr ( i) ,
1677- test_path_state ( Duration :: from_secs ( 180 ) , true ) , // 3 minutes ago
1678- ) ;
1679- }
1680-
1681- let pending = VecDeque :: new ( ) ;
1682- // mark one path as selected
1683- let selected_path = Some ( test_ip_addr ( 0 ) ) ;
1684- // mark a few paths as open
1685- let open_paths = vec ! [ test_ip_addr( 1 ) , test_ip_addr( 2 ) ] ;
1686-
1687- prune_paths ( & mut paths, & pending, & selected_path, open_paths. iter ( ) ) ;
1688-
1689- // protected paths should still be in the result even though they have old sources
1690- assert ! (
1691- paths. contains_key( & test_ip_addr( 0 ) ) ,
1692- "Expected to keep selected path even with old source"
1693- ) ;
1694- assert ! (
1695- paths. contains_key( & test_ip_addr( 1 ) ) ,
1696- "Expected to keep open path even with old source"
1697- ) ;
1698- assert ! (
1699- paths. contains_key( & test_ip_addr( 2 ) ) ,
1700- "Expected to keep open path even with old source"
1701- ) ;
1702-
1703- Ok ( ( ) )
1704- }
1705- }
0 commit comments