Skip to content

Commit d559d28

Browse files
authored
Bugfix/issue 3662 fix hash map key mapped memory issue (#3667) (#628)
1 parent e87fc44 commit d559d28

File tree

2 files changed

+15
-10
lines changed

2 files changed

+15
-10
lines changed

src/Interpreters/Streaming/Aggregator.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,6 +1821,9 @@ void NO_INLINE Aggregator::mergeDataImpl(
18211821
{
18221822
if (inserted)
18231823
{
1824+
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
1825+
dst = nullptr;
1826+
18241827
/// If there are multiple sources, there are more than one AggregatedDataVariant. Aggregator always creates a new AggregatedDataVariant and merge all other
18251828
/// AggregatedDataVariants to the new created one. After finalize(), it does not clean up aggregate state except the new create AggregatedDataVariant.
18261829
/// If it does not alloc new memory for the 'dst' (i.e. aggregate state of the new AggregatedDataVariant which get destroyed after finalize()) but reuse
@@ -3064,17 +3067,17 @@ void Aggregator::serializeAggregateStates(const AggregateDataPtr & place, WriteB
30643067

30653068
void Aggregator::deserializeAggregateStates(AggregateDataPtr & place, ReadBuffer & rb, Arena * arena) const
30663069
{
3070+
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
3071+
place = nullptr;
3072+
30673073
UInt8 has_states;
30683074
readIntBinary(has_states, rb);
30693075
if (has_states)
30703076
{
3071-
if (!place)
3072-
{
3073-
/// Allocate states for all aggregate functions
3074-
AggregateDataPtr aggregate_data = arena->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
3075-
createAggregateStates(aggregate_data);
3076-
place = aggregate_data;
3077-
}
3077+
/// Allocate states for all aggregate functions
3078+
AggregateDataPtr aggregate_data = arena->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
3079+
createAggregateStates(aggregate_data);
3080+
place = aggregate_data;
30783081

30793082
for (size_t i = 0; i < params.aggregates_size; ++i)
30803083
aggregate_functions[i]->deserialize(place + offsets_of_aggregate_states[i], rb, std::nullopt, arena);

src/Processors/Transforms/Streaming/ChangelogConvertTransform.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ void ChangelogConvertTransform::work()
163163
const auto & columns = chunk.getColumns();
164164
for (auto key_col_pos : key_column_positions)
165165
{
166-
materialized_columns.push_back(columns[key_col_pos]->convertToFullColumnIfConst());
166+
/// Matierlize Sparse/Const/LowCardinality columns
167+
materialized_columns.push_back(columns[key_col_pos]->convertToFullIfNeeded());
167168
key_columns.push_back(materialized_columns.back().get());
168169
}
169170

@@ -433,8 +434,9 @@ void ChangelogConvertTransform::recover(CheckpointContextPtr ckpt_ctx)
433434
index.deserialize(
434435
/*MappedDeserializer*/
435436
[&](std::unique_ptr<RowRefWithRefCount<LightChunk>> & mapped_, Arena &, ReadBuffer & rb_) {
436-
mapped_ = std::make_unique<RowRefWithRefCount<LightChunk>>();
437-
mapped_->deserialize(&source_chunks, deserialized_indices_to_blocks, rb_);
437+
auto new_mapped = std::make_unique<RowRefWithRefCount<LightChunk>>();
438+
new_mapped->deserialize(&source_chunks, deserialized_indices_to_blocks, rb_);
439+
mapped_ = std::move(new_mapped);
438440
},
439441
pool,
440442
rb);

0 commit comments

Comments
 (0)