Skip to content

Commit

Permalink
[feat](spill) spill and reserve
Browse files Browse the repository at this point in the history
Origin commit: 0e7e42d
  • Loading branch information
mrhhsg committed Jan 1, 2025
1 parent 266a224 commit bf480c5
Show file tree
Hide file tree
Showing 428 changed files with 6,553 additions and 2,124 deletions.
4 changes: 4 additions & 0 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "agent/workload_group_listener.h"

#include <thrift/protocol/TDebugProtocol.h>

#include "runtime/exec_env.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
Expand All @@ -33,6 +35,8 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
if (!topic_info.__isset.workload_group_info) {
continue;
}
LOG(INFO) << "Received publish workload group info request: "
<< apache::thrift::ThriftDebugString(topic_info).c_str();
is_set_workload_group_info = true;

// 1 parse topic info to group info
Expand Down
6 changes: 5 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <fmt/core.h>
#include <gflags/gflags.h>
#include <stdint.h>

#include <algorithm>
Expand Down Expand Up @@ -118,7 +119,7 @@ DEFINE_String(mem_limit, "90%");
DEFINE_Double(soft_mem_limit_frac, "0.9");

// Cache capacity reduce mem limit as a fraction of soft mem limit.
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.6");
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.7");

// Schema change memory limit as a fraction of soft memory limit.
DEFINE_Double(schema_change_mem_limit_frac, "0.6");
Expand Down Expand Up @@ -1270,6 +1271,9 @@ DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool {
});
DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");

// paused query in queue timeout(ms) will be resumed or canceled
DEFINE_Int64(spill_in_paused_queue_timeout_ms, "60000");

DEFINE_mBool(check_segment_when_build_rowset_meta, "false");

