Skip to content

Commit

Permalink
Clean remaining non_joined code and tests (#7317)
Browse files Browse the repository at this point in the history
close #7308
  • Loading branch information
yibin87 authored Apr 19, 2023
1 parent fa4bd14 commit 4ee1412
Show file tree
Hide file tree
Showing 21 changed files with 171 additions and 221 deletions.
24 changes: 12 additions & 12 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace DB
HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const JoinPtr & join_,
size_t non_joined_stream_index,
size_t scan_hash_map_after_probe_stream_index,
const String & req_id,
UInt64 max_block_size_)
: log(Logger::get(req_id))
Expand All @@ -34,7 +34,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(
RUNTIME_CHECK_MSG(original_join != nullptr, "join ptr should not be null.");
RUNTIME_CHECK_MSG(original_join->getProbeConcurrency() > 0, "Join probe concurrency must be greater than 0");

probe_exec.set(HashJoinProbeExec::build(original_join, input, non_joined_stream_index, max_block_size_));
probe_exec.set(HashJoinProbeExec::build(original_join, input, scan_hash_map_after_probe_stream_index, max_block_size_));
probe_exec->setCancellationHook([&]() { return isCancelledOrThrowIfKilled(); });

ProbeProcessInfo header_probe_process_info(0);
Expand All @@ -44,7 +44,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(

void HashJoinProbeBlockInputStream::readSuffixImpl()
{
LOG_DEBUG(log, "Finish join probe, total output rows {}, joined rows {}, non joined rows {}", joined_rows + non_joined_rows, joined_rows, non_joined_rows);
LOG_DEBUG(log, "Finish join probe, total output rows {}, joined rows {}, scan hash map rows {}", joined_rows + scan_hash_map_rows, joined_rows, scan_hash_map_rows);
}

Block HashJoinProbeBlockInputStream::getHeader() const
Expand Down Expand Up @@ -75,9 +75,9 @@ void HashJoinProbeBlockInputStream::onCurrentProbeDone()
switchStatus(probe_exec->onProbeFinish() ? ProbeStatus::FINISHED : ProbeStatus::WAIT_PROBE_FINISH);
}

void HashJoinProbeBlockInputStream::onCurrentReadNonJoinedDataDone()
void HashJoinProbeBlockInputStream::onCurrentScanHashMapDone()
{
switchStatus(probe_exec->onNonJoinedFinish() ? ProbeStatus::FINISHED : ProbeStatus::GET_RESTORE_JOIN);
switchStatus(probe_exec->onScanHashMapAfterProbeFinish() ? ProbeStatus::FINISHED : ProbeStatus::GET_RESTORE_JOIN);
}

void HashJoinProbeBlockInputStream::tryGetRestoreJoin()
Expand All @@ -96,10 +96,10 @@ void HashJoinProbeBlockInputStream::tryGetRestoreJoin()
void HashJoinProbeBlockInputStream::onAllProbeDone()
{
auto & cur_probe_exec = *probe_exec;
if (cur_probe_exec.needOutputNonJoinedData())
if (cur_probe_exec.needScanHashMap())
{
cur_probe_exec.onNonJoinedStart();
switchStatus(ProbeStatus::READ_NON_JOINED_DATA);
cur_probe_exec.onScanHashMapAfterProbeStart();
switchStatus(ProbeStatus::READ_SCAN_HASH_MAP_DATA);
}
else
{
Expand Down Expand Up @@ -147,13 +147,13 @@ Block HashJoinProbeBlockInputStream::getOutputBlock()
onAllProbeDone();
break;
}
case ProbeStatus::READ_NON_JOINED_DATA:
case ProbeStatus::READ_SCAN_HASH_MAP_DATA:
{
auto block = probe_exec->fetchNonJoined();
non_joined_rows += block.rows();
auto block = probe_exec->fetchScanHashMapData();
scan_hash_map_rows += block.rows();
if (!block)
{
onCurrentReadNonJoinedDataDone();
onCurrentScanHashMapDone();
break;
}
return block;
Expand Down
72 changes: 36 additions & 36 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const JoinPtr & join_,
size_t non_joined_stream_index,
size_t scan_hash_map_after_probe_stream_index,
const String & req_id,
UInt64 max_block_size_);

Expand All @@ -60,47 +60,47 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
* |
* ▼
* -----------------
* has non_joined data | | no non_joined data
* has scan_after_probe data | | no scan_after_probe data
* ▼ ▼
* WAIT_PROBE_FINISH FINISHED
* |
* ▼
* READ_NON_JOINED_DATA
* READ_SCAN_HASH_MAP_DATA
* |
* ▼
* FINISHED
*
* spill enabled:
* |-------------------> WAIT_BUILD_FINISH
* | |
* | ▼
* | PROBE
* | |
* | ▼
* | WAIT_PROBE_FINISH
* | |
* | ▼
* | ---------------
* | has non_joined data | | no non_joined data
* | ▼ |
* | READ_NON_JOINED_DATA |
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* | ▼
* | GET_RESTORE_JOIN
* | |
* | ▼
* | ---------------
* | has restored join | | no restored join
* | ▼ ▼
* | RESTORE_BUILD FINISHED
* | |
* -----------------------|
* |-------------------> WAIT_BUILD_FINISH
* | |
* |
* | PROBE
* | |
* |
* | WAIT_PROBE_FINISH
* | |
* |
* | ---------------
* |has scan_hash_map data | | no scan_hash_map data
* | ▼ |
* | READ_SCAN_HASH_MAP_DATA |
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* | \ /
* |
* | GET_RESTORE_JOIN
* | |
* |
* | ---------------
* | has restored join | | no restored join
* | ▼ ▼
* | RESTORE_BUILD FINISHED
* | |
* ------------------------|
*
*/
enum class ProbeStatus
Expand All @@ -110,7 +110,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
WAIT_PROBE_FINISH, /// wait probe finish
GET_RESTORE_JOIN, /// try to get restore join
RESTORE_BUILD, /// build for restore join
READ_NON_JOINED_DATA, /// output non joined data
READ_SCAN_HASH_MAP_DATA, /// output scan hash map after probe data
FINISHED, /// the final state
};

Expand All @@ -119,7 +119,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
std::tuple<size_t, Block> getOneProbeBlock();
void onCurrentProbeDone();
void onAllProbeDone();
void onCurrentReadNonJoinedDataDone();
void onCurrentScanHashMapDone();
void tryGetRestoreJoin();

private:
Expand All @@ -132,7 +132,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
HashJoinProbeExecHolder probe_exec;
ProbeStatus status{ProbeStatus::WAIT_BUILD_FINISH};
size_t joined_rows = 0;
size_t non_joined_rows = 0;
size_t scan_hash_map_rows = 0;

Block header;
};
Expand Down
60 changes: 30 additions & 30 deletions dbms/src/DataStreams/HashJoinProbeExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,38 @@ namespace DB
HashJoinProbeExecPtr HashJoinProbeExec::build(
const JoinPtr & join,
const BlockInputStreamPtr & probe_stream,
size_t non_joined_stream_index,
size_t scan_hash_map_after_probe_stream_index,
size_t max_block_size)
{
bool need_scan_hash_map_after_probe = needScanHashMapAfterProbe(join->getKind());
BlockInputStreamPtr non_joined_stream = nullptr;
BlockInputStreamPtr scan_hash_map_stream = nullptr;
if (need_scan_hash_map_after_probe)
non_joined_stream = join->createStreamWithNonJoinedRows(probe_stream->getHeader(), non_joined_stream_index, join->getProbeConcurrency(), max_block_size);
scan_hash_map_stream = join->createScanHashMapAfterProbeStream(probe_stream->getHeader(), scan_hash_map_after_probe_stream_index, join->getProbeConcurrency(), max_block_size);

return std::make_shared<HashJoinProbeExec>(
join,
nullptr,
probe_stream,
need_scan_hash_map_after_probe,
non_joined_stream_index,
non_joined_stream,
scan_hash_map_after_probe_stream_index,
scan_hash_map_stream,
max_block_size);
}

