Skip to content

Commit

Permalink
Fix bug for remote reading using "double lock" strategy (#1926)
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored May 19, 2021
1 parent 0f924c0 commit 11ca3d7
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 80 deletions.
39 changes: 27 additions & 12 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace DB
{
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::fail_point_wait_channels;

#define APPLY_FOR_FAILPOINTS(M) \
#define APPLY_FOR_FAILPOINTS_ONCE(M) \
M(exception_between_drop_meta_and_data) \
M(exception_between_alter_data_and_meta) \
M(exception_drop_table_during_remove_meta) \
Expand Down Expand Up @@ -41,19 +41,26 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_set_sst_to_dtfile_block_size) \
M(force_set_sst_decode_rand)

#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
M(pause_before_apply_raft_cmd) \
M(pause_before_apply_raft_snapshot) \
#define APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
M(hang_in_execution) \
M(pause_before_dt_background_delta_merge) \
M(pause_until_dt_background_delta_merge) \
M(pause_before_apply_raft_cmd) \
M(pause_before_apply_raft_snapshot) \
M(pause_until_apply_raft_snapshot)

#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \
M(pause_when_reading_from_dt_stream) \
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store)

namespace FailPoints
{
#define M(NAME) extern const char NAME[] = #NAME "";
APPLY_FOR_FAILPOINTS(M)
APPLY_FOR_FAILPOINTS_ONCE(M)
APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M)
APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M)
#undef M
} // namespace FailPoints
Expand Down Expand Up @@ -92,20 +99,28 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name)
return; \
}

APPLY_FOR_FAILPOINTS(M)
APPLY_FOR_FAILPOINTS_ONCE(M)
#undef M

#define M(NAME) \
#define SUB_M(NAME, flags) \
if (fail_point_name == FailPoints::NAME) \
{ \
/* FIU_ONETIME -- Only fail once; the point of failure will be automatically disabled afterwards.*/ \
fiu_enable(FailPoints::NAME, 1, nullptr, FIU_ONETIME); \
fiu_enable(FailPoints::NAME, 1, nullptr, flags); \
fail_point_wait_channels.try_emplace(FailPoints::NAME, std::make_shared<FailPointChannel>()); \
return; \
}

#define M(NAME) SUB_M(NAME, FIU_ONETIME)

APPLY_FOR_FAILPOINTS_ONCE_WITH_CHANNEL(M)
#undef M

#define M(NAME) SUB_M(NAME, 0)
APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M)
#undef M
#undef SUB_M

throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR);
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ DBGInvoker::DBGInvoker()

regSchemalessFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService);
regSchemalessFunc("refresh_schemas", dbgFuncRefreshSchemas);
regSchemalessFunc("gc_schemas", dbgFuncGcSchemas);
regSchemalessFunc("reset_schemas", dbgFuncResetSchemas);
regSchemalessFunc("is_tombstone", dbgFuncIsTombstone);

Expand Down
18 changes: 18 additions & 0 deletions dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer
output(ss.str());
}

// Trigger gc on all databases / tables.
// Usage:
// ./storage-client.sh "DBGInvoke gc_schemas([gc_safe_point])"
void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
auto & service = context.getSchemaSyncService();
Timestamp gc_safe_point = 0;
if (args.size() == 0)
gc_safe_point = PDClientHelper::getGCSafePointWithRetry(context.getTMTContext().getPDClient());
else
gc_safe_point = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[0]).value);
service->gc(gc_safe_point);

std::stringstream ss;
ss << "schemas gc done";
output(ss.str());
}

