Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 1ca0472

Browse files
committedFeb 24, 2025·
perf: switch store from RwLock to Mutex 1.74x performance increase ~98.8% error rate decrease
1 parent f58a329 commit 1ca0472

File tree

14 files changed

+51
-54
lines changed

14 files changed

+51
-54
lines changed
 

‎crates/portalnet/src/overlay/protocol.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use ethportal_api::{
3131
OverlayContentKey, RawContentKey, RawContentValue,
3232
};
3333
use futures::channel::oneshot;
34-
use parking_lot::RwLock;
34+
use parking_lot::Mutex;
3535
use tokio::sync::{broadcast, mpsc::UnboundedSender};
3636
use tracing::{debug, error, info, warn};
3737
use trin_metrics::{overlay::OverlayMetricsReporter, portalnet::PORTALNET_METRICS};
@@ -71,7 +71,7 @@ pub struct OverlayProtocol<TContentKey, TMetric, TValidator, TStore, TPingExtens
7171
/// Reference to the underlying discv5 protocol
7272
pub discovery: Arc<Discovery>,
7373
/// The data store.
74-
pub store: Arc<RwLock<TStore>>,
74+
pub store: Arc<Mutex<TStore>>,
7575
/// The overlay routing table of the local node.
7676
kbuckets: SharedKBucketsTable,
7777
/// The subnetwork protocol of the overlay.
@@ -106,7 +106,7 @@ impl<
106106
config: OverlayConfig,
107107
discovery: Arc<Discovery>,
108108
utp_socket: Arc<UtpSocket<UtpPeer>>,
109-
store: Arc<RwLock<TStore>>,
109+
store: Arc<Mutex<TStore>>,
110110
protocol: Subnetwork,
111111
validator: Arc<TValidator>,
112112
ping_extensions: Arc<TPingExtensions>,
@@ -178,7 +178,7 @@ impl<
178178

179179
/// Returns the data radius of the local node.
180180
pub fn data_radius(&self) -> Distance {
181-
self.store.read().radius()
181+
self.store.lock().radius()
182182
}
183183

