Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/discv5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable,
NodeStatus, UpdateResult,
},
node_info::NodeContact,
node_info::{NodeAddress, NodeContact},
packet::ProtocolIdentity,
service::{QueryKind, Service, ServiceRequest, TalkRequest},
Config, DefaultProtocolId, Enr, IpMode,
Expand Down Expand Up @@ -74,6 +74,8 @@ pub enum Event {
},
/// A new session has been established with a node.
SessionEstablished(Enr, SocketAddr),
/// A session has been removed from our cache due to inactivity.
SessionsExpired(Vec<NodeAddress>),
/// Our local ENR IP address has been updated.
SocketUpdated(SocketAddr),
/// A node has initiated a talk request.
Expand Down
24 changes: 24 additions & 0 deletions src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ pub enum HandlerOut {
socket: SocketAddr,
node_id: NodeId,
},
/// These sessions have expired from the cache.
ExpiredSessions(Vec<NodeAddress>),
}

/// How we connected to the node.
Expand Down Expand Up @@ -1231,6 +1233,9 @@ impl Handler {
// handshake to re-establish a session, if applicable.
message_nonce: Option<MessageNonce>,
) {
// Clear the session cache and report expired elements
self.remove_expired_sessions().await;

if let Some(current_session) = self.sessions.get_mut(&node_address) {
current_session.update(session);
// If a session is re-established, due to a new handshake during an ongoing
Expand Down Expand Up @@ -1286,6 +1291,9 @@ impl Handler {
remove_session: bool,
) {
if remove_session {
// Remove expired sessions
self.remove_expired_sessions().await;

self.sessions.remove(node_address);
METRICS
.active_sessions
Expand Down Expand Up @@ -1357,6 +1365,22 @@ impl Handler {
.retain(|_, time| time.is_none() || Some(Instant::now()) < *time);
}

/// Removes expired sessions and report them back to the service.
async fn remove_expired_sessions(&mut self) {
// Purge any expired sessions
let expired_sessions = self.sessions.remove_expired_values();

if !expired_sessions.is_empty() {
if let Err(e) = self
.service_send
.send(HandlerOut::ExpiredSessions(expired_sessions))
.await
{
warn!(error = %e, "Failed to inform app of expired sessions")
}
}
}

/// Returns whether a session with this node does not exist and a request that initiates
/// a session has been sent.
fn is_awaiting_session_to_be_established(&mut self, node_address: &NodeAddress) -> bool {
Expand Down
14 changes: 8 additions & 6 deletions src/lru_time_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,14 @@ impl<K: Clone + Eq + Hash, V> LruTimeCache<K, V> {
}

/// Retrieves a reference to the value stored under `key`, or `None` if the key doesn't exist.
/// Also removes expired elements and updates the time.
#[allow(dead_code)]
pub fn get(&mut self, key: &K) -> Option<&V> {
self.get_mut(key).map(|value| &*value)
}

/// Retrieves a mutable reference to the value stored under `key`, or `None` if the key doesn't exist.
/// Also removes expired elements and updates the time.
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
let now = Instant::now();
self.remove_expired_values(now);

match self.map.raw_entry_mut().from_key(key) {
hashlink::linked_hash_map::RawEntryMut::Occupied(mut occupied) => {
Expand Down Expand Up @@ -76,7 +73,6 @@ impl<K: Clone + Eq + Hash, V> LruTimeCache<K, V> {

/// Returns the size of the cache, i.e. the number of cached non-expired key-value pairs.
pub fn len(&mut self) -> usize {
self.remove_expired_values(Instant::now());
self.map.len()
}

Expand All @@ -87,13 +83,19 @@ impl<K: Clone + Eq + Hash, V> LruTimeCache<K, V> {
}

/// Removes expired items from the cache.
fn remove_expired_values(&mut self, now: Instant) {
pub fn remove_expired_values(&mut self) -> Vec<K> {
let mut expired_elements = Vec::new();
let now = Instant::now();
while let Some((_front, (_value, time))) = self.map.front() {
if *time + self.ttl >= now {
break;
}
self.map.pop_front();
// Store the expired key
if let Some((k, _v)) = self.map.pop_front() {
expired_elements.push(k);
}
}
expired_elements
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ impl Service {
}
self.send_event(Event::UnverifiableEnr{enr, socket, node_id});
}
HandlerOut::ExpiredSessions(expired_sessions) => {
self.send_event(Event::SessionsExpired(expired_sessions));
}
}
}
event = Service::bucket_maintenance_poll(&self.kbuckets) => {
Expand Down
Loading