diff --git a/engines/ep/src/kv_bucket.cc b/engines/ep/src/kv_bucket.cc index 18b1653462..7ef1b68f20 100644 --- a/engines/ep/src/kv_bucket.cc +++ b/engines/ep/src/kv_bucket.cc @@ -3098,6 +3098,13 @@ std::chrono::seconds KVBucket::getHistoryRetentionSeconds() const { void KVBucket::setHistoryRetentionBytes(size_t bytes) { historyRetentionBytes = bytes; + for (auto& i : vbMap.shards) { + KVShard* shard = i.get(); + // The KVStore needs to know the per vbucket size + shard->getRWUnderlying()->setHistoryRetentionBytes(bytes / + vbMap.getSize()); + } + } size_t KVBucket::getHistoryRetentionBytes() const { diff --git a/engines/ep/src/kvstore/kvstore.h b/engines/ep/src/kvstore/kvstore.h index 3425e59d62..a7a140af6d 100644 --- a/engines/ep/src/kvstore/kvstore.h +++ b/engines/ep/src/kvstore/kvstore.h @@ -862,6 +862,13 @@ class KVStore : public KVStoreIface { */ void checkIfInTransaction(Vbid vbid, std::string_view caller); + void setHistoryRetentionBytes(size_t size) override { + // no-op. + // Only supported by backends which report + // StorageProperties::HistoryRetentionAvailable::Yes + // For all other backends this function is allowed, but does nothing. + } + /** * Check if the specified document metadata is /potentially/ affected * by a datatype corruption issue (MB-52793) - a deleted document with diff --git a/engines/ep/src/kvstore/kvstore_iface.h b/engines/ep/src/kvstore/kvstore_iface.h index dedb8832ba..0f75ed6295 100644 --- a/engines/ep/src/kvstore/kvstore_iface.h +++ b/engines/ep/src/kvstore/kvstore_iface.h @@ -763,6 +763,11 @@ class KVStoreIface { * @param vbid ID of the vbucket being created */ virtual void prepareToCreateImpl(Vbid vbid) = 0; + + /** + * Method to configure the amount of history a vbucket should retain. + */ + virtual void setHistoryRetentionBytes(size_t size) = 0; }; std::string to_string(KVStoreIface::ReadVBStateStatus status); diff --git a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc index e6bab24457..67a4546439 100644 --- a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc +++ b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc @@ -869,7 +869,7 @@ StorageProperties MagmaKVStore::getStorageProperties() const { StorageProperties::AutomaticDeduplication::No, StorageProperties::PrepareCounting::No, StorageProperties::CompactionStaleItemCallbacks::Yes, - StorageProperties::HistoryRetentionAvailable::No); + StorageProperties::HistoryRetentionAvailable::Yes); return rv; } @@ -1695,8 +1695,7 @@ std::unique_ptr MagmaKVStore::initBySeqnoScanContext( getDroppedStatus.String()); } - // @todo:assign this using magma->GetOldestHistorySeqno(snapshot); - auto historyStartSeqno = 0; + auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot); if (logger->should_log(spdlog::level::info)) { logger->info( "MagmaKVStore::initBySeqnoScanContext {} seqno:{} endSeqno:{}" @@ -1809,8 +1808,7 @@ std::unique_ptr MagmaKVStore::initByIdScanContext( return nullptr; } - // @todo:assign this using magma->GetOldestHistorySeqno(snapshot); - auto historyStartSeqno = 0; + auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot); logger->info( "MagmaKVStore::initByIdScanContext {} historyStartSeqno:{} " "KeyIterator:{}", @@ -1830,13 +1828,16 @@ std::unique_ptr MagmaKVStore::initByIdScanContext( historyStartSeqno); } +scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const { + return scan(ctx, magma::Magma::SeqIterator::Mode::Snapshot); +} + scan_error_t MagmaKVStore::scanAllVersions(BySeqnoScanContext& ctx) const { - // @todo use magma's mode - // return scan(ctx, magma::Magma::SeqIterator::Mode::History); - return scan(ctx); + return scan(ctx, magma::Magma::SeqIterator::Mode::History); } -scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const { +scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx, + magma::Magma::SeqIterator::Mode mode) const { if (ctx.lastReadSeqno == ctx.maxSeqno) { logger->TRACE("MagmaKVStore::scan {} lastReadSeqno:{} == maxSeqno:{}", ctx.vbid, @@ -1849,7 +1850,8 @@ scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const { startSeqno = ctx.lastReadSeqno + 1; } auto& mctx = dynamic_cast(ctx); - for (mctx.itr->Seek(startSeqno, ctx.maxSeqno); mctx.itr->Valid(); + for (mctx.itr->Initialize(startSeqno, ctx.maxSeqno, mode); + mctx.itr->Valid(); mctx.itr->Next()) { Slice keySlice, metaSlice, valSlice; uint64_t seqno; @@ -3722,3 +3724,7 @@ std::pair MagmaKVStore::getOldestRollbackableHighSeqno( return {status, seqno}; } + +void MagmaKVStore::setHistoryRetentionBytes(size_t size) { + magma->SetHistoryRetentionSize(size); +} diff --git a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h index b46bdbc676..7cce128f19 100644 --- a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h +++ b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h @@ -568,6 +568,12 @@ class MagmaKVStore : public KVStore { std::unique_ptr begin( Vbid vbid, std::unique_ptr pcb) override; + /** + * Informs magma of how much history must be retained using + * Magma::SetHistoryRetentionTime + */ + void setHistoryRetentionBytes(size_t size) override; + // Magma uses a unique logger with a prefix of magma so that all logging // calls from the wrapper thru magma will be prefixed with magma. std::shared_ptr logger; @@ -782,6 +788,9 @@ class MagmaKVStore : public KVStore { folly::assume_unreachable(); } + scan_error_t scan(BySeqnoScanContext& ctx, + magma::Magma::SeqIterator::Mode mode) const; + MagmaKVStoreConfig& configuration; /** diff --git a/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h b/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h index f7d86eccab..ee9c16194d 100644 --- a/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h +++ b/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h @@ -143,6 +143,9 @@ class NexusKVStore : public KVStoreIface { void delSystemEvent(TransactionContext& txnCtx, const queued_item item) override; void endTransaction(Vbid vbid) override; + void setHistoryRetentionBytes(size_t size) override { + // not supported on Nexus + } /** * Unit test only hook called before we compact the first KVStore. Public as diff --git a/engines/ep/tests/mock/mock_kvstore.h b/engines/ep/tests/mock/mock_kvstore.h index 406d4da360..7d31011fe8 100644 --- a/engines/ep/tests/mock/mock_kvstore.h +++ b/engines/ep/tests/mock/mock_kvstore.h @@ -225,6 +225,7 @@ class MockKVStore : public KVStore { prepareToRollback, (Vbid vbid), (override)); + MOCK_METHOD(void, setHistoryRetentionBytes, (size_t size), (override)); /** * Helper function to replace the existing read-write KVStore in the given diff --git a/engines/ep/tests/module_tests/collections/collections_dcp_test.h b/engines/ep/tests/module_tests/collections/collections_dcp_test.h index f5ac61b4c7..e1c52f7c54 100644 --- a/engines/ep/tests/module_tests/collections/collections_dcp_test.h +++ b/engines/ep/tests/module_tests/collections/collections_dcp_test.h @@ -103,17 +103,27 @@ class CollectionsDcpTest : virtual public SingleThreadedKVBucketTest { const CollectionEntry::Entry& entry, uint64_t seqno); + /** + * This function (created for OSO tests) creates two collections (fruit + * and vegetable) and calls writeTwoCollections + * + * @param endOnVegetable true and the last item written will be for the + * vegetable collection + * @return current manifest and vbucket (::vbid) high-seqno + */ + std::pair setupTwoCollections( + bool endOnVegetable = false); + /** * This function (created for OSO tests) writes to two collections (fruit * and vegetable). The keys are "a", "b", "c" and "d" to demonstrate the * lexicographical ordering of an OSO snapshot. * * @param endOnVegetable true and the last item written will be for the - * vegetable collection - * @return manifest and high-seqno + * vegetable collection + * @return vbucket (::vbid) high-seqno */ - std::pair setupTwoCollections( - bool endOnVegetable = false); + uint64_t writeTwoCollectios(bool endOnTarget); static cb::engine_errc dcpAddFailoverLog( const std::vector&); diff --git a/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc b/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc index 0441f7a80c..6756db6e07 100644 --- a/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc +++ b/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc @@ -92,7 +92,8 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { void applyEvents(TransactionContext& txnCtx, VB::Commit& commitData, - const CollectionsManifest& cm) { + const CollectionsManifest& cm, + bool writeEventNow = true) { manifest.update(*vbucket, makeManifest(cm)); std::vector events; @@ -101,17 +102,44 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { for (auto& ev : events) { commitData.collections.recordSystemEvent(*ev); + if (writeEventNow) { + if (ev->isDeleted()) { + kvstore->delSystemEvent(txnCtx, ev); + } else { + kvstore->setSystemEvent(txnCtx, ev); + } + } + } + if (!writeEventNow) { + std::move(events.begin(), + events.end(), + std::back_inserter(allEvents)); + } + } + + void applyEvents(TransactionContext& txnCtx, + const CollectionsManifest& cm, + bool writeEventNow = true) { + return applyEvents(txnCtx, flush, cm, writeEventNow); + } + + // This function is to be used in conjunction with applyEvents when + // writeEventNow=false allowing a test to better emulate the flusher and + // write keys in a sorted batch. Tests can applyEvents so that collection + // metadata management does updates, but defer the system event writing + // until ready to commit + void sortAndWriteAllEvents(TransactionContext& txnCtx) { + std::sort(allEvents.begin(), + allEvents.end(), + OrderItemsForDeDuplication{}); + for (auto& ev : allEvents) { if (ev->isDeleted()) { kvstore->delSystemEvent(txnCtx, ev); } else { kvstore->setSystemEvent(txnCtx, ev); } } - } - - void applyEvents(TransactionContext& txnCtx, - const CollectionsManifest& cm) { - applyEvents(txnCtx, flush, cm); + allEvents.clear(); } void checkUid(const Collections::KVStore::Manifest& md, @@ -224,7 +252,8 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { VB::Commit commitData(manifest); auto ctx = kvstore->begin(vbucket->getId(), std::make_unique()); - applyEvents(*ctx, commitData, cm); + applyEvents(*ctx, commitData, cm, false); + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), commitData); auto [status, md] = kvstore->getCollectionsManifest(Vbid(0)); EXPECT_TRUE(status); @@ -240,6 +269,7 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { VBucketPtr vbucket; WriteCallback wc; DeleteCallback dc; + std::vector allEvents; }; class CollectionsKVStoreTest @@ -583,19 +613,21 @@ class CollectionRessurectionKVStoreTest auto ctx = kvstore->begin(vbucket->getId(), std::make_unique()); cm.add(targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); cm.add(target, targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), flush); } // runs a flush batch that will leave the target collection in dropped state void dropScope() { openScopeOpenCollection(); - cm.remove(targetScope); auto ctx = kvstore->begin(vbucket->getId(), std::make_unique()); - applyEvents(*ctx, cm); + cm.remove(targetScope); + applyEvents(*ctx, cm, false); + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), flush); } @@ -709,9 +741,9 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() { std::make_unique()); if (!cm.exists(targetScope)) { cm.add(targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); cm.add(target, targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); } std::string expectedName = target.name; @@ -720,22 +752,23 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() { // iterate cycles of remove/add for (int ii = 0; ii < getCycles(); ii++) { cm.remove(scope); - applyEvents(*ctx, cm); - + applyEvents(*ctx, cm, false); if (resurectWithNewName()) { expectedName = target.name + "_" + std::to_string(ii); scope.name = targetScope.name + "_" + std::to_string(ii); } cm.add(scope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); cm.add({expectedName, target.uid}, scope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); } if (dropCollectionAtEnd()) { cm.remove(scope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); } + + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), flush); // Now validate diff --git a/engines/ep/tests/module_tests/collections/collections_oso_dcp_test.cc b/engines/ep/tests/module_tests/collections/collections_oso_dcp_test.cc index ebc0756c62..4f0c6cd3b6 100644 --- a/engines/ep/tests/module_tests/collections/collections_oso_dcp_test.cc +++ b/engines/ep/tests/module_tests/collections/collections_oso_dcp_test.cc @@ -45,11 +45,15 @@ class CollectionsOSODcpTest : public CollectionsDcpParameterizedTest { std::pair CollectionsDcpTest::setupTwoCollections(bool endOnTarget) { - VBucketPtr vb = store->getVBucket(vbid); CollectionsManifest cm(CollectionEntry::fruit); setCollections(cookie, cm.add(CollectionEntry::vegetable)); + flush_vbucket_to_disk(vbid, 2); + return {cm, writeTwoCollectios(endOnTarget)}; +} - // Interleave the writes to two collections and then OSO backfill one +uint64_t CollectionsDcpTest::writeTwoCollectios(bool endOnTarget) { + // Interleave the writes to two collections, this is linked to expectations + // in CollectionsOSODcpTest test harness store_item(vbid, makeStoredDocKey("b", CollectionEntry::fruit), "q"); store_item(vbid, makeStoredDocKey("b", CollectionEntry::vegetable), "q"); store_item(vbid, makeStoredDocKey("d", CollectionEntry::fruit), "a"); @@ -66,8 +70,8 @@ CollectionsDcpTest::setupTwoCollections(bool endOnTarget) { vbid, makeStoredDocKey("c", CollectionEntry::vegetable), "q"); store_item(vbid, makeStoredDocKey("c", CollectionEntry::fruit), "y"); } - flush_vbucket_to_disk(vbid, 10); // 8 keys + 2 events - return {cm, vb->getHighSeqno()}; + flush_vbucket_to_disk(vbid, 8); + return store->getVBucket(vbid)->getHighSeqno(); } // Run through how we expect OSO to work, this is a minimal test which will @@ -674,7 +678,6 @@ TEST_P(CollectionsOSODcpTest, MB_43700) { // snapshots class CollectionsOSOEphemeralTest : public CollectionsDcpParameterizedTest { public: - std::pair setupTwoCollections(); }; // Run through how we expect OSO to work, this is a minimal test which will diff --git a/engines/ep/tests/module_tests/history_scan_test.cc b/engines/ep/tests/module_tests/history_scan_test.cc index cbc345fa2d..1ae26a5487 100644 --- a/engines/ep/tests/module_tests/history_scan_test.cc +++ b/engines/ep/tests/module_tests/history_scan_test.cc @@ -12,11 +12,13 @@ #include "../mock/mock_synchronous_ep_engine.h" #include "checkpoint_manager.h" #include "collections/collections_dcp_test.h" +#include "dcp/backfill_by_seqno_disk.h" #include "kv_bucket.h" #include "tests/mock/mock_dcp.h" #include "tests/mock/mock_dcp_consumer.h" #include "tests/mock/mock_dcp_producer.h" #include "tests/mock/mock_magma_kvstore.h" +#include "tests/mock/mock_stream.h" #include "tests/module_tests/test_helpers.h" #include "vbucket.h" @@ -34,7 +36,11 @@ class HistoryScanTest : public CollectionsDcpParameterizedTest { CollectionsDcpParameterizedTest::SetUp(); // To allow tests to set where history begins, use MockMagmaKVStore replaceMagmaKVStore(); - // @todo: Setup to retain history using setHistoryRetentionBytes + // For all tests, use big history window - all tests here will use a + // combination of magma's history retention + setHistoryStartSeqno to + // configure the test. + store->getRWUnderlying(vbid)->setHistoryRetentionBytes(100 * 1024 * + 1024); } void setHistoryStartSeqno(uint64_t seqno) { @@ -109,9 +115,6 @@ void HistoryScanTest::validateSnapshot( // Validate that 1 disk snapshot is produced and that it is marked as history // and duplicates TEST_P(HistoryScanTest, basic_unique) { - // The entire disk is "history", from seqno 1 - setHistoryStartSeqno(1); - std::vector items; items.emplace_back(store_item( vbid, makeStoredDocKey("a", CollectionID::Default), "val-a")); @@ -143,23 +146,22 @@ TEST_P(HistoryScanTest, basic_unique) { items); } -// Following test cannot be enabled until magma history support exists -TEST_P(HistoryScanTest, DISABLED_basic_duplicates) { +TEST_P(HistoryScanTest, basic_duplicates) { CollectionsManifest cm; setCollections(cookie, cm.add(CollectionEntry::vegetable, {}, true)); std::vector items; // Create a "dummy" Item that marks where the system-event is expected - items.emplace_back(makeStoredDocKey("a", CollectionEntry::vegetable), + items.emplace_back(makeStoredDocKey("ignored", CollectionEntry::vegetable), vbid, queue_op::system_event, 0, 1); items.emplace_back(store_item( - vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v0")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v0")); // temp flush in two batches as dedup is still on in the flusher flush_vbucket_to_disk(vbid, 1 + 1); items.emplace_back(store_item( - vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v1")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v1")); flush_vbucket_to_disk(vbid, 1); ensureDcpWillBackfill(); @@ -196,26 +198,26 @@ TEST_P(HistoryScanTest, TwoSnapshots) { std::vector items1, items2; // items1 represents the first snapshot, only the crate of vegetable will - // exist in this snapshot. The second history snapshot will have the 'a' + // exist in this snapshot. The second history snapshot will have the 'k0' // keys (both versions). items1.emplace_back(makeStoredDocKey("", CollectionEntry::vegetable), vbid, queue_op::system_event, 0, 1); - store_item(vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v0"); + store_item(vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v0"); flush_vbucket_to_disk(vbid, 1 + 1); - store_item(vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v1"); + store_item(vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v1"); flush_vbucket_to_disk(vbid, 1); // Now we must force history to begin from the next flush items2.emplace_back(store_item( - vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v2")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v2")); flush_vbucket_to_disk(vbid, 1); // @todo: switch to a using key 'a' when magma history support is available // then we can verify two versions of 'a' are returned items2.emplace_back(store_item( - vbid, makeStoredDocKey("b", CollectionEntry::vegetable), "v3")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v3")); flush_vbucket_to_disk(vbid, 1); ensureDcpWillBackfill(); @@ -272,10 +274,10 @@ TEST_P(HistoryScanTest, TwoSnapshots) { // Test OSO switches to history TEST_P(HistoryScanTest, OSOThenHistory) { - setHistoryStartSeqno(1); - - // This writes to fruit and vegetable - auto highSeqno = setupTwoCollections().second; + // Setup (which calls writeTwoCollections), then call writeTwoCollectios + // to generate some duplicates (history) + setupTwoCollections(); + auto highSeqno = writeTwoCollectios(true); ensureDcpWillBackfill(); @@ -304,7 +306,6 @@ TEST_P(HistoryScanTest, OSOThenHistory) { EXPECT_EQ(mcbp::systemevent::id::CreateCollection, producers->last_system_event); - uint64_t txHighSeqno = 0; std::array keys = {{"a", "b", "c", "d"}}; for (auto& k : keys) { @@ -314,7 +315,6 @@ TEST_P(HistoryScanTest, OSOThenHistory) { EXPECT_EQ(ClientOpcode::DcpMutation, producers->last_op); EXPECT_EQ(k, producers->last_key) << producers->last_byseqno; EXPECT_EQ(CollectionUid::vegetable, producers->last_collection_id); - txHighSeqno = std::max(txHighSeqno, producers->last_byseqno.load()); } // Now we get the end message @@ -323,26 +323,95 @@ TEST_P(HistoryScanTest, OSOThenHistory) { EXPECT_EQ(uint32_t(DcpOsoSnapshotFlags::End), producers->last_oso_snapshot_flags); - // Now we get the second snapshot + auto vb = store->getVBucket(vbid); + // Now we get the second snapshot, which is history stepAndExpect(ClientOpcode::DcpSnapshotMarker); - + EXPECT_EQ(vbid, producers->last_vbucket); + EXPECT_EQ(0, producers->last_snap_start_seqno); + EXPECT_EQ(vb->getPersistenceSeqno(), producers->last_snap_end_seqno); EXPECT_EQ(MARKER_FLAG_DISK | MARKER_FLAG_CHK | MARKER_FLAG_HISTORY | MARKER_FLAG_MAY_CONTAIN_DUPLICATE_KEYS, producers->last_flags); + stepAndExpect(ClientOpcode::DcpSystemEvent); + EXPECT_EQ(mcbp::systemevent::id::CreateCollection, + producers->last_system_event); + EXPECT_EQ(CollectionUid::vegetable, producers->last_collection_id); - // And all keys in seq order. Setup created in order b, d, a, c + // And all keys in seq order. writeTwoCollectios created in order b, d, a, c std::array keySeqnoOrder = {{"b", "d", "a", "c"}}; for (auto& k : keySeqnoOrder) { - // Now we get the mutations, they aren't guaranteed to be in seqno - // order, but we know that for now they will be in key order. stepAndExpect(ClientOpcode::DcpMutation); EXPECT_EQ(k, producers->last_key); EXPECT_EQ(CollectionUid::vegetable, producers->last_collection_id); } + // twice.. as we wrote them twice + for (auto& k : keySeqnoOrder) { + stepAndExpect(ClientOpcode::DcpMutation); + EXPECT_EQ(k, producers->last_key); + EXPECT_EQ(CollectionUid::vegetable, producers->last_collection_id); + } +} + +// Tests which don't need executing in two eviction modes +class HistoryScanTestSingleEvictionMode : public HistoryScanTest {}; + +// Test covers state machine transitions when a history scanHistory gets false +// from markDiskSnapshot +TEST_P(HistoryScanTestSingleEvictionMode, HistoryScanFailMarkDiskSnapshot) { + // Store an items, create new checkpoint and flush so we have something to + // backfill from disk + setVBucketStateAndRunPersistTask(vbid, vbucket_state_active); + store_item(vbid, makeStoredDocKey("key1"), "value"); + flushAndRemoveCheckpoints(vbid); + + // Create producer now we have items only on disk. + auto producer = std::make_shared( + *engine, cookie, "test-producer", 0 /*flags*/, false /*startTask*/); + ASSERT_EQ(cb::engine_errc::success, + producer->control(0, DcpControlKeys::ChangeStreams, "true")); + + auto vb = engine->getVBucket(vbid); + ASSERT_TRUE(vb); + auto stream = + std::make_shared(engine.get(), + producer, + DCP_ADD_STREAM_FLAG_DISKONLY, + 0, + *vb, + 0, + 1, + 0, + 0, + 0, + IncludeValue::Yes, + IncludeXattrs::Yes, + IncludeDeletedUserXattrs::No, + std::string{}); + + ASSERT_TRUE(stream->areChangeStreamsEnabled()); + stream->setActive(); + + // Create our own backfill to test + auto backfill = std::make_unique( + *engine->getKVBucket(), stream, 1, vb->getPersistenceSeqno()); + EXPECT_EQ(backfill_state_init, backfill->getState()); + EXPECT_EQ(backfill_success, backfill->run()); + EXPECT_EQ(backfill_state_scanning_history_snapshot, backfill->getState()); + // stream will error markDiskSnapshot + stream->setDead(cb::mcbp::DcpStreamEndStatus::Ok); + EXPECT_EQ(backfill_finished, backfill->run()); } INSTANTIATE_TEST_SUITE_P(HistoryScanTests, HistoryScanTest, STParameterizedBucketTest::magmaConfigValues(), - STParameterizedBucketTest::PrintToStringParamName); \ No newline at end of file + STParameterizedBucketTest::PrintToStringParamName); + +INSTANTIATE_TEST_SUITE_P( + HistoryScanTests, + HistoryScanTestSingleEvictionMode, + ::testing::Values("bucket_type=persistent:" + "backend=magma:" + "item_eviction_policy=full_eviction"), + STParameterizedBucketTest::PrintToStringParamName); \ No newline at end of file diff --git a/engines/ep/tests/module_tests/magma-kvstore_test.cc b/engines/ep/tests/module_tests/magma-kvstore_test.cc index bbcd77c3bf..a0fb4d48de 100644 --- a/engines/ep/tests/module_tests/magma-kvstore_test.cc +++ b/engines/ep/tests/module_tests/magma-kvstore_test.cc @@ -20,6 +20,7 @@ #include "test_helpers.h" #include "thread_gate.h" #include +#include using namespace std::string_literals; using namespace testing; @@ -94,7 +95,8 @@ TEST_F(MagmaKVStoreRollbackTest, Rollback) { auto ctx = kvstore->begin(vbid, std::make_unique()); for (int j = 0; j < 5; j++) { - auto key = makeStoredDocKey("key" + std::to_string(seqno)); + // pad the key so key09 < key10 + auto key = makeStoredDocKey(fmt::format("key_{:02}", seqno)); auto qi = makeCommittedItem(key, "value"); flush.proposedVBState.lastSnapStart = seqno; flush.proposedVBState.lastSnapEnd = seqno; @@ -104,22 +106,22 @@ TEST_F(MagmaKVStoreRollbackTest, Rollback) { kvstore->commit(std::move(ctx), flush); } - auto rv = kvstore->get(makeDiskDocKey("key5"), Vbid(0)); + auto rv = kvstore->get(makeDiskDocKey("key_05"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); - rv = kvstore->get(makeDiskDocKey("key6"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_06"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); auto rollbackResult = kvstore->rollback(Vbid(0), 5, std::make_unique()); ASSERT_TRUE(rollbackResult.success); - rv = kvstore->get(makeDiskDocKey("key1"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_01"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); - rv = kvstore->get(makeDiskDocKey("key5"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_05"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); - rv = kvstore->get(makeDiskDocKey("key6"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_06"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::no_such_key); - rv = kvstore->get(makeDiskDocKey("key10"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_10"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::no_such_key); auto vbs = kvstore->getCachedVBucketState(Vbid(0)); @@ -139,7 +141,8 @@ TEST_F(MagmaKVStoreRollbackTest, RollbackNoValidCheckpoint) { auto ctx = kvstore->begin(vbid, std::make_unique()); for (int j = 0; j < 5; j++) { - auto key = makeStoredDocKey("key" + std::to_string(seqno)); + // pad the key so key09 < key10 + auto key = makeStoredDocKey(fmt::format("key_{:02}", seqno)); auto qi = makeCommittedItem(key, "value"); qi->setBySeqno(seqno++); kvstore->set(*ctx, qi); @@ -619,17 +622,6 @@ class MockDirectory : public magma::Directory { TEST_F(MagmaKVStoreTest, readOnlyMode) { initialize_kv_store(kvstore.get(), vbid); - auto doWrite = [this](uint64_t seqno, bool expected) { - auto ctx = - kvstore->begin(vbid, std::make_unique()); - auto qi = makeCommittedItem(makeStoredDocKey("key"), "value"); - qi->setBySeqno(seqno); - flush.proposedVBState.lastSnapStart = seqno; - flush.proposedVBState.lastSnapEnd = seqno; - kvstore->set(*ctx, qi); - EXPECT_EQ(expected, kvstore->commit(std::move(ctx), flush)); - }; - // Add an item to test that we can read it doWrite(1, true /*success*/); @@ -715,11 +707,17 @@ TEST_F(MagmaKVStoreTest, makeFileHandleSyncFailed) { EXPECT_FALSE(fileHandle); } -// @todo: This is a basic test that will be expanded to cover scanning history -// at the moment this test is equivalent to "scan" -TEST_F(MagmaKVStoreTest, scanAllVersions) { +// Test scanAllVersions returns the expected number of keys and the expected +// history start seqno. +TEST_F(MagmaKVStoreTest, scanAllVersions1) { initialize_kv_store(kvstore.get(), vbid); + kvstore->setHistoryRetentionBytes(1024 * 1024); + + // History is enabled + flush.historical = CheckpointHistorical::Yes; + std::vector expectedItems; + expectedItems.push_back(doWrite(1, true, "k1")); expectedItems.push_back(doWrite(2, true, "k2")); auto validate = [&expectedItems](GetValue gv) { @@ -736,8 +734,7 @@ TEST_F(MagmaKVStoreTest, scanAllVersions) { ValueFilter::VALUES_COMPRESSED, SnapshotSource::Head); ASSERT_TRUE(bySeq); - // @todo: This must be the expected seqno where history begins - EXPECT_EQ(0, bySeq->historyStartSeqno); + EXPECT_EQ(1, bySeq->historyStartSeqno); EXPECT_EQ(scan_success, kvstore->scanAllVersions(*bySeq)); auto& cb = @@ -770,4 +767,83 @@ TEST_F(MagmaKVStoreTest, preparePendingRequests) { EXPECT_EQ(itr->second, req->getItem().getKey().c_str()); ++itr; } +} + +// Test scanAllVersions returns the expected number of keys and the expected +// history start seqno. This test uses the same key for all mutations. +TEST_F(MagmaKVStoreTest, scanAllVersions2) { + initialize_kv_store(kvstore.get(), vbid); + kvstore->setHistoryRetentionBytes(1024 * 1024); + flush.historical = CheckpointHistorical::Yes; + + std::vector expectedItems; + // doWrite writes the same key + expectedItems.push_back(doWrite(1, true)); + expectedItems.push_back(doWrite(2, true)); + auto validate = [&expectedItems](GetValue gv) { + ASSERT_TRUE(gv.item); + ASSERT_GE(expectedItems.size(), size_t(gv.item->getBySeqno())); + EXPECT_EQ(*expectedItems[gv.item->getBySeqno() - 1], *gv.item); + }; + auto bySeq = kvstore->initBySeqnoScanContext( + std::make_unique>(validate), + std::make_unique>(), + vbid, + 1, + DocumentFilter::ALL_ITEMS, + ValueFilter::VALUES_COMPRESSED, + SnapshotSource::Head); + ASSERT_TRUE(bySeq); + EXPECT_EQ(1, bySeq->historyStartSeqno); + EXPECT_EQ(scan_success, kvstore->scanAllVersions(*bySeq)); + + auto& cb = + static_cast&>(bySeq->getValueCallback()); + EXPECT_EQ(2, cb.getProcessedCount()); +} + +// ScanContext now exposes historyStartSeqno which is tracked by magma provided +// it is retaining history. +TEST_F(MagmaKVStoreTest, historyStartSeqno) { + initialize_kv_store(kvstore.get(), vbid); + kvstore->setHistoryRetentionBytes(1024 * 1024); + + auto validate = [this](uint64_t expectedSeqno) { + auto bySeq = kvstore->initBySeqnoScanContext( + std::make_unique(true /*expectcompressed*/), + std::make_unique(1, 5, Vbid(0)), + vbid, + 1, + DocumentFilter::ALL_ITEMS, + ValueFilter::VALUES_COMPRESSED, + SnapshotSource::Head); + auto byId = kvstore->initByIdScanContext( + std::make_unique(true /*expectcompressed*/), + std::make_unique(1, 5, Vbid(0)), + vbid, + {}, + DocumentFilter::ALL_ITEMS, + ValueFilter::VALUES_COMPRESSED); + ASSERT_TRUE(bySeq); + ASSERT_TRUE(byId); + + EXPECT_EQ(expectedSeqno, bySeq->historyStartSeqno); + EXPECT_EQ(bySeq->historyStartSeqno, byId->historyStartSeqno); + }; + + flush.historical = CheckpointHistorical::No; + validate(0); // no flush yet - and no history + doWrite(2, true); // write seqno 2 + validate(0); + + // Now enable history + flush.historical = CheckpointHistorical::Yes; + doWrite(3, true); // write seqno 3 + validate(3); + doWrite(4, true); // write seqno 4 + validate(3); // history still starts at 3 + + flush.historical = CheckpointHistorical::No; + doWrite(5, true); // write seqno 5 + validate(0); // back to no history } \ No newline at end of file