Skip to content

Commit

Permalink
commnd
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Mar 5, 2025
1 parent fd64cf9 commit fc0ebec
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
20 changes: 16 additions & 4 deletions be/src/pipeline/exec/local_merge_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,25 @@ Status LocalMergeSortLocalState::init(RuntimeState* state, LocalStateInfo& info)
return Status::OK();
}

// The dependency process of LocalMergeSort:
// 1. The main_source's dependencies include its own source dependency and all other_source_deps (created in LocalMergeSortSourceOperatorX).
// 2. The other_source's dependencies include only its own source dependency.
// 3. The sort sink sets the corresponding source dependency to ready.
// 4. At this point, other_source will execute, but main_source will not.
// 5. After other_source executes, it sets the corresponding other_source_deps to ready.
// 6. Now, main_source will execute.
std::vector<Dependency*> LocalMergeSortLocalState::dependencies() const {
if (_task_idx == 0) {
std::vector<Dependency*> deps;
deps.push_back(_dependency);
auto& p = _parent->cast<LocalMergeSortSourceOperatorX>();
for (auto& [id, dep] : p._other_source_deps) {
for (int i = 1; i < p._parallel_tasks; ++i) {
auto dep = p._other_source_deps[i];
deps.push_back(dep.get());
}
return deps;
} else {
return {_dependency};
return Base::dependencies();
}
}

Expand Down Expand Up @@ -93,9 +101,10 @@ Status LocalMergeSortSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Base::prepare(state));
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor));
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
_other_source_deps.resize(_parallel_tasks);
for (int i = 1; i < _parallel_tasks; ++i) {
auto dep = Dependency::create_shared(operator_id(), node_id(),
"LocalMergeSortSourceOperatorX", false);
fmt::format("LocalMergeOtherDependency{}", i), false);
_other_source_deps[i] = dep;
}
_sorters.resize(_parallel_tasks);
Expand All @@ -121,6 +130,7 @@ Status LocalMergeSortSourceOperatorX::main_source_get_block(RuntimeState* state,
vectorized::Block* block, bool* eos) {
auto& local_state = get_local_state(state);
if (local_state._merger == nullptr) {
// Since we cannot control the initialization order of different local states, we set the sorter to the operator during execution.
_sorters[local_state._task_idx] = local_state._shared_state->sorter;
RETURN_IF_ERROR(local_state.build_merger(state));
}
Expand All @@ -131,7 +141,9 @@ Status LocalMergeSortSourceOperatorX::main_source_get_block(RuntimeState* state,
Status LocalMergeSortSourceOperatorX::other_source_get_block(RuntimeState* state,
vectorized::Block* block, bool* eos) {
auto& local_state = get_local_state(state);
DCHECK(_other_source_deps.contains(local_state._task_idx));
DCHECK(_other_source_deps[local_state._task_idx] != nullptr);
// Since we cannot control the initialization order of different local states, we set the sorter to the operator during execution.
// Wake up main_source.
_sorters[local_state._task_idx] = local_state._shared_state->sorter;
_other_source_deps[local_state._task_idx]->set_ready();
*eos = true;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/local_merge_sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class LocalMergeSortSourceOperatorX final : public OperatorX<LocalMergeSortLocal
vectorized::VSortExecExprs _vsort_exec_exprs;
const int64_t _offset;

std::map<int, DependencySPtr> _other_source_deps;
std::vector<DependencySPtr> _other_source_deps;
// The sorters of all instances are used in the main source.
std::vector<std::shared_ptr<vectorized::Sorter>> _sorters;
};
Expand Down

0 comments on commit fc0ebec

Please sign in to comment.