Skip to content

Commit

Permalink
make SharedQueryInputStream compatible with UnionBlockInputStream (#2656
Browse files Browse the repository at this point in the history
) (#2662)
  • Loading branch information
ti-chi-bot authored Sep 16, 2021
1 parent c56ae2c commit a1debcb
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions dbms/src/DataStreams/SharedQueryBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream

if (thread.joinable())
thread.join();
if (exception)
std::rethrow_exception(exception);
if (!exception_msg.empty())
throw Exception(exception_msg);
}

protected:
Expand All @@ -84,8 +84,10 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
Block block;
do
{
if (exception)
std::rethrow_exception(exception);
if (!exception_msg.empty())
{
throw Exception(exception_msg);
}
if (isCancelled() || read_suffixed)
return {};
} while (!queue.tryPop(block, try_action_millisecionds));
Expand Down Expand Up @@ -116,9 +118,17 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
}
in->readSuffix();
}
catch (Exception & e)
{
exception_msg = e.message();
}
catch (std::exception & e)
{
exception_msg = e.what();
}
catch (...)
{
exception = std::current_exception();
exception_msg = "other error";
}
}

Expand All @@ -133,7 +143,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
std::thread thread;
std::mutex mutex;

std::exception_ptr exception;
std::string exception_msg;

Logger * log;
BlockInputStreamPtr in;
Expand Down

0 comments on commit a1debcb

Please sign in to comment.