Skip to content

Commit

Permalink
adjust shuffle join
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Jun 23, 2022
1 parent dddcffc commit 918aa32
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 9 deletions.
1 change: 0 additions & 1 deletion programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
}

registerAllStorageDistributedTaskBuilderMakers();
Poco::ThreadPool server_pool(3, config().getUInt("max_connections", 1024));
std::mutex servers_lock;
std::vector<ProtocolServerAdapter> servers;
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \
M(Bool, count_distinct_optimization, false, "Rewrite count distinct to subquery of group by", 0) \
M(String, use_cluster_for_distributed_shuffle, "", "If you want to run the join and group by in distributed shuffle mode, set it as one of the available cluster.", 0) \
M(Bool, enable_distribute_shuffle, false, "Enable shuffle join", 0) \
M(UInt64, shuffle_storage_session_timeout, 1800, "How long a session can be alive before expired by timeout", 0) \
M(Bool, throw_on_unsupported_query_inside_transaction, true, "Throw exception if unsupported query is used inside transaction", 0) \
M(TransactionsWaitCSNMode, wait_changes_become_visible_after_commit_mode, TransactionsWaitCSNMode::WAIT_UNKNOWN, "Wait for committed changes to become actually visible in the latest snapshot", 0) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,10 @@ bool StageQueryDistributedJoinRewriteAnalyzer::isApplicableJoinType()
{
const auto * join_tables = from_query->join();
auto * table_join = join_tables->table_join->as<ASTTableJoin>();
if (table_join->kind == ASTTableJoin::Kind::Cross)

if (table_join->kind != ASTTableJoin::Kind::Left && table_join->kind != ASTTableJoin::Kind::Inner)
return false;
if (table_join->strictness == ASTTableJoin::Strictness::Asof)
return false;

// TODO if right table is dict or special storage, return false;
Expand Down
1 change: 0 additions & 1 deletion src/Interpreters/InterpreterStageQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ BlockIO InterpreterStageQuery::execute(const QueryBlockIO & output_io, const Que
auto pipeline_builder = query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context));
pipeline_builder->addInterpreterContext(context);
BlockIO res;
res.pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder));
return res;
Expand Down
9 changes: 8 additions & 1 deletion src/Interpreters/StorageDistributedTasksBuilder.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <mutex>
#include <Interpreters/StorageDistributedTasksBuilder.h>
#include <Common/ErrorCodes.h>

Expand All @@ -7,9 +8,13 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

static std::once_flag init_builder_flag;
void registerAllStorageDistributedTaskBuilderMakers();
StorageDistributedTaskBuilderFactory & StorageDistributedTaskBuilderFactory::getInstance()
{
static StorageDistributedTaskBuilderFactory instance;
std::call_once(init_builder_flag, [](){ registerAllStorageDistributedTaskBuilderMakers(instance); });
return instance;
}

Expand All @@ -31,7 +36,9 @@ StorageDistributedTaskBuilderPtr StorageDistributedTaskBuilderFactory::getBuilde
return iter->second();
}

void registerAllStorageDistributedTaskBuilderMakers()
void registerHiveClusterTasksBuilder(StorageDistributedTaskBuilderFactory & instance);
void registerAllStorageDistributedTaskBuilderMakers(StorageDistributedTaskBuilderFactory & instance)
{
registerHiveClusterTasksBuilder(instance);
}
}
2 changes: 1 addition & 1 deletion src/Interpreters/StorageDistributedTasksBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ class StorageDistributedTaskBuilderFactory : boost::noncopyable

};

void registerAllStorageDistributedTaskBuilderMakers();
void registerAllStorageDistributedTaskBuilderMakers(StorageDistributedTaskBuilderFactory & instance);
}
3 changes: 2 additions & 1 deletion src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
select_with_union->set_of_modes.size(),
select_with_union->list_of_selects->getID());
}
if (!context->getSettings().use_cluster_for_distributed_shuffle.value.empty())
if (!context->getSettingsRef().use_cluster_for_distributed_shuffle.value.empty() && context->getSettingsRef().enable_distribute_shuffle)
{
MakeFunctionColumnAliasAction function_alias_action;
ASTDepthFirstVisitor<MakeFunctionColumnAliasAction> function_alias_visitor(function_alias_action, ast);
Expand All @@ -666,6 +666,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ast = add_finish_event_result;
}
}

interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal));

if (context->getCurrentTransaction() && !interpreter->supportsTransactions() &&
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/DistributedShuffle/StorageShuffle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <Processors/Chunk.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/ISource.h>
#include <QueryPipeline/RemoteInserter.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Storages/DistributedShuffle/ShuffleBlockTable.h>
Expand All @@ -45,11 +45,11 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class StorageShuffleSource : public SourceWithProgress, WithContext
class StorageShuffleSource : public ISource, WithContext
{
public:
StorageShuffleSource(ContextPtr context_, const String & session_id_, const String & table_id_, const Block & header_)
: SourceWithProgress(header_), WithContext(context_), session_id(session_id_), table_id(table_id_), header(header_)
: ISource(header_), WithContext(context_), session_id(session_id_), table_id(table_id_), header(header_)
{
}

Expand Down
7 changes: 7 additions & 0 deletions src/Storages/Hive/StorageHiveCluster.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <memory>
#include <Common/config.h>
#if USE_HIVE
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -66,6 +67,12 @@ class StorageHiveCluster : public IStorage, WithContext
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const override;
void alter(const AlterCommands & params, ContextPtr local_context, AlterLockHolder & alter_lock_holder) override;

std::shared_ptr<HiveSettings> getStorageHiveSettings() { return storage_settings; }
const String & getHiveMetastoreURL() const { return hive_metastore_url; }
const String & getHiveDatabase() const { return hive_database; }
const String & getHiveTableName() const { return hive_table; }
ASTPtr getPartitionByAst() const { return partition_by_ast; }

private:
String cluster_name;
String hive_metastore_url;
Expand Down

0 comments on commit 918aa32

Please sign in to comment.