diff --git a/dbcon/joblist/CMakeLists.txt b/dbcon/joblist/CMakeLists.txt index 3783ccf14e..d6bc794a99 100644 --- a/dbcon/joblist/CMakeLists.txt +++ b/dbcon/joblist/CMakeLists.txt @@ -4,6 +4,7 @@ include_directories(${ENGINE_COMMON_INCLUDES} ${ENGINE_SRC_DIR}/tools/passwd) set(joblist_LIB_SRCS anydatalist.cpp batchprimitiveprocessor-jl.cpp + blockedbloomfilter.cpp columncommand-jl.cpp command-jl.cpp crossenginestep.cpp diff --git a/dbcon/joblist/blockedbloomfilter.cpp b/dbcon/joblist/blockedbloomfilter.cpp new file mode 100644 index 0000000000..5600fd308c --- /dev/null +++ b/dbcon/joblist/blockedbloomfilter.cpp @@ -0,0 +1,78 @@ +#include "blockedbloomfilter.h" + +namespace joblist +{ + +void BlockedBloomFilter::insert(uint32_t hash) +{ + uint32_t blockIdx = hash % BLOOM_FILTER_BLOCK_COUNT; + uint64_t bitmask = 0; + + for (const auto& salt : SALTS) + { + uint32_t mixed = mix32(hash ^ salt); + uint8_t bitIdx = mixed % 64; + + bitmask |= (1ULL << bitIdx); + } + + bloomFilter[blockIdx].fetch_or(bitmask, std::memory_order_relaxed); +} + +bool BlockedBloomFilter::probe(uint32_t hash) const +{ + uint32_t blockIdx = hash % BLOOM_FILTER_BLOCK_COUNT; + uint64_t block = bloomFilter[blockIdx].load(std::memory_order_relaxed); + + for (const auto& salt : SALTS) + { + uint32_t mixed = mix32(hash ^ salt); + uint8_t bitIdx = mixed % 64; + + if ((block & (1ULL << bitIdx)) == 0) + { + return false; + } + + } + + return true; +} + +// SplitMix +inline uint32_t BlockedBloomFilter::mix32(uint32_t hash) const +{ + hash ^= hash >> 16; + hash *= 0x85ebca6b; + hash ^= hash >> 13; + hash *= 0xc2b2ae35; + hash ^= hash >> 16; + + return hash; +} + +void BlockedBloomFilter::serialize(messageqcpp::ByteStream& bs) const +{ + for (const auto& block : bloomFilter) + { + bs << block.load(std::memory_order_relaxed); + } +} + +void BlockedBloomFilter::deserialize(messageqcpp::ByteStream& bs) +{ + for (auto& block : bloomFilter) + { + uint64_t val; + bs >> val; + block.store(val, std::memory_order_relaxed); + } +} + +size_t BlockedBloomFilter::getSize() const +{ + return bloomFilter.size(); +} + +} // namespace joblist + diff --git a/dbcon/joblist/blockedbloomfilter.h b/dbcon/joblist/blockedbloomfilter.h new file mode 100644 index 0000000000..da5674e8fa --- /dev/null +++ b/dbcon/joblist/blockedbloomfilter.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include + +// Debug +#include +#include + +#include "bytestream.h" + +namespace joblist +{ + +class BlockedBloomFilter +{ + public: + BlockedBloomFilter() = default; + + void insert(uint32_t hash); + bool probe(uint32_t hash) const; + + void serialize(messageqcpp::ByteStream& bs) const; + void deserialize(messageqcpp::ByteStream& bs); + + size_t getSize() const; + + private: + // Member variables + static constexpr uint8_t HASH_FUNC_COUNT = 8; + static constexpr uint32_t SALTS[HASH_FUNC_COUNT] = + { + 0x47b6137b, + 0x44974d91, + 0x8824ad5b, + 0xa2b7289d, + 0x705495c7, + 0x2df1424b, + 0x9efc4947, + 0x5c6bfb31 + }; + + // Calculating BF's parameters at compile-time + static constexpr uint8_t BLOCK_SIZE = 64; + static constexpr uint32_t EXTENT_SIZE = 8'000'000UL; + static constexpr uint32_t DOUBLE_EXTENT_SIZE = 2*8'000'000UL; + static constexpr double FALSE_POSITIVE_RATE = 0.01; + static constexpr double lnFP = 4.605170186; // lnFP <- |ln(FPR)| + static constexpr double ln2sqr = 0.4804530139; // pow(ln(2), 2) + static constexpr uint32_t NUMBER_OF_BITS = (EXTENT_SIZE * lnFP) / ln2sqr; + static constexpr uint32_t BLOOM_FILTER_BLOCK_COUNT = (NUMBER_OF_BITS + BLOCK_SIZE - 1) / BLOCK_SIZE; + + std::array, BLOOM_FILTER_BLOCK_COUNT> bloomFilter = {}; + + // Private member functions + inline uint32_t mix32(uint32_t hash) const; + +}; + +using BloomFilters = std::shared_ptr, 2>>; + +} // namespace joblist + + + diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index da3a9b2d36..9c16fd1b4e 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -882,6 +882,7 @@ int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg) case BATCH_PRIMITIVE_ADD_JOINER: case BATCH_PRIMITIVE_END_JOINER: case BATCH_PRIMITIVE_ABORT: + case BATCH_PRIMITIVE_BLOOM_FILTER: case DICT_CREATE_EQUALITY_FILTER: case DICT_DESTROY_EQUALITY_FILTER: /* XXXPAT: This relies on the assumption that the first pmCount "PMS*" diff --git a/dbcon/joblist/primitivemsg.h b/dbcon/joblist/primitivemsg.h index 0d3cdaccc1..b721fdbb3e 100644 --- a/dbcon/joblist/primitivemsg.h +++ b/dbcon/joblist/primitivemsg.h @@ -167,6 +167,7 @@ enum ISMPACKETCOMMAND BATCH_PRIMITIVE_END_JOINER = PRIM_LOCALBASE + 11, BATCH_PRIMITIVE_ACK = PRIM_LOCALBASE + 12, BATCH_PRIMITIVE_ABORT = PRIM_LOCALBASE + 13, + BATCH_PRIMITIVE_BLOOM_FILTER = PRIM_LOCALBASE + 14, // max of 100-50=50 commands COL_RESULTS = PRIM_COLBASE + 0, diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index 7061d23a2f..61503b80bf 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -1472,6 +1472,14 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep bool compareRange(uint8_t COP, int64_t min, int64_t max, int64_t val) const; bool hasPCFilter, hasPMFilter, hasRIDFilter, hasSegmentFilter, hasDBRootFilter, hasSegmentDirFilter, hasPartitionFilter, hasMaxFilter, hasMinFilter, hasLBIDFilter, hasExtentIDFilter; + + // Blocked Bloom filter + std::vector bloomFilters; + void serializeBloomFilters(); + + public: + void setBloomFilters(std::vector&& bloomFilters); + }; /** @brief class FilterStep diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 1e8e0f93ee..931033f926 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -1479,8 +1479,13 @@ void TupleBPS::run() fDec->write(uniqueID, sbs); BPPIsAllocated = true; - if (doJoin && tjoiners[0]->inPM()) - serializeJoiner(); + if (doJoin) + { + if (tjoiners[0]->inPM()) + serializeJoiner(); + + serializeBloomFilters(); + } prepCasualPartitioning(); startPrimitiveThread(); @@ -3394,6 +3399,67 @@ void TupleBPS::abort() abort_nolock(); } +void TupleBPS::setBloomFilters(std::vector&& bloomFilters) +{ + this->bloomFilters = std::move(bloomFilters); +} + +void TupleBPS::serializeBloomFilters() +{ + messageqcpp::ByteStream bs; + ISMPacketHeader ism{}; + + ism.Interleave = 0; + ism.Flags = 0; + ism.Command = BATCH_PRIMITIVE_BLOOM_FILTER; + ism.Type = 2; + + uint32_t messageSize = sizeof(ISMPacketHeader); + + // How to get rid of iterating over BFs 2 times? + for (const auto& joinerBF : bloomFilters) + { + for (size_t i = 0; i < joinerBF->size(); ++i) + { + messageSize += sizeof(uint8_t); + + if ((*joinerBF)[i].has_value()) + { + messageSize += (*joinerBF)[i]->getSize() * sizeof(uint64_t); + } + + } + } + + ism.Size = messageSize; + bs.append((uint8_t*)&ism, sizeof(ism)); + + bs << txnId(); + bs << sessionId(); + bs << static_cast(stepId()); + bs << uniqueID; + bs << bloomFilters.size(); + + for (const auto& joinerBF : bloomFilters) + { + for (size_t i = 0; i < joinerBF->size(); ++i) + { + uint8_t hasBloomFilter = (*joinerBF)[i].has_value() ? 1 : 0; + bs << hasBloomFilter; + + if (hasBloomFilter) + { + (*joinerBF)[i]->serialize(bs); + } + + } + } + + SBS sbs(new messageqcpp::ByteStream(bs)); + fDec->write(uniqueID, sbs); + +} + template bool TupleBPS::processOneFilterType(int8_t colWidth, int64_t value, uint32_t type) const; template bool TupleBPS::processOneFilterType(int8_t colWidth, int128_t value, uint32_t type) const; diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 368a15fdda..4a5ae1a63c 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -228,7 +228,7 @@ void TupleHashJoinStep::startSmallRunners(uint index) if (typelessJoin[index]) { joiners[index].reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index], largeSideKeys[index], - jt, &jobstepThreadPool, resourceManager, numCores)); + jt, &jobstepThreadPool, resourceManager, numCores, bloomFilters[index])); } else { @@ -632,8 +632,14 @@ void TupleHashJoinStep::hjRunner() rgData.reset(new vector[smallDLs.size()]); memUsedByEachJoin.reset(new ssize_t[smallDLs.size()]); + + bloomFilters.resize(smallDLs.size()); for (i = 0; i < smallDLs.size(); i++) + { + bloomFilters[i] = std::make_shared, 2>>(); + (*(bloomFilters[i]))[0].emplace(); atomicops::atomicZero(&memUsedByEachJoin[i]); + } try { @@ -682,6 +688,8 @@ void TupleHashJoinStep::hjRunner() jobstepThreadPool.join(smallRunners); smallRunners.clear(); + largeBPS->setBloomFilters(std::move(bloomFilters)); + for (i = 0; i < feIndexes.size() && joiners.size() > 0; i++) joiners[feIndexes[i]]->setFcnExpFilter(fe[i]); diff --git a/dbcon/joblist/tuplehashjoin.h b/dbcon/joblist/tuplehashjoin.h index 4f590f2ded..f6c02e6b8d 100644 --- a/dbcon/joblist/tuplehashjoin.h +++ b/dbcon/joblist/tuplehashjoin.h @@ -32,6 +32,7 @@ #include #include "resourcemanager.h" #include "exceptclasses.h" +#include "blockedbloomfilter.h" namespace joblist { @@ -658,6 +659,10 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep void outOfMemoryHandler(std::shared_ptr joiner); friend class DiskJoinStep; + + // Blocked Bloom Filter + std::vector bloomFilters; + }; } // namespace joblist diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 58480e537c..ea604d4403 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -1725,6 +1725,75 @@ void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs) RowGroup largeSideRowGroup = outputRG; largeSideRowGroup.setData(&largeSideRGData); + boost::unique_lock lock(bloomFilterMutex); + bloomFilterCondition.wait(lock, + [this] { return bloomFiltersReady[getUniqueID()]; } + ); + + if (!bloomFilters.empty()) + { + std::vector keepRows(ridCount, false); + uint32_t newRidCount = 0; + Row r; + outputRG.initRow(&r); + + for (uint32_t i = 0; i < ridCount; i++) + { + outputRG.getRow(i, &r); + bool keepRow = true; + + for (uint32_t j = 0; j < joinerCount && keepRow; j++) + { + if (bloomFilters[j] && (*bloomFilters[j])[0].has_value() && typelessJoin[j]) + { + uint32_t hash = r.hashTypeless(tlLargeSideKeyColumns[j], mSmallSideKeyColumnsPtr, + mSmallSideRGPtr ? &mSmallSideRGPtr->getColWidths() : nullptr); + + if (!(*bloomFilters[j])[0]->probe(hash)) + { + keepRow = false; + } + } + } + + if (keepRow) + { + keepRows[i] = true; + newRidCount++; + } + } + + uint32_t writePos = 0; + for (uint32_t i = 0; i < ridCount; i++) + { + if (keepRows[i]) + { + if (writePos != i) + { + relRids[writePos] = relRids[i]; + values[writePos] = values[i]; + if (mJOINHasSkewedKeyColumn) + wide128Values[writePos] = wide128Values[i]; + + Row sourceRow, targetRow; + outputRG.initRow(&sourceRow); + outputRG.initRow(&targetRow); + outputRG.getRow(i, &sourceRow); + outputRG.getRow(writePos, &targetRow); + copyRow(sourceRow, &targetRow); + } + writePos++; + } + } + + ridCount = newRidCount; + outputRG.setRowCount(ridCount); + + largeSideRGData = outputRG.duplicate(); + largeSideRowGroup = outputRG; + largeSideRowGroup.setData(&largeSideRGData); + } + do // while (startRid > 0) { utils::setThreadName("BPPJoin_1"); @@ -2695,4 +2764,39 @@ void BatchPrimitiveProcessor::buildVSSCache(uint32_t loopCount) vssCache.insert(make_pair(lbidList[i], vssData[i])); } +void BatchPrimitiveProcessor::addBloomFilters([[maybe_unused]] messageqcpp::ByteStream& bs) +{ + bs.advance(sizeof(ISMPacketHeader) + 3 * sizeof(uint32_t)); + uint32_t msgUniqueID; + bs >> msgUniqueID; + + size_t bfSize = 0; + bs >> bfSize; + + bloomFilters.resize(bfSize); + for (size_t j = 0; j < bfSize; ++j) + { + for (size_t i = 0; i < 2; ++i) + { + uint8_t hasBloomFilter = 0; + bs >> hasBloomFilter; + + if (hasBloomFilter) + { + if (!bloomFilters[j]) + bloomFilters[j] = std::make_shared, 2>>(); + + (*bloomFilters[j]).at(i).emplace(); + (*bloomFilters[j]).at(i)->deserialize(bs); + } + } + } + + { + boost::unique_lock lock(bloomFilterMutex); + bloomFiltersReady[msgUniqueID] = true; + } + bloomFilterCondition.notify_all(); +} + } // namespace primitiveprocessor diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index 381fce3d55..d024453b43 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -49,6 +49,7 @@ #include "funcexpwrapper.h" #include "bppsendthread.h" #include "columnwidth.h" +#include "blockedbloomfilter.h" #ifdef PRIMPROC_STOPWATCH #include "stopwatch.h" @@ -436,6 +437,17 @@ class BatchPrimitiveProcessor bool initiatedByEM_; uint32_t weight_; + // Blocked Bloom Filter + private: + std::unordered_map bloomFiltersReady; + boost::mutex bloomFilterMutex; + boost::condition_variable bloomFilterCondition; + + std::vector bloomFilters; + public: + void addBloomFilters(messageqcpp::ByteStream& bs); + private: + uint32_t maxPmJoinResultCount = 1048576; friend class Command; friend class ColumnCommand; diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 8b48452d42..283bb4ad19 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1279,6 +1279,20 @@ struct BPPHandler } }; + struct BloomFilter : public BPPHandlerFunctor + { + BloomFilter(boost::shared_ptr r, SBS b) : BPPHandlerFunctor(std::move(r), std::move(b)) + { + } + + int operator()() override + { + utils::setThreadName("PPHandBloomFilter"); + return rt->addBloomFiltersToBPP(*bs, dieTime); + } + + }; + int doAbort(ByteStream& bs, const posix_time::ptime& dieTime) { uint32_t key; @@ -1486,6 +1500,37 @@ struct BPPHandler } } + int addBloomFiltersToBPP(ByteStream& bs, const posix_time::ptime& dieTime) + { + SBPPV bppv; + uint32_t uniqueID; + const uint8_t* buf; + + buf = bs.buf(); + uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 3 * sizeof(uint32_t)]); + + bppv = grabBPPs(uniqueID); + + if (bppv) + { + boost::shared_lock lk(getDJLock(uniqueID)); + bppv->get()[0]->addBloomFilters(bs); + return 0; + } + else + { + if (posix_time::second_clock::universal_time() > dieTime) + { + cout << "addBloomFilterToBPP: job for id " << uniqueID << " has been killed." << endl; + return 0; + } + else + { + return -1; + } + } + } + int lastJoinerMsg(ByteStream& bs, const posix_time::ptime& dieTime) { SBPPV bppv; @@ -2024,6 +2069,29 @@ struct ReadThread fBPPHandler->doAck(*sbs); break; } + + case BATCH_PRIMITIVE_BLOOM_FILTER: + { + const uint8_t* buf = sbs->buf(); + uint32_t pos = sizeof(ISMPacketHeader); + const uint32_t txnId = *((uint32_t*)&buf[pos]); + [[maybe_unused]] const uint32_t sessionID = *((uint32_t*)&buf[pos + 4]); + const uint32_t stepID = *((uint32_t*)&buf[pos + 8]); + const uint32_t uniqueID = *((uint32_t*)&buf[pos + 12]); + + const uint32_t id = 0; + const uint32_t weight = threadpool::MetaJobsInitialWeight; + const uint32_t priority = 0; + + boost::shared_ptr functor; + functor.reset(new BPPHandler::BloomFilter(fBPPHandler, sbs)); + + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + procPool->addJob(job); + + break; + } + default: { std::ostringstream os; diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index 7d3b59114c..3331a5b17e 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -166,7 +166,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, const vector& smallJoinColumns, const vector& largeJoinColumns, JoinType jt, threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, - const uint64_t numCores) + const uint64_t numCores, joblist::BloomFilters bloomFilters) : smallRG(smallInput) , largeRG(largeInput) , joinAlg(INSERTING) @@ -182,6 +182,7 @@ TupleJoiner::TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::R , jobstepThreadPool(jsThreadPool) , _convertToDiskJoin(false) , resourceManager_(rm) + , bloomFilters(bloomFilters) { uint i; @@ -314,7 +315,14 @@ void TupleJoiner::um_insertTypeless(uint threadID, uint rowCount, Row& r) td[i] = makeTypelessKey(r, smallKeyColumns, keyLength, alloc, largeRG, largeKeyColumns); if (td[i].len == 0) continue; - uint bucket = bucketPicker((char*)td[i].data, td[i].len, bpSeed) & bucketMask; + + uint32_t hash = bucketPicker((char*)td[i].data, td[i].len, bpSeed); + + if (bloomFilters->at(0).has_value()) { + bloomFilters->at(0)->insert(hash); + } + + uint bucket = hash & bucketMask; v[bucket].emplace_back(pair(td[i], r.getPointer())); } bucketsToTables(&v[0], ht); diff --git a/utils/joiner/tuplejoiner.h b/utils/joiner/tuplejoiner.h index bcd08ed1d0..f584bdfe91 100644 --- a/utils/joiner/tuplejoiner.h +++ b/utils/joiner/tuplejoiner.h @@ -38,6 +38,7 @@ #include "threadpool.h" #include "columnwidth.h" #include "mcs_string.h" +#include "blockedbloomfilter.h" namespace joiner { @@ -279,7 +280,7 @@ class TupleJoiner TupleJoiner(const rowgroup::RowGroup& smallInput, const rowgroup::RowGroup& largeInput, const std::vector& smallJoinColumns, const std::vector& largeJoinColumns, joblist::JoinType jt, threadpool::ThreadPool* jsThreadPool, joblist::ResourceManager* rm, - const uint64_t numCores); + const uint64_t numCores, joblist::BloomFilters bloomFilters); ~TupleJoiner(); @@ -567,6 +568,10 @@ class TupleJoiner joblist::ResourceManager* resourceManager_ = nullptr; bool wasAborted_ = false; void initRowsVector(); + + // Blocked Bloom filter + joblist::BloomFilters bloomFilters; + }; } // namespace joiner