Skip to content

Commit d4b02cc

Browse files
committed
temp
1 parent a18837c commit d4b02cc

File tree

4 files changed

+180
-3
lines changed

4 files changed

+180
-3
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#pragma once
2+
3+
#include <Interpreters/Streaming/HashJoin.h>
4+
5+
namespace DB
6+
{
7+
namespace Streaming
8+
{
9+
AsofHashJoin::AsofHashJoin(
10+
std::shared_ptr<TableJoin> table_join_,
11+
JoinStreamDescriptionPtr left_join_stream_desc_,
12+
JoinStreamDescriptionPtr right_join_stream_desc_)
13+
: HashJoin(table_join_, left_join_stream_desc_, right_join_stream_desc_)
14+
, asof_type(*table_join->getAsofType())
15+
, asof_inequality(table_join->getAsofInequality())
16+
{
17+
}
18+
19+
void AsofHashJoin::joinLeftBlock(Block & block)
20+
{
21+
doJoinBlockWithHashTable<true>(block, hash_blocks);
22+
}
23+
24+
void AsofHashJoin::insertRightBlock(Block block)
25+
{
26+
/// FIXME, there are quite some block copies
27+
/// FIXME, all_key_columns shall hold shared_ptr to columns instead of raw ptr
28+
/// then we can update `source_block` in place
29+
/// key columns are from source `block`
30+
ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(block, table_join->getAllNames(JoinTableSide::Right));
31+
32+
/// We have copy of source `block` to `block_to_save` after prepare, so `block_to_save` is good to get moved to the buffered stream data
33+
Block block_to_save = prepareBlockToSave(block, right_data.buffered_data->sample_block);
34+
35+
/// FIXME, multiple disjuncts OR clause
36+
ColumnRawPtrs key_columns;
37+
const Names & key_names = table_join->getClauses().front().key_names_right;
38+
key_columns.reserve(key_names.size());
39+
for (const auto & name : key_names)
40+
key_columns.push_back(all_key_columns[name]);
41+
42+
/// We will insert to the map only keys, where all components are not NULL.
43+
ConstNullMapPtr null_map{};
44+
ColumnPtr null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map);
45+
46+
/// If LEFT, RIGHT or FULL save blocks with nulls for NotJoinedBlocks
47+
UInt8 save_nullmap = 0;
48+
if (isRightOrFull(table_join->kind()) && null_map)
49+
{
50+
/// Save rows with NULL keys
51+
for (size_t i = 0; !save_nullmap && i < null_map->size(); ++i)
52+
save_nullmap |= (*null_map)[i];
53+
}
54+
55+
/// Add `block_to_save` to target stream data
56+
/// Note `block_to_save` may be empty for cases in which the query doesn't care other non-key columns.
57+
/// For example, SELECT count() FROM stream_a JOIN stream_b ON i=ii;
58+
auto start_row = buffered_hash_data->addOrConcatDataBlock(std::move(block_to_save));
59+
auto rows = buffered_hash_data->lastDataBlock().rows();
60+
61+
switch (hash_method_type)
62+
{
63+
#define M(TYPE) \
64+
case HashType::TYPE: \
65+
return insertFromBlockImplType< \
66+
Strictness::Asof, \
67+
typename KeyGetterForType<HashType::TYPE, std::remove_reference_t<decltype(*(buffered_hash_data->maps->TYPE))>>::Type>( \
68+
join, \
69+
*(buffered_hash_data->maps->TYPE), \
70+
rows, \
71+
key_columns, \
72+
key_sizes[0], \
73+
&buffered_hash_data->blocks, \
74+
start_row, \
75+
null_map, \
76+
buffered_hash_data->pool); \
77+
break;
78+
APPLY_FOR_HASH_KEY_VARIANTS(M)
79+
#undef M
80+
}
81+
insertFromBlockImpl<strictness_>(
82+
hash_method_type,
83+
map,
84+
rows,
85+
key_columns,
86+
key_sizes[0],
87+
&target_hash_blocks->blocks,
88+
start_row,
89+
null_map,
90+
target_hash_blocks->pool);
91+
92+
if (save_nullmap)
93+
/// FIXME, we will need account the allocated bytes for null_map_holder / not_joined_map as well
94+
buffered_hash_data->blocks_nullmaps.emplace_back(&buffered_hash_data->lastDataBlock(), null_map_holder);
95+
96+
checkLimits();
97+
}
98+
}
99+
}

