Skip to content

Commit

Permalink
Merge remote-tracking branch 'ad-freiburg/master' into refactor-resul…
Browse files Browse the repository at this point in the history
…t-table
  • Loading branch information
RobinTF committed Jun 5, 2024
2 parents 999baee + bb6e2f9 commit c291ff7
Show file tree
Hide file tree
Showing 17 changed files with 956 additions and 253 deletions.
36 changes: 36 additions & 0 deletions src/engine/QueryPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1642,6 +1642,13 @@ std::vector<QueryPlanner::SubtreePlan> QueryPlanner::createJoinCandidates(
return {makeSubtreePlan<OptionalJoin>(_qec, a._qet, b._qet)};
}

// Check if one of the two Operations is a SERVICE. If so, we can try
// to simplify the Service Query using the result of the other operation.
if (auto opt = createJoinWithService(a, b, jcs)) {
candidates.push_back(std::move(opt.value()));
return candidates;
}

if (jcs.size() >= 2) {
// If there are two or more join columns and we are not using the
// TwoColumnJoin (the if part before this comment), use a multiColumnJoin.
Expand Down Expand Up @@ -1770,6 +1777,35 @@ auto QueryPlanner::createJoinWithHasPredicateScan(
return plan;
}

// _____________________________________________________________________
auto QueryPlanner::createJoinWithService(
SubtreePlan a, SubtreePlan b,
const std::vector<std::array<ColumnIndex, 2>>& jcs)
-> std::optional<SubtreePlan> {
auto aRootOp = std::dynamic_pointer_cast<Service>(a._qet->getRootOperation());
auto bRootOp = std::dynamic_pointer_cast<Service>(b._qet->getRootOperation());

// Exactly one of the two Operations can be a service.
if (static_cast<bool>(aRootOp) == static_cast<bool>(bRootOp)) {
return std::nullopt;
}

auto service = aRootOp ? aRootOp : bRootOp;
auto sibling = bRootOp ? a : b;

service->setSiblingTree(sibling._qet);

const auto& qec = service->getExecutionContext();

SubtreePlan plan =
jcs.size() == 1
? makeSubtreePlan<Join>(qec, a._qet, b._qet, jcs[0][0], jcs[0][1])
: makeSubtreePlan<MultiColumnJoin>(qec, a._qet, b._qet);
mergeSubtreePlanIds(plan, a, b);

return plan;
}

// _____________________________________________________________________
void QueryPlanner::QueryGraph::setupGraph(
const std::vector<SubtreePlan>& leafOperations) {
Expand Down
4 changes: 4 additions & 0 deletions src/engine/QueryPlanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ class QueryPlanner {
SubtreePlan a, SubtreePlan b,
const std::vector<std::array<ColumnIndex, 2>>& jcs);

[[nodiscard]] static std::optional<SubtreePlan> createJoinWithService(
SubtreePlan a, SubtreePlan b,
const std::vector<std::array<ColumnIndex, 2>>& jcs);

[[nodiscard]] vector<SubtreePlan> getOrderByRow(
const ParsedQuery& pq,
const std::vector<std::vector<SubtreePlan>>& dpTab) const;
Expand Down
83 changes: 80 additions & 3 deletions src/engine/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
#include "engine/Service.h"

#include <absl/strings/str_cat.h>
#include <absl/strings/str_join.h>
#include <absl/strings/str_split.h>

#include "engine/CallFixedSize.h"
#include "engine/ExportQueryExecutionTrees.h"
#include "engine/Values.h"
#include "engine/VariableToColumnMap.h"
#include "global/RuntimeParameters.h"
#include "parser/TokenizerCtre.h"
#include "parser/TurtleParser.h"
#include "util/Exception.h"
Expand All @@ -21,18 +24,24 @@
// ____________________________________________________________________________
Service::Service(QueryExecutionContext* qec,
parsedQuery::Service parsedServiceClause,
GetTsvFunction getTsvFunction)
GetTsvFunction getTsvFunction,
std::shared_ptr<QueryExecutionTree> siblingTree)
: Operation{qec},
parsedServiceClause_{std::move(parsedServiceClause)},
getTsvFunction_{std::move(getTsvFunction)} {}
getTsvFunction_{std::move(getTsvFunction)},
siblingTree_{std::move(siblingTree)} {}

// ____________________________________________________________________________
std::string Service::getCacheKeyImpl() const {
std::ostringstream os;
// TODO: This duplicates code in GraphPatternOperation.cpp .
os << "SERVICE " << parsedServiceClause_.serviceIri_.toSparql() << " {\n"
<< parsedServiceClause_.prologue_ << "\n"
<< parsedServiceClause_.graphPatternAsString_ << "\n}\n";
<< parsedServiceClause_.graphPatternAsString_ << "\n";
if (siblingTree_ != nullptr) {
os << siblingTree_->getRootOperation()->getCacheKey() << "\n";
}
os << "}\n";
return std::move(os).str();
}

Expand Down Expand Up @@ -92,6 +101,14 @@ Result Service::computeResult([[maybe_unused]] bool requestLaziness) {
serviceIriString.remove_suffix(1);
ad_utility::httpUtils::Url serviceUrl{serviceIriString};

// Try to simplify the Service Query using it's sibling Operation.
if (auto valuesClause = getSiblingValuesClause(); valuesClause.has_value()) {
auto openBracketPos = parsedServiceClause_.graphPatternAsString_.find('{');
parsedServiceClause_.graphPatternAsString_ =
"{\n" + valuesClause.value() + '\n' +
parsedServiceClause_.graphPatternAsString_.substr(openBracketPos + 1);
}

// Construct the query to be sent to the SPARQL endpoint.
std::string variablesForSelectClause = absl::StrJoin(
parsedServiceClause_.visibleVariables_, " ", Variable::AbslFormatter);
Expand Down Expand Up @@ -159,6 +176,66 @@ Result Service::computeResult([[maybe_unused]] bool requestLaziness) {
return {std::move(idTable), resultSortedOn(), std::move(localVocab)};
}

// ____________________________________________________________________________
std::optional<std::string> Service::getSiblingValuesClause() const {
if (siblingTree_ == nullptr) {
return std::nullopt;
}

const auto& siblingResult = siblingTree_->getResult();
if (siblingResult->idTable().size() >
RuntimeParameters().get<"service-max-value-rows">()) {
return std::nullopt;
}

checkCancellation();

std::vector<ColumnIndex> commonColumnIndices;
const auto& siblingVars = siblingTree_->getVariableColumns();
std::string vars = "(";
for (const auto& localVar : parsedServiceClause_.visibleVariables_) {
auto it = siblingVars.find(localVar);
if (it == siblingVars.end()) {
continue;
}
absl::StrAppend(&vars, it->first.name(), " ");
commonColumnIndices.push_back(it->second.columnIndex_);
}
vars.back() = ')';

checkCancellation();

ad_utility::HashSet<std::string> rowSet;

std::string values = " { ";
for (size_t rowIndex = 0; rowIndex < siblingResult->idTable().size();
++rowIndex) {
std::string row = "(";
for (const auto& columnIdx : commonColumnIndices) {
const auto& optionalString = ExportQueryExecutionTrees::idToStringAndType(
siblingTree_->getRootOperation()->getIndex(),
siblingResult->idTable()(rowIndex, columnIdx),
siblingResult->localVocab());

if (optionalString.has_value()) {
absl::StrAppend(&row, optionalString.value().first, " ");
}
}
row.back() = ')';

if (rowSet.contains(row)) {
continue;
}

rowSet.insert(row);
absl::StrAppend(&values, row, " ");

checkCancellation();
}

return "VALUES " + vars + values + "} . ";
}

// ____________________________________________________________________________
template <size_t I>
void Service::writeTsvResult(cppcoro::generator<std::string_view> tsvResult,
Expand Down
21 changes: 20 additions & 1 deletion src/engine/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class Service : public Operation {
// The function used to obtain the result from the remote endpoint.
GetTsvFunction getTsvFunction_;

// The siblingTree, used for SERVICE clause optimization.
std::shared_ptr<QueryExecutionTree> siblingTree_;

public:
// Construct from parsed Service clause.
//
Expand All @@ -54,14 +57,27 @@ class Service : public Operation {
// but in our tests (`ServiceTest`) we use a mock function that does not
// require a running `HttpServer`.
Service(QueryExecutionContext* qec, parsedQuery::Service parsedServiceClause,
GetTsvFunction getTsvFunction = sendHttpOrHttpsRequest);
GetTsvFunction getTsvFunction = sendHttpOrHttpsRequest,
std::shared_ptr<QueryExecutionTree> siblingTree = nullptr);

// Set the siblingTree (subTree that will later be joined with the Result of
// the Service Operation), used to reduce the Service Queries Complexity.
void setSiblingTree(std::shared_ptr<QueryExecutionTree> siblingTree) {
siblingTree_ = siblingTree;
}

// Methods inherited from base class `Operation`.
std::string getDescriptor() const override;
size_t getResultWidth() const override;
std::vector<ColumnIndex> resultSortedOn() const override { return {}; }
float getMultiplicity(size_t col) override;

// Getters for testing.
const auto& getSiblingTree() const { return siblingTree_; }
const auto& getGraphPatternAsString() const {
return parsedServiceClause_.graphPatternAsString_;
}

private:
uint64_t getSizeEstimateBeforeLimit() override;

Expand All @@ -82,6 +98,9 @@ class Service : public Operation {
// Compute the result using `getTsvFunction_`.
Result computeResult([[maybe_unused]] bool requestLaziness) override;

// Get a VALUES clause that contains the values of the siblingTree's result.
std::optional<std::string> getSiblingValuesClause() const;

// Write the given TSV result to the given result object. The `I` is the width
// of the result table.
//
Expand Down
6 changes: 4 additions & 2 deletions src/engine/Values.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,10 @@ void Values::writeValues(IdTable* idTablePtr, LocalVocab* localVocab) {
std::vector<size_t> numLocalVocabPerColumn(idTable.numColumns());
for (auto& row : parsedValues_._values) {
for (size_t colIdx = 0; colIdx < idTable.numColumns(); colIdx++) {
TripleComponent& tc = row[colIdx];
Id id = std::move(tc).toValueId(getIndex().getVocab(), *localVocab);
const TripleComponent& tc = row[colIdx];
// TODO<joka921> We don't want to move, but also don't want to
// unconditionally copy.
Id id = TripleComponent{tc}.toValueId(getIndex().getVocab(), *localVocab);
idTable(rowIdx, colIdx) = id;
if (id.getDatatype() == Datatype::LocalVocabIndex) {
++numLocalVocabPerColumn[colIdx];
Expand Down
3 changes: 2 additions & 1 deletion src/global/RuntimeParameters.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ inline auto& RuntimeParameters() {
30s}),
SizeT<"lazy-index-scan-max-size-materialization">{1'000'000},
Bool<"use-binsearch-transitive-path">{true},
Bool<"group-by-hash-map-enabled">{false}};
Bool<"group-by-hash-map-enabled">{false},
SizeT<"service-max-value-rows">{100}};
}();
return params;
}
Expand Down
85 changes: 17 additions & 68 deletions src/parser/ParsedQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "parser/PropertyPath.h"
#include "parser/SelectClause.h"
#include "parser/TripleComponent.h"
#include "parser/UpdateClause.h"
#include "parser/data/GroupKey.h"
#include "parser/data/LimitOffsetClause.h"
#include "parser/data/OrderKey.h"
Expand Down Expand Up @@ -49,75 +50,8 @@ class SparqlPrefix {
bool operator==(const SparqlPrefix&) const = default;
};

inline bool isVariable(const string& elem) { return elem.starts_with("?"); }
inline bool isVariable(const TripleComponent& elem) {
return elem.isVariable();
}

inline bool isVariable(const PropertyPath& elem) {
return elem._operation == PropertyPath::Operation::IRI &&
isVariable(elem._iri);
}

std::ostream& operator<<(std::ostream& out, const PropertyPath& p);

// Data container for parsed triples from the where clause.
// It is templated on the predicate type, see the instantiations below.
template <typename Predicate>
class SparqlTripleBase {
public:
using AdditionalScanColumns = std::vector<std::pair<ColumnIndex, Variable>>;
SparqlTripleBase(TripleComponent s, Predicate p, TripleComponent o,
AdditionalScanColumns additionalScanColumns = {})
: s_(std::move(s)),
p_(std::move(p)),
o_(std::move(o)),
additionalScanColumns_(std::move(additionalScanColumns)) {}

bool operator==(const SparqlTripleBase& other) const = default;
TripleComponent s_;
Predicate p_;
TripleComponent o_;
// The additional columns (e.g. patterns) that are to be attached when
// performing an index scan using this triple.
// TODO<joka921> On this level we should not store `ColumnIndex`, but the
// special predicate IRIs that are to be attached here.
std::vector<std::pair<ColumnIndex, Variable>> additionalScanColumns_;
};

// A triple where the predicate is a `TripleComponent`, so a fixed entity or a
// variable, but not a property path.
class SparqlTripleSimple : public SparqlTripleBase<TripleComponent> {
using Base = SparqlTripleBase<TripleComponent>;
using Base::Base;
};

// A triple where the predicate is a `PropertyPath` (which technically still
// might be a variable or fixed entity in the current implementation).
class SparqlTriple : public SparqlTripleBase<PropertyPath> {
public:
using Base = SparqlTripleBase<PropertyPath>;
using Base::Base;

// ___________________________________________________________________________
SparqlTriple(TripleComponent s, const std::string& p_iri, TripleComponent o)
: Base{std::move(s), PropertyPath::fromIri(p_iri), std::move(o)} {}

// ___________________________________________________________________________
[[nodiscard]] string asString() const;

// Convert to a simple triple. Fails with an exception if the predicate
// actually is a property path.
SparqlTripleSimple getSimple() const {
AD_CONTRACT_CHECK(p_.isIri());
TripleComponent p =
isVariable(p_._iri)
? TripleComponent{Variable{p_._iri}}
: TripleComponent(TripleComponent::Iri::fromIriref(p_._iri));
return {s_, p, o_, additionalScanColumns_};
}
};

// Forward declaration
namespace parsedQuery {
struct GraphPatternOperation;
Expand All @@ -132,6 +66,8 @@ class ParsedQuery {

using ConstructClause = parsedQuery::ConstructClause;

using UpdateClause = parsedQuery::UpdateClause;

ParsedQuery() = default;

GraphPattern _rootGraphPattern;
Expand All @@ -147,7 +83,8 @@ class ParsedQuery {

// explicit default initialisation because the constructor
// of SelectClause is private
std::variant<SelectClause, ConstructClause> _clause{SelectClause{}};
std::variant<SelectClause, ConstructClause, UpdateClause> _clause{
SelectClause{}};

[[nodiscard]] bool hasSelectClause() const {
return std::holds_alternative<SelectClause>(_clause);
Expand All @@ -157,6 +94,10 @@ class ParsedQuery {
return std::holds_alternative<ConstructClause>(_clause);
}

[[nodiscard]] bool hasUpdateClause() const {
return std::holds_alternative<UpdateClause>(_clause);
}

[[nodiscard]] decltype(auto) selectClause() const {
return std::get<SelectClause>(_clause);
}
Expand All @@ -165,6 +106,10 @@ class ParsedQuery {
return std::get<ConstructClause>(_clause);
}

[[nodiscard]] decltype(auto) updateClause() const {
return std::get<UpdateClause>(_clause);
}

[[nodiscard]] decltype(auto) selectClause() {
return std::get<SelectClause>(_clause);
}
Expand All @@ -173,6 +118,10 @@ class ParsedQuery {
return std::get<ConstructClause>(_clause);
}

[[nodiscard]] decltype(auto) updateClause() {
return std::get<UpdateClause>(_clause);
}

// Add a variable, that was found in the query body.
void registerVariableVisibleInQueryBody(const Variable& variable);

Expand Down
Loading

0 comments on commit c291ff7

Please sign in to comment.