Skip to content

Commit d6d8eb4

Browse files
Merge pull request #115 from frankmcsherry/logging
Logging
2 parents 5902213 + acfca41 commit d6d8eb4

26 files changed

Lines changed: 1193 additions & 777 deletions

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22

33
name = "differential-dataflow"
4-
version = "0.6.0"
4+
version = "0.7.0"
55
authors = ["Frank McSherry <fmcsherry@me.com>"]
66

77
description = "An incremental data-parallel dataflow platform"
@@ -24,7 +24,7 @@ graph_map = { git = "https://github.com/frankmcsherry/graph-map.git" }
2424
abomonation = { git = "https://github.com/frankmcsherry/abomonation" }
2525
abomonation_derive = "0.3"
2626
timely_sort="0.1.6"
27-
timely = { git = "https://github.com/frankmcsherry/timely-dataflow" }
27+
timely = { git = "https://github.com/frankmcsherry/timely-dataflow", tag = "v0.7.0" }
2828
fnv="1.0.2"
2929

3030
[profile.release]

examples/arrange.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,12 @@ use rand::{Rng, SeedableRng, StdRng};
88

99
use differential_dataflow::input::Input;
1010
use differential_dataflow::AsCollection;
11-
use differential_dataflow::operators::arrange::{Arrange, ArrangeByKey};
11+
use differential_dataflow::operators::arrange::ArrangeByKey;
1212
use differential_dataflow::operators::group::Group;
1313
use differential_dataflow::operators::join::JoinCore;
1414
use differential_dataflow::operators::Iterate;
1515
use differential_dataflow::operators::Consolidate;
1616

17-
use differential_dataflow::trace::Trace;
18-
use differential_dataflow::trace::implementations::ord::OrdValSpineAbom;
19-
20-
// use differential_dataflow::trace::implementations::ord::OrdValSpine;
21-
// use differential_dataflow::trace::{Cursor, Trace};
22-
// use differential_dataflow::trace::Batch;
23-
// use differential_dataflow::hashable::OrdWrapper;
24-
// use differential_dataflow::trace::TraceReader;
25-
2617
fn main() {
2718

2819
let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap();
@@ -32,7 +23,7 @@ fn main() {
3223
let inspect: bool = std::env::args().nth(5).unwrap() == "inspect";
3324

3425

35-
// define a new timely dataflow computation.
26+
// define a new timely dataflow computation.
3627
timely::execute_from_args(std::env::args().skip(6), move |worker| {
3728

3829
let timer = ::std::time::Instant::now();
@@ -99,8 +90,8 @@ fn main() {
9990
})
10091
.probe_with(&mut probe)
10192
.as_collection()
102-
// .arrange_by_key()
103-
.arrange(OrdValSpineAbom::new())
93+
.arrange_by_key()
94+
// .arrange::<OrdValSpineAbom>()
10495
.trace
10596
});
10697

examples/bfs.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use differential_dataflow::input::Input;
1111
use differential_dataflow::Collection;
1212
use differential_dataflow::operators::*;
1313
use differential_dataflow::lattice::Lattice;
14+
use differential_dataflow::logging::DifferentialEvent;
1415

1516
type Node = u32;
1617
type Edge = (Node, Node);
@@ -26,6 +27,22 @@ fn main() {
2627
// define a new computational scope, in which to run BFS
2728
timely::execute_from_args(std::env::args(), move |worker| {
2829

30+
if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") {
31+
32+
eprintln!("enabled DIFFERENTIAL logging to {}", addr);
33+
34+
if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
35+
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
36+
let mut logger = ::timely::logging::BatchLogger::new(writer);
37+
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data|
38+
logger.publish_batch(time, data)
39+
);
40+
}
41+
else {
42+
panic!("Could not connect to differential log address: {:?}", addr);
43+
}
44+
}
45+
2946
let timer = ::std::time::Instant::now();
3047

3148
// define BFS dataflow; return handles to roots and edges inputs

examples/bijkstra.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ fn main() {
2626

2727
// define a new computational scope, in which to run BFS
2828
timely::execute_from_args(std::env::args().skip(6), move |worker| {
29-
29+
3030
let timer = ::std::time::Instant::now();
3131

3232
// define BFS dataflow; return handles to roots and edges inputs
@@ -80,7 +80,7 @@ fn main() {
8080
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
8181
graph.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)));
8282
}
83-
graph.advance_to(3 + round * batch + element);
83+
graph.advance_to(3 + round * batch + element);
8484
}
8585
graph.flush();
8686

