Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f2e81f0
Implemented Bloom Filter's interface, and main methods
AestheticAkhmad Jul 30, 2025
9eca168
Merge branch 'stable-23.10' into MCOL-5758-BLOOM-FILTER-PRE-JOIN-GSOC…
mariadb-LeonidFedorov Aug 1, 2025
cf7c1ee
Change vector to array
AestheticAkhmad Jul 31, 2025
7589dea
Update vec to array due to fixed size, update datatypes of BF to 32bit
AestheticAkhmad Aug 3, 2025
b556725
Add Bloom filter to TupleHashJoinStepp
AestheticAkhmad Aug 4, 2025
8ddfe83
Merge branch 'stable-23.10' into MCOL-5758-BLOOM-FILTER-PRE-JOIN-GSOC…
AestheticAkhmad Aug 4, 2025
8fd70a0
Add BF and populate in TupleJoiner
AestheticAkhmad Aug 4, 2025
5306a3b
use consistent names for BFs in TupleJoiner
AestheticAkhmad Aug 6, 2025
5a10d7e
Implemented Bloom filter's serialize/deserialize methods
AestheticAkhmad Aug 6, 2025
3383a46
Pass Bloom filter from TupleHashJoinStep to BatchPrimitiveProcessorJL
AestheticAkhmad Aug 6, 2025
3eecc51
Merge branch 'stable-23.10' into MCOL-5758-BLOOM-FILTER-PRE-JOIN-GSOC…
AestheticAkhmad Aug 6, 2025
2d54d55
Revert "Pass Bloom filter from TupleHashJoinStep to BatchPrimitivePro…
AestheticAkhmad Aug 6, 2025
8ca6269
Merge branch 'stable-23.10' into MCOL-5758-BLOOM-FILTER-PRE-JOIN-GSOC…
AestheticAkhmad Aug 9, 2025
389311c
Pass BFs to TupleBPS for further steps
AestheticAkhmad Aug 9, 2025
6ea996f
Pass BF to BPP through TBPS->BPPSeeder
AestheticAkhmad Aug 11, 2025
4895f26
Serialize BFs when in UM, type-alias strcuture of BFs
AestheticAkhmad Aug 13, 2025
071db76
Merge branch 'stable-23.10' into MCOL-5758-BLOOM-FILTER-PRE-JOIN-GSOC…
AestheticAkhmad Aug 13, 2025
0c01743
Add namespace joblist before BFs
AestheticAkhmad Aug 15, 2025
fd5d459
Serialize BFs size, so BPP is aware of it
AestheticAkhmad Aug 17, 2025
aff3a67
Merge branch 'stable-23.10' into MCOL-5758-BLOOM-FILTER-PRE-JOIN-GSOC…
AestheticAkhmad Aug 28, 2025
ab91611
Synchronize BF and BPP::execute, filter rows with BF
AestheticAkhmad Sep 16, 2025
ad805c1
Merge branch 'MCOL-5758-BLOOM-FILTER-PRE-JOIN-GSOC-2025-REDESIGNED' o…
AestheticAkhmad Sep 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbcon/joblist/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ include_directories(${ENGINE_COMMON_INCLUDES} ${ENGINE_SRC_DIR}/tools/passwd)
set(joblist_LIB_SRCS
anydatalist.cpp
batchprimitiveprocessor-jl.cpp
blockedbloomfilter.cpp
columncommand-jl.cpp
command-jl.cpp
crossenginestep.cpp
Expand Down
78 changes: 78 additions & 0 deletions dbcon/joblist/blockedbloomfilter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#include "blockedbloomfilter.h"

namespace joblist
{

void BlockedBloomFilter::insert(uint32_t hash)
{
uint32_t blockIdx = hash % BLOOM_FILTER_BLOCK_COUNT;
uint64_t bitmask = 0;

for (const auto& salt : SALTS)
{
uint32_t mixed = mix32(hash ^ salt);
uint8_t bitIdx = mixed % 64;

bitmask |= (1ULL << bitIdx);
}

bloomFilter[blockIdx].fetch_or(bitmask, std::memory_order_relaxed);
}

bool BlockedBloomFilter::probe(uint32_t hash) const
{
uint32_t blockIdx = hash % BLOOM_FILTER_BLOCK_COUNT;
uint64_t block = bloomFilter[blockIdx].load(std::memory_order_relaxed);

for (const auto& salt : SALTS)
{
uint32_t mixed = mix32(hash ^ salt);
uint8_t bitIdx = mixed % 64;

if ((block & (1ULL << bitIdx)) == 0)
{
return false;
}

}

return true;
}

// SplitMix
inline uint32_t BlockedBloomFilter::mix32(uint32_t hash) const
{
hash ^= hash >> 16;
hash *= 0x85ebca6b;
hash ^= hash >> 13;
hash *= 0xc2b2ae35;
hash ^= hash >> 16;

return hash;
}

void BlockedBloomFilter::serialize(messageqcpp::ByteStream& bs) const
{
for (const auto& block : bloomFilter)
{
bs << block.load(std::memory_order_relaxed);
}
}

void BlockedBloomFilter::deserialize(messageqcpp::ByteStream& bs)
{
for (auto& block : bloomFilter)
{
uint64_t val;
bs >> val;
block.store(val, std::memory_order_relaxed);
}
}

size_t BlockedBloomFilter::getSize() const
{
return bloomFilter.size();
}

} // namespace joblist

67 changes: 67 additions & 0 deletions dbcon/joblist/blockedbloomfilter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include <array>
#include <atomic>
#include <cmath>

// Debug
#include <string>
#include <bitset>

#include "bytestream.h"

namespace joblist
{

class BlockedBloomFilter
{
public:
BlockedBloomFilter() = default;

void insert(uint32_t hash);
bool probe(uint32_t hash) const;

void serialize(messageqcpp::ByteStream& bs) const;
void deserialize(messageqcpp::ByteStream& bs);

size_t getSize() const;

private:
// Member variables
static constexpr uint8_t HASH_FUNC_COUNT = 8;
static constexpr uint32_t SALTS[HASH_FUNC_COUNT] =
{
0x47b6137b,
0x44974d91,
0x8824ad5b,
0xa2b7289d,
0x705495c7,
0x2df1424b,
0x9efc4947,
0x5c6bfb31
};

// Calculating BF's parameters at compile-time
static constexpr uint8_t BLOCK_SIZE = 64;
static constexpr uint32_t EXTENT_SIZE = 8'000'000UL;
static constexpr uint32_t DOUBLE_EXTENT_SIZE = 2*8'000'000UL;
static constexpr double FALSE_POSITIVE_RATE = 0.01;
static constexpr double lnFP = 4.605170186; // lnFP <- |ln(FPR)|
static constexpr double ln2sqr = 0.4804530139; // pow(ln(2), 2)
static constexpr uint32_t NUMBER_OF_BITS = (EXTENT_SIZE * lnFP) / ln2sqr;
static constexpr uint32_t BLOOM_FILTER_BLOCK_COUNT = (NUMBER_OF_BITS + BLOCK_SIZE - 1) / BLOCK_SIZE;

std::array<std::atomic<uint64_t>, BLOOM_FILTER_BLOCK_COUNT> bloomFilter = {};

// Private member functions
inline uint32_t mix32(uint32_t hash) const;

};




} // namespace joblist



1 change: 1 addition & 0 deletions dbcon/joblist/distributedenginecomm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,7 @@ int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
case BATCH_PRIMITIVE_ADD_JOINER:
case BATCH_PRIMITIVE_END_JOINER:
case BATCH_PRIMITIVE_ABORT:
case BATCH_PRIMITIVE_BLOOM_FILTER:
case DICT_CREATE_EQUALITY_FILTER:
case DICT_DESTROY_EQUALITY_FILTER:
/* XXXPAT: This relies on the assumption that the first pmCount "PMS*"
Expand Down
1 change: 1 addition & 0 deletions dbcon/joblist/primitivemsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ enum ISMPACKETCOMMAND
BATCH_PRIMITIVE_END_JOINER = PRIM_LOCALBASE + 11,
BATCH_PRIMITIVE_ACK = PRIM_LOCALBASE + 12,
BATCH_PRIMITIVE_ABORT = PRIM_LOCALBASE + 13,
BATCH_PRIMITIVE_BLOOM_FILTER = PRIM_LOCALBASE + 14,

// max of 100-50=50 commands
COL_RESULTS = PRIM_COLBASE + 0,
Expand Down
8 changes: 8 additions & 0 deletions dbcon/joblist/primitivestep.h
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,14 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
bool compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const;
bool hasPCFilter, hasPMFilter, hasRIDFilter, hasSegmentFilter, hasDBRootFilter, hasSegmentDirFilter,
hasPartitionFilter, hasMaxFilter, hasMinFilter, hasLBIDFilter, hasExtentIDFilter;

// Blocked Bloom filter
std::vector<std::shared_ptr<std::array<std::optional<BlockedBloomFilter>, 2>>> bloomFilters;
void serializeBloomFilters();

public:
void setBloomFilters(std::vector<std::shared_ptr<std::array<std::optional<BlockedBloomFilter>, 2>>>&& bloomFilters);

};

/** @brief class FilterStep
Expand Down
63 changes: 63 additions & 0 deletions dbcon/joblist/tuple-bps.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,10 @@ void TupleBPS::run()
BPPIsAllocated = true;

if (doJoin && tjoiners[0]->inPM())
{
serializeBloomFilters();
serializeJoiner();
}

prepCasualPartitioning();
startPrimitiveThread();
Expand Down Expand Up @@ -3394,6 +3397,66 @@ void TupleBPS::abort()
abort_nolock();
}

void TupleBPS::setBloomFilters(std::vector<std::shared_ptr<std::array<std::optional<BlockedBloomFilter>, 2>>>&& bloomFilters)
{
this->bloomFilters = std::move(bloomFilters);
}

void TupleBPS::serializeBloomFilters()
{
messageqcpp::ByteStream bs;
ISMPacketHeader ism{};

ism.Interleave = 0;
ism.Flags = 0;
ism.Command = BATCH_PRIMITIVE_BLOOM_FILTER;
ism.Type = 2;

uint32_t messageSize = sizeof(ISMPacketHeader);

// How to get rid of iterating over BFs 2 times?
for (const auto& joinerBF : bloomFilters)
{
for (size_t i = 0; i < joinerBF->size(); ++i)
{
messageSize += sizeof(uint8_t);

if ((*joinerBF)[i].has_value())
{
messageSize += (*joinerBF)[i]->getSize() * sizeof(uint64_t);
}

}
}

ism.Size = messageSize;
bs.append((uint8_t*)&ism, sizeof(ism));

bs << txnId();
bs << sessionId();
bs << static_cast<uint32_t>(stepId());
bs << uniqueID;

for (const auto& joinerBF : bloomFilters)
{
for (size_t i = 0; i < joinerBF->size(); ++i)
{
uint8_t hasBloomFilter = (*joinerBF)[i].has_value() ? 1 : 0;
bs << hasBloomFilter;

if (hasBloomFilter)
{
(*joinerBF)[i]->serialize(bs);
}

}
}

SBS sbs(new messageqcpp::ByteStream(bs));
fDec->write(uniqueID, sbs);

}

template bool TupleBPS::processOneFilterType<int64_t>(int8_t colWidth, int64_t value, uint32_t type) const;
template bool TupleBPS::processOneFilterType<int128_t>(int8_t colWidth, int128_t value, uint32_t type) const;

Expand Down
10 changes: 9 additions & 1 deletion dbcon/joblist/tuplehashjoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ void TupleHashJoinStep::startSmallRunners(uint index)
if (typelessJoin[index])
{
joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index], largeSideKeys[index],
jt, &jobstepThreadPool, resourceManager, numCores));
jt, &jobstepThreadPool, resourceManager, numCores, bloomFilters[index]));
}
else
{
Expand Down Expand Up @@ -632,8 +632,14 @@ void TupleHashJoinStep::hjRunner()
rgData.reset(new vector<RGData>[smallDLs.size()]);
memUsedByEachJoin.reset(new ssize_t[smallDLs.size()]);


bloomFilters.resize(smallDLs.size());
for (i = 0; i < smallDLs.size(); i++)
{
bloomFilters[i] = std::make_shared<std::array<std::optional<BlockedBloomFilter>, 2>>();
(*(bloomFilters[i]))[0].emplace();
atomicops::atomicZero(&memUsedByEachJoin[i]);
}

try
{
Expand Down Expand Up @@ -682,6 +688,8 @@ void TupleHashJoinStep::hjRunner()
jobstepThreadPool.join(smallRunners);
smallRunners.clear();

largeBPS->setBloomFilters(std::move(bloomFilters));

for (i = 0; i < feIndexes.size() && joiners.size() > 0; i++)
joiners[feIndexes[i]]->setFcnExpFilter(fe[i]);

Expand Down
5 changes: 5 additions & 0 deletions dbcon/joblist/tuplehashjoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <utility>
#include "resourcemanager.h"
#include "exceptclasses.h"
#include "blockedbloomfilter.h"

namespace joblist
{
Expand Down Expand Up @@ -658,6 +659,10 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep
void outOfMemoryHandler(std::shared_ptr<joiner::TupleJoiner> joiner);

friend class DiskJoinStep;

// Blocked Bloom Filter
std::vector<std::shared_ptr<std::array<std::optional<BlockedBloomFilter>, 2>>> bloomFilters;

};

} // namespace joblist
30 changes: 30 additions & 0 deletions primitives/primproc/batchprimitiveprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2695,4 +2695,34 @@ void BatchPrimitiveProcessor::buildVSSCache(uint32_t loopCount)
vssCache.insert(make_pair(lbidList[i], vssData[i]));
}

void BatchPrimitiveProcessor::addBloomFilters([[maybe_unused]] messageqcpp::ByteStream& bs)
{
bs.advance(sizeof(ISMPacketHeader) + 4 * sizeof(uint32_t));
if (bloomFilters.empty() && doJoin)
{
bloomFilters.resize(joinerCount);
}

for (size_t j = 0; j < joinerCount; ++j)
{
for (size_t i = 0; i < 2; ++i)
{
uint8_t hasBloomFilter = 0;
bs >> hasBloomFilter;

if (hasBloomFilter)
{
if (!bloomFilters[j])
bloomFilters[j] = std::make_shared<std::array<std::optional<BlockedBloomFilter>, 2>>();

(*bloomFilters[j]).at(i).emplace();
(*bloomFilters[j]).at(i)->deserialize(bs);

}

}
}

}

} // namespace primitiveprocessor
6 changes: 6 additions & 0 deletions primitives/primproc/batchprimitiveprocessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ class BatchPrimitiveProcessor
bool initiatedByEM_;
uint32_t weight_;

// Blocked Bloom Filter
std::vector<std::shared_ptr<std::array<std::optional<joblist::BlockedBloomFilter>, 2>>> bloomFilters;
public:
void addBloomFilters(messageqcpp::ByteStream& bs);
private:

uint32_t maxPmJoinResultCount = 1048576;
friend class Command;
friend class ColumnCommand;
Expand Down
Loading