HashJoinProbeExec::HashJoinProbeExec(
const JoinPtr & join_,
const BlockInputStreamPtr & restore_build_stream_,
const BlockInputStreamPtr & probe_stream_,
bool need_output_non_joined_data_,
size_t non_joined_stream_index_,
const BlockInputStreamPtr & non_joined_stream_,
bool need_scan_hash_map_after_probe_,
size_t scan_hash_map_after_probe_stream_index_,
const BlockInputStreamPtr & scan_hash_map_after_probe_stream_,
size_t max_block_size_)
: join(join_)
, restore_build_stream(restore_build_stream_)
, probe_stream(probe_stream_)
, need_output_non_joined_data(need_output_non_joined_data_)
, non_joined_stream_index(non_joined_stream_index_)
, non_joined_stream(non_joined_stream_)
, need_scan_hash_map_after_probe(need_scan_hash_map_after_probe_)
, scan_hash_map_after_probe_stream_index(scan_hash_map_after_probe_stream_index_)
, scan_hash_map_after_probe_stream(scan_hash_map_after_probe_stream_)
, max_block_size(max_block_size_)
, probe_process_info(max_block_size_)
{}
Expand Down Expand Up @@ -155,19 +155,19 @@ HashJoinProbeExecPtr HashJoinProbeExec::doTryGetRestoreExec()
{
/// restored join should always enable spill
assert(restore_info->join && restore_info->join->isEnableSpill());
size_t non_joined_stream_index = 0;
if (need_output_non_joined_data)
size_t scan_hash_map_stream_index = 0;
if (need_scan_hash_map_after_probe)
{
assert(restore_info->non_joined_stream);
non_joined_stream_index = dynamic_cast<ScanHashMapAfterProbeBlockInputStream *>(restore_info->non_joined_stream.get())->getNonJoinedIndex();
assert(restore_info->scan_hash_map_stream);
scan_hash_map_stream_index = dynamic_cast<ScanHashMapAfterProbeBlockInputStream *>(restore_info->scan_hash_map_stream.get())->getIndex();
}
auto restore_probe_exec = std::make_shared<HashJoinProbeExec>(
restore_info->join,
restore_info->build_stream,
restore_info->probe_stream,
need_output_non_joined_data,
non_joined_stream_index,
restore_info->non_joined_stream,
need_scan_hash_map_after_probe,
scan_hash_map_stream_index,
restore_info->scan_hash_map_stream,
max_block_size);
restore_probe_exec->parent = shared_from_this();
restore_probe_exec->setCancellationHook(is_cancelled);
Expand Down Expand Up @@ -197,9 +197,9 @@ void HashJoinProbeExec::cancel()
/// - for threads reading joined data: will return empty block if build is not finished yet
/// - for threads reading non joined data: will return empty block if build or probe is not finished yet
join->wakeUpAllWaitingThreads();
if (non_joined_stream != nullptr)
if (scan_hash_map_after_probe_stream != nullptr)
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(non_joined_stream.get()); p_stream != nullptr)
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(scan_hash_map_after_probe_stream.get()); p_stream != nullptr)
p_stream->cancel(false);
}
if (probe_stream != nullptr)
Expand Down Expand Up @@ -230,31 +230,31 @@ bool HashJoinProbeExec::onProbeFinish()
if (join->isRestoreJoin())
probe_stream->readSuffix();
join->finishOneProbe();
return !need_output_non_joined_data && !join->isEnableSpill();
return !need_scan_hash_map_after_probe && !join->isEnableSpill();
}

