Skip to content

Commit

Permalink
Bugfix: should also do seek for sub stream in DMFileReader::read (#597)…
Browse files Browse the repository at this point in the history
… (#598)
  • Loading branch information
sre-bot authored Apr 1, 2020
1 parent 7e32dae commit 443f4d1
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 13 deletions.
9 changes: 7 additions & 2 deletions dbms/src/Interpreters/InterpreterManageQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ BlockIO InterpreterManageQuery::execute()
{
case ManageOperation::Enum::Flush:
{
manageable_storage->flushDelta();
manageable_storage->flushCache(context);
return {};
}
case ManageOperation::Enum::Status:
Expand All @@ -41,11 +41,16 @@ BlockIO InterpreterManageQuery::execute()
manageable_storage->checkStatus(context);
return {};
}
case ManageOperation::Enum ::DeleteRows:
case ManageOperation::Enum::DeleteRows:
{
manageable_storage->deleteRows(context, ast.rows);
return {};
}
case ManageOperation::Enum::MergeDelta:
{
manageable_storage->mergeDelta(context);
return {};
}
}
return {};
}
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Parsers/ASTManageQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ namespace ManageOperation
Status,
Check,
DeleteRows,
MergeDelta,
};

inline const char * toString(UInt64 op)
{
static const char * data[] = {"Flush", "Status", "Check", "Delete Rows"};
return op < 4 ? data[op] : "Unknown operation";
static const char * data[] = {"Flush", "Status", "Check", "Delete Rows", "Merge Delta"};
return op < 5 ? data[op] : "Unknown operation";
}
}

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Parsers/ParserManageQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bool ParserManageQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_status("STATUS");
ParserKeyword s_check("CHECK");
ParserKeyword s_delete_rows("DELETE ROWS");
ParserKeyword s_merge_delta("MERGE DELTA");

ParserToken s_dot(TokenType::Dot);
ParserIdentifier name_p;
Expand Down Expand Up @@ -52,6 +53,8 @@ bool ParserManageQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
operation = ManageOperation::Enum::Status;
else if (s_check.ignore(pos, expected))
operation = ManageOperation::Enum::Check;
else if (s_merge_delta.ignore(pos, expected))
operation = ManageOperation::Enum::MergeDelta;
else if (s_delete_rows.ignore(pos, expected))
{
operation = ManageOperation::Enum::DeleteRows;
Expand Down
20 changes: 20 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,26 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const HandleRa
}
}

void DeltaMergeStore::mergeDeltaAll(const Context & context)
{
auto dm_context = newDMContext(context, context.getSettingsRef());

std::vector<SegmentPtr> all_segments;
{
std::shared_lock lock(read_write_mutex);
for (auto & [range_end, segment] : segments)
{
(void)range_end;
all_segments.push_back(segment);
}
}

for (auto & segment : all_segments)
{
segmentMergeDelta(*dm_context, segment, true);
}
}

