Skip to content

Commit 037ec2c

Browse files
committed
combine sketches across workers for more precise bloom filters
1 parent 49abc74 commit 037ec2c

File tree

2 files changed

+37
-1
lines changed

2 files changed

+37
-1
lines changed

crates/core/src/entrypoint/ampc/shortest_path/coordinator.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::ampc::{Coordinator, DefaultDhtTable, DhtConn};
2828
use crate::config::ShortestPathCoordinatorConfig;
2929
use crate::distributed::cluster::Cluster;
3030
use crate::distributed::member::{Member, Service, ShardId};
31+
use crate::hyperloglog::HyperLogLog;
3132
use crate::webpage::url_ext::UrlExt;
3233
use crate::{webgraph, Result};
3334

@@ -174,6 +175,21 @@ pub fn run(config: ShortestPathCoordinatorConfig) -> Result<()> {
174175
.build()?
175176
.block_on(setup_gossip(tokio_conf))?;
176177

178+
let sketch = cluster
179+
.workers
180+
.iter()
181+
.map(|worker| worker.get_node_sketch())
182+
.fold(HyperLogLog::default(), |mut acc, sketch| {
183+
acc.merge(&sketch);
184+
acc
185+
});
186+
187+
let num_nodes = sketch.size() as u64;
188+
189+
for worker in cluster.workers.iter() {
190+
worker.update_changed_nodes_precision(num_nodes);
191+
}
192+
177193
let jobs: Vec<_> = cluster
178194
.workers
179195
.iter()

crates/core/src/entrypoint/ampc/shortest_path/worker.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ impl ShortestPathWorker {
7575
pub fn nodes_sketch(&self) -> &HyperLogLog<4096> {
7676
&self.nodes_sketch
7777
}
78+
79+
pub fn update_changed_nodes_precision(&self, num_nodes: u64) {
80+
let mut changed_nodes = self.changed_nodes().lock().unwrap();
81+
*changed_nodes = U64BloomFilter::new(num_nodes, 0.01);
82+
}
7883
}
7984

8085
#[derive(serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode, Debug, Clone)]
@@ -88,6 +93,17 @@ impl Message<ShortestPathWorker> for GetNodeSketch {
8893
}
8994
}
9095

96+
#[derive(serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode, Debug, Clone)]
97+
pub struct UpdateChangedNodesPrecision(u64);
98+
99+
impl Message<ShortestPathWorker> for UpdateChangedNodesPrecision {
100+
type Response = ();
101+
102+
fn handle(self, worker: &ShortestPathWorker) -> Self::Response {
103+
worker.update_changed_nodes_precision(self.0);
104+
}
105+
}
106+
91107
#[derive(serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode, Debug, Clone)]
92108
pub struct BatchId2Node(Vec<webgraph::NodeID>);
93109

@@ -109,7 +125,7 @@ impl Message<ShortestPathWorker> for BatchId2Node {
109125
}
110126
}
111127

112-
impl_worker!(ShortestPathJob, RemoteShortestPathWorker => ShortestPathWorker, [BatchId2Node, GetNodeSketch]);
128+
impl_worker!(ShortestPathJob, RemoteShortestPathWorker => ShortestPathWorker, [BatchId2Node, GetNodeSketch, UpdateChangedNodesPrecision]);
113129

114130
#[derive(Clone)]
115131
pub struct RemoteShortestPathWorker {
@@ -143,6 +159,10 @@ impl RemoteShortestPathWorker {
143159
pub fn get_node_sketch(&self) -> HyperLogLog<4096> {
144160
self.send(GetNodeSketch)
145161
}
162+
163+
pub fn update_changed_nodes_precision(&self, num_nodes: u64) {
164+
self.send(UpdateChangedNodesPrecision(num_nodes));
165+
}
146166
}
147167

148168
impl RemoteWorker for RemoteShortestPathWorker {

0 commit comments

Comments
 (0)