Skip to content

Commit 3208aec

Browse files
Merge pull request #163 from TimelyDataflow/v0.9-release
V0.9 release
2 parents 7523ce6 + 142d38a commit 3208aec

28 files changed

Lines changed: 759 additions & 440 deletions

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22

33
name = "differential-dataflow"
4-
version = "0.8.0"
4+
version = "0.9.0"
55
authors = ["Frank McSherry <fmcsherry@me.com>"]
66

77
description = "An incremental data-parallel dataflow platform"
@@ -12,6 +12,7 @@ homepage = "https://github.com/TimelyDataflow/differential-dataflow"
1212
repository = "https://github.com/TimelyDataflow/differential-dataflow.git"
1313
keywords = ["differential", "dataflow"]
1414
license = "MIT"
15+
readme = "README.md"
1516

1617
[dev-dependencies]
1718
indexmap = "1.0.1"
@@ -27,8 +28,8 @@ serde_derive = "1.0"
2728
abomonation = "0.7"
2829
abomonation_derive = "0.3"
2930
timely_sort="0.1.6"
30-
#timely = "0.8"
31-
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
31+
timely = "0.9"
32+
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
3233
#timely = { path = "../timely-dataflow/" }
3334
fnv="1.0.2"
3435

examples/pagerank.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,19 @@ where
117117
.map(|(node, _degr)| node);
118118

119119
// Propagate surfers along links, blend in reset surfers.
120-
let pushed =
120+
let mut pushed =
121121
edges.semijoin(&to_push)
122122
.map(|(_node, dest)| dest)
123123
.concat(&reset)
124-
.consolidate()
124+
.consolidate();
125+
126+
if iters > 0 {
127+
pushed =
128+
pushed
125129
.inner
126130
.filter(move |(_d,t,_r)| t.inner < iters)
127131
.as_collection();
132+
}
128133

129134
// Bind the recursive variable, return its limit.
130135
ranks.set(&pushed);

interactive/src/bin/server.rs

Lines changed: 35 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@ extern crate interactive;
55
use timely::synchronization::Sequencer;
66
use interactive::{Manager, Command, Value};
77

8-
use timely::logging::TimelyEvent;
9-
use differential_dataflow::logging::DifferentialEvent;
10-
118
fn main() {
129

1310
let mut args = std::env::args();
@@ -19,77 +16,6 @@ fn main() {
1916
let command_queue = Arc::new(Mutex::new(VecDeque::<Command<Value>>::new()));
2017
let command_queue2 = command_queue.clone();
2118

22-
let guards =
23-
timely::execute_from_args(args, move |worker| {
24-
25-
let timer = ::std::time::Instant::now();
26-
let mut manager = Manager::<Value>::new();
27-
28-
let recv = command_queue.clone();
29-
30-
use std::rc::Rc;
31-
use timely::dataflow::operators::capture::event::link::EventLink;
32-
use timely::logging::BatchLogger;
33-
34-
let timely_events = Rc::new(EventLink::new());
35-
let differential_events = Rc::new(EventLink::new());
36-
37-
manager.publish_timely_logging(worker, Some(timely_events.clone()));
38-
manager.publish_differential_logging(worker, Some(differential_events.clone()));
39-
40-
let mut timely_logger = BatchLogger::new(timely_events.clone());
41-
worker
42-
.log_register()
43-
.insert::<TimelyEvent,_>("timely", move |time, data| timely_logger.publish_batch(time, data));
44-
45-
let mut differential_logger = BatchLogger::new(differential_events.clone());
46-
worker
47-
.log_register()
48-
.insert::<DifferentialEvent,_>("differential/arrange", move |time, data| differential_logger.publish_batch(time, data));
49-
50-
let mut sequencer = Sequencer::new(worker, timer);
51-
52-
let mut done = false;
53-
while !done {
54-
55-
{ // Check out channel status.
56-
let mut lock = recv.lock().expect("Mutex poisoned");
57-
while let Some(command) = lock.pop_front() {
58-
sequencer.push(command);
59-
}
60-
}
61-
62-
// Dequeue and act on commands.
63-
// One at a time, so that Shutdown works.
64-
if let Some(command) = sequencer.next() {
65-
println!("{:?}\tExecuting {:?}", timer.elapsed(), command);
66-
if command == Command::Shutdown {
67-
done = true;
68-
}
69-
command.execute(&mut manager, worker);
70-
}
71-
72-
worker.step();
73-
}
74-
75-
println!("Shutting down");
76-
77-
// Disable sequencer for shut down.
78-
drop(sequencer);
79-
80-
// Deregister loggers, so that the logging dataflows can shut down.
81-
worker
82-
.log_register()
83-
.insert::<TimelyEvent,_>("timely", move |_time, _data| { });
84-
85-
worker
86-
.log_register()
87-
.insert::<DifferentialEvent,_>("differential/arrange", move |_time, _data| { });
88-
89-
}).expect("Timely computation did not initialize cleanly");
90-
91-
println!("Now accepting commands");
92-
9319
// Detached thread for client connections.
9420
std::thread::Builder::new()
9521
.name("Listener".to_string())
@@ -111,7 +37,41 @@ fn main() {
11137
})
11238
.expect("failed to create thread");
11339
}
114-
11540
})
11641
.expect("Failed to spawn listen thread");
42+
43+
// Initiate timely computation.
44+
timely::execute_from_args(args, move |worker| {
45+
46+
let timer = ::std::time::Instant::now();
47+
let recv = command_queue.clone();
48+
49+
let mut manager = Manager::<Value>::new();
50+
let mut sequencer = Some(Sequencer::new(worker, timer));
51+
52+
while sequencer.is_some() {
53+
54+
// Check out channel status.
55+
while let Some(command) = recv.lock().expect("Mutex poisoned").pop_front() {
56+
sequencer
57+
.as_mut()
58+
.map(|s| s.push(command));
59+
}
60+
61+
// Dequeue and act on commands.
62+
// Once per iteration, so that Shutdown works "immediately".
63+
if let Some(command) = sequencer.as_mut().and_then(|s| s.next()) {
64+
println!("{:?}\tExecuting {:?}", timer.elapsed(), command);
65+
if command == Command::Shutdown {
66+
sequencer = None;
67+
}
68+
command.execute(&mut manager, worker);
69+
}
70+
71+
worker.step();
72+
}
73+
74+
println!("Shutting down");
75+
76+
}).expect("Timely computation did not initialize cleanly");
11777
}

