Skip to content

Commit 5e45e0e

Browse files
committed
Parallelize ChannelMonitorUpdate loading
When reading `ChannelMonitor`s from a `MonitorUpdatingPersister` on startup, we have to make sure to load any `ChannelMonitorUpdate`s and re-apply them as well. Now that we know which `ChannelMonitorUpdate`s to load from `list`ing the entries from the `KVStore` we can parallelize the reads themselves, which we do here. Now, loading all `ChannelMonitor`s from an async `KVStore` requires only three full RTTs - one to list the set of `ChannelMonitor`s, one to both fetch the `ChanelMonitor` and list the set of `ChannelMonitorUpdate`s, and one to fetch all the `ChannelMonitorUpdate`s (with the last one skipped when there are no `ChannelMonitorUpdate`s to read).
1 parent 7b2d93d commit 5e45e0e

File tree

1 file changed

+22
-18
lines changed

1 file changed

+22
-18
lines changed

lightning/src/util/persist.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,28 +1095,32 @@ where
10951095
Some(res) => res,
10961096
None => return Ok(None),
10971097
};
1098-
let mut current_update_id = monitor.get_latest_update_id();
1098+
let current_update_id = monitor.get_latest_update_id();
10991099
let updates: Result<Vec<_>, _> =
11001100
list_res?.into_iter().map(|name| UpdateName::new(name)).collect();
11011101
let mut updates = updates?;
11021102
updates.sort_unstable();
1103-
// TODO: Parallelize this loop
1104-
for update_name in updates {
1105-
if update_name.0 > current_update_id {
1106-
let update = self.read_monitor_update(monitor_key, &update_name).await?;
1107-
monitor
1108-
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1109-
.map_err(|e| {
1110-
log_error!(
1111-
self.logger,
1112-
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1113-
monitor_key,
1114-
update_name.as_str(),
1115-
e
1116-
);
1117-
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1118-
})?;
1119-
}
1103+
let updates_to_load = updates.iter().filter(|update| update.0 > current_update_id);
1104+
let mut update_futures = Vec::with_capacity(updates_to_load.clone().count());
1105+
for update_name in updates_to_load {
1106+
update_futures.push(ResultFuture::Pending(Box::pin(async move {
1107+
(update_name, self.read_monitor_update(monitor_key, update_name).await)
1108+
})));
1109+
}
1110+
for (update_name, update_res) in MultiResultFuturePoller::new(update_futures).await {
1111+
let update = update_res?;
1112+
monitor
1113+
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
1114+
.map_err(|e| {
1115+
log_error!(
1116+
self.logger,
1117+
"Monitor update failed. monitor: {} update: {} reason: {:?}",
1118+
monitor_key,
1119+
update_name.as_str(),
1120+
e
1121+
);
1122+
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
1123+
})?;
11201124
}
11211125
Ok(Some((block_hash, monitor)))
11221126
}

0 commit comments

Comments
 (0)