Skip to content

Commit 6f74d0b

Browse files
committed
Persist: plumb incremental apply APIs through
Depends on #33044
1 parent f035a90 commit 6f74d0b

File tree

7 files changed

+453
-162
lines changed

7 files changed

+453
-162
lines changed

WORKSPACE

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,14 +353,14 @@ rust_toolchains(
353353
"clippy": "a1ebfd7eac1dd1e879cde7d4a775689e5bd7d80c576930604c7589576d4fe2fe",
354354
"llvm-tools": "8d864504e0f4e5e37ad81840f5a353f5029128a713e2907d8c13da2f3cfba4c3",
355355
"rust-std": "00c1bb3ddb383c9aaace344332f2e523552464abf5133645a49764b3c02b0421",
356-
"rustc": "d278edbf97698d63779dfe8116c252cd957d25f284ae282a2d435a49710b6a35"
356+
"rustc": "d278edbf97698d63779dfe8116c252cd957d25f284ae282a2d435a49710b6a35",
357357
},
358358
"nightly": {
359359
"cargo": "efe388459b9cb6dc2d0bf3cd75ccd6acc9dbfa61928691af3bd46d892e078ae4",
360360
"clippy": "8fc88c6421e02c2b9afb7b9ab6dfd981a5a9df189f80fd6f580ee8fbd053aa66",
361361
"llvm-tools": "c19f937183373639e0cd5f69f784a889cb3e0424943c20c90ee486b95f1e5777",
362362
"rust-std": "71745eb4d21138fa4e1fd900101423887e32f1ac8c4464c1e09068a04e668486",
363-
"rustc": "e77d89b08325f8e48b444887a63685be5cfba007aa421468cdaafc7ad11ccadf"
363+
"rustc": "e77d89b08325f8e48b444887a63685be5cfba007aa421468cdaafc7ad11ccadf",
364364
},
365365
},
366366
"aarch64-unknown-linux-gnu": {

misc/bazel/c_deps/rust-sys/BUILD.librdkafka.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ cmake(
2626
name = "librdkafka",
2727
build_args = ["-j8"],
2828
build_data = ["@openssl//:openssl_root"],
29+
copts = ["-UNDEBUG"],
2930
env = {
3031
# Note: casing here is important.
3132
"OpenSSL_ROOT": "$(execpath @openssl//:openssl_root)",
3233
},
33-
copts = ["-UNDEBUG"],
3434
generate_args = [
3535
# Features enabled by the Rust `rdkafka-sys` crate.
3636
"-DRDKAFKA_BUILD_STATIC=1",

src/persist-client/src/cli/admin.rs

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -455,17 +455,17 @@ where
455455
let req = CompactReq {
456456
shard_id,
457457
desc: req.desc,
458-
inputs: req
459-
.inputs
460-
.into_iter()
461-
.map(|b| Arc::unwrap_or_clone(b.batch))
462-
.collect(),
458+
inputs: req.inputs,
463459
};
464-
let parts = req.inputs.iter().map(|x| x.part_count()).sum::<usize>();
460+
let parts = req
461+
.inputs
462+
.iter()
463+
.map(|x| x.batch.part_count())
464+
.sum::<usize>();
465465
let bytes = req
466466
.inputs
467467
.iter()
468-
.map(|x| x.encoded_size_bytes())
468+
.map(|x| x.batch.encoded_size_bytes())
469469
.sum::<usize>();
470470
let start = Instant::now();
471471
info!(
@@ -743,14 +743,18 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
743743
let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
744744
for req in reqs {
745745
info!(
746-
"force_compaction {} {} compacting {} batches in {} parts totaling {} bytes: lower={:?} upper={:?} since={:?}",
746+
"force_compaction {} {} compacting {} batches in {} parts with {} runs totaling {} bytes: lower={:?} upper={:?} since={:?}",
747747
machine.applier.shard_metrics.name,
748748
machine.applier.shard_metrics.shard_id,
749749
req.inputs.len(),
750-
req.inputs.iter().flat_map(|x| &x.parts).count(),
750+
req.inputs.iter().flat_map(|x| &x.batch.parts).count(),
751+
req.inputs
752+
.iter()
753+
.map(|x| x.batch.runs().count())
754+
.sum::<usize>(),
751755
req.inputs
752756
.iter()
753-
.flat_map(|x| &x.parts)
757+
.flat_map(|x| &x.batch.parts)
754758
.map(|x| x.encoded_size_bytes())
755759
.sum::<usize>(),
756760
req.desc.lower().elements(),
@@ -795,13 +799,18 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
795799

796800
// NB: This check is intentionally at the end so that it's safe to call
797801
// this method in a loop.
798-
let num_batches = machine.applier.all_batches().len();
799-
if num_batches < 2 {
802+
let num_runs: usize = machine
803+
.applier
804+
.all_batches()
805+
.iter()
806+
.map(|x| x.runs().count())
807+
.sum();
808+
if num_runs <= 1 {
800809
info!(
801-
"force_compaction {} {} exiting with {} batches",
810+
"force_compaction {} {} exiting with {} runs",
802811
machine.applier.shard_metrics.name,
803812
machine.applier.shard_metrics.shard_id,
804-
num_batches
813+
num_runs
805814
);
806815
return;
807816
}

0 commit comments

Comments
 (0)