Skip to content

Commit 774dbd8

Browse files
committed
optimise shortest path to use exact changed nodes (stored in hashset) if there are very few updated nodes
1 parent 037ec2c commit 774dbd8

File tree

5 files changed

+306
-58
lines changed

5 files changed

+306
-58
lines changed

crates/core/src/ampc/dht/value.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ impl ValueTrait for HarmonicMeta {}
5353
type ShortestPathMeta = crate::entrypoint::ampc::shortest_path::Meta;
5454
impl ValueTrait for ShortestPathMeta {}
5555

56+
type ShortestPathChangedNodes = crate::entrypoint::ampc::shortest_path::UpdatedNodes;
57+
impl ValueTrait for ShortestPathChangedNodes {}
58+
5659
impl ValueTrait for U64BloomFilter {}
5760

5861
type Unit = ();
@@ -75,6 +78,7 @@ pub enum Value {
7578
HyperLogLog128(HyperLogLog128),
7679
HarmonicMeta(HarmonicMeta),
7780
ShortestPathMeta(ShortestPathMeta),
81+
ShortestPathChangedNodes(ShortestPathChangedNodes),
7882
U64BloomFilter(U64BloomFilter),
7983
Unit(Unit),
8084
}
@@ -113,5 +117,6 @@ impl_from_to_value!(HyperLogLog64, HyperLogLog64);
113117
impl_from_to_value!(HyperLogLog128, HyperLogLog128);
114118
impl_from_to_value!(HarmonicMeta, HarmonicMeta);
115119
impl_from_to_value!(ShortestPathMeta, ShortestPathMeta);
120+
impl_from_to_value!(ShortestPathChangedNodes, ShortestPathChangedNodes);
116121
impl_from_to_value!(U64BloomFilter, U64BloomFilter);
117122
impl_from_to_value!(Unit, Unit);

crates/core/src/entrypoint/ampc/shortest_path/mapper.rs

Lines changed: 127 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@
1616

1717
use std::sync::{atomic::AtomicBool, Arc, Mutex};
1818

19-
use bloom::U64BloomFilter;
2019
use rustc_hash::FxHashMap;
2120

22-
use super::{DhtTable as _, Mapper, Meta, ShortestPathJob, ShortestPathTables};
21+
use super::{
22+
updated_nodes::{UpdatedNodes, UpdatedNodesKind},
23+
worker::ShortestPathWorker,
24+
DhtTable as _, Mapper, Meta, ShortestPathJob, ShortestPathTables,
25+
};
2326
use crate::{
2427
ampc::{
2528
dht::{U64Min, UpsertAction},
2629
DhtConn,
2730
},
28-
webgraph,
31+
webgraph::{self, query},
2932
webpage::html::links::RelFlags,
3033
};
3134