void dbgFuncResetSchemas(Context & context, const ASTs &, DBGInvoker::Printer output)
{
TMTContext & tmt = context.getTMTContext();
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Debug/dbgFuncSchema.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ void dbgFuncEnableSchemaSyncService(Context & context, const ASTs & args, DBGInv
// ./storage-client.sh "DBGInvoke refresh_schemas()"
void dbgFuncRefreshSchemas(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Trigger gc on all databases / tables.
// Usage:
// ./storage-client.sh "DBGInvoke gc_schemas([gc_safe_point])"
void dbgFuncGcSchemas(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Reset schemas.
// Usage:
// ./storages-client.sh "DBGInvoke reset_schemas()"
Expand Down
124 changes: 72 additions & 52 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,65 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(Context & context_, const std
}
}

static std::tuple<std::optional<::tipb::DAGRequest>, std::optional<DAGSchema>> //
buildRemoteTS(const std::unordered_map<RegionID, const RegionInfo &> & region_retry, const DAGQueryBlock & query_block,
const tipb::TableScan & ts, const String & handle_column_name, const TableStructureLockHolder &, const ManageableStoragePtr & storage,
Context & context, Poco::Logger * log)
{
if (region_retry.empty())
return std::make_tuple(std::nullopt, std::nullopt);

for (auto it : region_retry)
{
context.getQueryContext().getDAGContext()->retry_regions.push_back(it.second);
}
LOG_DEBUG(log, ({
std::stringstream ss;
ss << "Start to retry " << region_retry.size() << " regions (";
for (auto & r : region_retry)
ss << r.first << ",";
ss << ")";
ss.str();
}));

DAGSchema schema;
::tipb::DAGRequest dag_req;

{
const auto & table_info = storage->getTableInfo();
tipb::Executor * ts_exec = dag_req.add_executors();
ts_exec->set_tp(tipb::ExecType::TypeTableScan);
ts_exec->set_executor_id(query_block.source->executor_id());
*(ts_exec->mutable_tbl_scan()) = ts;

for (int i = 0; i < ts.columns().size(); ++i)
{
const auto & col = ts.columns(i);
auto col_id = col.column_id();

if (col_id == DB::TiDBPkColumnID)
{
ColumnInfo ci;
ci.tp = TiDB::TypeLongLong;
ci.setPriKeyFlag();
ci.setNotNullFlag();
schema.emplace_back(std::make_pair(handle_column_name, std::move(ci)));
}
else
{
auto & col_info = table_info.getColumnInfo(col_id);
schema.emplace_back(std::make_pair(col_info.name, col_info));
}
dag_req.add_output_offsets(i);
}
dag_req.set_encode_type(tipb::EncodeType::TypeCHBlock);
}
/// do not collect execution summaries because in this case because the execution summaries
/// will be collected by CoprocessorBlockInputStream
dag_req.set_collect_execution_summaries(false);
return std::make_tuple(dag_req, schema);
}

// the flow is the same as executeFetchcolumns
void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
{
Expand Down Expand Up @@ -238,9 +297,13 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline &
bool need_local_read = !query_info.mvcc_query_info->regions_query_info.empty();
if (need_local_read)
{
readFromLocalStorage(table_id, required_columns, query_info, max_block_size, learner_read_snapshot, pipeline, region_retry);
readFromLocalStorage(
table_structure_lock, table_id, required_columns, query_info, max_block_size, learner_read_snapshot, pipeline, region_retry);
}

// Should build these vars under protect of `table_structure_lock`.
auto [dag_req, schema] = buildRemoteTS(region_retry, query_block, ts, handle_column_name, table_structure_lock, storage, context, log);
auto null_stream_if_empty = std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns));
// The DeltaTree engine ensures that once input streams are created, the caller can get a consistent result
// from those streams even if DDL operations are applied. Release the alter lock so that reading does not
// block DDL operations, keep the drop lock so that the storage not to be dropped during reading.
Expand All @@ -249,68 +312,24 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline &
// For those regions which are not presented in this tiflash node, we will try to fetch streams by key ranges from other tiflash nodes, only happens in batch cop mode.
if (!region_retry.empty())
{
for (auto it : region_retry)
{
context.getQueryContext().getDAGContext()->retry_regions.push_back(it.second);
}
LOG_DEBUG(log, ({
std::stringstream ss;
ss << "Start to retry " << region_retry.size() << " regions (";
for (auto & r : region_retry)
ss << r.first << ",";
ss << ")";
ss.str();
}));

DAGSchema schema;
::tipb::DAGRequest dag_req;

{
const auto & table_info = storage->getTableInfo();
tipb::Executor * ts_exec = dag_req.add_executors();
ts_exec->set_tp(tipb::ExecType::TypeTableScan);
ts_exec->set_executor_id(query_block.source->executor_id());
*(ts_exec->mutable_tbl_scan()) = ts;

for (int i = 0; i < ts.columns().size(); ++i)
{
const auto & col = ts.columns(i);
auto col_id = col.column_id();

if (col_id == DB::TiDBPkColumnID)
{
ColumnInfo ci;
ci.tp = TiDB::TypeLongLong;
ci.setPriKeyFlag();
ci.setNotNullFlag();
schema.emplace_back(std::make_pair(handle_column_name, std::move(ci)));
}
else
{
auto & col_info = table_info.getColumnInfo(col_id);
schema.emplace_back(std::make_pair(col_info.name, col_info));
}
dag_req.add_output_offsets(i);
}
dag_req.set_encode_type(tipb::EncodeType::TypeCHBlock);
}
/// do not collect execution summaries because in this case because the execution summaries
/// will be collected by CoprocessorBlockInputStream
dag_req.set_collect_execution_summaries(false);

#ifndef NDEBUG
if (unlikely(!dag_req.has_value() || !schema.has_value()))
throw TiFlashException(
"Try to read from remote but can not build DAG request. Should not happen!", Errors::Coprocessor::Internal);
#endif
std::vector<pingcap::coprocessor::KeyRange> ranges;
for (auto & info : region_retry)
{
for (auto & range : info.second.key_ranges)
ranges.emplace_back(*range.first, *range.second);
}
sort(ranges.begin(), ranges.end());
executeRemoteQueryImpl(pipeline, ranges, dag_req, schema);
executeRemoteQueryImpl(pipeline, ranges, *dag_req, *schema);
}

if (pipeline.streams.empty())
{
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(storage->getSampleBlockForColumns(required_columns)));
pipeline.streams.emplace_back(null_stream_if_empty);
}

pipeline.transform([&](auto & stream) { stream->addTableLock(table_drop_lock); });
Expand Down Expand Up @@ -346,6 +365,7 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, Pipeline &
}