@@ -100,15 +100,15 @@ fn main() {
100100
fn bidijkstra<G: Scope>(edges: &Collection<G, Edge>, goals: &Collection<G, (Node, Node)>) -> Collection<G, ((Node, Node), u32)>
101101
where G::Timestamp: Lattice+Ord {
102102

103-
edges.scope().scoped(|inner| {
103+
edges.scope().scoped::<u64,_,_>(|inner| {
104104

105-
// Our plan is to start evolving distances from both sources and destinations.
105+
// Our plan is to start evolving distances from both sources and destinations.
106106
// The evolution from a source or destination should continue as long as there
107107
// is a corresponding destination or source that has not yet been reached.
108108

109109
// forward and reverse (node, (root, dist))
110-
let forward = Variable::from(goals.map(|(x,_)| (x,(x,0))).enter(inner));
111-
let reverse = Variable::from(goals.map(|(_,y)| (y,(y,0))).enter(inner));
110+
let forward = Variable::new_from(goals.map(|(x,_)| (x,(x,0))).enter(inner), u64::max_value(), 1);
111+
let reverse = Variable::new_from(goals.map(|(_,y)| (y,(y,0))).enter(inner), u64::max_value(), 1);
112112

113113
let goals = goals.enter(inner);
114114
let edges = edges.enter(inner);
@@ -118,7 +118,7 @@ where G::Timestamp: Lattice+Ord {
118118
// done(src, dst) := forward(src, med), reverse(dst, med), goal(src, dst).
119119
//
120120
// This is a cyclic join, which should scare us a bunch.
121-
let reached =
121+
let reached =
122122
forward
123123
.join_map(&reverse, |_, &(src,d1), &(dst,d2)| ((src, dst), d1 + d2))
124124
.group(|_key, s, t| t.push((*s[0].0, 1)));
@@ -133,7 +133,7 @@ where G::Timestamp: Lattice+Ord {
133133

134134
// Let's expand out forward queries that are active.
135135
let forward_active = active.map(|(x,_y)| x).distinct();
136-
let forward_next =
136+
let forward_next =
137137
forward
138138
.map(|(med, (src, dist))| (src, (med, dist)))
139139
.semijoin(&forward_active)
@@ -148,7 +148,7 @@ where G::Timestamp: Lattice+Ord {
148148

149149
// Let's expand out reverse queries that are active.
150150
let reverse_active = active.map(|(_x,y)| y).distinct();
151-
let reverse_next =
151+
let reverse_next =
152152
reverse
153153
.map(|(med, (rev, dist))| (rev, (med, dist)))
154154
.semijoin(&reverse_active)

examples/graspan.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ use indexmap::IndexMap;
1010
use timely::communication::Allocate;
1111

1212
use timely::dataflow::Scope;
13-
use timely::dataflow::scopes::{ScopeParent, Root, Child};
13+
use timely::dataflow::scopes::{ScopeParent, Child};
1414
use timely::progress::timestamp::RootTimestamp;
1515
use timely::progress::nested::product::Product;
1616
use timely::progress::Timestamp;
17+
use timely::worker::Worker;
1718

1819
use differential_dataflow::Collection;
1920
use differential_dataflow::lattice::Lattice;
@@ -88,7 +89,7 @@ type Arrange<G, K, V, R> = Arranged<G, K, V, R, TraceValHandle<K, V, <G as Scope
8889
/// An edge variable provides arranged representations of its contents, even before they are
8990
/// completely defined, in support of recursively defined productions.
9091
pub struct EdgeVariable<'a, G: Scope> where G::Timestamp : Lattice {
91-
variable: Variable<'a, G, Edge, isize>,
92+
variable: Variable<'a, G, Edge, u64, isize>,
9293
current: Collection<Child<'a, G, u64>, Edge, isize>,
9394
forward: Option<Arrange<Child<'a, G, u64>, Node, Node, isize>>,
9495
reverse: Option<Arrange<Child<'a, G, u64>, Node, Node, isize>>,
@@ -97,7 +98,7 @@ pub struct EdgeVariable<'a, G: Scope> where G::Timestamp : Lattice {
9798
impl<'a, G: Scope> EdgeVariable<'a, G> where G::Timestamp : Lattice {
9899
/// Creates a new variable initialized with `source`.
99100
pub fn from(source: &Collection<Child<'a, G, u64>, Edge>) -> Self {
100-
let variable = Variable::from(source.filter(|_| false));
101+
let variable = Variable::new_from(source.filter(|_| false), u64::max_value(), 1);
101102
EdgeVariable {
102103
variable: variable,
103104
current: source.clone(),
@@ -157,7 +158,7 @@ impl Query {
157158
}
158159

159160
/// Creates a dataflow implementing the query, and returns input and trace handles.
160-
pub fn render_in<'a, A, T>(&self, scope: &mut Child<'a, Root<A>, T>) -> IndexMap<String, RelationHandles<T>>
161+
pub fn render_in<'a, A, T>(&self, scope: &mut Child<'a, Worker<A>, T>) -> IndexMap<String, RelationHandles<T>>
161162
where A: Allocate, T: Timestamp+Lattice {
162163

163164
// Create new input (handle, stream) pairs

0 commit comments

Comments
 (0)