interactive/src/command.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl<Value: Data+Hash> Command<Value> {
9595

9696
Command::Shutdown => {
9797
println!("Shutdown received");
98-
manager.shutdown();
98+
manager.shutdown(worker);
9999
}
100100
}
101101

interactive/src/manager.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,49 @@ impl<Value: Data+Hash> Manager<Value> {
5656
}
5757
}
5858

59+
/// Enables logging of timely and differential events.
60+
pub fn enable_logging<A: Allocate>(&mut self, worker: &mut Worker<A>)
61+
where
62+
TimelyEvent: AsVector<Value>,
63+
DifferentialEvent: AsVector<Value>,
64+
{
65+
66+
use std::rc::Rc;
67+
use timely::dataflow::operators::capture::event::link::EventLink;
68+
use timely::logging::BatchLogger;
69+
70+
let timely_events = Rc::new(EventLink::new());
71+
let differential_events = Rc::new(EventLink::new());
72+
73+
self.publish_timely_logging(worker, Some(timely_events.clone()));
74+
self.publish_differential_logging(worker, Some(differential_events.clone()));
75+
76+
let mut timely_logger = BatchLogger::new(timely_events.clone());
77+
worker
78+
.log_register()
79+
.insert::<TimelyEvent,_>("timely", move |time, data| timely_logger.publish_batch(time, data));
80+
81+
let mut differential_logger = BatchLogger::new(differential_events.clone());
82+
worker
83+
.log_register()
84+
.insert::<DifferentialEvent,_>("differential/arrange", move |time, data| differential_logger.publish_batch(time, data));
85+
86+
}
87+
5988
/// Clear the managed inputs and traces.
60-
pub fn shutdown(&mut self) {
89+
pub fn shutdown<A: Allocate>(&mut self, worker: &mut Worker<A>) {
6190
self.inputs.sessions.clear();
6291
self.traces.inputs.clear();
6392
self.traces.arrangements.clear();
93+
94+
// Deregister loggers, so that the logging dataflows can shut down.
95+
worker
96+
.log_register()
97+
.insert::<TimelyEvent,_>("timely", move |_time, _data| { });
98+
99+
worker
100+
.log_register()
101+
.insert::<DifferentialEvent,_>("differential/arrange", move |_time, _data| { });
64102
}
65103

66104
/// Inserts a new input session by name.

interactive/src/plan/sfw.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
//! Multi-way equijoin expression plan.
2+
//!
3+
//! This plan provides us the opportunity to map out a non-trivial differential
4+
//! implementation for a complex join query. In particular, we are able to invoke
5+
//! delta-query and worst-case optimal join plans, which avoid any intermediate
6+
//! materialization.
7+
//!
8+
//! Each `MultiwayJoin` indicates several source collections, equality constraints
9+
//! among their attributes, and then the set of attributes to produce as results.
10+
//!
11+
//! One naive implementation would take each input collection in order, and develop
12+
//! the join restricted to the prefix of relations so far. Ideally the order would
13+
//! be such that joined collections have equality constraints and prevent Cartesian
14+
//! explosion. At each step, a new collection picks out some of the attributes and
15+
//! instantiates a primitive binary join between the accumulated collection and the
16+
//! next collection.
17+
//!
18+
//! A more sophisticated implementation establishes delta queries for each input
19+
//! collection, which responds to changes in that input collection against the
20+
//! current other input collections. For each input collection we may choose very
21+
//! different join orders, as the order must follow equality constraints.
22+
//!
23+
//! A further implementation could develop the results attribute-by-attribute, as
24+
//! opposed to collection-by-collection, which gives us the ability to use column
25+
//! indices rather than whole-collection indices.
26+
27+
use std::hash::Hash;
28+
29+
use timely::dataflow::Scope;
30+
31+
use differential_dataflow::operators::JoinCore;
32+
33+
use differential_dataflow::{Collection, Data};
34+
use plan::{Plan, Render};
35+
use {TraceManager, Time, Diff};
36+
37+
/// A plan stage joining two source relations on the specified
38+
/// symbols. Throws if any of the join symbols isn't bound by both
39+
/// sources.
40+
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
41+
pub struct MultiwayJoin<Value> {
42+
/// Attributes to extract as `(attr, input)`.
43+
pub results: Vec<(usize, usize)>,
44+
/// Source collections.
45+
pub sources: Vec<Box<Plan<Value>>>,
46+
/// Equality constraints (as lists of equal `(attr, input)` pairs).
47+
pub equalities: Vec<Vec<(usize, usize)>>,
48+
}
49+
50+
impl<V: Data+Hash> Render for MultiwayJoin<V> {
51+
52+
type Value = V;
53+
54+
fn render<S: Scope<Timestamp = Time>>(
55+
&self,
56+
scope: &mut S,
57+
arrangements: &mut TraceManager<Self::Value>) -> Collection<S, Vec<Self::Value>, Diff>
58+
{
59+
unimplemented!()
60+
}
61+
}

0 commit comments

Comments
 (0)