Skip to content

Commit 85910d2

Browse files
committed
Persist: Add new API for run based spine replacement
1 parent a547087 commit 85910d2

File tree

8 files changed

+1584
-370
lines changed

8 files changed

+1584
-370
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Seeds for failure cases proptest has generated in the past. It is
2+
# automatically read and these particular cases re-run before any
3+
# novel cases are generated.
4+
#
5+
# It is recommended to check this file in to source control so that
6+
# everyone who runs the test benefits from these saved cases.
7+
cc 324dd3dcd5c90ff25fdf69fc623350147ed15818c46f6181664574cededc8228 # shrinks to (batch, to_replace, range) = (HollowBatch { desc: ([0], [0], [0]), parts: [Single(Hollow(HollowBatchPart { key: PartialBatchKey("&cዀ𝅏=፭\\ሏ{ౄ@Ѩ𑁞𞺒Ⱥ��\u{11d3a}𐭰:]𑖠%%ᅦ\"{ΈѨΊ"), encoded_size_bytes: 17159905101681073050, key_lower: [128], structured_key_lower: None, stats: Some(LazyPartStats { key: ProtoStructStats { len: 6688095797385753307, cols: {"": ProtoDynStats { option: None, kind: Some(Struct(ProtoStructStats { len: 8739577643014117486, cols: {"\u{2008}{¥ৎ𑍐.&}Ⱥv<𖩣v$𞋿Ѩ": ProtoDynStats { option: None, kind: Some(Struct(ProtoStructStats { len: 13721996691348023136, cols: {"U\"Ѩ𞹟&8ச\\�Í🕴:": ProtoDynStats { option: None, kind: Some(Primitive(ProtoPrimitiveStats { lower: Some(LowerI64(3442531055666647167)), upper: Some(UpperI64(6540975407828514330)) })) }, "q": ProtoDynStats { option: None, kind: Some(Primitive(ProtoPrimitiveStats { lower: Some(LowerString("𑅟¥")), upper: Some(UpperString("𗐅`{𐏉꠰𐡎Yਵ]�🫧Ѩá")) })) }} })) }} })) }, "$'ந¥Ѩ𞹋𝔂ᣠ?I𜵷(ڄਵ𐬺*IEෞ": ProtoDynStats { option: None, kind: Some(Primitive(ProtoPrimitiveStats { lower: Some(LowerString("{¥ੜ:}fUQ𐁆hà*𑵨/𐮫£Ⱥ{")), upper: Some(UpperString("ㄷ`á\u{c3e}jv.")) })) }, "C[{v}'\\𒑳ᅱㄗ&,/s{/⅖`Q$>,𐕏": ProtoDynStats { option: None, kind: Some(Struct(ProtoStructStats { len: 15325414661794872962, cols: {"?Ѩ9[ꟐF𐖹𞅁nȺ[🕴{𐠈�": ProtoDynStats { option: None, kind: Some(Bytes(ProtoBytesStats { kind: Some(Primitive(ProtoPrimitiveBytesStats { lower: [21, 176, 220, 118, 44, 88, 27, 242, 198, 0, 62, 194, 75, 238, 111, 138, 227, 34, 250, 74, 29, 56, 190, 127, 148, 96, 221, 212, 47, 118, 66, 243, 4, 183, 125, 57, 35, 72, 24, 34, 212, 73, 131, 135, 165, 37, 51, 87, 191, 213, 34, 118, 160, 76, 43, 65, 25, 70, 85, 135], upper: [208, 187, 160, 84, 157, 234, 88, 157, 213, 91, 105, 19, 52, 177, 180, 51, 214, 23, 217, 72, 58, 107, 152, 66, 63, 221, 88, 16, 51, 2, 116, 83, 238, 20, 170, 172, 166, 123, 246, 38, 189, 167, 69, 166, 21, 35, 197, 195, 234, 34, 210, 105, 237, 97, 130, 83, 93, 57, 91, 125, 0, 94, 98, 107, 249, 225, 61, 59, 4, 72, 27, 244, 1, 162, 110, 114, 108, 78, 76, 9, 124, 198] })) })) }, "|{ଡ০'\\Y/gꤊ𑙦Ѩ": ProtoDynStats { option: None, kind: Some(Primitive(ProtoPrimitiveStats { lower: Some(LowerBool(false)), upper: Some(UpperBool(false)) })) }} })) }, "f𑎎MFJѨ&𐞺🂭ÿ𡲷%?හ𞟠¥WL{𐠫0k": ProtoDynStats { option: None, kind: Some(Struct(ProtoStructStats { len: 15440363192230073212, cols: {} })) }} } }), ts_rewrite: None, diffs_sum: Some([130, 33, 18, 213, 85, 47, 132, 83]), format: Some(Row), schema_id: Some(SchemaId(15276162640790439026)), deprecated_schema_id: Some(SchemaId(145574519964482152)) })), Single(Hollow(HollowBatchPart { key: PartialBatchKey("7ף=Y𞥞\u{11d43}&M(=൚ொ&$\\𝼩/!ú'𐾸K࡞?¥qȺ𐒧𛅑¥"), encoded_size_bytes: 16221637005735549288, key_lower: [61, 125, 10, 36, 150, 198, 184, 63, 22, 47, 246, 48, 40, 189, 127, 142, 173, 25, 79, 60, 24, 96, 28, 246, 17, 205, 150, 30, 91, 69, 30, 234, 89, 139, 104, 202, 64, 37, 167, 109, 70, 203, 145, 166, 230, 144, 96, 151, 11, 101, 204, 233, 81], structured_key_lower: None, stats: Some(LazyPartStats { key: ProtoStructStats { len: 4001924578348347085, cols: {"Ὑhp\"d½ڧ𑗅z/N%🕴𐿳Ⴧ": ProtoDynStats { option: None, kind: Some(Struct(ProtoStructStats { len: 848674279897532819, cols: {"d\u{ecd}🩑?\"=𘠼.Ⴭଳ{/I": ProtoDynStats { option: None, kind: Some(Struct(ProtoStructStats { len: 15989136208187322796, cols: {} })) }} })) }, "𞤳𑊈f6ꔠ<O+ਸ਼u𑾰𞹇`&§x⿲𝙩¢\"v<*𐖰𑨲𚿾🕴¥": ProtoDynStats { option: None, kind: Some(Struct(ProtoStructStats { len: 13118251564013457809, cols: {} })) }} } }), ts_rewrite: None, diffs_sum: Some([143, 104, 1, 105, 234, 31, 102, 89]), format: Some(Row), schema_id: Some(SchemaId(1623169680363454099)), deprecated_schema_id: Some(SchemaId(16047067416504752114)) })), Single(Hollow(HollowBatchPart { key: PartialBatchKey("{¥ல$〳𐋈é:\\.y\u{16b34}y[\u{a4c}𐨕\u{afe}`�Ὼ𑛞𑤉"), encoded_size_bytes: 12335937890353768456, key_lower: [155, 156, 191, 35, 148, 123], structured_key_lower: None, stats: Some(LazyPartStats { key: ProtoStructStats { len: 18290375779633485551, cols: {"`:`{/`?Tಷ$𐣱.^꠴*D�:𑾰/𐮛Ѩ:.&𑏔B?�ףּ": ProtoDynStats { option: None, kind: Some(Struct(ProtoStructStats { len: 5875023596634024771, cols: {} })) }} } }), ts_rewrite: None, diffs_sum: Some([63, 212, 227, 106, 240, 217, 183, 25]), format: None, schema_id: None, deprecated_schema_id: None }))], len: 4, runs: [1], run_meta: [RunMeta { order: None, schema: None, deprecated_schema: None, id: Some(RunId(aa07d41b-4170-2dcc-24ec-215c6bf0c513)), len: None }, RunMeta { order: None, schema: None, deprecated_schema: None, id: Some(RunId(37ead899-8703-bb12-4c34-88914c768d0b)), len: None }] }, HollowBatch { desc: ([226592977566134188], [10000479806158488660], [15948108891800766332]), parts: [Single(Hollow(HollowBatchPart { key: PartialBatchKey("'/C𑦦𑊀ৡ/𐤿7\\`ά\u{1171f}¥Ⱥꠄ🆒ୀe5<*,`G:ﮀ𐰸Y"), encoded_size_bytes: 2414666504526312933, key_lower: [5, 195, 38, 247, 195, 34, 185, 100, 228, 34, 223, 194, 132, 70, 60, 85, 63, 96, 186, 7, 80, 188, 204, 129, 105, 45, 72, 62, 183, 206, 135, 188, 191, 173, 176, 131, 184, 5, 118, 246, 96, 108, 122, 217, 190, 182, 228, 181, 40, 149, 231, 9, 127, 110, 97, 186, 182, 248, 219, 214, 137, 80, 36, 72, 79, 91, 223, 137, 73, 46, 171, 71, 84, 56, 230, 131, 243, 179, 20, 254, 97, 145, 232, 87, 209, 74, 151, 142, 233, 183], structured_key_lower: None, stats: Some(LazyPartStats { key: ProtoStructStats { len: 9238180354612834502, cols: {"'𐓎𑤷\u{1e01d}j᪒": ProtoDynStats { option: None, kind: Some(Struct(ProtoStructStats { len: 10529964197857170512, cols: {} })) }, "/j𑓒w\u{10f47}5=Ⱥ=B𐐄??ᇷp.𚿵g.e/🠄𑊨 𑌏-%=~Ѩ]<": ProtoDynStats { option: None, kind: Some(Struct(ProtoStructStats { len: 11007995775904703669, cols: {} })) }, "{4¥=:𘐎?\\𑱂o'_R�:~.🢲Uw\u{b55}\u{a01}r": ProtoDynStats { option: None, kind: Some(Struct(ProtoStructStats { len: 16260740473980028200, cols: {"ⶠࡓ}": ProtoDynStats { option: None, kind: Some(Primitive(ProtoPrimitiveStats { lower: Some(LowerString("9𓺥e\u{598}¥¥=f$ꖉU(:&Y൘Ѩ%𝔸𐁚ૠ𑎎ை:ਲ਼")), upper: Some(UpperString("⳽ힱ< '{Ѩ6{k\\")) })) }} })) }} } }), ts_rewrite: Some(Antichain { elements: [5889030889590607230] }), diffs_sum: Some([75, 171, 24, 197, 54, 139, 128, 237]), format: None, schema_id: Some(SchemaId(4828515181703063863)), deprecated_schema_id: None })), Single(Inline { updates: LazyInlineBatchPart(ProtoInlineBatchPart { desc: None, index: 0, updates: None }), ts_rewrite: Some(Antichain { elements: [7419603467876392213] }), schema_id: None, deprecated_schema_id: None })], len: 5, runs: [], run_meta: [RunMeta { order: None, schema: None, deprecated_schema: None, id: Some(RunId(524fbb8b-5285-40d7-9d15-b9d201412eaf)), len: None }] }, 0..1)