DEFINE_mInt32(max_s3_client_retry, "10");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,7 @@ DECLARE_mInt32(spill_gc_interval_ms);
DECLARE_mInt32(spill_gc_work_time_ms);
DECLARE_Int32(spill_io_thread_pool_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_Int64(spill_in_paused_queue_timeout_ms);

DECLARE_mBool(check_segment_when_build_rowset_meta);

Expand Down
14 changes: 10 additions & 4 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,18 @@ void Daemon::memory_maintenance_thread() {
doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();

// step 7. Analyze blocking queries.
// step 7: handle paused queries(caused by memory insufficient)
doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries();

// step 8. Analyze blocking queries.
// TODO sort the operators that can spill, wake up the pipeline task spill
// or continue execution according to certain rules or cancel query.

// step 8. Flush memtable
// step 9. Flush memtable
doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh();
// TODO notify flush memtable

// step 9. Jemalloc purge all arena dirty pages
// step 10. Jemalloc purge all arena dirty pages
je_purge_dirty_pages();
}
}
Expand Down Expand Up @@ -486,7 +489,9 @@ void Daemon::cache_adjust_capacity_thread() {
doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for(
l, std::chrono::seconds(1));
}
double adjust_weighted = GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted;
double adjust_weighted = std::min<double>(
GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted,
GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted);
if (_stop_background_threads_latch.count() == 0) {
break;
}
Expand All @@ -501,6 +506,7 @@ void Daemon::cache_adjust_capacity_thread() {
LOG(INFO) << fmt::format(
"[MemoryGC] refresh cache capacity end, free memory {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str());
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = adjust_weighted;
doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.store(
false, std::memory_order_relaxed);
} while (true);
Expand Down
10 changes: 10 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ namespace ErrorCode {
E(BAD_CAST, -254, true); \
E(ARITHMETIC_OVERFLOW_ERRROR, -255, false); \
E(PERMISSION_DENIED, -256, false); \
E(QUERY_MEMORY_EXCEEDED, -257, false); \
E(WORKLOAD_GROUP_MEMORY_EXCEEDED, -258, false); \
E(PROCESS_MEMORY_EXCEEDED, -259, false); \
E(CE_CMD_PARAMS_ERROR, -300, true); \
E(CE_BUFFER_TOO_SMALL, -301, true); \
E(CE_CMD_NOT_VALID, -302, true); \
Expand Down Expand Up @@ -381,6 +384,11 @@ class [[nodiscard]] Status {
_code = rhs._code;
if (rhs._err_msg) {
_err_msg = std::make_unique<ErrMsg>(*rhs._err_msg);
} else {
// If rhs error msg is empty, then should also clear current error msg
// For example, if rhs is OK and current status is error, then copy to current
// status, should clear current error message.
_err_msg.reset();
}
return *this;
}
Expand All @@ -390,6 +398,8 @@ class [[nodiscard]] Status {
_code = rhs._code;
if (rhs._err_msg) {
_err_msg = std::move(rhs._err_msg);
} else {
_err_msg.reset();
}
return *this;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false},
{"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
{"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
{"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
Expand All @@ -41,6 +42,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
{"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
{"QUERY_TYPE", TYPE_VARCHAR, sizeof(StringRef), false},
{"SPILL_WRITE_BYTES_TO_LOCAL_STORAGE", TYPE_BIGINT, sizeof(int64_t), false},
{"SPILL_READ_BYTES_FROM_LOCAL_STORAGE", TYPE_BIGINT, sizeof(int64_t), false},
};

SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendWorkloadGroupResourceUsage::
{"CPU_USAGE_PERCENT", TYPE_DOUBLE, sizeof(double), false},
{"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"WRITE_BUFFER_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
};

SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage()
Expand Down
21 changes: 20 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
}

MemTable::~MemTable() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
_query_thread_context.query_mem_tracker->write_tracker());
if (_is_flush_success) {
// If the memtable is flush success, then its memtracker's consumption should be 0
if (_mem_tracker->consumption() != 0 && config::crash_in_memory_tracker_inaccurate) {
Expand Down Expand Up @@ -182,6 +183,8 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r

Status MemTable::insert(const vectorized::Block* input_block,
const std::vector<uint32_t>& row_idxs) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
_query_thread_context.query_mem_tracker->write_tracker());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);

if (_is_first_insertion) {
Expand Down Expand Up @@ -579,6 +582,8 @@ void MemTable::_aggregate() {
}

void MemTable::shrink_memtable_by_agg() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
_query_thread_context.query_mem_tracker->write_tracker());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
if (_keys_type == KeysType::DUP_KEYS) {
return;
Expand Down Expand Up @@ -608,6 +613,20 @@ bool MemTable::need_agg() const {
return false;
}

size_t MemTable::get_flush_reserve_memory_size() const {
size_t reserve_size = 0;
if (_keys_type == KeysType::DUP_KEYS) {
if (_tablet_schema->num_key_columns() == 0) {
// no need to reserve
} else {
reserve_size = _input_mutable_block.allocated_bytes();
}
} else {
reserve_size = _input_mutable_block.allocated_bytes();
}
return reserve_size;
}

Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class MemTable {

int64_t tablet_id() const { return _tablet_id; }
size_t memory_usage() const { return _mem_tracker->consumption(); }
size_t get_flush_reserve_memory_size() const;
// insert tuple from (row_pos) to (row_pos+num_rows)
Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);

Expand Down
43 changes: 43 additions & 0 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "common/signal_handler.h"
#include "olap/memtable.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/storage_engine.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
Expand Down Expand Up @@ -140,6 +141,37 @@ Status FlushToken::wait() {
return Status::OK();
}

Status FlushToken::_try_reserve_memory(QueryThreadContext query_thread_context, int64_t size) {
auto* thread_context = doris::thread_context();
auto* memtable_flush_executor =
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
Status st;
do {
// only try to reserve process memory
st = thread_context->try_reserve_process_memory(size);
if (st.ok()) {
memtable_flush_executor->inc_flushing_task();
break;
}
if (_is_shutdown() || query_thread_context.get_memory_tracker()->is_query_cancelled()) {
st = Status::Cancelled("flush memtable already cancelled");
break;
}
// Make sure at least one memtable is flushing even reserve memory failed.
if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
// If there are already any flushing task, Wait for some time and retry.
LOG_EVERY_T(INFO, 60) << fmt::format(
"Failed to reserve memory {} for flush memtable, retry after 100ms",
PrettyPrinter::print_bytes(size));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
st = Status::OK();
break;
}
} while (true);
return st;
}

Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
<< ", memsize: " << memtable->memory_usage()
Expand All @@ -150,8 +182,19 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in
SCOPED_ATTACH_TASK(memtable->query_thread_context());
signal::set_signal_task_id(_rowset_writer->load_id());
signal::tablet_id = memtable->tablet_id();

DEFER_RELEASE_RESERVED();

auto reserve_size = memtable->get_flush_reserve_memory_size();
RETURN_IF_ERROR(_try_reserve_memory(memtable->query_thread_context(), reserve_size));
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
memtable->query_thread_context().query_mem_tracker->write_tracker());
SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());

Defer defer {[&]() {
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
}};
std::unique_ptr<vectorized::Block> block;
RETURN_IF_ERROR(memtable->to_block(&block));
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));
Expand Down
19 changes: 19 additions & 0 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> {

Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size);

Status _try_reserve_memory(QueryThreadContext query_thread_context, int64_t size);

// Records the current flush status of the tablet.
// Note: Once its value is set to Failed, it cannot return to SUCCESS.
std::shared_mutex _flush_status_lock;
Expand Down Expand Up @@ -140,12 +142,29 @@ class MemTableFlushExecutor {
std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority,
std::shared_ptr<WorkloadGroup> wg_sptr);

// return true if it already has any flushing task
bool check_and_inc_has_any_flushing_task() {
// need to use CAS instead of only `if (0 == _flushing_task_count)` statement,
// to avoid concurrent entries both pass the if statement
int expected_count = 0;
if (!_flushing_task_count.compare_exchange_strong(expected_count, 1)) {
return true;
}
DCHECK(expected_count == 0 && _flushing_task_count == 1);
return false;
}

void inc_flushing_task() { _flushing_task_count++; }

void dec_flushing_task() { _flushing_task_count--; }

private:
void _register_metrics();
static void _deregister_metrics();

std::unique_ptr<ThreadPool> _flush_pool;
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
std::atomic<int> _flushing_task_count = 0;
};

} // namespace doris
Loading

0 comments on commit bf480c5

Please sign in to comment.