Skip to content

Commit

Permalink
Merge remote-tracking branch 'ad-freiburg/master' into refactor-resul…
Browse files Browse the repository at this point in the history
…t-table
  • Loading branch information
RobinTF committed Aug 7, 2024
2 parents 389a9bf + 0b9d26f commit 934015c
Show file tree
Hide file tree
Showing 35 changed files with 578 additions and 334 deletions.
15 changes: 14 additions & 1 deletion src/engine/CartesianProductJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,19 @@ ProtoResult CartesianProductJoin::computeResult(
child.setLimit(limitIfPresent.value());
}
subResults.push_back(child.getResult());

const auto& table = subResults.back()->idTable();
// Early stopping: If one of the results is empty, we can stop early.
if (subResults.back()->idTable().size() == 0) {
if (table.empty()) {
break;
}

// If one of the children is the neutral element (because of a triple with
// zero variables), we can simply ignore it here.
if (table.numRows() == 1 && table.numColumns() == 0) {
subResults.pop_back();
continue;
}
// Example for the following calculation: If we have a LIMIT of 1000 and
// the first child already has a result of size 100, then the second child
// needs to evaluate only its first 10 results. The +1 is because integer
Expand All @@ -169,6 +178,10 @@ ProtoResult CartesianProductJoin::computeResult(
}
}

// TODO<joka921> Find a solution to cheaply handle the case, that only a
// single result is left. This can probably be done by using the
// `ProtoResult`.

auto sizesView = std::views::transform(
subResults, [](const auto& child) { return child->idTable().size(); });
auto totalResultSize = std::accumulate(sizesView.begin(), sizesView.end(),
Expand Down
6 changes: 1 addition & 5 deletions src/engine/CartesianProductJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
// Chair of Algorithms and Data Structures.
// Author: Johannes Kalmbach <[email protected]>

#ifndef QLEVER_CARTESIANPRODUCTJOIN_H
#define QLEVER_CARTESIANPRODUCTJOIN_H

#pragma once
#include "engine/Operation.h"
#include "engine/QueryExecutionTree.h"

Expand Down Expand Up @@ -92,5 +90,3 @@ class CartesianProductJoin : public Operation {
std::span<const Id> inputColumn, size_t groupSize,
size_t offset);
};

#endif // QLEVER_CARTESIANPRODUCTJOIN_H
142 changes: 50 additions & 92 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <absl/strings/str_join.h>

#include <boost/optional.hpp>
#include <sstream>
#include <string>