src/persist-client/src/cli/admin.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
4040
use crate::internal::encoding::Schemas;
4141
use crate::internal::gc::{GarbageCollector, GcReq};
4242
use crate::internal::machine::Machine;
43-
use crate::internal::trace::FueledMergeRes;
43+
use crate::internal::trace::{CompactionInput, FueledMergeRes};
4444
use crate::rpc::{NoopPubSubSender, PubSubSender};
4545
use crate::write::{WriteHandle, WriterId};
4646
use crate::{
@@ -509,7 +509,11 @@ where
509509
start.elapsed(),
510510
);
511511
let (apply_res, maintenance) = machine
512-
.merge_res(&FueledMergeRes { output: res.output })
512+
.merge_res(&FueledMergeRes {
513+
output: res.output,
514+
input: CompactionInput::Legacy,
515+
new_active_compaction: None,
516+
})
513517
.await;
514518
if !maintenance.is_empty() {
515519
info!("ignoring non-empty requested maintenance: {maintenance:?}")

src/persist-client/src/internal/compact.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use crate::internal::metrics::ShardMetrics;
4949
use crate::internal::state::{
5050
ENABLE_INCREMENTAL_COMPACTION, HollowBatch, RunMeta, RunOrder, RunPart,
5151
};
52-
use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
52+
use crate::internal::trace::{ApplyMergeResult, CompactionInput, FueledMergeRes};
5353
use crate::iter::{Consolidator, StructuredSort};
5454
use crate::{Metrics, PersistConfig, ShardId};
5555

@@ -396,7 +396,11 @@ where
396396
);
397397
let res = Self::compact_all(stream, req.clone()).await?;
398398
let maintenance = Self::apply(
399-
FueledMergeRes { output: res.output },
399+
FueledMergeRes {
400+
output: res.output,
401+
input: CompactionInput::Legacy,
402+
new_active_compaction: None,
403+
},
400404
&metrics_clone,
401405
&machine_clone,
402406
)

