Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions iceberg/tea_scan.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "iceberg/tea_scan.h"

#include <chrono>
#include <deque>
#include <exception>
#include <iterator>
#include <memory>
#include <queue>
Expand Down Expand Up @@ -372,6 +374,123 @@ class EntryStream : public IcebergEntriesStream {
std::shared_ptr<IcebergEntriesStream> all_stream_;
};

// reference:
// https://github.com/ClickHouse/ClickHouse/blob/86228891ee33324d714cf87152e487db5144961c/src/Common/ConcurrentBoundedQueue.h
template <typename T>
class ConcurrentBoundedQueue {
public:
explicit ConcurrentBoundedQueue(uint32_t queue_capacity) : max_capacity_(queue_capacity) {}

std::optional<T> Pop() {
std::unique_lock guard(mutex_);
can_pop_cv_.wait(guard, [&]() { return is_stopped_ || !queue_.empty(); });
if (queue_.empty()) {
return std::nullopt;
}
T result = std::move(queue_.front());
queue_.pop();
can_push_cv_.notify_one();
return result;
}

void Push(T value) {
std::unique_lock guard(mutex_);
can_push_cv_.wait(guard, [&]() { return is_stopped_ || queue_.size() < max_capacity_; });
if (is_stopped_) {
return;
}
queue_.emplace(std::move(value));
can_pop_cv_.notify_one();
}

void Stop() {
std::lock_guard guard(mutex_);
is_stopped_ = true;
can_pop_cv_.notify_all();
can_push_cv_.notify_all();
}

private:
std::mutex mutex_;
bool is_stopped_;
// consider making it an array with fixed size, so `Push` cannot throw an exception
std::queue<T> queue_;
std::condition_variable can_pop_cv_;
std::condition_variable can_push_cv_;
const uint32_t max_capacity_ = 0;
};

class MergeStream : public IcebergEntriesStream {
public:
MergeStream(std::vector<std::shared_ptr<IcebergEntriesStream>> streams, uint32_t threads, uint32_t queue_capacity)
: pool_(threads), entries_(queue_capacity) {
stream_is_finished_.resize(streams.size());

tasks_.reserve(streams.size());
for (size_t i = 0; i < streams.size(); ++i) {
auto stream = streams[i];
auto future = pool_.Submit([s = stream, entries = &entries_, stream_is_finished = &stream_is_finished_[i]]() {
try {
// TOOD(gmusya): consider make task more granular
while (true) {
auto maybe_result = s->ReadNext();
if (!maybe_result) {
return;
}
entries->Push(std::move(*maybe_result));
}
} catch (std::exception& e) {
entries->Push(arrow::Status::ExecutionError(e.what()));
} catch (arrow::Status& e) {
entries->Push(arrow::Status::ExecutionError(e.message()));
} catch (...) {
entries->Push(arrow::Status::ExecutionError(std::string((__PRETTY_FUNCTION__)) + ": unexpected exception"));
}

*stream_is_finished = true;
});
tasks_.emplace_back(std::move(future));
}
}

std::optional<ManifestEntry> ReadNext() override {
if (!pool_is_stopped_) {
while (first_not_finished_stream_ < tasks_.size() && stream_is_finished_[first_not_finished_stream_]) {
++first_not_finished_stream_;
}
if (first_not_finished_stream_ == tasks_.size()) {
pool_.Stop(true);
pool_is_stopped_ = true;
}
}

auto maybe_result = entries_.Pop();
if (!maybe_result.has_value()) {
return std::nullopt;
}

arrow::Result<ManifestEntry> result = *std::move(maybe_result);
if (!result.ok()) {
throw std::runtime_error(result.status().message());
}
return result.MoveValueUnsafe();
}

~MergeStream() {
entries_.Stop();
pool_.Stop(false);
}

private:
ThreadPool pool_;
bool pool_is_stopped_ = false;

ConcurrentBoundedQueue<arrow::Result<ManifestEntry>> entries_;
std::vector<std::future<void>> tasks_;
std::vector<uint8_t> stream_is_finished_; // we want to pass references to tasks, so we cannot use bool
uint32_t first_not_finished_stream_ = 0;
};

std::optional<ManifestEntry> AllEntriesStream::ReadNext() {
while (true) {
if (!current_manifest_stream_) {
Expand Down
Loading