src/Interpreters/Streaming/AsofHashJoin.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,23 @@ namespace Streaming
99
class AsofHashJoin final : public HashJoin
1010
{
1111
public:
12-
using HashJoin::HashJoin;
12+
AsofHashJoin(
13+
std::shared_ptr<TableJoin> table_join_,
14+
JoinStreamDescriptionPtr left_join_stream_desc_,
15+
JoinStreamDescriptionPtr right_join_stream_desc_);
16+
1317
HashJoinType type() const override { return HashJoinType::Asof; }
18+
19+
void joinLeftBlock(Block & block) override;
20+
void insertRightBlock(Block block) override;
21+
22+
private:
23+
TypeIndex asof_type;
24+
ASOFJoinInequality asof_inequality;
25+
26+
using DataBlock = LightChunkWithTimestamp;
27+
using BufferedAsofHashData = BufferedHashData<DataBlock, AsofRowRefs<DataBlock>>;
28+
SERDE std::unique_ptr<BufferedAsofHashData> buffered_hash_data;
1429
};
1530

1631
}

src/Interpreters/Streaming/HashJoin.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ class HashJoin : public IHashJoin
255255

256256
private:
257257
/// For non-bidirectional hash join
258-
void insertRightBlock(Block right_block);
259-
void joinLeftBlock(Block & left_block);
258+
virtual void insertRightBlock(Block right_block);
259+
virtual void joinLeftBlock(Block & left_block);
260260

261261
/// For bidirectional hash join
262262
/// There are 2 blocks returned : joined block via parameter and retracted block via returned-value if there is

src/Interpreters/Streaming/joinData.h

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,5 +229,68 @@ SERDE struct BufferedStreamData
229229
using BucketBlocks = std::vector<BufferedStreamData::BucketBlock>;
230230

231231
using BufferedStreamDataPtr = std::unique_ptr<BufferedStreamData>;
232+
233+
template <typename JoinDataBlock, typename MappedRowRef>
234+
struct BufferedHashData
235+
{
236+
using JoinDataBlockList = RefCountDataBlockList<JoinDataBlock>;
237+
using JoinDataBlockRawPtr = const JoinDataBlock *;
238+
using BlockNullmapList = std::deque<std::pair<JoinDataBlockRawPtr, ColumnPtr>>;
239+
using Map = HashMapsTemplate<MappedRowRef>;
240+
241+
BufferedHashData(size_t data_block_size, CachedBlockMetrics & metrics);
242+
243+
~BufferedHashData();
244+
245+
template <typename RowRefHandler>
246+
void insert(JoinDataBlock && block, RowRefHandler && handler)
247+
{
248+
/// Add block into `blocks`
249+
auto start_row = blocks.pushBackOrConcat(std::move(block));
250+
251+
/// Insert MappedRowRef into hash map (which is referenced to `blocks`)
252+
const IColumn * asof_column [[maybe_unused]] = nullptr;
253+
if constexpr (is_range_asof_join || is_asof_join)
254+
asof_column = key_columns.back();
255+
256+
auto key_columns
257+
if constexpr (is_asof_join)
258+
{
259+
auto key_column_copy = key_columns;
260+
auto key_size_copy = key_sizes;
261+
key_column_copy.pop_back();
262+
key_size_copy.pop_back();
263+
return KeyGetter(key_column_copy, key_size_copy, nullptr);
264+
}
265+
else
266+
return KeyGetter(key_columns, key_sizes, nullptr);
267+
268+
auto key_getter = createKeyGetter < KeyGetter, is_range_asof_join || is_asof_join > (key_columns, key_sizes);
269+
270+
for (size_t i = start_row; i < rows; ++i)
271+
{
272+
auto emplace_result = key_getter.emplaceKey(map, i, pool);
273+
handler(emplace_result.getMapped(), emplace_result.isInserted());
274+
}
275+
}
276+
277+
const JoinDataBlock & lastDataBlock() const { return blocks.lastDataBlock(); }
278+
279+
HashMapSizes hashMapSizes(const HashJoin * hash_join) const;
280+
281+
/// Buffered data
282+
JoinDataBlockList blocks;
283+
284+
/// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.
285+
Arena pool;
286+
287+
/// Hash maps variants are attached to original source blocks, and will be garbage collected
288+
/// automatically along with the source blocks. Hence put it here instead of BufferedStreamData
289+
Map map;
290+
291+
/// FIXME: support nullable type
292+
// BlockNullmapList blocks_nullmaps; /// Nullmaps for blocks of "right" table (if needed)
293+
};
294+
232295
}
233296
}

0 commit comments

Comments
 (0)