void HashJoinProbeExec::onNonJoinedStart()
void HashJoinProbeExec::onScanHashMapAfterProbeStart()
{
assert(non_joined_stream != nullptr);
non_joined_stream->readPrefix();
assert(scan_hash_map_after_probe_stream != nullptr);
scan_hash_map_after_probe_stream->readPrefix();
}

Block HashJoinProbeExec::fetchNonJoined()
Block HashJoinProbeExec::fetchScanHashMapData()
{
assert(non_joined_stream != nullptr);
return non_joined_stream->read();
assert(scan_hash_map_after_probe_stream != nullptr);
return scan_hash_map_after_probe_stream->read();
}

bool HashJoinProbeExec::onNonJoinedFinish()
bool HashJoinProbeExec::onScanHashMapAfterProbeFinish()
{
non_joined_stream->readSuffix();
scan_hash_map_after_probe_stream->readSuffix();
if (!join->isEnableSpill())
{
return true;
}
else
{
join->finishOneNonJoin(non_joined_stream_index);
join->finishOneNonJoin(scan_hash_map_after_probe_stream_index);
return false;
}
}
Expand Down
22 changes: 11 additions & 11 deletions dbms/src/DataStreams/HashJoinProbeExec.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HashJoinProbeExec : public std::enable_shared_from_this<HashJoinProbeExec>
static HashJoinProbeExecPtr build(
const JoinPtr & join,
const BlockInputStreamPtr & probe_stream,
size_t non_joined_stream_index,
size_t scan_hash_map_after_probe_stream_index,
size_t max_block_size);

