Skip to content

Commit 27e4894

Browse files
Merge pull request #15 from frankmcsherry/resolution
Resolution (fixes #14)
2 parents 241d48b + 6a4fce0 commit 27e4894

63 files changed

Lines changed: 6416 additions & 3683 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
[package]
33

44
name = "differential-dataflow"
5-
version = "0.0.3"
5+
version = "0.0.4"
66
authors = ["Frank McSherry <fmcsherry@me.com>"]
77

88
description = "An incremental data-parallel dataflow platform"
@@ -22,8 +22,11 @@ getopts="0.2.14"
2222
rand="0.3.13"
2323
byteorder="0.4.2"
2424

25+
[dependencies.timely]
26+
git="http://github.com/frankmcsherry/timely-dataflow"
27+
2528
[dependencies]
26-
timely="0.1.15"
29+
abomonation="0.4.4"
2730
timely_sort="^0.1.1"
2831
timely_communication="^0.1.3"
2932
itertools="0.4"

TODO.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
## Differential dataflow to-do list:
2+
3+
1. Batch builders need an "ordered" option where the keys and vals are already sorted (cf group.output).
4+
2. Several trace implementations need to be fleshed out (e.g. `time` and `constant`).
5+
- Consider ConstantCollection type which can only be construct from known data, arranged to `constant`.
6+
3. Several trace implementations could benefit from a RHH `keys` field; prototype and test!
7+
- Probably wants a Uniform<T: Unsigned> struct for "node identifiers"; needs tweaks to `Data` trait.
8+
4. Lots of sorting, but no radix-sorting. Historically a big improvement.
9+
- Connects to U: Unsigned output of `hashed`; no point radix-sorting u32 keys as if u64s.
10+
5. Several operators need revision: distinct, threshold, cogroup.
11+
6. The `keys` trace implementation has had zero testing. Important!
12+
7. Progressive merging under-explored; trade-offs in rate of work? (yes, but worth?)
13+
14+
8. High-resolution times aren't too far away.
15+
- Think up alternate Collection type with new data bits.
16+
- Uncomment `group` implementation and get to work.
17+
18+
9. Join now has "deferred work"; check it out to see if it helps on large graphs.

examples/arrange.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,14 @@ extern crate timely_sort;
44
extern crate differential_dataflow;
55
extern crate vec_map;
66

7-
use vec_map::VecMap;
8-
97
use timely::dataflow::*;
108
use timely::dataflow::operators::*;
119

12-
use timely_sort::Unsigned;
13-
1410
use rand::{Rng, SeedableRng, StdRng};
1511

1612
use differential_dataflow::AsCollection;
17-
use differential_dataflow::operators::*;
18-
use differential_dataflow::collection::Trace;
13+
use differential_dataflow::operators::arrange::ArrangeByKey;
14+
use differential_dataflow::trace::{Cursor, Trace};
1915

2016
fn main() {
2117

@@ -37,7 +33,7 @@ fn main() {
3733

3834
// pull off source, and count.
3935
let arranged = edges.as_collection()
40-
.arrange_by_key(|k: &u32| k.as_u64(), |x| (VecMap::new(), x));
36+
.arrange_by_key();
4137

4238
(input, arranged.stream.probe().0, arranged.trace.clone())
4339
});
@@ -92,11 +88,16 @@ fn main() {
9288

9389
let mut count = 0;
9490
let timer = ::std::time::Instant::now();
95-
let mut borrow = trace.borrow_mut();
96-
for node in 0..nodes {
97-
for _edge in borrow.get_collection(&node, input.time()) {
98-
count += 1;
91+
let mut cursor = trace.borrow().trace.cursor();
92+
while cursor.key_valid() {
93+
while cursor.val_valid() {
94+
let mut sum = 0;
95+
cursor.map_times(|_,d| sum += d);
96+
if sum > 0 { count += 1; }
97+
cursor.step_val();
9998
}
99+
100+
cursor.step_key()
100101
}
101102

102103
println!("count: {} in {:?}", count, timer.elapsed());

examples/bfs.rs

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,16 @@
11
extern crate rand;
22
extern crate timely;
3-
extern crate timely_sort;
43
extern crate differential_dataflow;
5-
extern crate vec_map;
4+
5+
use rand::{Rng, SeedableRng, StdRng};
66

77
use timely::dataflow::*;
88
use timely::dataflow::operators::*;
9-
use timely_sort::Unsigned;
10-
11-
use rand::{Rng, SeedableRng, StdRng};
12-
use vec_map::VecMap;
139

1410
use differential_dataflow::Collection;
1511
use differential_dataflow::operators::*;
16-
use differential_dataflow::operators::join::JoinArranged;
17-
use differential_dataflow::operators::group::GroupArranged;
1812
use differential_dataflow::lattice::Lattice;
1913

20-
use differential_dataflow::operators::iterate::Variable;
21-
2214
type Node = u32;
2315
type Edge = (Node, Node);
2416

@@ -27,27 +19,30 @@ fn main() {
2719
let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap();
2820
let edges: u32 = std::env::args().nth(2).unwrap().parse().unwrap();
2921
let batch: u32 = std::env::args().nth(3).unwrap().parse().unwrap();
30-
let inspect: bool = std::env::args().nth(4).unwrap() == "inspect";
22+
let rounds: u32 = std::env::args().nth(4).unwrap().parse().unwrap();
23+
let inspect: bool = std::env::args().nth(5).unwrap() == "inspect";
3124

3225
// define a new computational scope, in which to run BFS
33-
timely::execute_from_args(std::env::args().skip(5), move |computation| {
26+
timely::execute_from_args(std::env::args().skip(6), move |computation| {
3427

35-
let timer = ::std::time::Instant::now();
28+
// let timer = ::std::time::Instant::now();
3629

3730
// define BFS dataflow; return handles to roots and edges inputs
3831
let (mut graph, probe) = computation.scoped(|scope| {
3932

40-
let roots = vec![(0,1), (1,1), (2,1)].into_iter().to_stream(scope);
33+
let roots = vec![(1,1)].into_iter().to_stream(scope);
4134
let (edge_input, graph) = scope.new_input();
42-
let mut result = bfs(&Collection::new(graph), &Collection::new(roots));
35+
36+
let mut result = bfs(&Collection::new(graph.clone()), &Collection::new(roots.clone()));
4337

4438
if !inspect {
4539
result = result.filter(|_| false);
4640
}
4741

4842
let probe = result.map(|(_,l)| l)
4943
.consolidate_by(|&x| x)
50-
.inspect(|x| println!("\t{:?}", x)).probe();
44+
.inspect(|x| println!("\t{:?}", x)).probe()
45+
;
5146

5247
(edge_input, probe.0)
5348
});
@@ -56,7 +51,7 @@ fn main() {
5651
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
5752
let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions
5853

59-
println!("performing BFS on {} nodes, {} edges:", nodes, edges);
54+
// println!("performing BFS on {} nodes, {} edges:", nodes, edges);
6055

6156
if computation.index() == 0 {
6257
// trickle edges in to dataflow
@@ -71,24 +66,24 @@ fn main() {
7166
}
7267
}
7368

74-
println!("loaded; elapsed: {:?}", timer.elapsed());
69+
// println!("loaded; elapsed: {:?}", timer.elapsed());
7570

7671
graph.advance_to(1);
7772
computation.step_while(|| probe.lt(graph.time()));
7873

79-
println!("stable; elapsed: {:?}", timer.elapsed());
74+
// println!("stable; elapsed: {:?}", timer.elapsed());
8075

8176
if batch > 0 {
8277
let mut changes = Vec::new();
83-
for wave in 0.. {
78+
for _wave in 0 .. rounds {
8479
if computation.index() == 0 {
8580
for _ in 0..batch {
8681
changes.push(((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1));
8782
changes.push(((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)),-1));
8883
}
8984
}
9085

91-
let timer = ::std::time::Instant::now();
86+
// let timer = ::std::time::Instant::now();
9287
let round = *graph.epoch();
9388
if computation.index() == 0 {
9489
while let Some(change) = changes.pop() {
@@ -98,17 +93,23 @@ fn main() {
9893
graph.advance_to(round + 1);
9994
computation.step_while(|| probe.lt(&graph.time()));
10095

101-
if computation.index() == 0 {
102-
println!("wave {}: avg {:?}", wave, timer.elapsed() / (batch as u32));
103-
}
96+
// if computation.index() == 0 {
97+
// let elapsed = timer.elapsed();
98+
// println!("{}", elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64));
99+
// // println!()
100+
// // println!("wave {}: avg {:?}", wave, timer.elapsed() / (batch as u32));
101+
// }
104102
}
105103
}
104+
105+
// println!("finished; elapsed: {:?}", timer.elapsed());
106+
106107
}).unwrap();
107108
}
108109

109110
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
110111
fn bfs<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
111-
where G::Timestamp: Lattice {
112+
where G::Timestamp: Lattice+Ord {
112113

113114
// initialize roots as reaching themselves at distance 0
114115
let nodes = roots.map(|x| (x, 0));
@@ -119,37 +120,36 @@ where G::Timestamp: Lattice {
119120
let edges = edges.enter(&inner.scope());
120121
let nodes = nodes.enter(&inner.scope());
121122

122-
inner.join_map_u(&edges, |_k,l,d| (*d, l+1))
123+
inner.join_map(&edges, |_k,l,d| (*d, l+1))
123124
.concat(&nodes)
124-
.group_u(|_, s, t| t.push((*s.peek().unwrap().0, 1)))
125+
.group(|_, s, t| t.push((s[0].0, 1)))
125126
})
126127
}
127128

128-
// Experimental implementation using arrangement to share the output of group_u with the input to join_map_u.
129-
130-
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
131-
fn _bfs2<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
132-
where G::Timestamp: Lattice {
129+
// // Experimental implementation using arrangement to share the output of group_u with the input to join_map_u.
130+
// // returns pairs (n, s) indicating node n can be reached from a root in s steps.
131+
// fn _bfs2<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
132+
// where G::Timestamp: Lattice {
133133

134-
// initialize roots as reaching themselves at distance 0
135-
let nodes = roots.map(|x| (x, 0));
134+
// // initialize roots as reaching themselves at distance 0
135+
// let nodes = roots.map(|x| (x, 0));
136136

137-
// repeatedly update minimal distances each node can be reached from each root
138-
nodes.scope().scoped(|scope| {
137+
// // repeatedly update minimal distances each node can be reached from each root
138+
// nodes.scope().scoped(|scope| {
139139

140-
let edges = edges.enter(scope);
141-
let nodes2 = nodes.enter(scope);
140+
// let edges = edges.enter(scope);
141+
// let nodes2 = nodes.enter(scope);
142142

143-
let variable = Variable::from(nodes.enter(scope));
143+
// let variable = Variable::from(nodes.enter(scope));
144144

145-
let arranged = variable.concat(&nodes2)
146-
.arrange_by_key(|k| k.as_u64(), |x| (VecMap::new(), x))
147-
.group(|k| k.as_u64(), |x| (VecMap::new(), x), |_, s, t| t.push((*s.peek().unwrap().0, 1)));
145+
// let arranged = variable.concat(&nodes2)
146+
// .arrange_by_key(|k| k.as_u64(), |x| (VecMap::new(), x))
147+
// .group(|k| k.as_u64(), |x| (VecMap::new(), x), |_, s, t| t.push((*s.peek().unwrap().0, 1)));
148148

149-
let result = arranged.as_collection().leave();
149+
// let result = arranged.as_collection().leave();
150150

151-
variable.set(&arranged.join(&edges.arrange_by_key(|k| k.as_u64(), |x| (VecMap::new(), x)), |_k,l,d| (*d, l+1)));
151+
// variable.set(&arranged.join(&edges.arrange_by_key(|k| k.as_u64(), |x| (VecMap::new(), x)), |_k,l,d| (*d, l+1)));
152152

153-
result
154-
})
155-
}
153+
// result
154+
// })
155+
// }

0 commit comments

Comments
 (0)