Skip to content

Commit b92b6c9

Browse files
authored
Report expired sessions (#281)
* Report expired sessions * Sneaky clippy lint * Remove old dead-code * Correct tests
1 parent c020f3d commit b92b6c9

File tree

4 files changed

+45
-8
lines changed

4 files changed

+45
-8
lines changed

src/discv5.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::{
1818
self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable,
1919
NodeStatus, UpdateResult,
2020
},
21-
node_info::NodeContact,
21+
node_info::{NodeAddress, NodeContact},
2222
service::{QueryKind, Service, ServiceRequest, TalkRequest},
2323
Config, Enr, IpMode,
2424
};
@@ -72,6 +72,8 @@ pub enum Event {
7272
},
7373
/// A new session has been established with a node.
7474
SessionEstablished(Enr, SocketAddr),
75+
/// A session has been removed from our cache due to inactivity.
76+
SessionsExpired(Vec<NodeAddress>),
7577
/// Our local ENR IP address has been updated.
7678
SocketUpdated(SocketAddr),
7779
/// A node has initiated a talk request.

src/handler/mod.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ pub enum HandlerOut {
139139
socket: SocketAddr,
140140
node_id: NodeId,
141141
},
142+
/// These sessions have expired from the cache.
143+
ExpiredSessions(Vec<NodeAddress>),
142144
}
143145

144146
/// How we connected to the node.
@@ -1227,6 +1229,9 @@ impl Handler {
12271229
// handshake to re-establish a session, if applicable.
12281230
message_nonce: Option<MessageNonce>,
12291231
) {
1232+
// Clear the session cache and report expired elements
1233+
self.remove_expired_sessions().await;
1234+
12301235
if let Some(current_session) = self.sessions.get_mut(&node_address) {
12311236
current_session.update(session);
12321237
// If a session is re-established, due to a new handshake during an ongoing
@@ -1282,6 +1287,9 @@ impl Handler {
12821287
remove_session: bool,
12831288
) {
12841289
if remove_session {
1290+
// Remove expired sessions
1291+
self.remove_expired_sessions().await;
1292+
12851293
self.sessions.remove(node_address);
12861294
METRICS
12871295
.active_sessions
@@ -1353,6 +1361,22 @@ impl Handler {
13531361
.retain(|_, time| time.is_none() || Some(Instant::now()) < *time);
13541362
}
13551363

1364+
/// Removes expired sessions and report them back to the service.
1365+
async fn remove_expired_sessions(&mut self) {
1366+
// Purge any expired sessions
1367+
let expired_sessions = self.sessions.remove_expired_values();
1368+
1369+
if !expired_sessions.is_empty() {
1370+
if let Err(e) = self
1371+
.service_send
1372+
.send(HandlerOut::ExpiredSessions(expired_sessions))
1373+
.await
1374+
{
1375+
warn!(error = %e, "Failed to inform app of expired sessions")
1376+
}
1377+
}
1378+
}
1379+
13561380
/// Returns whether a session with this node does not exist and a request that initiates
13571381
/// a session has been sent.
13581382
fn is_awaiting_session_to_be_established(&mut self, node_address: &NodeAddress) -> bool {

src/lru_time_cache.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,13 @@ impl<K: Clone + Eq + Hash, V> LruTimeCache<K, V> {
3737
}
3838

3939
/// Retrieves a reference to the value stored under `key`, or `None` if the key doesn't exist.
40-
/// Also removes expired elements and updates the time.
41-
#[allow(dead_code)]
4240
pub fn get(&mut self, key: &K) -> Option<&V> {
4341
self.get_mut(key).map(|value| &*value)
4442
}
4543

4644
/// Retrieves a mutable reference to the value stored under `key`, or `None` if the key doesn't exist.
47-
/// Also removes expired elements and updates the time.
4845
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
4946
let now = Instant::now();
50-
self.remove_expired_values(now);
5147

5248
match self.map.raw_entry_mut().from_key(key) {
5349
hashlink::linked_hash_map::RawEntryMut::Occupied(mut occupied) => {
@@ -76,7 +72,6 @@ impl<K: Clone + Eq + Hash, V> LruTimeCache<K, V> {
7672

7773
/// Returns the size of the cache, i.e. the number of cached non-expired key-value pairs.
7874
pub fn len(&mut self) -> usize {
79-
self.remove_expired_values(Instant::now());
8075
self.map.len()
8176
}
8277

@@ -87,13 +82,19 @@ impl<K: Clone + Eq + Hash, V> LruTimeCache<K, V> {
8782
}
8883

8984
/// Removes expired items from the cache.
90-
fn remove_expired_values(&mut self, now: Instant) {
85+
pub fn remove_expired_values(&mut self) -> Vec<K> {
86+
let mut expired_elements = Vec::new();
87+
let now = Instant::now();
9188
while let Some((_front, (_value, time))) = self.map.front() {
9289
if *time + self.ttl >= now {
9390
break;
9491
}
95-
self.map.pop_front();
92+
// Store the expired key
93+
if let Some((k, _v)) = self.map.pop_front() {
94+
expired_elements.push(k);
95+
}
9696
}
97+
expired_elements
9798
}
9899
}
99100

@@ -204,6 +205,7 @@ mod tests {
204205
assert_eq!(Some(&10), cache.get(&1));
205206

206207
sleep(TTL);
208+
cache.remove_expired_values();
207209
assert_eq!(None, cache.get(&1));
208210
}
209211

@@ -214,6 +216,7 @@ mod tests {
214216
assert_eq!(Some(&10), cache.peek(&1));
215217

216218
sleep(TTL);
219+
cache.remove_expired_values();
217220
assert_eq!(None, cache.peek(&1));
218221
}
219222

@@ -224,6 +227,7 @@ mod tests {
224227
assert_eq!(1, cache.len());
225228

226229
sleep(TTL);
230+
cache.remove_expired_values();
227231
assert_eq!(0, cache.len());
228232
}
229233

@@ -232,12 +236,16 @@ mod tests {
232236
let mut cache = LruTimeCache::new(TTL, None);
233237
cache.insert(1, 10);
234238
sleep(TTL / 4);
239+
cache.remove_expired_values();
235240
cache.insert(2, 20);
236241
sleep(TTL / 4);
242+
cache.remove_expired_values();
237243
cache.insert(3, 30);
238244
sleep(TTL / 4);
245+
cache.remove_expired_values();
239246
cache.insert(4, 40);
240247
sleep(TTL / 4);
248+
cache.remove_expired_values();
241249

242250
assert_eq!(3, cache.len());
243251
assert_eq!(None, cache.get(&1));

src/service.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,9 @@ impl Service {
416416
}
417417
self.send_event(Event::UnverifiableEnr{enr, socket, node_id});
418418
}
419+
HandlerOut::ExpiredSessions(expired_sessions) => {
420+
self.send_event(Event::SessionsExpired(expired_sessions));
421+
}
419422
}
420423
}
421424
event = Service::bucket_maintenance_poll(&self.kbuckets) => {

0 commit comments

Comments
 (0)