184184
/// Processes a single Discovery v5 TALKREQ message.
@@ -223,7 +223,7 @@ impl<
223223
) -> PutContentInfo {
224224
let should_we_store = match self
225225
.store
226-
.read()
226+
.lock()
227227
.is_key_within_radius_and_unavailable(&content_key)
228228
{
229229
Ok(should_we_store) => matches!(should_we_store, ShouldWeStoreContent::Store),
@@ -240,7 +240,7 @@ impl<
240240
if should_we_store {
241241
let _ = self
242242
.store
243-
.write()
243+
.lock()
244244
.put(content_key.clone(), content_value.clone());
245245
}
246246

‎crates/portalnet/src/overlay/service/manager.rs

+16-19
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use ethportal_api::{
3535
OverlayContentKey, RawContentKey, RawContentValue,
3636
};
3737
use futures::{channel::oneshot, future::join_all, prelude::*};
38-
use parking_lot::RwLock;
38+
use parking_lot::{Mutex, RwLock};
3939
use rand::Rng;
4040
use smallvec::SmallVec;
4141
use ssz::Encode;
@@ -121,7 +121,7 @@ impl<
121121
#[allow(clippy::too_many_arguments)]
122122
pub async fn spawn(
123123
discovery: Arc<Discovery>,
124-
store: Arc<RwLock<TStore>>,
124+
store: Arc<Mutex<TStore>>,
125125
kbuckets: SharedKBucketsTable,
126126
bootnode_enrs: Vec<Enr>,
127127
ping_queue_interval: Option<Duration>,
@@ -376,7 +376,7 @@ impl<
376376
/// This requires store lock and can block the thread, so it shouldn't be called other lock is
377377
/// already held.
378378
pub(super) fn data_radius(&self) -> Distance {
379-
self.store.read().radius()
379+
self.store.lock().radius()
380380
}
381381

382382
/// Maintains the routing table.
@@ -928,7 +928,7 @@ impl<
928928
}
929929
};
930930
match (
931-
self.store.read().get(&content_key),
931+
self.store.lock().get(&content_key),
932932
self.utp_controller.get_outbound_semaphore(),
933933
) {
934934
(Ok(Some(content)), Some(permit)) => {
@@ -1045,7 +1045,7 @@ impl<
10451045
// Accept content if within radius and not already present in the data store.
10461046
let mut accept = self
10471047
.store
1048-
.read()
1048+
.lock()
10491049
.is_key_within_radius_and_unavailable(key)
10501050
.map(|value| matches!(value, ShouldWeStoreContent::Store))
10511051
.map_err(|err| {
@@ -1552,15 +1552,11 @@ impl<
15521552
// already stored.
15531553
let key_desired = utp_processing
15541554
.store
1555-
.read()
1555+
.lock()
15561556
.is_key_within_radius_and_unavailable(&key);
15571557
match key_desired {
15581558
Ok(ShouldWeStoreContent::Store) => {
1559-
match utp_processing
1560-
.store
1561-
.write()
1562-
.put(key.clone(), &content_value)
1563-
{
1559+
match utp_processing.store.lock().put(key.clone(), &content_value) {
15641560
Ok(dropped_content) => {
15651561
if !dropped_content.is_empty() && utp_processing.gossip_dropped {
15661562
// add dropped content to validation result, so it will be propagated
@@ -1755,7 +1751,7 @@ impl<
17551751
) {
17561752
let mut content = content;
17571753
// Operate under assumption that all content in the store is valid
1758-
let local_value = utp_processing.store.read().get(&content_key);
1754+
let local_value = utp_processing.store.lock().get(&content_key);
17591755
if let Ok(Some(val)) = local_value {
17601756
// todo validate & replace content value if different & punish bad peer
17611757
content = val;
@@ -1796,7 +1792,7 @@ impl<
17961792
let should_store = validation_result.valid_for_storing
17971793
&& utp_processing
17981794
.store
1799-
.read()
1795+
.lock()
18001796
.is_key_within_radius_and_unavailable(&content_key)
18011797
.map_or_else(
18021798
|err| {
@@ -1808,7 +1804,7 @@ impl<
18081804
if should_store {
18091805
match utp_processing
18101806
.store
1811-
.write()
1807+
.lock()
18121808
.put(content_key.clone(), content.clone())
18131809
{
18141810
Ok(dropped_content) => {
@@ -1889,7 +1885,7 @@ impl<
18891885

18901886
/// Provide the requested content key and content value for the acceptor
18911887
fn provide_requested_content(
1892-
store: Arc<RwLock<TStore>>,
1888+
store: Arc<Mutex<TStore>>,
18931889
accept_message: &Accept,
18941890
content_keys_offered: Vec<RawContentKey>,
18951891
) -> anyhow::Result<Vec<RawContentValue>> {
@@ -1910,7 +1906,7 @@ impl<
19101906
.zip(content_keys_offered.iter())
19111907
{
19121908
if i {
1913-
match store.read().get(key) {
1909+
match store.lock().get(key) {
19141910
Ok(content) => match content {
19151911
Some(content) => content_items.push(content),
19161912
None => return Err(anyhow!("Unable to read offered content!")),
@@ -2198,7 +2194,7 @@ impl<
21982194
debug!("Starting query for content key: {}", target);
21992195

22002196
// Lookup content locally before querying the network.
2201-
if let Ok(Some(content)) = self.store.read().get(&target) {
2197+
if let Ok(Some(content)) = self.store.lock().get(&target) {
22022198
let local_enr = self.local_enr();
22032199
let mut query_trace = QueryTrace::new(&local_enr, target.content_id().into());
22042200
query_trace.node_responded_with_content(&local_enr);
@@ -2369,7 +2365,7 @@ where
23692365
TStore: ContentStore<Key = TContentKey>,
23702366
{
23712367
validator: Arc<TValidator>,
2372-
store: Arc<RwLock<TStore>>,
2368+
store: Arc<Mutex<TStore>>,
23732369
metrics: OverlayMetricsReporter,
23742370
kbuckets: SharedKBucketsTable,
23752371
command_tx: UnboundedSender<OverlayCommand<TContentKey>>,
@@ -2462,6 +2458,7 @@ mod tests {
24622458
portal_wire::{Ping, Pong, MAINNET},
24632459
};
24642460
use kbucket::KBucketsTable;
2461+
use parking_lot::lock_api::Mutex;
24652462
use rstest::*;
24662463
use serial_test::serial;
24672464
use tokio::{
@@ -2524,7 +2521,7 @@ mod tests {
25242521

25252522
let node_id = discovery.local_enr().node_id();
25262523
let store = MemoryContentStore::new(node_id, DistanceFunction::Xor);
2527-
let store = Arc::new(RwLock::new(store));
2524+
let store = Arc::new(Mutex::new(store));
25282525

25292526
let overlay_config = OverlayConfig::default();
25302527
let kbuckets = SharedKBucketsTable::new(KBucketsTable::new(

‎crates/portalnet/src/overlay/service/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use delay_map::HashSetDelay;
77
use discv5::enr::NodeId;
88
use ethportal_api::{types::network::Subnetwork, OverlayContentKey};
99
use manager::QueryTraceEvent;
10-
use parking_lot::RwLock;
10+
use parking_lot::{Mutex, RwLock};
1111
use tokio::sync::{
1212
broadcast,
1313
mpsc::{UnboundedReceiver, UnboundedSender},
@@ -38,7 +38,8 @@ where
3838
/// The underlying Discovery v5 protocol.
3939
discovery: Arc<Discovery>,
4040
/// The content database of the local node.
41-
store: Arc<RwLock<TStore>>,
41+
store: Arc<Mutex<TStore>>,
42+
4243
/// The routing table of the local node.
4344
kbuckets: SharedKBucketsTable,
4445
/// The protocol identifier.

‎crates/portalnet/tests/overlay.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use ethportal_api::{
1515
utils::bytes::hex_encode_upper,
1616
OverlayContentKey,
1717
};
18-
use parking_lot::RwLock;
18+
use parking_lot::Mutex;
1919
use portalnet::{
2020
config::PortalnetConfig,
2121
discovery::{Discovery, Discv5UdpSocket},
@@ -47,7 +47,7 @@ async fn init_overlay(
4747

4848
let node_id = discovery.local_enr().node_id();
4949
let store = MemoryContentStore::new(node_id, DistanceFunction::Xor);
50-
let store = Arc::new(RwLock::new(store));
50+
let store = Arc::new(Mutex::new(store));
5151

5252
let header_oracle = HeaderOracle::default();
5353
let header_oracle = Arc::new(TokioRwLock::new(header_oracle));
@@ -256,7 +256,7 @@ async fn overlay() {
256256
let content = vec![0xef];
257257
overlay_three
258258
.store
259-
.write()
259+
.lock()
260260
.put(content_key.clone(), &content)
261261
.expect("Unable to store content");
262262
let (found_content, utp_transfer, _) = overlay_one

‎crates/subnetworks/beacon/src/jsonrpc.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ async fn local_content(
215215
network: Arc<BeaconNetwork>,
216216
content_key: BeaconContentKey,
217217
) -> Result<Value, String> {
218-
let response = match network.overlay.store.read().get(&content_key)
218+
let response = match network.overlay.store.lock().get(&content_key)
219219
{
220220
Ok(val) => match val {
221221
Some(val) => {
@@ -236,7 +236,7 @@ async fn paginate_local_content_keys(
236236
offset: u64,
237237
limit: u64,
238238
) -> Result<Value, String> {
239-
let response = match network.overlay.store.read().paginate(&offset, &limit)
239+
let response = match network.overlay.store.lock().paginate(&offset, &limit)
240240
{
241241
Ok(val) => Ok(json!(val)),
242242
Err(err) => Err(format!(
@@ -256,7 +256,7 @@ async fn store(
256256
let response = match network
257257
.overlay
258258
.store
259-
.write()
259+
.lock()
260260
.put::<Vec<u8>>(content_key, data)
261261
{
262262
Ok(_) => Ok(Value::Bool(true)),

‎crates/subnetworks/beacon/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ pub fn spawn_beacon_heartbeat(network: Arc<BeaconNetwork>) {
119119
// this wait at the top. Otherwise, we get two log lines immediately on startup.
120120
heart_interval.tick().await;
121121

122-
let storage_log = network.overlay.store.read().get_summary_info();
122+
let storage_log = network.overlay.store.lock().get_summary_info();
123123
let message_log = network.overlay.get_message_summary();
124124
let utp_log = network.overlay.get_utp_summary();
125125
info!("reports~ data: {storage_log}; msgs: {message_log}");

‎crates/subnetworks/beacon/src/network.rs

+6-7
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use ethportal_api::{
77
BeaconContentKey,
88
};
99
use light_client::{consensus::rpc::portal_rpc::PortalRpc, database::FileDB, Client};
10-
use parking_lot::RwLock as PLRwLock;
10+
use parking_lot::Mutex as PLMutex;
1111
use portalnet::{
1212
config::PortalnetConfig,
1313
discovery::{Discovery, UtpPeer},
@@ -58,8 +58,8 @@ impl BeaconNetwork {
5858
gossip_dropped: GOSSIP_DROPPED,
5959
..Default::default()
6060
};
61-
let storage = Arc::new(PLRwLock::new(BeaconStorage::new(storage_config)?));
62-
storage.write().spawn_pruning_task(); // Spawn pruning task to clean up expired content.
61+
let storage = Arc::new(PLMutex::new(BeaconStorage::new(storage_config)?));
62+
storage.lock().spawn_pruning_task(); // Spawn pruning task to clean up expired content.
6363
let storage_clone = Arc::clone(&storage);
6464
let validator = Arc::new(BeaconValidator::new(header_oracle));
6565
let ping_extensions = Arc::new(BeaconPingExtensions {});
@@ -106,15 +106,15 @@ impl BeaconNetwork {
106106
/// Get the trusted block root to start syncing light client from.
107107
fn get_trusted_block_root(
108108
portal_config: &PortalnetConfig,
109-
storage: Arc<PLRwLock<BeaconStorage>>,
109+
storage: Arc<PLMutex<BeaconStorage>>,
110110
) -> anyhow::Result<B256> {
111111
// 1) Check if a trusted block root was provided via config
112112
if let Some(block_root) = portal_config.trusted_block_root {
113113
return Ok(block_root);
114114
}
115115

116116
// 2) Otherwise, try to read the latest block root from storage
117-
let maybe_db_block_root = storage.read().lookup_latest_block_root()?;
117+
let maybe_db_block_root = storage.lock().lookup_latest_block_root()?;
118118
if let Some(db_block_root) = maybe_db_block_root {
119119
info!(
120120
block_root = %db_block_root,
@@ -140,7 +140,6 @@ mod tests {
140140

141141
use discv5::enr::NodeId;
142142
use ethportal_api::{types::distance::Distance, utils::bytes::hex_decode};
143-
use parking_lot::RwLock;
144143
use tempfile::TempDir;
145144
use trin_storage::{config::StorageCapacityConfig, PortalStorageConfigFactory};
146145

@@ -182,7 +181,7 @@ mod tests {
182181
.create(&Subnetwork::Beacon, Distance::MAX)
183182
.unwrap();
184183
// A mock storage that always returns None
185-
let storage_clone = Arc::new(RwLock::new(BeaconStorage::new(storage_config).unwrap()));
184+
let storage_clone = Arc::new(PLMutex::new(BeaconStorage::new(storage_config).unwrap()));
186185

187186
let result = get_trusted_block_root(&portal_config, storage_clone)
188187
.expect("Function should not fail with an Err");

‎crates/subnetworks/history/src/jsonrpc.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ async fn local_content(
150150
network: Arc<HistoryNetwork>,
151151
content_key: HistoryContentKey,
152152
) -> Result<Value, String> {
153-
let response = match network.overlay.store.read().get(&content_key)
153+
let response = match network.overlay.store.lock().get(&content_key)
154154
{
155155
Ok(val) => match val {
156156
Some(val) => {
@@ -176,7 +176,7 @@ async fn paginate_local_content_keys(
176176
offset: u64,
177177
limit: u64,
178178
) -> Result<Value, String> {
179-
let response = match network.overlay.store.read().paginate(offset, limit)
179+
let response = match network.overlay.store.lock().paginate(offset, limit)
180180
{
181181
Ok(val) => Ok(json!(val)),
182182
Err(err) => Err(format!(
@@ -196,7 +196,7 @@ async fn store(
196196
let response = match network
197197
.overlay
198198
.store
199-
.write()
199+
.lock()
200200
.put::<Vec<u8>>(content_key, data)
201201
{
202202
Ok(_) => Ok(Value::Bool(true)),

‎crates/subnetworks/history/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ pub fn spawn_history_heartbeat(network: Arc<HistoryNetwork>) {
118118
// this wait at the top. Otherwise, we get two log lines immediately on startup.
119119
heart_interval.tick().await;
120120

121-
let storage_log = network.overlay.store.read().get_summary_info();
121+
let storage_log = network.overlay.store.lock().get_summary_info();
122122
let message_log = network.overlay.get_message_summary();
123123
let utp_log = network.overlay.get_utp_summary();
124124
info!("reports~ data: {storage_log}; msgs: {message_log}");

‎crates/subnetworks/history/src/network.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use ethportal_api::{
44
types::{distance::XorMetric, network::Subnetwork},
55
HistoryContentKey,
66
};
7-
use parking_lot::RwLock as PLRwLock;
7+
use parking_lot::Mutex;
88
use portalnet::{
99
config::PortalnetConfig,
1010
discovery::{Discovery, UtpPeer},
@@ -55,7 +55,7 @@ impl HistoryNetwork {
5555
utp_transfer_limit: portal_config.utp_transfer_limit,
5656
..Default::default()
5757
};
58-
let storage = Arc::new(PLRwLock::new(HistoryStorage::new(
58+
let storage = Arc::new(Mutex::new(HistoryStorage::new(
5959
storage_config,
6060
disable_history_storage,
6161
)?));

‎crates/subnetworks/state/src/jsonrpc.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ fn local_storage_lookup(
172172
network
173173
.overlay
174174
.store
175-
.read()
175+
.lock()
176176
.get(content_key)
177177
.map_err(|err| err.to_string())
178178
}
@@ -285,7 +285,7 @@ async fn store(
285285
network
286286
.overlay
287287
.store
288-
.write()
288+
.lock()
289289
.put(content_key, content_value.encode())
290290
.map(|_| true),
291291
)
@@ -352,7 +352,7 @@ async fn put_content(
352352
fn paginate(network: Arc<StateNetwork>, offset: u64, limit: u64) -> Result<Value, String> {
353353
to_json_result(
354354
"PaginateLocalContentKeys",
355-
network.overlay.store.read().paginate(offset, limit),
355+
network.overlay.store.lock().paginate(offset, limit),
356356
)
357357
}
358358

‎crates/subnetworks/state/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ pub fn spawn_state_heartbeat(network: Arc<StateNetwork>) {
127127
let cpu_percent =
128128
100.0 * process_time.elapsed().as_secs_f64() / clock_time.elapsed().as_secs_f64();
129129

130-
let storage_log = network.overlay.store.read().get_summary_info();
130+
let storage_log = network.overlay.store.lock().get_summary_info();
131131
let message_log = network.overlay.get_message_summary();
132132
let utp_log = network.overlay.get_utp_summary();
133133
info!("reports~ data: {storage_log}; msgs: {message_log}; cpu={cpu_percent:.1}%");

‎crates/subnetworks/state/src/network.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use ethportal_api::{
44
types::{distance::XorMetric, network::Subnetwork},
55
StateContentKey,
66
};
7-
use parking_lot::RwLock as PLRwLock;
7+
use parking_lot::Mutex;
88
use portalnet::{
99
config::PortalnetConfig,
1010
discovery::{Discovery, UtpPeer},
@@ -60,7 +60,7 @@ impl StateNetwork {
6060
utp_transfer_limit: portal_config.utp_transfer_limit,
6161
..Default::default()
6262
};
63-
let storage = Arc::new(PLRwLock::new(StateStorage::new(storage_config)?));
63+
let storage = Arc::new(Mutex::new(StateStorage::new(storage_config)?));
6464
let validator = Arc::new(StateValidator { header_oracle });
6565
let ping_extensions = Arc::new(StatePingExtensions {});
6666
let overlay = OverlayProtocol::new(

‎portal-spec-tests

Submodule portal-spec-tests updated 27 files

0 commit comments

Comments
 (0)
Please sign in to comment.