src/persist-client/src/internal/machine.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1438,6 +1438,7 @@ pub mod datadriven {
14381438
use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
14391439
use crate::internal::state::{BatchPart, RunOrder, RunPart};
14401440
use crate::internal::state_versions::EncodedRollup;
1441+
use crate::internal::trace::CompactionInput;
14411442
use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
14421443
use crate::rpc::NoopPubSubSender;
14431444
use crate::tests::new_test_client;
@@ -2464,7 +2465,11 @@ pub mod datadriven {
24642465
.clone();
24652466
let (merge_res, maintenance) = datadriven
24662467
.machine
2467-
.merge_res(&FueledMergeRes { output: batch })
2468+
.merge_res(&FueledMergeRes {
2469+
output: batch,
2470+
input: CompactionInput::Legacy,
2471+
new_active_compaction: None,
2472+
})
24682473
.await;
24692474
datadriven.routine.push(maintenance);
24702475
Ok(format!(

src/persist-client/src/internal/state.rs

Lines changed: 93 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,6 +1053,29 @@ impl<T> HollowBatch<T> {
10531053
}
10541054
}
10551055

1056+
#[cfg(test)]
1057+
pub(crate) fn new_run_for_test(
1058+
desc: Description<T>,
1059+
parts: Vec<RunPart<T>>,
1060+
len: usize,
1061+
run_id: RunId,
1062+
) -> Self {
1063+
let run_meta = if parts.is_empty() {
1064+
vec![]
1065+
} else {
1066+
let mut meta = RunMeta::default();
1067+
meta.id = Some(run_id);
1068+
vec![meta]
1069+
};
1070+
Self {
1071+
desc,
1072+
len,
1073+
parts,
1074+
run_splits: vec![],
1075+
run_meta,
1076+
}
1077+
}
1078+
10561079
/// An empty hollow batch, representing no updates over the given desc.
10571080
pub(crate) fn empty(desc: Description<T>) -> Self {
10581081
Self {
@@ -1810,7 +1833,9 @@ where
18101833
return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
18111834
}
18121835

1813-
let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1836+
let apply_merge_result = self
1837+
.trace
1838+
.apply_merge_res_checked_classic::<D>(res, metrics);
18141839
Continue(apply_merge_result)
18151840
}
18161841

@@ -2162,10 +2187,7 @@ where
21622187
// We have a nonempty batch: replace it with an empty batch and return.
21632188
// This should not produce an excessively large diff: if it did, we wouldn't have been
21642189
// able to append that batch in the first place.
2165-
let fake_merge = FueledMergeRes {
2166-
output: HollowBatch::empty(desc),
2167-
};
2168-
let result = self.trace.apply_tombstone_merge(&fake_merge);
2190+
let result = self.trace.apply_tombstone_merge(&desc);
21692191
assert!(
21702192
result.matched(),
21712193
"merge with a matching desc should always match"
@@ -2833,34 +2855,92 @@ pub(crate) mod tests {
28332855
}
28342856
}
28352857

2858+
pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2859+
num_runs: usize,
2860+
) -> impl Strategy<Value = HollowBatch<T>> {
2861+
(
2862+
any::<T>(),
2863+
any::<T>(),
2864+
any::<T>(),
2865+
proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2866+
any::<usize>(),
2867+
)
2868+
.prop_map(move |(t0, t1, since, parts, len)| {
2869+
let (lower, upper) = if t0 <= t1 {
2870+
(Antichain::from_elem(t0), Antichain::from_elem(t1))
2871+
} else {
2872+
(Antichain::from_elem(t1), Antichain::from_elem(t0))
2873+
};
2874+
let since = Antichain::from_elem(since);
2875+
2876+
let run_splits = (1..num_runs)
2877+
.map(|i| i * parts.len() / num_runs)
2878+
.collect::<Vec<_>>();
2879+
2880+
let run_meta = (0..num_runs)
2881+
.map(|_| {
2882+
let mut meta = RunMeta::default();
2883+
meta.id = Some(RunId::new());
2884+
meta
2885+
})
2886+
.collect::<Vec<_>>();
2887+
2888+
HollowBatch::new(
2889+
Description::new(lower, upper, since),
2890+
parts,
2891+
len % 10,
2892+
run_meta,
2893+
run_splits,
2894+
)
2895+
})
2896+
}
2897+
28362898
pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
28372899
Strategy::prop_map(
28382900
(
28392901
any::<T>(),
28402902
any::<T>(),
28412903
any::<T>(),
2842-
proptest::collection::vec(any_run_part::<T>(), 0..3),
2904+
proptest::collection::vec(any_run_part::<T>(), 0..20),
28432905
any::<usize>(),
2844-
any::<bool>(),
2906+
0..=10usize,
2907+
proptest::collection::vec(any::<RunId>(), 10),
28452908
),
2846-
|(t0, t1, since, parts, len, runs)| {
2909+
|(t0, t1, since, parts, len, num_runs, run_ids)| {
28472910
let (lower, upper) = if t0 <= t1 {
28482911
(Antichain::from_elem(t0), Antichain::from_elem(t1))
28492912
} else {
28502913
(Antichain::from_elem(t1), Antichain::from_elem(t0))
28512914
};
28522915
let since = Antichain::from_elem(since);
2853-
if runs && parts.len() > 2 {
2854-
let split_at = parts.len() / 2;
2916+
if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2917+
let run_splits = (1..num_runs)
2918+
.map(|i| i * parts.len() / num_runs)
2919+
.collect::<Vec<_>>();
2920+
2921+
let run_meta = (0..num_runs)
2922+
.enumerate()
2923+
.map(|(i, _)| {
2924+
let mut meta = RunMeta::default();
2925+
meta.id = Some(run_ids[i]);
2926+
meta
2927+
})
2928+
.collect::<Vec<_>>();
2929+
28552930
HollowBatch::new(
28562931
Description::new(lower, upper, since),
28572932
parts,
28582933
len % 10,
2859-
vec![RunMeta::default(), RunMeta::default()],
2860-
vec![split_at],
2934+
run_meta,
2935+
run_splits,
28612936
)
28622937
} else {
2863-
HollowBatch::new_run(Description::new(lower, upper, since), parts, len % 10)
2938+
HollowBatch::new_run_for_test(
2939+
Description::new(lower, upper, since),
2940+
parts,
2941+
len % 10,
2942+
run_ids[0],
2943+
)
28642944
}
28652945
},
28662946
)

src/persist-client/src/internal/state_diff.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::internal::state::{
3232
LeasedReaderState, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, RunPart,
3333
State, StateCollections, WriterState,
3434
};
35+
use crate::internal::trace::CompactionInput;
3536
use crate::internal::trace::{FueledMergeRes, SpineId, ThinMerge, ThinSpineBatch, Trace};
3637
use crate::read::LeasedReaderId;
3738
use crate::write::WriterId;
@@ -877,7 +878,11 @@ fn apply_diffs_spine<T: Timestamp + Lattice + Codec64>(
877878

878879
// Fast-path: compaction
879880
if let Some((_inputs, output)) = sniff_compaction(&diffs) {
880-
let res = FueledMergeRes { output };
881+
let res = FueledMergeRes {
882+
output,
883+
input: CompactionInput::Legacy,
884+
new_active_compaction: None,
885+
};
881886
// We can't predict how spine will arrange the batches when it's
882887
// hydrated. This means that something that is maintaining a Spine
883888
// starting at some seqno may not exactly match something else
@@ -1444,7 +1449,11 @@ mod tests {
14441449
leader
14451450
.collections
14461451
.trace
1447-
.apply_merge_res_unchecked(&FueledMergeRes { output });
1452+
.apply_merge_res_unchecked(&FueledMergeRes {
1453+
output,
1454+
input: CompactionInput::Legacy,
1455+
new_active_compaction: None,
1456+
});
14481457
}
14491458
}
14501459
}

0 commit comments

Comments
 (0)