void DeltaMergeStore::compact(const Context & db_context, const HandleRange & range)
{
auto dm_context = newDMContext(db_context, db_context.getSettingsRef());
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ class DeltaMergeStore : private boost::noncopyable

void flushCache(const DMContextPtr & dm_context, const HandleRange & range);

/// Do merge delta for all segments. Only used for debug.
void mergeDeltaAll(const Context & context);

/// Compact fregment packs into bigger one.
void compact(const Context & context, const HandleRange & range = HandleRange::newAll());

Expand Down
19 changes: 11 additions & 8 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,8 @@ Block DMFileReader::read()
const String stream_name = DMFile::getFileNameBase(cd.id);
if (auto iter = column_streams.find(stream_name); iter != column_streams.end())
{
auto & stream = iter->second;
if (shouldSeek(start_pack_id) || skip_packs_by_column[i] > 0)
{
auto & mark = (*stream->marks)[start_pack_id];
stream->buf->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
}
auto & top_stream = iter->second;
bool should_seek = shouldSeek(start_pack_id) || skip_packs_by_column[i] > 0;

auto data_type = dmfile->getColumnStat(cd.id).type;
auto column = data_type->createColumn();
Expand All @@ -257,13 +253,20 @@ Block DMFileReader::read()
[&](const IDataType::SubstreamPath & substream_path) {
String substream_name = DMFile::getFileNameBase(cd.id, substream_path);
auto & sub_stream = column_streams.at(substream_name);

if (should_seek)
{
auto & mark = (*sub_stream->marks)[start_pack_id];
sub_stream->buf->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
}

return sub_stream->buf.get();
},
read_rows,
stream->avg_size_hint,
top_stream->avg_size_hint,
true,
{});
IDataType::updateAvgValueSizeHint(*column, stream->avg_size_hint);
IDataType::updateAvgValueSizeHint(*column, top_stream->avg_size_hint);

// Cast column's data from DataType in disk to what we need now
auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd);
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class IManageableStorage : public IStorage
explicit IManageableStorage(const ColumnsDescription & columns_) : IStorage(columns_) {}
~IManageableStorage() override = default;

virtual void flushDelta() {}
virtual void flushCache(const Context & /*context*/) {}

virtual void flushCache(const Context & /*context*/, const DB::HandleRange<HandleID> & /* range_to_flush */) {}

Expand All @@ -45,6 +45,8 @@ class IManageableStorage : public IStorage

virtual void deleteRows(const Context &, size_t /*rows*/) { throw Exception("Unsupported"); }

virtual void mergeDelta(const Context &) { throw Exception("Unsupported"); }

virtual BlockInputStreamPtr listSegments(const Context &) { throw Exception("Unsupported"); }

virtual ::TiDB::StorageEngine engineType() const = 0;
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,11 @@ void StorageDeltaMerge::flushCache(const Context & context, const DM::HandleRang
store->flushCache(context, range_to_flush);
}

void StorageDeltaMerge::mergeDelta(const Context & context)
{
store->mergeDeltaAll(context);
}

void StorageDeltaMerge::deleteRange(const DM::HandleRange & range_to_delete, const Settings & settings)
{
return store->deleteRange(global_context, settings, range_to_delete);
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,20 @@ class StorageDeltaMerge : public ext::shared_ptr_helper<StorageDeltaMerge>, publ
/// Write from raft layer.
void write(Block && block, const Settings & settings);

void flushCache(const Context & context) override
{
flushCache(context, DM::HandleRange::newAll());
}

void flushCache(const Context & context, const DB::HandleRange<HandleID> & range_to_flush) override
{
flushCache(context, toDMHandleRange(range_to_flush));
}

void flushCache(const Context & context, const DM::HandleRange & range_to_flush);

void mergeDelta(const Context & context) override;

void deleteRange(const DM::HandleRange & range_to_delete, const Settings & settings);

void rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) override;
Expand Down
28 changes: 28 additions & 0 deletions tests/mutable-test/bugs/substream_seek.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
>> set dt_segment_stable_pack_rows = 5;

>> drop table if exists A;
>> create table A (a int, b Nullable(int)) engine = DeltaMerge((a));

>> insert into A (a, b) values (1, NULL), (2, NULL), (3, NULL), (4, NULL), (5, NULL), (6, 6), (7, 7), (8, 8), (9, 9), (10, 10);

>> manage table A flush;

>> manage table A merge delta;

>> select * from A where a >= 6;
# Before fix:
# ┌──a─┬──b─┐
# │ 6 │ \N │
# │ 7 │ \N │
# │ 8 │ \N │
# │ 9 │ \N │
# │ 10 │ \N │
# └────┴────┘

┌──a─┬──b─┐
│ 6 │ 6 │
│ 7 │ 7 │
│ 8 │ 8 │
│ 9 │ 9 │
│ 10 │ 10 │
└────┴────┘

0 comments on commit 443f4d1

Please sign in to comment.