From 61348e865b1b4b46013233aa3e210ea158a364ce Mon Sep 17 00:00:00 2001
From: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Date: Wed, 29 May 2024 14:31:20 +0800
Subject: [PATCH] Storages: Fix cloning delta index when there are duplicated
 tuples (#9000) (#9016)

close pingcap/tiflash#8845

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>

Co-authored-by: jinhelin <linjinhe33@gmail.com>
---
 .../Storages/DeltaMerge/Delta/Snapshot.cpp    |   4 +-
 dbms/src/Storages/DeltaMerge/DeltaIndex.h     |  28 ++-
 dbms/src/Storages/DeltaMerge/DeltaPlace.h     |   3 +
 dbms/src/Storages/DeltaMerge/DeltaTree.h      |   4 +
 .../ReadThread/UnorderedInputStream.h         |   6 +
 dbms/src/Storages/DeltaMerge/Segment.cpp      |   3 +-
 dbms/src/Storages/DeltaMerge/Segment.h        |   7 +
 .../tests/gtest_dm_delta_merge_store.cpp      | 180 +++++++++++++++
 .../gtest_dm_delta_merge_store_test_basic.h   |   4 +
 .../DeltaMerge/tests/gtest_dm_segment.cpp     | 214 +++++++++++++++++-
 10 files changed, 433 insertions(+), 20 deletions(-)

diff --git a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
index 758e0321e55..bfaf53311ea 100644
--- a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
+++ b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
@@ -95,8 +95,8 @@ size_t DeltaValueReader::readRows(MutableColumns & output_cols, size_t offset, s
     //
     // So here, we should filter out those out-of-range rows.
 
-    auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
-    auto total_delta_rows = delta_snap->getRows();
+    const auto mem_table_rows_offset = delta_snap->getMemTableSetRowsOffset();
+    const auto total_delta_rows = delta_snap->getRows();
 
     auto persisted_files_start = std::min(offset, mem_table_rows_offset);
     auto persisted_files_end = std::min(offset + limit, mem_table_rows_offset);
diff --git a/dbms/src/Storages/DeltaMerge/DeltaIndex.h b/dbms/src/Storages/DeltaMerge/DeltaIndex.h
index 18119a6f7bb..bd7f43cf8b4 100644
--- a/dbms/src/Storages/DeltaMerge/DeltaIndex.h
+++ b/dbms/src/Storages/DeltaMerge/DeltaIndex.h
@@ -84,17 +84,17 @@ class DeltaIndex
         }
     }
 