Expand All @@ -26,17 +27,19 @@ IndexScan::IndexScan(QueryExecutionContext* qec, Permutation::Enum permutation,
numVariables_(static_cast<size_t>(subject_.isVariable()) +
static_cast<size_t>(predicate_.isVariable()) +
static_cast<size_t>(object_.isVariable())) {
// We previously had `nullptr`s here in unit tests. This is no longer
// necessary nor allowed.
AD_CONTRACT_CHECK(qec != nullptr);
for (auto& [idx, variable] : triple.additionalScanColumns_) {
additionalColumns_.push_back(idx);
additionalVariables_.push_back(variable);
}
sizeEstimate_ = computeSizeEstimate();

// Check the following invariant: The permuted input triple must contain at
// least one variable, and all the variables must be at the end of the
// Check the following invariant: All the variables must be at the end of the
// permuted triple. For example in the PSO permutation, either only the O, or
// the S and O, or all three of P, S, O can be variables, all other
// combinations are not supported.
// the S and O, or all three of P, S, O, or none of them can be variables, all
// other combinations are not supported.
auto permutedTriple = getPermutedTriple();
for (size_t i = 0; i < 3 - numVariables_; ++i) {
AD_CONTRACT_CHECK(!permutedTriple.at(i)->isVariable());
Expand All @@ -57,7 +60,7 @@ string IndexScan::getCacheKeyImpl() const {
auto permutationString = Permutation::toString(permutation_);

if (numVariables_ == 3) {
os << "SCAN FOR FULL INDEX " << permutationString << " (DUMMY OPERATION)";
os << "SCAN FOR FULL INDEX " << permutationString;

} else {
os << "SCAN " << permutationString << " with ";
Expand All @@ -66,10 +69,9 @@ string IndexScan::getCacheKeyImpl() const {
const auto& key = getPermutedTriple().at(idx)->toRdfLiteral();
os << keyString << " = \"" << key << "\"";
};
addKey(0);
if (numVariables_ == 1) {
for (size_t i = 0; i < 3 - numVariables_; ++i) {
addKey(i);
os << ", ";
addKey(1);
}
}
if (!additionalColumns_.empty()) {
Expand All @@ -92,16 +94,8 @@ size_t IndexScan::getResultWidth() const {

// _____________________________________________________________________________
vector<ColumnIndex> IndexScan::resultSortedOn() const {
switch (numVariables_) {
case 1:
return {ColumnIndex{0}};
case 2:
return {ColumnIndex{0}, ColumnIndex{1}};
case 3:
return {ColumnIndex{0}, ColumnIndex{1}, ColumnIndex{2}};
default:
AD_FAIL();
}
auto resAsView = ad_utility::integerRange(ColumnIndex{numVariables_});
return std::vector<ColumnIndex>{resAsView.begin(), resAsView.end()};
}

// _____________________________________________________________________________
Expand Down Expand Up @@ -149,12 +143,8 @@ ProtoResult IndexScan::computeResult(bool requestLaziness) {
using enum Permutation::Enum;
idTable.setNumColumns(numVariables_);
const auto& index = _executionContext->getIndex();
const auto permutedTriple = getPermutedTriple();
if (numVariables_ == 2) {
idTable = index.scan(*permutedTriple[0], std::nullopt, permutation_,
additionalColumns(), cancellationHandle_, getLimit());
} else if (numVariables_ == 1) {
idTable = index.scan(*permutedTriple[0], *permutedTriple[1], permutation_,
if (numVariables_ < 3) {
idTable = index.scan(getScanSpecification(), permutation_,
additionalColumns(), cancellationHandle_, getLimit());
} else {
AD_CORRECTNESS_CHECK(numVariables_ == 3);
Expand All @@ -169,44 +159,19 @@ ProtoResult IndexScan::computeResult(bool requestLaziness) {

// _____________________________________________________________________________
size_t IndexScan::computeSizeEstimate() const {
if (_executionContext) {
// Should always be in this branch. Else is only for test cases.

// We have to do a simple scan anyway so might as well do it now
if (numVariables_ == 1) {
// TODO<C++23> Use the monadic operation `std::optional::or_else`.
// Note: we cannot use `optional::value_or()` here, because the else
// case is expensive to compute, and we need it lazily evaluated.
if (auto size = getExecutionContext()->getQueryTreeCache().getPinnedSize(
getCacheKey());
size.has_value()) {
return size.value();
} else {
// This call explicitly has to read two blocks of triples from memory to
// obtain an exact size estimate.
return getIndex().getResultSizeOfScan(
*getPermutedTriple()[0], *getPermutedTriple().at(1), permutation_);
}
} else if (numVariables_ == 2) {
const TripleComponent& firstKey = *getPermutedTriple().at(0);
return getIndex().getCardinality(firstKey, permutation_);
} else {
// The triple consists of three variables.
// TODO<joka921> As soon as all implementations of a full index scan
// (Including the "dummy joins" in Join.cpp) consistently exclude the
// internal triples, this estimate should be changed to only return
// the number of triples in the actual knowledge graph (excluding the
// internal triples).
AD_CORRECTNESS_CHECK(numVariables_ == 3);
return getIndex().numTriples().normalAndInternal_();
}
AD_CORRECTNESS_CHECK(_executionContext);
// We have to do a simple scan anyway so might as well do it now
if (numVariables_ < 3) {
return getIndex().getResultSizeOfScan(getScanSpecification(), permutation_);
} else {
// Only for test cases. The handling of the objects is to make the
// strange query planner tests pass.
auto strLen = [](const auto& el) {
return (el.isString() ? el.getString() : el.toString()).size();
};
return 1000 + strLen(subject_) + strLen(object_) + strLen(predicate_);
// The triple consists of three variables.
// TODO<joka921> As soon as all implementations of a full index scan
// (Including the "dummy joins" in Join.cpp) consistently exclude the
// internal triples, this estimate should be changed to only return
// the number of triples in the actual knowledge graph (excluding the
// internal triples).
AD_CORRECTNESS_CHECK(numVariables_ == 3);
return getIndex().numTriples().normalAndInternal_();
}
}

Expand All @@ -219,29 +184,20 @@ size_t IndexScan::getCostEstimate() {

// _____________________________________________________________________________
void IndexScan::determineMultiplicities() {
multiplicity_.clear();
if (_executionContext) {
multiplicity_ = [this]() -> std::vector<float> {
const auto& idx = getIndex();
if (numVariables_ == 1) {
if (numVariables_ == 0) {
return {};
} else if (numVariables_ == 1) {
// There are no duplicate triples in RDF and two elements are fixed.
multiplicity_.emplace_back(1);
return {1.0f};
} else if (numVariables_ == 2) {
const auto permutedTriple = getPermutedTriple();
multiplicity_ = idx.getMultiplicities(*permutedTriple[0], permutation_);
return idx.getMultiplicities(*getPermutedTriple()[0], permutation_);
} else {
AD_CORRECTNESS_CHECK(numVariables_ == 3);
multiplicity_ = idx.getMultiplicities(permutation_);
}
} else {
// This branch is only used in certain unit tests.
multiplicity_.emplace_back(1);
if (numVariables_ == 2) {
multiplicity_.emplace_back(1);
}
if (numVariables_ == 3) {
multiplicity_.emplace_back(1);
return idx.getMultiplicities(permutation_);
}
}
}();
for ([[maybe_unused]] size_t i :
std::views::iota(multiplicity_.size(), getResultWidth())) {
multiplicity_.emplace_back(1);
Expand Down Expand Up @@ -296,6 +252,12 @@ std::array<const TripleComponent* const, 3> IndexScan::getPermutedTriple()
triple[permutation[2]]};
}

// ___________________________________________________________________________
ScanSpecificationAsTripleComponent IndexScan::getScanSpecification() const {
auto permutedTriple = getPermutedTriple();
return {*permutedTriple[0], *permutedTriple[1], *permutedTriple[2]};
}

// ___________________________________________________________________________
Permutation::IdTableGenerator IndexScan::getLazyScan(
const IndexScan& s, std::vector<CompressedBlockMetadata> blocks) {
Expand All @@ -309,6 +271,10 @@ Permutation::IdTableGenerator IndexScan::getLazyScan(
col1Id = s.getPermutedTriple()[1]->toValueId(index.getVocab()).value();
}

// This function is currently only called by the `getLazyScanForJoin...`
// functions. In these cases we always have at least one variable in each of
// the scans, because otherwise there would be no join column.
AD_CORRECTNESS_CHECK(s.numVariables_ >= 1);
// If there is a LIMIT or OFFSET clause that constrains the scan
// (which can happen with an explicit subquery), we cannot use the prefiltered
// blocks, as we currently have no mechanism to include limits and offsets
Expand All @@ -325,28 +291,20 @@ Permutation::IdTableGenerator IndexScan::getLazyScan(
// ________________________________________________________________
std::optional<Permutation::MetadataAndBlocks> IndexScan::getMetadataForScan(
const IndexScan& s) {
auto permutedTriple = s.getPermutedTriple();
const IndexImpl& index = s.getIndex().getImpl();
auto numVars = s.numVariables_;
std::optional<Id> col0Id =
numVars == 3 ? std::nullopt
: permutedTriple[0]->toValueId(index.getVocab());
std::optional<Id> col1Id =
numVars >= 2 ? std::nullopt
: permutedTriple[1]->toValueId(index.getVocab());
if ((!col0Id.has_value() && numVars < 3) ||
(!col1Id.has_value() && numVars < 2)) {
const auto& index = s.getExecutionContext()->getIndex().getImpl();
auto scanSpec = s.getScanSpecification().toScanSpecification(index);
if (!scanSpec.has_value()) {
return std::nullopt;
}

return index.getPermutation(s.permutation())
.getMetadataAndBlocks({col0Id, col1Id, std::nullopt});
.getMetadataAndBlocks(scanSpec.value());
};

// ________________________________________________________________
std::array<Permutation::IdTableGenerator, 2>
IndexScan::lazyScanForJoinOfTwoScans(const IndexScan& s1, const IndexScan& s2) {
AD_CONTRACT_CHECK(s1.numVariables_ <= 3 && s2.numVariables_ <= 3);
AD_CONTRACT_CHECK(s1.numVariables_ >= 1 && s2.numVariables_ >= 1);

// This function only works for single column joins. This means that the first
// variable of both scans must be equal, but all other variables of the scans
Expand Down Expand Up @@ -395,7 +353,7 @@ IndexScan::lazyScanForJoinOfTwoScans(const IndexScan& s1, const IndexScan& s2) {
Permutation::IdTableGenerator IndexScan::lazyScanForJoinOfColumnWithScan(
std::span<const Id> joinColumn, const IndexScan& s) {
AD_EXPENSIVE_CHECK(std::ranges::is_sorted(joinColumn));
AD_CORRECTNESS_CHECK(s.numVariables_ <= 3);
AD_CORRECTNESS_CHECK(s.numVariables_ <= 3 && s.numVariables_ > 0);

auto metaBlocks1 = getMetadataForScan(s);

Expand Down
1 change: 1 addition & 0 deletions src/engine/IndexScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class IndexScan final : public Operation {
// `permutation_`. For example if `permutation_ == PSO` then the result is
// {&predicate_, &subject_, &object_}
std::array<const TripleComponent* const, 3> getPermutedTriple() const;
ScanSpecificationAsTripleComponent getScanSpecification() const;

private:
ProtoResult computeResult(bool requestLaziness) override;
Expand Down
21 changes: 1 addition & 20 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,25 +199,6 @@ std::shared_ptr<const Result> Operation::getResult(
const bool pinResult =
_executionContext->_pinSubtrees || pinFinalResultButNotSubtrees;

// When we pin the final result but no subtrees, we need to remember the sizes
// of all involved index scans that have only one free variable. Note that
// these index scans are executed already during query planning because they
// have to be executed anyway, for any query plan. If we don't remember these
// sizes here, future queries that take the result from the cache would redo
// these index scans. Note that we do not need to remember the multiplicity
// (and distinctness) because the multiplicity for an index scan with a single
// free variable is always 1.
if (pinFinalResultButNotSubtrees) {
auto lock =
getExecutionContext()->getQueryTreeCache().pinnedSizes().wlock();
forAllDescendants([&lock](QueryExecutionTree* child) {
if (child->getRootOperation()->isIndexScanWithNumVariables(1)) {
(*lock)[child->getRootOperation()->getCacheKey()] =
child->getSizeEstimate();
}
});
}

try {
// In case of an exception, create the correct runtime info, no matter which
// exception handler is called.
Expand Down Expand Up @@ -355,7 +336,7 @@ void Operation::updateRuntimeInformationOnSuccess(

// ____________________________________________________________________________________________________________________
void Operation::updateRuntimeInformationOnSuccess(
const ConcurrentLruCache::ResultAndCacheStatus& resultAndCacheStatus,
const QueryResultCache::ResultAndCacheStatus& resultAndCacheStatus,
Milliseconds duration) {
const auto& result = resultAndCacheStatus._resultPointer->resultTable();
updateRuntimeInformationOnSuccess(
Expand Down
2 changes: 1 addition & 1 deletion src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ class Operation {
// Create and store the complete runtime information for this operation after
// it has either been successfully computed or read from the cache.
virtual void updateRuntimeInformationOnSuccess(
const ConcurrentLruCache::ResultAndCacheStatus& resultAndCacheStatus,
const QueryResultCache::ResultAndCacheStatus& resultAndCacheStatus,
Milliseconds duration) final;

// Similar to the function above, but the components are specified manually.
Expand Down
33 changes: 2 additions & 31 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,37 +74,8 @@ class CacheValue {
// Threadsafe LRU cache for (partial) query results, that
// checks on insertion, if the result is currently being computed
// by another query.
using ConcurrentLruCache = ad_utility::ConcurrentCache<
ad_utility::LRUCache<std::string, CacheValue, CacheValue::SizeGetter>>;
using PinnedSizes =
ad_utility::Synchronized<ad_utility::HashMap<std::string, size_t>,
std::shared_mutex>;
class QueryResultCache : public ConcurrentLruCache {
private:
PinnedSizes _pinnedSizes;

public:
virtual ~QueryResultCache() = default;
void clearAll() override {
// The _pinnedSizes are not part of the (otherwise threadsafe) _cache
// and thus have to be manually locked.
auto lock = _pinnedSizes.wlock();
ConcurrentLruCache::clearAll();
lock->clear();
}
// Inherit the constructor.
using ConcurrentLruCache::ConcurrentLruCache;
const PinnedSizes& pinnedSizes() const { return _pinnedSizes; }
PinnedSizes& pinnedSizes() { return _pinnedSizes; }
std::optional<size_t> getPinnedSize(const std::string& key) {
auto rlock = _pinnedSizes.rlock();
if (rlock->contains(key)) {
return rlock->at(key);
} else {
return std::nullopt;
}
}
};
using QueryResultCache = ad_utility::ConcurrentCache<
ad_utility::LRUCache<string, CacheValue, CacheValue::SizeGetter>>;

// Execution context for queries.
// Holds references to index and engine, implements caching.
Expand Down
Loading

0 comments on commit 934015c

Please sign in to comment.