@@ -84,7 +87,7 @@ impl ShortestPathMapper {
8487

8588
fn map_batch(
8689
batch: &[webgraph::SmallEdge],
87-
new_changed_nodes: &Mutex<U64BloomFilter>,
90+
new_changed_nodes: &Mutex<UpdatedNodes>,
8891
round_had_changes: &AtomicBool,
8992
dht: &DhtConn<ShortestPathTables>,
9093
) {
@@ -93,11 +96,94 @@ impl ShortestPathMapper {
9396

9497
for (node, action) in updates {
9598
if action.is_changed() {
96-
new_changed_nodes.insert_u128(node.as_u128());
99+
new_changed_nodes.add(node);
97100
round_had_changes.store(true, std::sync::atomic::Ordering::Relaxed);
98101
}
99102
}
100103
}
104+
105+
fn relax_all_edges(
106+
worker: &ShortestPathWorker,
107+
changed_nodes: &UpdatedNodes,
108+
new_changed_nodes: &Mutex<UpdatedNodes>,
109+
round_had_changes: &AtomicBool,
110+
dht: &DhtConn<ShortestPathTables>,
111+
) {
112+
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
113+
pool.scope(|s| {
114+
let mut batch = Vec::with_capacity(BATCH_SIZE);
115+
116+
for edge in worker.graph().page_edges() {
117+
if edge.rel_flags.intersects(*SKIPPED_REL) {
118+
continue;
119+
}
120+
121+
if changed_nodes.contains(edge.from) {
122+
batch.push(edge);
123+
}
124+
125+
if batch.len() >= BATCH_SIZE {
126+
let update_batch = batch.clone();
127+
s.spawn(move |_| {
128+
Self::map_batch(&update_batch, new_changed_nodes, round_had_changes, dht)
129+
});
130+
batch.clear();
131+
}
132+
}
133+
134+
if !batch.is_empty() {
135+
Self::map_batch(&batch, new_changed_nodes, round_had_changes, dht);
136+
}
137+
});
138+
}
139+
140+
fn relax_exact_edges(
141+
worker: &ShortestPathWorker,
142+
changed_nodes: &UpdatedNodes,
143+
exact_changed_nodes: &[webgraph::NodeID],
144+
new_changed_nodes: &Mutex<UpdatedNodes>,
145+
round_had_changes: &AtomicBool,
146+
dht: &DhtConn<ShortestPathTables>,
147+
) {
148+
let mut batch = Vec::with_capacity(BATCH_SIZE);
149+
150+
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
151+
152+
pool.scope(|s| {
153+
for node in exact_changed_nodes {
154+
for edge in worker
155+
.graph()
156+
.search(&query::ForwardlinksQuery::new(*node))
157+
.unwrap_or_default()
158+
{
159+
if edge.rel_flags.intersects(*SKIPPED_REL) {
160+
continue;
161+
}
162+
163+
if changed_nodes.contains(edge.from) {
164+
batch.push(edge);
165+
}
166+
167+
if batch.len() >= BATCH_SIZE {
168+
let update_batch = batch.clone();
169+
s.spawn(move |_| {
170+
Self::map_batch(
171+
&update_batch,
172+
new_changed_nodes,
173+
round_had_changes,
174+
dht,
175+
)
176+
});
177+
batch.clear();
178+
}
179+
}
180+
}
181+
});
182+
183+
if !batch.is_empty() {
184+
Self::map_batch(&batch, new_changed_nodes, round_had_changes, dht);
185+
}
186+
}
101187
}
102188

103189
impl Mapper for ShortestPathMapper {
@@ -112,47 +198,41 @@ impl Mapper for ShortestPathMapper {
112198
match self {
113199
ShortestPathMapper::RelaxEdges => {
114200
let round_had_changes = Arc::new(AtomicBool::new(false));
115-
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
116-
117-
let new_changed_nodes = Arc::new(Mutex::new(U64BloomFilter::empty_from(
118-
&worker.changed_nodes().lock().unwrap(),
119-
)));
120-
121-
pool.scope(|s| {
122-
let mut changed_nodes = worker.changed_nodes().lock().unwrap();
123-
changed_nodes.insert_u128(job.source.as_u128());
124-
125-
let mut batch = Vec::with_capacity(BATCH_SIZE);
126-
127-
for edge in worker.graph().page_edges() {
128-
if edge.rel_flags.intersects(*SKIPPED_REL) {
129-
continue;
130-
}
131-
132-
if changed_nodes.contains_u128(edge.from.as_u128()) {
133-
batch.push(edge);
134-
}
135-
136-
if batch.len() >= BATCH_SIZE {
137-
let update_batch = batch.clone();
138-
let update_new_changed_nodes = new_changed_nodes.clone();
139-
let update_round_had_changes = round_had_changes.clone();
140-
s.spawn(move |_| {
141-
Self::map_batch(
142-
&update_batch,
143-
&update_new_changed_nodes,
144-
&update_round_had_changes,
145-
dht,
146-
)
147-
});
148-
batch.clear();
149-
}
150-
}
151201

152-
if !batch.is_empty() {
153-
Self::map_batch(&batch, &new_changed_nodes, &round_had_changes, dht);
202+
let mut changed_nodes = worker.changed_nodes().lock().unwrap();
203+
changed_nodes.add(job.source);
204+
205+
let new_changed_nodes =
206+
Arc::new(Mutex::new(UpdatedNodes::empty_from(&changed_nodes)));
207+
208+
match changed_nodes.kind() {
209+
UpdatedNodesKind::Exact => {
210+
let exact_changed_nodes: Vec<_> = changed_nodes
211+
.as_exact()
212+
.unwrap()
213+
.clone()
214+
.into_iter()
215+
.collect();
216+
217+
Self::relax_exact_edges(
218+
worker,
219+
&changed_nodes,
220+
&exact_changed_nodes,
221+
&new_changed_nodes,
222+
&round_had_changes,
223+
dht,
224+
);
154225
}
155-
});
226+
UpdatedNodesKind::Sketch => {
227+
Self::relax_all_edges(
228+
worker,
229+
&changed_nodes,
230+
&new_changed_nodes,
231+
&round_had_changes,
232+
dht,
233+
);
234+
}
235+
}
156236

157237
dht.next()
158238
.changed_nodes
@@ -169,10 +249,10 @@ impl Mapper for ShortestPathMapper {
169249
let all_changed_nodes: Vec<_> =
170250
dht.next().changed_nodes.iter().map(|(_, v)| v).collect();
171251
let mut changed_nodes =
172-
U64BloomFilter::empty_from(&worker.changed_nodes().lock().unwrap());
252+
UpdatedNodes::empty_from(&worker.changed_nodes().lock().unwrap());
173253

174-
for bloom in all_changed_nodes {
175-
changed_nodes.union(bloom.clone());
254+
for other in &all_changed_nodes {
255+
changed_nodes = changed_nodes.union(other);
176256
}
177257

178258
*worker.changed_nodes().lock().unwrap() = changed_nodes;

crates/core/src/entrypoint/ampc/shortest_path/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
1919
pub mod coordinator;
2020
mod mapper;
21+
mod updated_nodes;
2122
pub mod worker;
2223

23-
use bloom::U64BloomFilter;
24+
pub use updated_nodes::UpdatedNodes;
2425

2526
use crate::distributed::member::ShardId;
2627
use crate::{
@@ -49,7 +50,7 @@ pub struct Meta {
4950
pub struct ShortestPathTables {
5051
distances: DefaultDhtTable<webgraph::NodeID, u64>,
5152
meta: DefaultDhtTable<(), Meta>,
52-
changed_nodes: DefaultDhtTable<ShardId, U64BloomFilter>,
53+
changed_nodes: DefaultDhtTable<ShardId, UpdatedNodes>,
5354
}
5455

5556
impl_dht_tables!(ShortestPathTables, [distances, meta, changed_nodes]);

0 commit comments

Comments
 (0)