Skip to content

Commit e1a25c7

Browse files
committed
offer mutable access to containers through handles
This PR refactors a pattern that is pervasive to pretty much every single timely operator into an easier to use input handle method that also reduces the boilerplate code operator authors need to write. The pattern is receiving a `RefOrMut` container from a handle and immediately swapping it with a local container in order to gain mutable access. In this repo all but one operators exhibited this pattern and associated boilerplate. Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent faf5eb6 commit e1a25c7

File tree

29 files changed

+172
-288
lines changed

29 files changed

+172
-288
lines changed

communication/src/message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,4 +235,4 @@ impl<T: Clone> Message<T> {
235235
unreachable!()
236236
}
237237
}
238-
}
238+
}

mdbook/src/chapter_4/chapter_4_3.md

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,18 +77,14 @@ fn main() {
7777
.delay(|number, time| number / 100 )
7878
// Buffer records until all prior timestamps have completed.
7979
.binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
80-
81-
let mut vector = Vec::new();
82-
8380
move |input1, input2, output| {
8481
8582
// Stash received data.
86-
input1.for_each(|time, data| {
87-
data.swap(&mut vector);
83+
while let Some((time, data)) = input1.next_mut() {
8884
stash.entry(time.retain())
8985
.or_insert(Vec::new())
90-
.extend(vector.drain(..));
91-
});
86+
.extend(data.drain(..));
87+
}
9288
9389
// Consider sending stashed data.
9490
for (time, data) in stash.iter_mut() {

timely/examples/bfs.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,20 @@ fn main() {
5656
move |input1, input2, output, notify| {
5757

5858
// receive edges, start to sort them
59-
input1.for_each(|time, data| {
59+
while let Some((time, data)) = input1.next() {
6060
notify.notify_at(time.retain());
61-
edge_list.push(data.replace(Vec::new()));
62-
});
61+
edge_list.push(data.take());
62+
}
6363

6464
// receive (node, worker) pairs, note any new ones.
65-
input2.for_each(|time, data| {
65+
while let Some((time, data)) = input2.next() {
6666
node_lists.entry(time.time().clone())
6767
.or_insert_with(|| {
6868
notify.notify_at(time.retain());
6969
Vec::new()
7070
})
71-
.push(data.replace(Vec::new()));
72-
});
71+
.push(data.take());
72+
}
7373

7474
notify.for_each(|time, _num, _notify| {
7575

timely/examples/distinct.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ fn main() {
2020
scope.input_from(&mut input)
2121
.unary(Exchange::new(|x| *x), "Distinct", move |_, _|
2222
move |input, output| {
23-
input.for_each(|time, data| {
23+
while let Some((time, data)) = input.next() {
2424
let counts =
2525
counts_by_time
2626
.entry(time.time().clone())
@@ -33,7 +33,7 @@ fn main() {
3333
}
3434
*count += 1;
3535
}
36-
})
36+
}
3737
})
3838
.inspect(move |x| println!("worker {}:\tvalue {}", index, x))
3939
.probe_with(&mut probe);

timely/examples/hashjoin.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,12 @@ fn main() {
4040
let mut map1 = HashMap::<u64, Vec<u64>>::new();
4141
let mut map2 = HashMap::<u64, Vec<u64>>::new();
4242

43-
let mut vector1 = Vec::new();
44-
let mut vector2 = Vec::new();
45-
4643
move |input1, input2, output| {
4744

4845
// Drain first input, check second map, update first map.
49-
input1.for_each(|time, data| {
50-
data.swap(&mut vector1);
46+
while let Some((time, data)) = input1.next_mut() {
5147
let mut session = output.session(&time);
52-
for (key, val1) in vector1.drain(..) {
48+
for (key, val1) in data.drain(..) {
5349
if let Some(values) = map2.get(&key) {
5450
for val2 in values.iter() {
5551
session.give((val1.clone(), val2.clone()));
@@ -58,13 +54,12 @@ fn main() {
5854

5955
map1.entry(key).or_insert(Vec::new()).push(val1);
6056
}
61-
});
57+
}
6258

6359
// Drain second input, check first map, update second map.
64-
input2.for_each(|time, data| {
65-
data.swap(&mut vector2);
60+
while let Some((time, data)) = input2.next_mut() {
6661
let mut session = output.session(&time);
67-
for (key, val2) in vector2.drain(..) {
62+
for (key, val2) in data.drain(..) {
6863
if let Some(values) = map1.get(&key) {
6964
for val1 in values.iter() {
7065
session.give((val1.clone(), val2.clone()));
@@ -73,7 +68,7 @@ fn main() {
7368

7469
map2.entry(key).or_insert(Vec::new()).push(val2);
7570
}
76-
});
71+
}
7772
}
7873
})
7974
.probe_with(&mut probe);

timely/examples/pagerank.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,19 @@ fn main() {
4242
let mut diffs = Vec::new(); // for received but un-acted upon deltas.
4343
let mut delta = Vec::new();
4444

45-
let mut edge_vec = Vec::new();
46-
let mut rank_vec = Vec::new();
47-
4845
let timer = ::std::time::Instant::now();
4946

5047
move |input1, input2, output| {
5148

5249
// hold on to edge changes until it is time.
53-
input1.for_each(|time, data| {
54-
data.swap(&mut edge_vec);
55-
edge_stash.entry(time.retain()).or_insert(Vec::new()).extend(edge_vec.drain(..));
56-
});
50+
while let Some((time, data)) = input1.next_mut() {
51+
edge_stash.entry(time.retain()).or_insert(Vec::new()).extend(data.drain(..));
52+
}
5753

5854
// hold on to rank changes until it is time.
59-
input2.for_each(|time, data| {
60-
data.swap(&mut rank_vec);
61-
rank_stash.entry(time.retain()).or_insert(Vec::new()).extend(rank_vec.drain(..));
62-
});
55+
while let Some((time, data)) = input2.next_mut() {
56+
rank_stash.entry(time.retain()).or_insert(Vec::new()).extend(data.drain(..));
57+
}
6358

6459
let frontiers = &[input1.frontier(), input2.frontier()];
6560

timely/examples/wordcount.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ fn main() {
3333
while let Some((time, data)) = input.next() {
3434
queues.entry(time.retain())
3535
.or_insert(Vec::new())
36-
.push(data.replace(Vec::new()));
36+
.push(data.take());
3737
}
3838

3939
for (key, val) in queues.iter_mut() {

timely/src/dataflow/operators/aggregation/aggregate.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,19 +76,17 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for
7676
hash: H) -> Stream<S, R> where S::Timestamp: Eq {
7777

7878
let mut aggregates = HashMap::new();
79-
let mut vector = Vec::new();
8079
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {
8180

8281
// read each input, fold into aggregates
83-
input.for_each(|time, data| {
84-
data.swap(&mut vector);
82+
while let Some((time, data)) = input.next_mut() {
8583
let agg_time = aggregates.entry(time.time().clone()).or_insert_with(HashMap::new);
86-
for (key, val) in vector.drain(..) {
84+
for (key, val) in data.drain(..) {
8785
let agg = agg_time.entry(key.clone()).or_insert_with(Default::default);
8886
fold(&key, val, agg);
8987
}
9088
notificator.notify_at(time.retain());
91-
});
89+
};
9290

9391
// pop completed aggregates, send along whatever
9492
notificator.for_each(|time,_,_| {

timely/src/dataflow/operators/aggregation/state_machine.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
6666
let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state)
6767
let mut states = HashMap::new(); // keys -> state
6868

69-
let mut vector = Vec::new();
70-
7169
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
7270

7371
// go through each time with data, process each (key, val) pair.
@@ -86,19 +84,16 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
8684
});
8785

8886
// stash each input and request a notification when ready
89-
input.for_each(|time, data| {
90-
91-
data.swap(&mut vector);
92-
87+
while let Some((time, data)) = input.next_mut() {
9388
// stash if not time yet
9489
if notificator.frontier(0).less_than(time.time()) {
95-
pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(vector.drain(..));
90+
pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(data.drain(..));
9691
notificator.notify_at(time.retain());
9792
}
9893
else {
9994
// else we can process immediately
10095
let mut session = output.session(&time);
101-
for (key, val) in vector.drain(..) {
96+
for (key, val) in data.drain(..) {
10297
let (remove, output) = {
10398
let state = states.entry(key.clone()).or_insert_with(Default::default);
10499
fold(&key, val, state)
@@ -107,7 +102,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
107102
session.give_iterator(output.into_iter());
108103
}
109104
}
110-
});
105+
}
111106
})
112107
}
113108
}

timely/src/dataflow/operators/branch.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,21 @@ impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
4646
let (mut output2, stream2) = builder.new_output();
4747

4848
builder.build(move |_| {
49-
let mut vector = Vec::new();
5049
move |_frontiers| {
5150
let mut output1_handle = output1.activate();
5251
let mut output2_handle = output2.activate();
5352

54-
input.for_each(|time, data| {
55-
data.swap(&mut vector);
53+
while let Some((time, data)) = input.next_mut() {
5654
let mut out1 = output1_handle.session(&time);
5755
let mut out2 = output2_handle.session(&time);
58-
for datum in vector.drain(..) {
56+
for datum in data.drain(..) {
5957
if condition(&time.time(), &datum) {
6058
out2.give(datum);
6159
} else {
6260
out1.give(datum);
6361
}
6462
}
65-
});
63+
}
6664
}
6765
});
6866

@@ -103,21 +101,18 @@ impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
103101
let (mut output2, stream2) = builder.new_output();
104102

105103
builder.build(move |_| {
106-
107-
let mut container = Default::default();
108104
move |_frontiers| {
109105
let mut output1_handle = output1.activate();
110106
let mut output2_handle = output2.activate();
111107

112-
input.for_each(|time, data| {
113-
data.swap(&mut container);
108+
while let Some((time, data)) = input.next_mut() {
114109
let mut out = if condition(&time.time()) {
115110
output2_handle.session(&time)
116111
} else {
117112
output1_handle.session(&time)
118113
};
119-
out.give_container(&mut container);
120-
});
114+
out.give_container(data);
115+
}
121116
}
122117
});
123118

0 commit comments

Comments
 (0)