Skip to content

Commit 23e21c8

Browse files
committed
Persist: Add new API for run based spine replacement
1 parent a547087 commit 23e21c8

File tree

7 files changed

+1561
-366
lines changed

7 files changed

+1561
-366
lines changed

src/persist-client/src/cli/admin.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,11 @@ where
509509
start.elapsed(),
510510
);
511511
let (apply_res, maintenance) = machine
512-
.merge_res(&FueledMergeRes { output: res.output })
512+
.merge_res(&FueledMergeRes {
513+
output: res.output,
514+
inputs: vec![],
515+
new_active_compaction: None,
516+
})
513517
.await;
514518
if !maintenance.is_empty() {
515519
info!("ignoring non-empty requested maintenance: {maintenance:?}")

src/persist-client/src/internal/compact.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,11 @@ where
396396
);
397397
let res = Self::compact_all(stream, req.clone()).await?;
398398
let maintenance = Self::apply(
399-
FueledMergeRes { output: res.output },
399+
FueledMergeRes {
400+
output: res.output,
401+
inputs: vec![],
402+
new_active_compaction: None,
403+
},
400404
&metrics_clone,
401405
&machine_clone,
402406
)

src/persist-client/src/internal/machine.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2464,7 +2464,11 @@ pub mod datadriven {
24642464
.clone();
24652465
let (merge_res, maintenance) = datadriven
24662466
.machine
2467-
.merge_res(&FueledMergeRes { output: batch })
2467+
.merge_res(&FueledMergeRes {
2468+
output: batch,
2469+
inputs: vec![],
2470+
new_active_compaction: None,
2471+
})
24682472
.await;
24692473
datadriven.routine.push(maintenance);
24702474
Ok(format!(

src/persist-client/src/internal/state.rs

Lines changed: 93 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,6 +1053,29 @@ impl<T> HollowBatch<T> {
10531053
}
10541054
}
10551055

1056+
#[cfg(test)]
1057+
pub(crate) fn new_run_for_test(
1058+
desc: Description<T>,
1059+
parts: Vec<RunPart<T>>,
1060+
len: usize,
1061+
run_id: RunId,
1062+
) -> Self {
1063+
let run_meta = if parts.is_empty() {
1064+
vec![]
1065+
} else {
1066+
let mut meta = RunMeta::default();
1067+
meta.id = Some(run_id);
1068+
vec![meta]
1069+
};
1070+
Self {
1071+
desc,
1072+
len,
1073+
parts,
1074+
run_splits: vec![],
1075+
run_meta,
1076+
}
1077+
}
1078+
10561079
/// An empty hollow batch, representing no updates over the given desc.
10571080
pub(crate) fn empty(desc: Description<T>) -> Self {
10581081
Self {
@@ -1810,7 +1833,9 @@ where
18101833
return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
18111834
}
18121835

1813-
let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1836+
let apply_merge_result = self
1837+
.trace
1838+
.apply_merge_res_checked_classic::<D>(res, metrics);
18141839
Continue(apply_merge_result)
18151840
}
18161841

@@ -2162,10 +2187,7 @@ where
21622187
// We have a nonempty batch: replace it with an empty batch and return.
21632188
// This should not produce an excessively large diff: if it did, we wouldn't have been
21642189
// able to append that batch in the first place.
2165-
let fake_merge = FueledMergeRes {
2166-
output: HollowBatch::empty(desc),
2167-
};
2168-
let result = self.trace.apply_tombstone_merge(&fake_merge);
2190+
let result = self.trace.apply_tombstone_merge(&desc);
21692191
assert!(
21702192
result.matched(),
21712193
"merge with a matching desc should always match"
@@ -2833,34 +2855,92 @@ pub(crate) mod tests {
28332855
}
28342856
}
28352857

2858+
pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2859+
num_runs: usize,
2860+
) -> impl Strategy<Value = HollowBatch<T>> {
2861+
(
2862+
any::<T>(),
2863+
any::<T>(),
2864+
any::<T>(),
2865+
proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2866+
any::<usize>(),
2867+
)
2868+
.prop_map(move |(t0, t1, since, parts, len)| {
2869+
let (lower, upper) = if t0 <= t1 {
2870+
(Antichain::from_elem(t0), Antichain::from_elem(t1))
2871+
} else {
2872+
(Antichain::from_elem(t1), Antichain::from_elem(t0))
2873+
};
2874+
let since = Antichain::from_elem(since);
2875+
2876+
let run_splits = (1..num_runs)
2877+
.map(|i| i * parts.len() / num_runs)
2878+
.collect::<Vec<_>>();
2879+
2880+
let run_meta = (0..num_runs)
2881+
.map(|_| {
2882+
let mut meta = RunMeta::default();
2883+
meta.id = Some(RunId::new());
2884+
meta
2885+
})
2886+
.collect::<Vec<_>>();
2887+
2888+
HollowBatch::new(
2889+
Description::new(lower, upper, since),
2890+
parts,
2891+
len % 10,
2892+
run_meta,
2893+
run_splits,
2894+
)
2895+
})
2896+
}
2897+
28362898
pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
28372899
Strategy::prop_map(
28382900
(
28392901
any::<T>(),
28402902
any::<T>(),
28412903
any::<T>(),
2842-
proptest::collection::vec(any_run_part::<T>(), 0..3),
2904+
proptest::collection::vec(any_run_part::<T>(), 0..20),
28432905
any::<usize>(),
2844-
any::<bool>(),
2906+
0..=10usize,
2907+
proptest::collection::vec(any::<RunId>(), 10),
28452908
),
2846-
|(t0, t1, since, parts, len, runs)| {
2909+
|(t0, t1, since, parts, len, num_runs, run_ids)| {
28472910
let (lower, upper) = if t0 <= t1 {
28482911
(Antichain::from_elem(t0), Antichain::from_elem(t1))
28492912
} else {
28502913
(Antichain::from_elem(t1), Antichain::from_elem(t0))
28512914
};
28522915
let since = Antichain::from_elem(since);
2853-
if runs && parts.len() > 2 {
2854-
let split_at = parts.len() / 2;
2916+
if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2917+
let run_splits = (1..num_runs)
2918+
.map(|i| i * parts.len() / num_runs)
2919+
.collect::<Vec<_>>();
2920+
2921+
let run_meta = (0..num_runs)
2922+
.enumerate()
2923+
.map(|(i, _)| {
2924+
let mut meta = RunMeta::default();
2925+
meta.id = Some(run_ids[i]);
2926+
meta
2927+
})
2928+
.collect::<Vec<_>>();
2929+
28552930
HollowBatch::new(
28562931
Description::new(lower, upper, since),
28572932
parts,
28582933
len % 10,
2859-
vec![RunMeta::default(), RunMeta::default()],
2860-
vec![split_at],
2934+
run_meta,
2935+
run_splits,
28612936
)
28622937
} else {
2863-
HollowBatch::new_run(Description::new(lower, upper, since), parts, len % 10)
2938+
HollowBatch::new_run_for_test(
2939+
Description::new(lower, upper, since),
2940+
parts,
2941+
len % 10,
2942+
run_ids[0],
2943+
)
28642944
}
28652945
},
28662946
)

src/persist-client/src/internal/state_diff.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use mz_persist::location::{SeqNo, VersionedData};
2121
use mz_persist_types::Codec64;
2222
use mz_persist_types::schema::SchemaId;
2323
use mz_proto::TryFromProtoError;
24+
use std::collections::BTreeSet;
2425
use timely::PartialOrder;
2526
use timely::progress::{Antichain, Timestamp};
2627
use tracing::debug;
@@ -32,6 +33,7 @@ use crate::internal::state::{
3233
LeasedReaderState, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, RunPart,
3334
State, StateCollections, WriterState,
3435
};
36+
use crate::internal::trace::CompactionInput;
3537
use crate::internal::trace::{FueledMergeRes, SpineId, ThinMerge, ThinSpineBatch, Trace};
3638
use crate::read::LeasedReaderId;
3739
use crate::write::WriterId;
@@ -877,7 +879,11 @@ fn apply_diffs_spine<T: Timestamp + Lattice + Codec64>(
877879

878880
// Fast-path: compaction
879881
if let Some((_inputs, output)) = sniff_compaction(&diffs) {
880-
let res = FueledMergeRes { output };
882+
let res = FueledMergeRes {
883+
output,
884+
input: CompactionInput::IdRange(BTreeSet::new()),
885+
new_active_compaction: None,
886+
};
881887
// We can't predict how spine will arrange the batches when it's
882888
// hydrated. This means that something that is maintaining a Spine
883889
// starting at some seqno may not exactly match something else
@@ -1444,7 +1450,11 @@ mod tests {
14441450
leader
14451451
.collections
14461452
.trace
1447-
.apply_merge_res_unchecked(&FueledMergeRes { output });
1453+
.apply_merge_res_unchecked(&FueledMergeRes {
1454+
output,
1455+
input: CompactionInput::IdRange(BTreeSet::new()),
1456+
new_active_compaction: None,
1457+
});
14481458
}
14491459
}
14501460
}

0 commit comments

Comments
 (0)