Skip to content

Commit

Permalink
Rework code (again) so that generator does not get cached
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTF committed Aug 7, 2024
1 parent 934015c commit c7d58a4
Show file tree
Hide file tree
Showing 18 changed files with 247 additions and 1,342 deletions.
104 changes: 34 additions & 70 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,57 +127,33 @@ ProtoResult Operation::runComputation(ad_utility::Timer& timer,
// _____________________________________________________________________________
CacheValue Operation::runComputationAndTransformToCache(
ad_utility::Timer& timer, ComputationMode computationMode,
const std::string& cacheKey) {
const std::string& cacheKey, bool pinned) {
auto& cache = _executionContext->getQueryTreeCache();
CacheableResult result{runComputation(timer, computationMode),
cache.getMaxSizeSingleEntry().getBytes()};
if (!result.isDataEvaluated()) {
result.setOnSizeChanged(
[&cache, cacheKey, runtimeInfo = getRuntimeInfoPointer()](
std::optional<std::chrono::milliseconds> duration) {
cache.recomputeSize(cacheKey);
if (duration.has_value()) {
runtimeInfo->totalTime_ += duration.value();
}
});
}
return CacheValue{std::move(result), runtimeInfo()};
}

// _____________________________________________________________________________
Result Operation::extractFromCache(
std::shared_ptr<const CacheableResult> result, bool freshlyInserted,
bool isRoot, ComputationMode computationMode) {
if (result->isDataEvaluated()) {
auto resultNumRows = result->idTable().size();
auto resultNumCols = result->idTable().numColumns();
LOG(DEBUG) << "Computed result of size " << resultNumRows << " x "
<< resultNumCols << std::endl;
}

if (result->isDataEvaluated()) {
return Result::createResultWithFullyEvaluatedIdTable(std::move(result));
}

if (freshlyInserted) {
return Result::createResultAsMasterConsumer(
std::move(result),
isRoot ? std::function{[this]() { signalQueryUpdate(); }}
: std::function<void()>{});
}
// TODO<RobinTF> timer does not make sense here.
ad_utility::Timer timer{ad_utility::Timer::Started};
return Result::createResultWithFallback(
std::move(result),
[this, timer = std::move(timer), computationMode]() mutable {
return runComputation(timer, computationMode);
auto result = Result::fromProtoResult(
runComputation(timer, computationMode),
[&cache](const IdTable& idTable) {
return cache.getMaxSizeSingleEntry() >= CacheValue::getSize(idTable);
},
[this, isRoot](auto duration) {
[this, &cache, cacheKey, pinned](Result aggregatedResult) {
cache.tryInsertIfNotPresent(
pinned, cacheKey,
CacheValue{std::move(aggregatedResult), runtimeInfo()});
});
/*
TODO<RobinTF> incorporate time calculations and query updates.
runtimeInfo().totalTime_ += duration;
if (isRoot) {
signalQueryUpdate();
}
});
*/
if (result.isDataEvaluated()) {
auto resultNumRows = result.idTable().size();
auto resultNumCols = result.idTable().numColumns();
LOG(DEBUG) << "Computed result of size " << resultNumRows << " x "
<< resultNumCols << std::endl;
}

return CacheValue{std::move(result), runtimeInfo()};
}

// ________________________________________________________________________
Expand Down Expand Up @@ -212,46 +188,34 @@ std::shared_ptr<const Result> Operation::getResult(
updateRuntimeInformationOnFailure(timer.msecs());
}
});
bool actuallyComputed = false;
auto cacheSetup = [this, &timer, computationMode, &actuallyComputed,
&cacheKey]() {
actuallyComputed = true;
return runComputationAndTransformToCache(timer, computationMode,
cacheKey);
auto cacheSetup = [this, &timer, computationMode, &cacheKey, pinResult]() {
return runComputationAndTransformToCache(timer, computationMode, cacheKey,
pinResult);
};

using ad_utility::CachePolicy;
auto suitedForCache = [](const CacheValue& cacheValue) {
return cacheValue.resultTable().isDataEvaluated();
};

CachePolicy cachePolicy = computationMode == ComputationMode::ONLY_IF_CACHED
? CachePolicy::neverCompute
: CachePolicy::computeOnDemand;
bool onlyReadFromCache = computationMode == ComputationMode::ONLY_IF_CACHED;

auto result =
pinResult ? cache.computeOncePinned(cacheKey, cacheSetup, cachePolicy)
: cache.computeOnce(cacheKey, cacheSetup, cachePolicy);
pinResult ? cache.computeOncePinned(cacheKey, cacheSetup,
onlyReadFromCache, suitedForCache)
: cache.computeOnce(cacheKey, cacheSetup, onlyReadFromCache,
suitedForCache);

if (result._resultPointer == nullptr) {
AD_CORRECTNESS_CHECK(cachePolicy == CachePolicy::neverCompute);
AD_CORRECTNESS_CHECK(onlyReadFromCache);
return nullptr;
}

if (!result._resultPointer->resultTable().isDataEvaluated() &&
computationMode == ComputationMode::FULLY_MATERIALIZED) {
AD_CORRECTNESS_CHECK(!actuallyComputed);
result = pinResult ? cache.computeOncePinned(cacheKey, cacheSetup,
CachePolicy::alwaysCompute)
: cache.computeOnce(cacheKey, cacheSetup,
CachePolicy::alwaysCompute);
}

updateRuntimeInformationOnSuccess(
result, result._resultPointer->resultTable().isDataEvaluated()
? timer.msecs()
: result._resultPointer->runtimeInfo().totalTime_);

return std::make_shared<const Result>(
extractFromCache(result._resultPointer->resultTablePtr(),
actuallyComputed, isRoot, computationMode));
return result._resultPointer->resultTablePtr();
} catch (ad_utility::CancellationException& e) {
e.setOperation(getDescriptor());
runtimeInfo().status_ = RuntimeInformation::Status::cancelled;
Expand Down
7 changes: 2 additions & 5 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,8 @@ class Operation {

CacheValue runComputationAndTransformToCache(ad_utility::Timer& timer,
ComputationMode computationMode,
const std::string& cacheKey);

Result extractFromCache(std::shared_ptr<const CacheableResult> result,
bool freshlyInserted, bool isRoot,
ComputationMode computationMode);
const std::string& cacheKey,
bool pinned);

// Create and store the complete runtime information for this operation after
// it has either been successfully computed or read from the cache.
Expand Down
31 changes: 11 additions & 20 deletions src/engine/QueryExecutionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,39 @@

class CacheValue {
private:
std::shared_ptr<CacheableResult> resultTable_;
std::shared_ptr<Result> result_;
RuntimeInformation runtimeInfo_;

public:
explicit CacheValue(CacheableResult resultTable,
RuntimeInformation runtimeInfo)
: resultTable_{std::make_shared<CacheableResult>(std::move(resultTable))},
explicit CacheValue(Result result, RuntimeInformation runtimeInfo)
: result_{std::make_shared<Result>(std::move(result))},
runtimeInfo_{std::move(runtimeInfo)} {}

CacheValue(CacheValue&&) = default;
CacheValue(const CacheValue&) = delete;
CacheValue& operator=(CacheValue&&) = default;
CacheValue& operator=(const CacheValue&) = delete;

const CacheableResult& resultTable() const noexcept { return *resultTable_; }
const Result& resultTable() const noexcept { return *result_; }

std::shared_ptr<const CacheableResult> resultTablePtr() const noexcept {
return resultTable_;
std::shared_ptr<const Result> resultTablePtr() const noexcept {
return result_;
}

const RuntimeInformation& runtimeInfo() const noexcept {
return runtimeInfo_;
}

~CacheValue() {
if (resultTable_ && !resultTable_->isDataEvaluated()) {
// Clear listeners
try {
resultTable_->setOnSizeChanged({});
} catch (...) {
// Should never happen. The listeners only throw assertion errors
// if the result is evaluated.
std::exit(1);
}
}
static ad_utility::MemorySize getSize(const IdTable& idTable) {
return ad_utility::MemorySize::bytes(idTable.size() * idTable.numColumns() *
sizeof(Id));
}

// Calculates the `MemorySize` taken up by an instance of `CacheValue`.
struct SizeGetter {
ad_utility::MemorySize operator()(const CacheValue& cacheValue) const {
if (const auto& tablePtr = cacheValue.resultTable_; tablePtr) {
return tablePtr->getCurrentSize();
if (const auto& resultPtr = cacheValue.result_; resultPtr) {
return getSize(resultPtr->idTable());
} else {
return 0_B;
}
Expand Down
7 changes: 1 addition & 6 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,7 @@ void QueryExecutionTree::readFromCache() {
auto& cache = qec_->getQueryTreeCache();
auto res = cache.getIfContained(getCacheKey());
if (res.has_value()) {
auto resultTable = res->_resultPointer->resultTablePtr();
if (resultTable->isDataEvaluated()) {
cachedResult_ = std::make_shared<const Result>(
Result::createResultWithFullyEvaluatedIdTable(
std::move(resultTable)));
}
cachedResult_ = res->_resultPointer->resultTablePtr();
}
}

Expand Down
Loading

0 comments on commit c7d58a4

Please sign in to comment.