Skip to content

Commit e7388bb

Browse files
committed
Persist: plumb incremental apply APIs through
Depends on #33044
1 parent 00d1bb7 commit e7388bb

File tree

5 files changed

+492
-169
lines changed

5 files changed

+492
-169
lines changed

src/persist-client/src/cli/admin.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
4040
use crate::internal::encoding::Schemas;
4141
use crate::internal::gc::{GarbageCollector, GcReq};
4242
use crate::internal::machine::Machine;
43-
use crate::internal::trace::{CompactionInput, FueledMergeRes};
43+
use crate::internal::trace::FueledMergeRes;
4444
use crate::rpc::{NoopPubSubSender, PubSubSender};
4545
use crate::write::{WriteHandle, WriterId};
4646
use crate::{
@@ -455,17 +455,17 @@ where
455455
let req = CompactReq {
456456
shard_id,
457457
desc: req.desc,
458-
inputs: req
459-
.inputs
460-
.into_iter()
461-
.map(|b| Arc::unwrap_or_clone(b.batch))
462-
.collect(),
458+
inputs: req.inputs,
463459
};
464-
let parts = req.inputs.iter().map(|x| x.part_count()).sum::<usize>();
460+
let parts = req
461+
.inputs
462+
.iter()
463+
.map(|x| x.batch.part_count())
464+
.sum::<usize>();
465465
let bytes = req
466466
.inputs
467467
.iter()
468-
.map(|x| x.encoded_size_bytes())
468+
.map(|x| x.batch.encoded_size_bytes())
469469
.sum::<usize>();
470470
let start = Instant::now();
471471
info!(
@@ -514,7 +514,7 @@ where
514514
let (apply_res, maintenance) = machine
515515
.merge_res(&FueledMergeRes {
516516
output: res.output,
517-
input: CompactionInput::Legacy,
517+
input: res.input,
518518
new_active_compaction: None,
519519
})
520520
.await;
@@ -749,14 +749,18 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
749749
let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
750750
for req in reqs {
751751
info!(
752-
"force_compaction {} {} compacting {} batches in {} parts totaling {} bytes: lower={:?} upper={:?} since={:?}",
752+
"force_compaction {} {} compacting {} batches in {} parts with {} runs totaling {} bytes: lower={:?} upper={:?} since={:?}",
753753
machine.applier.shard_metrics.name,
754754
machine.applier.shard_metrics.shard_id,
755755
req.inputs.len(),
756-
req.inputs.iter().flat_map(|x| &x.parts).count(),
756+
req.inputs.iter().flat_map(|x| &x.batch.parts).count(),
757+
req.inputs
758+
.iter()
759+
.map(|x| x.batch.runs().count())
760+
.sum::<usize>(),
757761
req.inputs
758762
.iter()
759-
.flat_map(|x| &x.parts)
763+
.flat_map(|x| &x.batch.parts)
760764
.map(|x| x.encoded_size_bytes())
761765
.sum::<usize>(),
762766
req.desc.lower().elements(),
@@ -801,13 +805,18 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
801805

802806
// NB: This check is intentionally at the end so that it's safe to call
803807
// this method in a loop.
804-
let num_batches = machine.applier.all_batches().len();
805-
if num_batches < 2 {
808+
let num_runs: usize = machine
809+
.applier
810+
.all_batches()
811+
.iter()
812+
.map(|x| x.runs().count())
813+
.sum();
814+
if num_runs <= 1 {
806815
info!(
807-
"force_compaction {} {} exiting with {} batches",
816+
"force_compaction {} {} exiting with {} runs",
808817
machine.applier.shard_metrics.name,
809818
machine.applier.shard_metrics.shard_id,
810-
num_batches
819+
num_runs
811820
);
812821
return;
813822
}

0 commit comments

Comments
 (0)