Skip to content

Commit

Permalink
fix bug that SharedQueryBlockInputStream may loss block randomly (#2759
Browse files Browse the repository at this point in the history
…) (#2762)
  • Loading branch information
ti-chi-bot authored Sep 16, 2021
1 parent a5eec5f commit c97dc9d
Showing 1 changed file with 8 additions and 15 deletions.
23 changes: 8 additions & 15 deletions dbms/src/DataStreams/SharedQueryBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#pragma once

#include <thread>

#include <Common/ConcurrentBoundedQueue.h>
#include <common/logger_useful.h>
#include <Common/typeid_cast.h>

#include <DataStreams/IProfilingBlockInputStream.h>

#include <thread>

namespace DB
{

Expand Down Expand Up @@ -37,15 +36,9 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
}
}

String getName() const override
{
return "SharedQuery";
}
String getName() const override { return "SharedQuery"; }

Block getHeader() const override
{
return children.back()->getHeader();
}
Block getHeader() const override { return children.back()->getHeader(); }

void readPrefix() override
{
Expand Down Expand Up @@ -102,7 +95,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
in->readPrefix();
while (!isCancelled())
{
Block block;
Block block = in->read();
do
{
if (isCancelled() || read_suffixed)
Expand All @@ -111,7 +104,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
queue.tryEmplace(0);
break;
}
} while (!queue.tryPush(block = in->read(), try_action_millisecionds));
} while (!queue.tryPush(block, try_action_millisecionds));

if (!block)
break;
Expand Down Expand Up @@ -141,11 +134,11 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
bool read_suffixed = false;

std::thread thread;
std::mutex mutex;
std::mutex mutex;

std::string exception_msg;

Logger * log;
BlockInputStreamPtr in;
};
}
} // namespace DB

0 comments on commit c97dc9d

Please sign in to comment.