Skip to content

Commit 56bde0f

Browse files
committed
offer mutable access to containers through handles
This PR refactors a pattern that is pervasive to pretty much every single timely operator into an easier to use input handle method that also reduces the boilerplate code operator authors need to write. The pattern is receiving a `RefOrMut` container from a handle and immediately swapping it with a local container in order to gain mutable access. In this repo all but one operators exhibited this pattern and associated boilerplate. Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent faf5eb6 commit 56bde0f

File tree

22 files changed

+139
-209
lines changed

22 files changed

+139
-209
lines changed

timely/examples/bfs.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,20 @@ fn main() {
5656
move |input1, input2, output, notify| {
5757

5858
// receive edges, start to sort them
59-
input1.for_each(|time, data| {
59+
while let Some((time, data)) = input1.next_mut() {
6060
notify.notify_at(time.retain());
61-
edge_list.push(data.replace(Vec::new()));
62-
});
61+
edge_list.push(std::mem::take(data));
62+
}
6363

6464
// receive (node, worker) pairs, note any new ones.
65-
input2.for_each(|time, data| {
65+
while let Some((time, data)) = input2.next_mut() {
6666
node_lists.entry(time.time().clone())
6767
.or_insert_with(|| {
6868
notify.notify_at(time.retain());
6969
Vec::new()
7070
})
71-
.push(data.replace(Vec::new()));
72-
});
71+
.push(std::mem::take(data));
72+
}
7373

7474
notify.for_each(|time, _num, _notify| {
7575

timely/examples/distinct.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ fn main() {
2020
scope.input_from(&mut input)
2121
.unary(Exchange::new(|x| *x), "Distinct", move |_, _|
2222
move |input, output| {
23-
input.for_each(|time, data| {
23+
while let Some((time, data)) = input.next() {
2424
let counts =
2525
counts_by_time
2626
.entry(time.time().clone())
@@ -33,7 +33,7 @@ fn main() {
3333
}
3434
*count += 1;
3535
}
36-
})
36+
}
3737
})
3838
.inspect(move |x| println!("worker {}:\tvalue {}", index, x))
3939
.probe_with(&mut probe);

timely/examples/hashjoin.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,12 @@ fn main() {
4040
let mut map1 = HashMap::<u64, Vec<u64>>::new();
4141
let mut map2 = HashMap::<u64, Vec<u64>>::new();
4242

43-
let mut vector1 = Vec::new();
44-
let mut vector2 = Vec::new();
45-
4643
move |input1, input2, output| {
4744

4845
// Drain first input, check second map, update first map.
49-
input1.for_each(|time, data| {
50-
data.swap(&mut vector1);
46+
while let Some((time, data)) = input1.next_mut() {
5147
let mut session = output.session(&time);
52-
for (key, val1) in vector1.drain(..) {
48+
for (key, val1) in data.drain(..) {
5349
if let Some(values) = map2.get(&key) {
5450
for val2 in values.iter() {
5551
session.give((val1.clone(), val2.clone()));
@@ -58,13 +54,12 @@ fn main() {
5854

5955
map1.entry(key).or_insert(Vec::new()).push(val1);
6056
}
61-
});
57+
}
6258

6359
// Drain second input, check first map, update second map.
64-
input2.for_each(|time, data| {
65-
data.swap(&mut vector2);
60+
while let Some((time, data)) = input2.next_mut() {
6661
let mut session = output.session(&time);
67-
for (key, val2) in vector2.drain(..) {
62+
for (key, val2) in data.drain(..) {
6863
if let Some(values) = map1.get(&key) {
6964
for val1 in values.iter() {
7065
session.give((val1.clone(), val2.clone()));
@@ -73,7 +68,7 @@ fn main() {
7368

7469
map2.entry(key).or_insert(Vec::new()).push(val2);
7570
}
76-
});
71+
}
7772
}
7873
})
7974
.probe_with(&mut probe);

timely/examples/pagerank.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,19 @@ fn main() {
4242
let mut diffs = Vec::new(); // for received but un-acted upon deltas.
4343
let mut delta = Vec::new();
4444

45-
let mut edge_vec = Vec::new();
46-
let mut rank_vec = Vec::new();
47-
4845
let timer = ::std::time::Instant::now();
4946

5047
move |input1, input2, output| {
5148

5249
// hold on to edge changes until it is time.
53-
input1.for_each(|time, data| {
54-
data.swap(&mut edge_vec);
55-
edge_stash.entry(time.retain()).or_insert(Vec::new()).extend(edge_vec.drain(..));
56-
});
50+
while let Some((time, data)) = input1.next_mut() {
51+
edge_stash.entry(time.retain()).or_insert(Vec::new()).extend(data.drain(..));
52+
}
5753

5854
// hold on to rank changes until it is time.
59-
input2.for_each(|time, data| {
60-
data.swap(&mut rank_vec);
61-
rank_stash.entry(time.retain()).or_insert(Vec::new()).extend(rank_vec.drain(..));
62-
});
55+
while let Some((time, data)) = input2.next_mut() {
56+
rank_stash.entry(time.retain()).or_insert(Vec::new()).extend(data.drain(..));
57+
}
6358

6459
let frontiers = &[input1.frontier(), input2.frontier()];
6560

timely/src/dataflow/operators/aggregation/aggregate.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,19 +76,17 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for
7676
hash: H) -> Stream<S, R> where S::Timestamp: Eq {
7777

7878
let mut aggregates = HashMap::new();
79-
let mut vector = Vec::new();
8079
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {
8180

8281
// read each input, fold into aggregates
83-
input.for_each(|time, data| {
84-
data.swap(&mut vector);
82+
while let Some((time, data)) = input.next_mut() {
8583
let agg_time = aggregates.entry(time.time().clone()).or_insert_with(HashMap::new);
86-
for (key, val) in vector.drain(..) {
84+
for (key, val) in data.drain(..) {
8785
let agg = agg_time.entry(key.clone()).or_insert_with(Default::default);
8886
fold(&key, val, agg);
8987
}
9088
notificator.notify_at(time.retain());
91-
});
89+
};
9290

9391
// pop completed aggregates, send along whatever
9492
notificator.for_each(|time,_,_| {

timely/src/dataflow/operators/aggregation/state_machine.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
6666
let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
6767
let mut states = HashMap::new(); // keys -> state
6868

69-
let mut vector = Vec::new();
70-
7169
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
7270

7371
// go through each time with data, process each (key, val) pair.
@@ -86,19 +84,16 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
8684
});
8785

8886
// stash each input and request a notification when ready
89-
input.for_each(|time, data| {
90-
91-
data.swap(&mut vector);
92-
87+
while let Some((time, data)) = input.next_mut() {
9388
// stash if not time yet
9489
if notificator.frontier(0).less_than(time.time()) {
95-
pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(vector.drain(..));
90+
pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(data.drain(..));
9691
notificator.notify_at(time.retain());
9792
}
9893
else {
9994
// else we can process immediately
10095
let mut session = output.session(&time);
101-
for (key, val) in vector.drain(..) {
96+
for (key, val) in data.drain(..) {
10297
let (remove, output) = {
10398
let state = states.entry(key.clone()).or_insert_with(Default::default);
10499
fold(&key, val, state)
@@ -107,7 +102,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
107102
session.give_iterator(output.into_iter());
108103
}
109104
}
110-
});
105+
}
111106
})
112107
}
113108
}

timely/src/dataflow/operators/branch.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,21 @@ impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
4646
let (mut output2, stream2) = builder.new_output();
4747

4848
builder.build(move |_| {
49-
let mut vector = Vec::new();
5049
move |_frontiers| {
5150
let mut output1_handle = output1.activate();
5251
let mut output2_handle = output2.activate();
5352

54-
input.for_each(|time, data| {
55-
data.swap(&mut vector);
53+
while let Some((time, data)) = input.next_mut() {
5654
let mut out1 = output1_handle.session(&time);
5755
let mut out2 = output2_handle.session(&time);
58-
for datum in vector.drain(..) {
56+
for datum in data.drain(..) {
5957
if condition(&time.time(), &datum) {
6058
out2.give(datum);
6159
} else {
6260
out1.give(datum);
6361
}
6462
}
65-
});
63+
}
6664
}
6765
});
6866

@@ -103,21 +101,18 @@ impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
103101
let (mut output2, stream2) = builder.new_output();
104102

105103
builder.build(move |_| {
106-
107-
let mut container = Default::default();
108104
move |_frontiers| {
109105
let mut output1_handle = output1.activate();
110106
let mut output2_handle = output2.activate();
111107

112-
input.for_each(|time, data| {
113-
data.swap(&mut container);
108+
while let Some((time, data)) = input.next_mut() {
114109
let mut out = if condition(&time.time()) {
115110
output2_handle.session(&time)
116111
} else {
117112
output1_handle.session(&time)
118113
};
119-
out.give_container(&mut container);
120-
});
114+
out.give_container(data);
115+
}
121116
}
122117
});
123118

timely/src/dataflow/operators/concat.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,12 @@ impl<G: Scope, D: Container> Concatenate<G, D> for G {
8080

8181
// build an operator that plays out all input data.
8282
builder.build(move |_capability| {
83-
84-
let mut vector = Default::default();
8583
move |_frontier| {
8684
let mut output = output.activate();
8785
for handle in handles.iter_mut() {
88-
handle.for_each(|time, data| {
89-
data.swap(&mut vector);
90-
output.session(&time).give_container(&mut vector);
91-
})
86+
while let Some((time, data)) = handle.next_mut() {
87+
output.session(&time).give_container(data);
88+
}
9289
}
9390
}
9491
});

timely/src/dataflow/operators/count.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ impl<G: Scope, D: Data> Accumulate<G, D> for Stream<G, D> {
5555

5656
let mut accums = HashMap::new();
5757
self.unary_notify(Pipeline, "Accumulate", vec![], move |input, output, notificator| {
58-
input.for_each(|time, data| {
58+
while let Some((time, data)) = input.next() {
5959
logic(&mut accums.entry(time.time().clone()).or_insert_with(|| default.clone()), data);
6060
notificator.notify_at(time.retain());
61-
});
61+
};
6262

6363
notificator.for_each(|time,_,_| {
6464
if let Some(accum) = accums.remove(&time) {

timely/src/dataflow/operators/delay.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,18 +97,16 @@ pub trait Delay<G: Scope, D: Data> {
9797
impl<G: Scope, D: Data> Delay<G, D> for Stream<G, D> {
9898
fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
9999
let mut elements = HashMap::new();
100-
let mut vector = Vec::new();
101100
self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
102-
input.for_each(|time, data| {
103-
data.swap(&mut vector);
104-
for datum in vector.drain(..) {
101+
while let Some((time, data)) = input.next_mut() {
102+
for datum in data.drain(..) {
105103
let new_time = func(&datum, &time);
106104
assert!(time.time().less_equal(&new_time));
107105
elements.entry(new_time.clone())
108106
.or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
109107
.push(datum);
110108
}
111-
});
109+
}
112110

113111
// for each available notification, send corresponding set
114112
notificator.for_each(|time,_,_| {
@@ -128,13 +126,13 @@ impl<G: Scope, D: Data> Delay<G, D> for Stream<G, D> {
128126
fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
129127
let mut elements = HashMap::new();
130128
self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
131-
input.for_each(|time, data| {
129+
while let Some((time, data)) = input.next_mut() {
132130
let new_time = func(&time);
133131
assert!(time.time().less_equal(&new_time));
134132
elements.entry(new_time.clone())
135133
.or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
136-
.push(data.replace(Vec::new()));
137-
});
134+
.push(std::mem::take(data));
135+
}
138136

139137
// for each available notification, send corresponding set
140138
notificator.for_each(|time,_,_| {

0 commit comments

Comments
 (0)