Skip to content

Commit 80ed856

Browse files
authored
Inline consolidate fast-path (#629)
* Inline consolidate fast-path Gives the compiler the opportunity to inline consolidate calls for small amounts of data by separating the complex consolidate logic for more than one update to a different function. We don't mark the function as cold or inline never because we want to leave the decision to the compiler. In local testing, this shaved a few percentages of the spines example. With the patch: ``` Running ["new"] arrangement 5.682871841s loading complete 14.032609636s queries complete 14.037398497s shut down ``` With current master: ``` Running ["new"] arrangement 6.010566878s loading complete 14.673966926s queries complete 14.678923984s shut down ``` Signed-off-by: Moritz Hoffmann <antiguru@gmail.com> * Make function private Signed-off-by: Moritz Hoffmann <antiguru@gmail.com> --------- Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
1 parent d15bf32 commit 80ed856

File tree

1 file changed

+66
-53
lines changed

1 file changed

+66
-53
lines changed

differential-dataflow/src/consolidation.rs

Lines changed: 66 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::difference::{IsZero, Semigroup};
2222
/// This method will sort `vec` and then consolidate runs of more than one entry with
2323
/// identical first elements by accumulating the second elements of the pairs. Should the final
2424
/// accumulation be zero, the element is discarded.
25+
#[inline]
2526
pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
2627
consolidate_from(vec, 0);
2728
}
@@ -31,55 +32,61 @@ pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
3132
/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
3233
/// identical first elements by accumulating the second elements of the pairs. Should the final
3334
/// accumulation be zero, the element is discarded.
35+
#[inline]
3436
pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
3537
let length = consolidate_slice(&mut vec[offset..]);
3638
vec.truncate(offset + length);
3739
}
3840

3941
/// Sorts and consolidates a slice, returning the valid prefix length.
42+
#[inline]
4043
pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
41-
4244
if slice.len() > 1 {
45+
consolidate_slice_slow(slice)
46+
}
47+
else {
48+
slice.iter().filter(|x| !x.1.is_zero()).count()
49+
}
50+
}
4351

44-
// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
45-
// In a world where there are not many results, we may never even need to call in to merge sort.
46-
slice.sort_by(|x,y| x.0.cmp(&y.0));
52+
/// Part of `consolidate_slice` that handles slices of length greater than 1.
53+
fn consolidate_slice_slow<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
54+
// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
55+
// In a world where there are not many results, we may never even need to call in to merge sort.
56+
slice.sort_by(|x,y| x.0.cmp(&y.0));
4757

48-
// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
49-
let mut offset = 0;
50-
let mut accum = slice[offset].1.clone();
58+
// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
59+
let mut offset = 0;
60+
let mut accum = slice[offset].1.clone();
5161

52-
for index in 1 .. slice.len() {
53-
if slice[index].0 == slice[index-1].0 {
54-
accum.plus_equals(&slice[index].1);
55-
}
56-
else {
57-
if !accum.is_zero() {
58-
slice.swap(offset, index-1);
59-
slice[offset].1.clone_from(&accum);
60-
offset += 1;
61-
}
62-
accum.clone_from(&slice[index].1);
63-
}
62+
for index in 1 .. slice.len() {
63+
if slice[index].0 == slice[index-1].0 {
64+
accum.plus_equals(&slice[index].1);
6465
}
65-
if !accum.is_zero() {
66-
slice.swap(offset, slice.len()-1);
67-
slice[offset].1 = accum;
68-
offset += 1;
66+
else {
67+
if !accum.is_zero() {
68+
slice.swap(offset, index-1);
69+
slice[offset].1.clone_from(&accum);
70+
offset += 1;
71+
}
72+
accum.clone_from(&slice[index].1);
6973
}
70-
71-
offset
7274
}
73-
else {
74-
slice.iter().filter(|x| !x.1.is_zero()).count()
75+
if !accum.is_zero() {
76+
slice.swap(offset, slice.len()-1);
77+
slice[offset].1 = accum;
78+
offset += 1;
7579
}
80+
81+
offset
7682
}
7783

7884
/// Sorts and consolidates `vec`.
7985
///
8086
/// This method will sort `vec` and then consolidate runs of more than one entry with
8187
/// identical first two elements by accumulating the third elements of the triples. Should the final
8288
/// accumulation be zero, the element is discarded.
89+
#[inline]
8390
pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
8491
consolidate_updates_from(vec, 0);
8592
}
@@ -89,48 +96,54 @@ pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)
8996
/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
9097
/// identical first two elements by accumulating the third elements of the triples. Should the final
9198
/// accumulation be zero, the element is discarded.
99+
#[inline]
92100
pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
93101
let length = consolidate_updates_slice(&mut vec[offset..]);
94102
vec.truncate(offset + length);
95103
}
96104

97105
/// Sorts and consolidates a slice, returning the valid prefix length.
106+
#[inline]
98107
pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
99108

100109
if slice.len() > 1 {
110+
consolidate_updates_slice_slow(slice)
111+
}
112+
else {
113+
slice.iter().filter(|x| !x.2.is_zero()).count()
114+
}
115+
}
101116

102-
// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
103-
// In a world where there are not many results, we may never even need to call in to merge sort.
104-
slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
117+
/// Part of `consolidate_updates_slice` that handles slices of length greater than 1.
118+
fn consolidate_updates_slice_slow<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
119+
// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
120+
// In a world where there are not many results, we may never even need to call in to merge sort.
121+
slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
105122

106-
// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
107-
let mut offset = 0;
108-
let mut accum = slice[offset].2.clone();
123+
// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
124+
let mut offset = 0;
125+
let mut accum = slice[offset].2.clone();
109126

110-
for index in 1 .. slice.len() {
111-
if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
112-
accum.plus_equals(&slice[index].2);
113-
}
114-
else {
115-
if !accum.is_zero() {
116-
slice.swap(offset, index-1);
117-
slice[offset].2.clone_from(&accum);
118-
offset += 1;
119-
}
120-
accum.clone_from(&slice[index].2);
121-
}
127+
for index in 1 .. slice.len() {
128+
if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
129+
accum.plus_equals(&slice[index].2);
122130
}
123-
if !accum.is_zero() {
124-
slice.swap(offset, slice.len()-1);
125-
slice[offset].2 = accum;
126-
offset += 1;
131+
else {
132+
if !accum.is_zero() {
133+
slice.swap(offset, index-1);
134+
slice[offset].2.clone_from(&accum);
135+
offset += 1;
136+
}
137+
accum.clone_from(&slice[index].2);
127138
}
128-
129-
offset
130139
}
131-
else {
132-
slice.iter().filter(|x| !x.2.is_zero()).count()
140+
if !accum.is_zero() {
141+
slice.swap(offset, slice.len()-1);
142+
slice[offset].2 = accum;
143+
offset += 1;
133144
}
145+
146+
offset
134147
}
135148

136149

0 commit comments

Comments
 (0)