From c97dc9d56516f23d3dbd6831c93c6a5214f6064b Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 16 Sep 2021 11:30:42 +0800 Subject: [PATCH] fix bug that SharedQueryBlockInputStream may loss block randomly (#2759) (#2762) --- .../DataStreams/SharedQueryBlockInputStream.h | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/dbms/src/DataStreams/SharedQueryBlockInputStream.h b/dbms/src/DataStreams/SharedQueryBlockInputStream.h index f2690c74de2..437f699528a 100644 --- a/dbms/src/DataStreams/SharedQueryBlockInputStream.h +++ b/dbms/src/DataStreams/SharedQueryBlockInputStream.h @@ -1,13 +1,12 @@ #pragma once -#include - #include #include #include - #include +#include + namespace DB { @@ -37,15 +36,9 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream } } - String getName() const override - { - return "SharedQuery"; - } + String getName() const override { return "SharedQuery"; } - Block getHeader() const override - { - return children.back()->getHeader(); - } + Block getHeader() const override { return children.back()->getHeader(); } void readPrefix() override { @@ -102,7 +95,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream in->readPrefix(); while (!isCancelled()) { - Block block; + Block block = in->read(); do { if (isCancelled() || read_suffixed) @@ -111,7 +104,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream queue.tryEmplace(0); break; } - } while (!queue.tryPush(block = in->read(), try_action_millisecionds)); + } while (!queue.tryPush(block, try_action_millisecionds)); if (!block) break; @@ -141,11 +134,11 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream bool read_suffixed = false; std::thread thread; - std::mutex mutex; + std::mutex mutex; std::string exception_msg; Logger * log; BlockInputStreamPtr in; }; -} +} // namespace DB