From eb69d2b101910866540f83034d01e526ed300db5 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 16 Mar 2025 18:24:06 +0200 Subject: [PATCH] chore: prefetch keys in opmget Also, stop deduplicating keys by default, but keep it as a run-time flag. All in all, this PR improves MGET (100% miss) throughput by at least 7%: from 1.23M to 1.32M qps. (100% miss traffic allows measuring more effectively the impact of this code) Also, finally fix TieredStorageTest.FlushAll test. Signed-off-by: Roman Gershman --- src/server/string_family.cc | 41 ++++++++++++++++++++++++------- src/server/tiered_storage_test.cc | 6 +++-- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/src/server/string_family.cc b/src/server/string_family.cc index e3a598d0714d..fbee18389921 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -35,6 +35,10 @@ #include "server/transaction.h" #include "util/fibers/future.h" +ABSL_FLAG(bool, mget_prefetch_keys, true, "If true, MGET will prefetch keys before reading them"); + +ABSL_FLAG(bool, mget_dedup_keys, false, "If true, MGET will deduplicate keys"); + namespace dfly { namespace { @@ -547,23 +551,42 @@ MGetResponse OpMGet(fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Tran absl::InlinedVector items(keys.Size()); + // First, fetch all iterators and count total size ahead + size_t total_size = 0; + unsigned index = 0; + static bool mget_prefetch_keys = absl::GetFlag(FLAGS_mget_prefetch_keys); + static bool mget_dedup_keys = absl::GetFlag(FLAGS_mget_dedup_keys); + // We can not make it thread-local because we may preempt during the Find loop due to // serialization with the bumpup calls. // TODO: consider separating BumpUps from finds because it becomes too complicated // to reason about. absl::flat_hash_map key_index; + if (mget_dedup_keys) { + key_index.reserve(keys.Size()); + } - // First, fetch all iterators and count total size ahead - size_t total_size = 0; - unsigned index = 0; - key_index.reserve(keys.Size()); + PrimeTable& pt = db_slice.GetDBTable(t->GetDbIndex())->prime; + + constexpr unsigned kPrefetchLimit = 32; + if (mget_prefetch_keys) { + unsigned prefetched = 0; + for (string_view key : keys) { + pt.Prefetch(key); + if (++prefetched >= kPrefetchLimit) { + break; + } + } + } for (string_view key : keys) { - auto [it, inserted] = key_index.try_emplace(key, index); - if (!inserted) { // duplicate -> point to the first occurrence. - items[index++].source_index = it->second; - continue; + if (mget_dedup_keys) { + auto [it, inserted] = key_index.try_emplace(key, index); + if (!inserted) { // duplicate -> point to the first occurrence. + items[index++].source_index = it->second; + continue; + } } auto it_res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_STRING); @@ -618,8 +641,8 @@ MGetResponse OpMGet(fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Tran } } } - key_index.clear(); + key_index.clear(); return response; } diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index ac2be9e633fa..467a8a2e6e0a 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -276,7 +276,10 @@ TEST_F(TieredStorageTest, FlushAll) { Metrics metrics; ExpectConditionWithinTimeout([&] { metrics = GetMetrics(); - return metrics.events.hits > 2; + + // Note that metrics.events.hits is not consistent with total_fetches + // and it can happen that hits is greater than total_fetches due to in-progress reads. + return metrics.tiered_stats.total_fetches > 2; }); LOG(INFO) << FormatMetrics(metrics); @@ -290,7 +293,6 @@ TEST_F(TieredStorageTest, FlushAll) { LOG(INFO) << FormatMetrics(metrics); EXPECT_EQ(metrics.db_stats.front().tiered_entries, 0u); - EXPECT_GT(metrics.tiered_stats.total_fetches, 2u); } TEST_F(TieredStorageTest, FlushPending) {