-    DeltaIndexPtr tryCloneInner(size_t placed_deletes_limit, const Updates * updates = nullptr)
+    DeltaIndexPtr tryCloneInner(size_t rows_limit, size_t placed_deletes_limit, const Updates * updates = nullptr)
     {
         DeltaTreePtr delta_tree_copy;
         size_t placed_rows_copy = 0;
         size_t placed_deletes_copy = 0;
-        // Make sure the delta index do not place more deletes than `placed_deletes_limit`.
-        // Because delete ranges can break MVCC view.
         {
             std::scoped_lock lock(mutex);
-            // Safe to reuse the copy of the existing DeltaIndex
-            if (placed_deletes <= placed_deletes_limit)
+            // Make sure the MVCC view will not be broken by the mismatch of delta index and snapshot:
+            // - First, make sure the delta index do not place more deletes than `placed_deletes_limit`.
+            // - Second, make sure the snapshot includes all duplicated tuples in the delta index.
+            if (placed_deletes <= placed_deletes_limit && delta_tree->maxDupTupleID() < static_cast<Int64>(rows_limit))
             {
                 delta_tree_copy = delta_tree;
                 placed_rows_copy = placed_rows;
@@ -186,8 +186,9 @@ class DeltaIndex
     {
         std::scoped_lock lock(mutex);
 
-        if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes)
-            && !(maybe_advanced.placed_rows == placed_rows && maybe_advanced.placed_deletes == placed_deletes))
+        if ((maybe_advanced.placed_rows >= placed_rows && maybe_advanced.placed_deletes >= placed_deletes) // advance
+            // not excatly the same
+            && (maybe_advanced.placed_rows != placed_rows || maybe_advanced.placed_deletes != placed_deletes))
         {
             delta_tree = maybe_advanced.delta_tree;
             placed_rows = maybe_advanced.placed_rows;
@@ -197,14 +198,17 @@ class DeltaIndex
         return false;
     }
 
-    DeltaIndexPtr tryClone(size_t /*rows*/, size_t deletes) { return tryCloneInner(deletes); }
+    /**
+     * Try to get a clone of current instance.
+     * Return an empty DeltaIndex if `deletes < this->placed_deletes` because the advanced delta-index will break
+     * the MVCC view.
+     */
+    DeltaIndexPtr tryClone(size_t rows, size_t deletes) { return tryCloneInner(rows, deletes); }
 
     DeltaIndexPtr cloneWithUpdates(const Updates & updates)
     {
-        if (unlikely(updates.empty()))
-            throw Exception("Unexpected empty updates");
-
-        return tryCloneInner(updates.front().delete_ranges_offset, &updates);
+        RUNTIME_CHECK_MSG(!updates.empty(), "Unexpected empty updates");
+        return tryCloneInner(updates.front().rows_offset, updates.front().delete_ranges_offset, &updates);
     }
 };
 
diff --git a/dbms/src/Storages/DeltaMerge/DeltaPlace.h b/dbms/src/Storages/DeltaMerge/DeltaPlace.h
index 76802091d20..2df65adafab 100644
--- a/dbms/src/Storages/DeltaMerge/DeltaPlace.h
+++ b/dbms/src/Storages/DeltaMerge/DeltaPlace.h
@@ -248,7 +248,10 @@ bool placeInsert(const SkippableBlockInputStreamPtr & stable, //
             tuple_id = delta_value_space_offset + (offset + i);
 
         if (dup)
+        {
             delta_tree.addDelete(rid);
+            delta_tree.setMaxDupTupleID(tuple_id);
+        }
         delta_tree.addInsert(rid, tuple_id);
     }
 
diff --git a/dbms/src/Storages/DeltaMerge/DeltaTree.h b/dbms/src/Storages/DeltaMerge/DeltaTree.h
index a7e238aef5b..6fa2f2ada50 100644
--- a/dbms/src/Storages/DeltaMerge/DeltaTree.h
+++ b/dbms/src/Storages/DeltaMerge/DeltaTree.h
@@ -778,6 +778,7 @@ class DeltaTree
     size_t num_inserts = 0;
     size_t num_deletes = 0;
     size_t num_entries = 0;
+    Int64 max_dup_tuple_id = -1;
 
     std::unique_ptr<Allocator> allocator;
     size_t bytes = 0;
@@ -989,6 +990,8 @@ class DeltaTree
     size_t numEntries() const { return num_entries; }
     size_t numInserts() const { return num_inserts; }
     size_t numDeletes() const { return num_deletes; }
+    Int64 maxDupTupleID() const { return max_dup_tuple_id; }
+    void setMaxDupTupleID(Int64 tuple_id) { max_dup_tuple_id = std::max(tuple_id, max_dup_tuple_id); }
 
     void addDelete(UInt64 rid);
     void addInsert(UInt64 rid, UInt64 tuple_id);
@@ -1005,6 +1008,7 @@ DT_CLASS::DeltaTree(const DT_CLASS::Self & o)
     , num_inserts(o.num_inserts)
     , num_deletes(o.num_deletes)
     , num_entries(o.num_entries)
+    , max_dup_tuple_id(o.max_dup_tuple_id)
     , allocator(std::make_unique<Allocator>())
 {
     // If exception is thrown before clear copying_nodes, all nodes will be destroyed.
diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h
index 0f27d9db0ce..2a6944abe14 100644
--- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h
+++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h
@@ -26,6 +26,10 @@ extern const char pause_when_reading_from_dt_stream[];
 
 namespace DB::DM
 {
+namespace tests
+{
+class DeltaMergeStoreRWTest;
+}
 class UnorderedInputStream : public IProfilingBlockInputStream
 {
     static constexpr auto NAME = "UnorderedInputStream";
@@ -141,5 +145,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream
     int64_t ref_no;
     size_t total_rows = 0;
     bool task_pool_added;
+
+    friend tests::DeltaMergeStoreRWTest;
 };
 } // namespace DB::DM
diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp
index 8ac746c02b3..bdfe045d409 100644
--- a/dbms/src/Storages/DeltaMerge/Segment.cpp
+++ b/dbms/src/Storages/DeltaMerge/Segment.cpp
@@ -1998,7 +1998,8 @@ std::pair<DeltaIndexPtr, bool> Segment::ensurePlace(const DMContext & dm_context
                                                     UInt64 max_version) const
 {
     auto delta_snap = delta_reader->getDeltaSnap();
-    // Clone a new delta index.
+    // Try to clone from the sahred delta index, if it fails to reuse the shared delta index,
+    // it will return an empty delta index and we should place it in the following branch.
     auto my_delta_index = delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
     auto my_delta_tree = my_delta_index->getDeltaTree();
 
diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h
index 7470703c4f7..db6f1534fed 100644
--- a/dbms/src/Storages/DeltaMerge/Segment.h
+++ b/dbms/src/Storages/DeltaMerge/Segment.h
@@ -31,6 +31,11 @@
 
 namespace DB::DM
 {
+namespace tests
+{
+class DeltaMergeStoreRWTest;
+}
+
 class Segment;
 struct SegmentSnapshot;
 using SegmentSnapshotPtr = std::shared_ptr<SegmentSnapshot>;
@@ -616,6 +621,8 @@ class Segment
 
     const LoggerPtr parent_log; // Used when constructing new segments in split
     const LoggerPtr log;
+
+    friend tests::DeltaMergeStoreRWTest;
 };
 
 } // namespace DB::DM
diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
index 29a6545d066..512b1f3013a 100644
--- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
+++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
@@ -36,6 +36,7 @@
 #include <algorithm>
 #include <future>
 #include <iterator>
+#include <memory>
 #include <random>
 
 namespace DB
@@ -3460,6 +3461,185 @@ try
 CATCH
 
 
+void DeltaMergeStoreRWTest::dupHandleVersionAndDeltaIndexAdvancedThanSnapshot()
+{
+    auto table_column_defines = DMTestEnv::getDefaultColumns();
+    store = reload(table_column_defines);
+
+    auto create_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
+        auto block = DMTestEnv::prepareSimpleWriteBlock(beg, end, false, ts);
+        block.checkNumberOfRows();
+        return block;
+    };
+
+    auto write_block = [&](UInt64 beg, UInt64 end, UInt64 ts) {
+        auto block = create_block(beg, end, ts);
+        store->write(*db_context, db_context->getSettingsRef(), block);
+    };
+
+    auto create_stream = [&]() {
+        return store->read(
+            *db_context,
+            db_context->getSettingsRef(),
+            store->getTableColumns(),
+            {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
+            /* num_streams= */ 1,
+            /* start_ts= */ std::numeric_limits<UInt64>::max(),
+            EMPTY_FILTER,
+            TRACING_NAME,
+            /* keep_order= */ false,
+            /* is_fast_scan= */ false,
+            DEFAULT_BLOCK_SIZE)[0];
+    };
+
+    auto count_rows = [](BlockInputStreamPtr stream) {
+        std::size_t count = 0;
+        stream->readPrefix();
+        for (;;)
+        {
+            auto block = stream->read();
+            if (!block)
+            {
+                break;
+            }
+            count += block.rows();
+        }
+        stream->readSuffix();
+        return count;
+    };
+
+    auto get_seg_read_task = [&](BlockInputStreamPtr stream) {
+        auto unordered_stream = std::dynamic_pointer_cast<UnorderedInputStream>(stream);
+        const auto & tasks = unordered_stream->task_pool->getTasks();
+        RUNTIME_CHECK(tasks.size() == 1, tasks.size());
+        return tasks.begin()->second;
+    };
+
+    auto clone_delta_index = [](SegmentReadTaskPtr seg_read_task) {
+        auto delta_snap = seg_read_task->read_snapshot->delta;
+        return delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes());
+    };
+
+    auto check_delta_index
+        = [](DeltaIndexPtr delta_index, size_t expect_rows, size_t expect_deletes, Int64 expect_max_dup_tuple_id) {
+              auto [placed_rows, placed_deletes] = delta_index->getPlacedStatus();
+              ASSERT_EQ(placed_rows, expect_rows);
+              ASSERT_EQ(placed_deletes, expect_deletes);
+              ASSERT_EQ(delta_index->getDeltaTree()->maxDupTupleID(), expect_max_dup_tuple_id);
+          };
+
+    auto ensure_place = [&](SegmentReadTaskPtr seg_read_task) {
+        auto pk_ver_col_defs = std::make_shared<ColumnDefines>(
+            ColumnDefines{getExtraHandleColumnDefine(dm_context->is_common_handle), getVersionColumnDefine()});
+        auto delta_reader = std::make_shared<DeltaValueReader>(
+            *dm_context,
+            seg_read_task->read_snapshot->delta,
+            pk_ver_col_defs,
+            RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));
+        return seg_read_task->segment->ensurePlace(
+            *dm_context,
+            seg_read_task->read_snapshot->stable,
+            delta_reader,
+            {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
+            std::numeric_limits<UInt64>::max());
+    };
+
+    // Write [0, 128) with ts 1 for initializing stable.
+    write_block(0, 128, 1);
+    store->mergeDeltaAll(*db_context);
+
+    // Write [50, 60) with ts 2 for initializing delta.
+    write_block(50, 60, 2);
+
+    // Scan table normally.
+    {
+        auto stream = create_stream();
+        auto count = count_rows(stream);
+        ASSERT_EQ(count, 128);
+    }
+
+    // The snapshot does not include all the duplicated tuples of the delta index.
+    // This snapshot should rebuild delta index for itself.
+    // https://github.com/pingcap/tiflash/issues/8845
+    {
+        // Create snapshot but not place index
+        auto stream1 = create_stream();
+
+        // !!!Duplicated!!!: Write [50, 60) with ts 2
+        write_block(50, 60, 2);
+
+        // Place index with newest data.
+        auto stream2 = create_stream();
+        auto count2 = count_rows(stream2);
+        ASSERT_EQ(count2, 128);
+
+        // stream1 should not resue delta index of stream2
+
+        // Check cloning delta index
+        {
+            auto seg_read_task = get_seg_read_task(stream1);
+
+            // Shared delta index has been placed to the newest by `count_rows(stream2)`.
+            auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
+            check_delta_index(shared_delta_index, 20, 0, 19);
+
+            // Cannot clone delta index because it contains duplicated records in the gap of snapshot and the shared delta index.
+            auto cloned_delta_index = clone_delta_index(seg_read_task);
+            check_delta_index(cloned_delta_index, 0, 0, -1);
+        }
+        // Check scanning result of stream1
+        auto count1 = count_rows(stream1);
+        ASSERT_EQ(count1, count2);
+    }
+
+    // Make sure shared delta index can be reused by new snapshot
+    {
+        auto stream = create_stream();
+        auto seg_read_task = get_seg_read_task(stream);
+        auto cloned_delta_index = clone_delta_index(seg_read_task);
+        check_delta_index(cloned_delta_index, 20, 0, 19);
+    }
+
+    // The snapshot includes all the duplicated tuples of the delta index.
+    // Delta index can be reused safely.
+    {
+        write_block(70, 80, 2);
+        auto stream = create_stream();
+        auto seg_read_task = get_seg_read_task(stream);
+        auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
+        check_delta_index(shared_delta_index, 20, 0, 19);
+        auto cloned_delta_index = clone_delta_index(seg_read_task);
+        check_delta_index(cloned_delta_index, 20, 0, 19);
+        auto [placed_delta_index, fully_indexed] = ensure_place(seg_read_task);
+        ASSERT_TRUE(fully_indexed);
+        check_delta_index(placed_delta_index, 30, 0, 19);
+        auto count = count_rows(stream);
+        ASSERT_EQ(count, 128);
+    }
+
+    {
+        write_block(75, 85, 2);
+        auto stream = create_stream();
+        auto seg_read_task = get_seg_read_task(stream);
+        auto shared_delta_index = seg_read_task->read_snapshot->delta->getSharedDeltaIndex();
+        check_delta_index(shared_delta_index, 30, 0, 19);
+        auto cloned_delta_index = clone_delta_index(seg_read_task);
+        check_delta_index(cloned_delta_index, 30, 0, 19);
+        auto [placed_delta_index, fully_indexed] = ensure_place(seg_read_task);
+        ASSERT_TRUE(fully_indexed);
+        check_delta_index(placed_delta_index, 40, 0, 34);
+        auto count = count_rows(stream);
+        ASSERT_EQ(count, 128);
+    }
+}
+
+TEST_P(DeltaMergeStoreRWTest, DupHandleVersionAndDeltaIndexAdvancedThanSnapshot)
+try
+{
+    dupHandleVersionAndDeltaIndexAdvancedThanSnapshot();
+}
+CATCH
+
 } // namespace tests
 } // namespace DM
 } // namespace DB
diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h
index 35131b8c1fe..62c0c217fa6 100644
--- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h
+++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h
@@ -124,6 +124,7 @@ class DeltaMergeStoreRWTest
     {
         TiFlashStorageTestBasic::SetUp();
         store = reload();
+        dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
     }
 
     DeltaMergeStorePtr
@@ -182,8 +183,11 @@ class DeltaMergeStoreRWTest
 protected:
     TestMode mode;
     DeltaMergeStorePtr store;
+    DMContextPtr dm_context;
 
     constexpr static const char * TRACING_NAME = "DeltaMergeStoreRWTest";
+
+    void dupHandleVersionAndDeltaIndexAdvancedThanSnapshot();
 };
 } // namespace tests
 } // namespace DM
diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
index a33544acff1..e2c8c759543 100644
--- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
+++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
@@ -18,6 +18,8 @@
 #include <Storages/DeltaMerge/DMContext.h>
 #include <Storages/DeltaMerge/DeltaMergeStore.h>
 #include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
+#include <Storages/DeltaMerge/Range.h>
+#include <Storages/DeltaMerge/RowKeyRange.h>
 #include <Storages/DeltaMerge/Segment.h>
 #include <Storages/DeltaMerge/StoragePool.h>
 #include <Storages/DeltaMerge/WriteBatches.h>