void DAGQueryBlockInterpreter::readFromLocalStorage( //
const TableStructureLockHolder &, //
const TableID table_id, const Names & required_columns, SelectQueryInfo & query_info, const size_t max_block_size,
const LearnerReadSnapshot & learner_read_snapshot, //
Pipeline & pipeline, std::unordered_map<RegionID, const RegionInfo &> & region_retry)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class DAGQueryBlockInterpreter
TiDB::TiDBCollators & collators, AggregateDescriptions & aggregate_descriptions);
void executeProject(Pipeline & pipeline, NamesWithAliases & project_cols);

void readFromLocalStorage( //
void readFromLocalStorage( //
const TableStructureLockHolder &, //
const TableID table_id, const Names & required_columns, SelectQueryInfo & query_info, const size_t max_block_size,
const LearnerReadSnapshot & learner_read_snapshot, //
Pipeline & pipeline, std::unordered_map<RegionID, const RegionInfo &> & region_retry);
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
#pragma once

#include <Common/FailPoint.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>

namespace DB
{
namespace FailPoints
{
extern const char pause_when_reading_from_dt_stream[];
} // namespace FailPoints

namespace DM
{

Expand Down Expand Up @@ -80,6 +86,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
}
LOG_TRACE(log, "Start to read segment [" + DB::toString(cur_segment->segmentId()) + "]");
}
FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream);

Block res = cur_stream->read(res_filter, return_filter);

Expand Down Expand Up @@ -122,4 +129,4 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
};

} // namespace DM
} // namespace DB
} // namespace DB
Loading

0 comments on commit 11ca3d7

Please sign in to comment.