using CancellationHook = std::function<bool()>;
Expand All @@ -41,9 +41,9 @@ class HashJoinProbeExec : public std::enable_shared_from_this<HashJoinProbeExec>
const JoinPtr & join_,
const BlockInputStreamPtr & restore_build_stream_,
const BlockInputStreamPtr & probe_stream_,
bool need_output_non_joined_data_,
size_t non_joined_stream_index_,
const BlockInputStreamPtr & non_joined_stream_,
bool need_scan_hash_map_after_probe_,
size_t scan_hash_map_after_probe_stream_index,
const BlockInputStreamPtr & scan_hash_map_after_probe_stream_,
size_t max_block_size_);

void waitUntilAllBuildFinished();
Expand All @@ -65,12 +65,12 @@ class HashJoinProbeExec : public std::enable_shared_from_this<HashJoinProbeExec>
// Returns false if the probe_exec continues to execute.
bool onProbeFinish();

bool needOutputNonJoinedData() { return need_output_non_joined_data; }
void onNonJoinedStart();
Block fetchNonJoined();
bool needScanHashMap() { return need_scan_hash_map_after_probe; }
void onScanHashMapAfterProbeStart();
Block fetchScanHashMapData();
// Returns true if the probe_exec ends.
// Returns false if the probe_exec continues to execute.
bool onNonJoinedFinish();
bool onScanHashMapAfterProbeFinish();

void setCancellationHook(CancellationHook cancellation_hook)
{
Expand All @@ -89,9 +89,9 @@ class HashJoinProbeExec : public std::enable_shared_from_this<HashJoinProbeExec>

const BlockInputStreamPtr probe_stream;

const bool need_output_non_joined_data;
const size_t non_joined_stream_index;
const BlockInputStreamPtr non_joined_stream;
const bool need_scan_hash_map_after_probe;
const size_t scan_hash_map_after_probe_stream_index;
const BlockInputStreamPtr scan_hash_map_after_probe_stream;

const size_t max_block_size;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ScanHashMapAfterProbeBlockInputStream : public IProfilingBlockInputStream

Block getHeader() const override { return result_sample_block; };

size_t getNonJoinedIndex() const { return index; }
size_t getIndex() const { return index; }


protected:
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ struct JoinExecuteInfo
{
String build_side_root_executor_id;
JoinPtr join_ptr;
BlockInputStreams non_joined_streams;
BlockInputStreams join_build_streams;
};

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
for (auto & stream : pipeline.streams)
{
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, join_ptr, probe_index++, log->identifier(), settings.max_block_size);
stream->setExtraInfo(fmt::format("join probe, join_executor_id = {}, has_non_joined_data = {}", query_block.source_name, needScanHashMapAfterProbe(join_ptr->getKind())));
stream->setExtraInfo(fmt::format("join probe, join_executor_id = {}, scan_hash_map_after_probe = {}", query_block.source_name, needScanHashMapAfterProbe(join_ptr->getKind())));
}

/// add a project to remove all the useless column
Expand Down
Loading

0 comments on commit 4ee1412

Please sign in to comment.