@@ -44,9 +46,7 @@ extern const Metric DT_SnapshotOfDeltaMerge;
 extern const Metric DT_SnapshotOfPlaceIndex;
 } // namespace CurrentMetrics
 
-namespace DB
-{
-namespace DM
+namespace DB::DM
 {
 extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, //
                                     const ColumnDefinesPtr & schema_snap,
@@ -346,6 +346,211 @@ try
 }
 CATCH
 
+TEST_F(SegmentTest, ReadWithMoreAdvacedDeltaIndex2)
+try
+{
+    auto write_rows = [&](size_t offset, size_t rows) {
+        Block block = DMTestEnv::prepareSimpleWriteBlock(offset, offset + rows, false);
+        // write to segment
+        segment->write(dmContext(), block);
+    };
+
+    // Thread A
+    write_rows(0, 100);
+    ASSERT_INPUTSTREAM_NROWS(
+        segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}),
+        100);
+    auto snap = segment->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead);
+
+    {
+        // check segment
+        segment->check(dmContext(), "test");
+    }
+
+    // Thread B
+    write_rows(0, 100);
+    ASSERT_INPUTSTREAM_NROWS(
+        segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}),
+        100);
+
+    // Thread A
+    {
+        auto in = segment->getInputStreamModeNormal(
+            dmContext(),
+            *tableColumns(),
+            snap,
+            {RowKeyRange::newAll(false, 1)},
+            {},
+            std::numeric_limits<UInt64>::max(),
+            DEFAULT_BLOCK_SIZE);
+        ASSERT_INPUTSTREAM_NROWS(in, 100);
+    }
+}
+CATCH
+
+TEST_F(SegmentTest, ReadWithMoreAdvacedDeltaIndexWithDeleteRange01)
+try
+{
+    auto write_rows = [&](size_t offset, size_t rows) {
+        Block block = DMTestEnv::prepareSimpleWriteBlock(offset, offset + rows, false);
+        // write to segment
+        segment->write(dmContext(), block);
+    };
+
+    // Thread A write [0, 100) && [100, 200)
+    write_rows(0, 100);
+    write_rows(100, 100);
+    ASSERT_INPUTSTREAM_NROWS(
+        segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}),
+        200);
+    // check segment
+    segment->check(dmContext(), "test");
+    auto snap_a = segment->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead);
+
+    // Thread B delete range [0, 50)
+    RowKeyRange range = RowKeyRange::fromHandleRange(HandleRange(0, 50));
+    segment->write(dmContext(), range);
+    LOG_INFO(Logger::get(), "Thread B read");
+    ASSERT_INPUTSTREAM_NROWS(
+        segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}),
+        150);
+
+    {
+        // tryClone will return an empty delta-index because `shared_delta_index.placed_deletes != delta->getDeletes()`
+        auto my_delta_index
+            = snap_a->delta->getSharedDeltaIndex()->tryClone(snap_a->delta->getRows(), snap_a->delta->getDeletes());
+        auto [my_placed_rows, my_placed_deletes] = my_delta_index->getPlacedStatus();
+        ASSERT_EQ(my_placed_rows, 0);
+        ASSERT_EQ(my_placed_deletes, 0);
+    }
+
+    // Thread A read [0, 200)
+    {
+        LOG_INFO(Logger::get(), "Thread A read with snap_a");
+        auto in = segment->getInputStreamModeNormal(
+            dmContext(),
+            *tableColumns(),
+            snap_a,
+            {RowKeyRange::newAll(false, 1)},
+            {},
+            std::numeric_limits<UInt64>::max(),
+            DEFAULT_BLOCK_SIZE);
+        ASSERT_INPUTSTREAM_NROWS(in, 200);
+    }
+}
+CATCH
+
+
+TEST_F(SegmentTest, ReadWithMoreAdvacedDeltaIndexWithDeleteRange02)
+try
+{
+    auto write_rows = [&](size_t offset, size_t rows) {
+        Block block = DMTestEnv::prepareSimpleWriteBlock(offset, offset + rows, false);
+        // write to segment
+        segment->write(dmContext(), block);
+    };
+
+    // Thread A write [0, 100)
+    write_rows(0, 100);
+    ASSERT_INPUTSTREAM_NROWS(
+        segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}),
+        100);
+    // check segment
+    segment->check(dmContext(), "test");
+    auto snap_a = segment->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead);
+
+    // Thread B write [100, 200) && delete range [0, 50)
+    write_rows(100, 100);
+    RowKeyRange range = RowKeyRange::fromHandleRange(HandleRange(0, 50));
+    segment->write(dmContext(), range);
+    LOG_INFO(Logger::get(), "Thread B read");
+    ASSERT_INPUTSTREAM_NROWS(
+        segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}),
+        150);
+
+    // Thread A
+    {
+        LOG_INFO(Logger::get(), "Thread A read with snap_a");
+        auto in = segment->getInputStreamModeNormal(
+            dmContext(),
+            *tableColumns(),
+            snap_a,
+            {RowKeyRange::newAll(false, 1)},
+            {},
+            std::numeric_limits<UInt64>::max(),
+            DEFAULT_BLOCK_SIZE);
+        ASSERT_INPUTSTREAM_NROWS(in, 100);
+    }
+}
+CATCH
+
+TEST_F(SegmentTest, ReadWithMoreAdvacedDeltaIndexComplicated)
+try
+{
+    // Test the case that reading rows with an advance DeltaIndex
+    size_t offset = 0;
+    auto write_rows = [&](size_t rows, bool flush) {
+        Block block = DMTestEnv::prepareSimpleWriteBlock(offset, offset + rows, false);
+        offset += rows;
+        // write to segment
+        segment->write(dmContext(), block, flush);
+    };
+
+    // Thread C
+    write_rows(100, false);
+    write_rows(100, false);
+    ASSERT_INPUTSTREAM_NROWS(
+        segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}),
+        200);
+    auto snap_c = segment->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead);
+    segment->check(dmContext(), "test");
+
+    // Thread A write 100 rows to persisted_files and 100 rows to mem_tables
+    write_rows(100, true);
+    write_rows(100, false);
+    ASSERT_INPUTSTREAM_NROWS(
+        segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}),
+        400);
+    auto snap_a = segment->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead);
+    segment->check(dmContext(), "test");
+
+    // Thread B write 100 rows to mem_tables
+    write_rows(100, false);
+    LOG_INFO(Logger::get(), "Thread B read");
+    ASSERT_INPUTSTREAM_NROWS(
+        segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}),
+        500);
+    segment->check(dmContext(), "test");
+
+    // Thread A
+    {
+        LOG_INFO(Logger::get(), "Thread A read with snap_a");
+        auto in = segment->getInputStreamModeNormal(
+            dmContext(),
+            *tableColumns(),
+            snap_a,
+            {RowKeyRange::newAll(false, 1)},
+            {},
+            std::numeric_limits<UInt64>::max(),
+            DEFAULT_BLOCK_SIZE);
+        ASSERT_INPUTSTREAM_NROWS(in, 400);
+    }
+    // Thread C
+    {
+        LOG_INFO(Logger::get(), "Thread C read with snap_c");
+        auto in = segment->getInputStreamModeNormal(
+            dmContext(),
+            *tableColumns(),
+            snap_c,
+            {RowKeyRange::newAll(false, 1)},
+            {},
+            std::numeric_limits<UInt64>::max(),
+            DEFAULT_BLOCK_SIZE);
+        ASSERT_INPUTSTREAM_NROWS(in, 200);
+    }
+}
+CATCH
+
 class SegmentDeletionRelevantPlaceTest
     : public SegmentTest
     , public testing::WithParamInterface<bool>
@@ -1644,5 +1849,4 @@ try
 CATCH
 
 } // namespace tests
-} // namespace DM
-} // namespace DB
+} // namespace DB::DM
\ No newline at end of file