Skip to content

Commit

Permalink
Compiling and at least the preexisting tests run through...
Browse files Browse the repository at this point in the history
TODO<joka921>
Test the merging of the located triples.

Signed-off-by: Johannes Kalmbach <[email protected]>
  • Loading branch information
joka921 committed Oct 31, 2024
1 parent 88991cd commit 1435b1c
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 67 deletions.
168 changes: 110 additions & 58 deletions src/index/CompressedRelation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ static void pruneBlock(auto& block, LimitOffsetClause& limitOffset) {
CompressedRelationReader::IdTableGenerator
CompressedRelationReader::asyncParallelBlockGenerator(
auto beginBlock, auto endBlock, const ScanImplConfig& scanConfig,
CancellationHandle cancellationHandle, LimitOffsetClause& limitOffset
) const {

CancellationHandle cancellationHandle,
LimitOffsetClause& limitOffset) const {
const auto& columnIndices = scanConfig.scanColumns_;
const auto& blockGraphFilter = scanConfig.graphFilter_;
LazyScanMetadata& details = co_await cppcoro::getDetails;
Expand Down Expand Up @@ -97,11 +96,11 @@ CompressedRelationReader::asyncParallelBlockGenerator(
CompressedBlock compressedBlock =
readCompressedBlockFromFile(blockMetadata, columnIndices);
lock.unlock();
auto decompressedBlock =
decompressAndPostprocessBlock(compressedBlock, blockMetadata.numRows_,
scanConfig, blockMetadata);
// TODO<joka921> We need to pass the information whether the block was postprocessed
// for the statistics from the `decompressAndPostprocessBlock` function.
auto decompressedBlock = decompressAndPostprocessBlock(
compressedBlock, blockMetadata.numRows_, scanConfig, blockMetadata);
// TODO<joka921> We need to pass the information whether the block was
// postprocessed for the statistics from the `decompressAndPostprocessBlock`
// function.
return std::pair{myIndex, std::optional{std::move(decompressedBlock)}};
};
const size_t numThreads =
Expand Down Expand Up @@ -247,7 +246,8 @@ CompressedRelationReader::IdTableGenerator CompressedRelationReader::lazyScan(
co_return;
}

auto config = getScanConfig(scanSpec, additionalColumns, locatedTriplesPerBlock);
auto config =
getScanConfig(scanSpec, additionalColumns, locatedTriplesPerBlock);
/*
auto columnIndices = prepareColumnIndices(scanSpec, additionalColumns);
// If we need to filter by the graph ID of the triples, then we need to add
Expand Down Expand Up @@ -284,16 +284,16 @@ CompressedRelationReader::IdTableGenerator CompressedRelationReader::lazyScan(
*/

auto getIncompleteBlock = [&](auto it) {
auto result =
readPossiblyIncompleteBlock(scanSpec, *it, std::ref(details),
config.scanColumns_, locatedTriplesPerBlock);
auto result = readPossiblyIncompleteBlock(
scanSpec, config, *it, std::ref(details), locatedTriplesPerBlock);
cancellationHandle->throwIfCancelled();
return result;
};

if (beginBlockMetadata < endBlockMetadata) {
auto block = getIncompleteBlock(beginBlockMetadata);
// TODO<joka921> we need to add the information whether the block was postprocessed for statistics.
// TODO<joka921> we need to add the information whether the block was
// postprocessed for statistics.
// TODO<joka921> Can we merge this `pruneBlock` with the above
// `postprocess` method? And can /should we include the skipped blocks
// into the `limitOffset` calculation?
Expand Down Expand Up @@ -535,34 +535,58 @@ IdTable CompressedRelationReader::scan(

// ____________________________________________________________________________
DecompressedBlock CompressedRelationReader::readPossiblyIncompleteBlock(
const ScanSpecification& scanSpec,
const ScanSpecification& scanSpec, const ScanImplConfig& scanConfig,
const CompressedBlockMetadata& blockMetadata,
std::optional<std::reference_wrapper<LazyScanMetadata>> scanMetadata,
ColumnIndicesRef columnIndices,
const LocatedTriplesPerBlock& locatedTriples) const {
std::vector<ColumnIndex> additionalColumns;
AD_CORRECTNESS_CHECK(ADDITIONAL_COLUMN_GRAPH_ID < blockMetadata.offsetsAndCompressedSize_.size());
const auto& columnIndices = scanConfig.scanColumns_;
AD_CORRECTNESS_CHECK(ADDITIONAL_COLUMN_GRAPH_ID <
blockMetadata.offsetsAndCompressedSize_.size());
std::ranges::copy(
std::views::iota(ADDITIONAL_COLUMN_GRAPH_ID + 1, blockMetadata.offsetsAndCompressedSize_.size()),
std::views::iota(ADDITIONAL_COLUMN_GRAPH_ID,
blockMetadata.offsetsAndCompressedSize_.size()),
std::back_inserter(additionalColumns));
ScanSpecification specForAllColumns{std::nullopt, std::nullopt, std::nullopt, {}, scanSpec.graphsToFilter()};
auto config = getScanConfig(specForAllColumns, std::move(additionalColumns), locatedTriples);
ScanSpecification specForAllColumns{std::nullopt,
std::nullopt,
std::nullopt,
{},
scanConfig.graphFilter_.desiredGraphs_};
auto config = getScanConfig(specForAllColumns, std::move(additionalColumns),
locatedTriples);
bool manuallyDeleteGraphColumn = scanConfig.graphFilter_.deleteGraphColumn_;
// A block is uniquely identified by its start position in the file.
auto cacheKey = blockMetadata.offsetsAndCompressedSize_.at(0).offsetInFile_;
auto sharedResultFromCache = blockCache_
.computeOnce(
cacheKey,
[&]() {
auto optBlock = readAndDecompressBlock(
blockMetadata,config);
if (optBlock.has_value()) {
return std::move(optBock.value());
}
// TODO<joka921> create an empty ID table with the correct number of columns.
},
false, [](const auto&) { return true; })
._resultPointer;
// auto cacheKey =
// blockMetadata.offsetsAndCompressedSize_.at(0).offsetInFile_;

const DecompressedBlock& block = [&]() {
auto optBlock = readAndDecompressBlock(blockMetadata, config);
if (optBlock.has_value()) {
return std::move(optBlock.value());
} else {
return DecompressedBlock{config.scanColumns_.size(), allocator_};
}
}();
/*
auto sharedResultFromCache =
blockCache_
.computeOnce(
cacheKey,
[&]() {
auto optBlock = readAndDecompressBlock(blockMetadata, config);
if (optBlock.has_value()) {
return std::move(optBlock.value());
} else {
return DecompressedBlock{config.scanColumns_.size(),
allocator_};
}
},
false, [](const auto&) { return true; })
._resultPointer;
// For debugging for now...
blockCache_.clearAll();
const DecompressedBlock& block = *sharedResultFromCache;
*/

// Find the range in the blockMetadata, that belongs to the same relation
// `col0Id`
Expand All @@ -589,12 +613,19 @@ DecompressedBlock CompressedRelationReader::readPossiblyIncompleteBlock(
filterColumn(scanSpec.col1Id(), 1);
filterColumn(scanSpec.col2Id(), 2);

DecompressedBlock result{columnIndices.size(), allocator_};
DecompressedBlock result{scanConfig.scanColumns_.size() -
static_cast<size_t>(manuallyDeleteGraphColumn),
allocator_};
result.resize(endIdx - beginIdx);
for (auto i : ad_utility::integerRange(columnIndices.size())) {
const auto& inputCol = block.getColumn(columnIndices[i]);
size_t i = 0;
for (const auto& inputColIdx :
columnIndices | std::views::filter([&](const auto& idx) {
return !manuallyDeleteGraphColumn || idx != ADDITIONAL_COLUMN_GRAPH_ID;
})) {
const auto& inputCol = block.getColumn(inputColIdx);
std::ranges::copy(inputCol.begin() + beginIdx, inputCol.begin() + endIdx,
result.getColumn(i).begin());
++i;
}
if (scanMetadata.has_value()) {
auto& details = scanMetadata.value().get();
Expand Down Expand Up @@ -651,7 +682,8 @@ std::pair<size_t, size_t> CompressedRelationReader::getResultSizeImpl(
// if necessary etc.
auto allColumns = prepareColumnIndices(scanSpec, {});
allColumns.push_back(ADDITIONAL_COLUMN_GRAPH_ID);
auto locatedTriples = prepareLocatedTriples(allColumns, locatedTriplesPerBlock);
auto locatedTriples = prepareLocatedTriples(allColumns,
locatedTriplesPerBlock);
// Determine the total size of the result.
// First accumulate the complete blocks in the "middle"
Expand All @@ -669,7 +701,8 @@ std::pair<size_t, size_t> CompressedRelationReader::getResultSizeImpl(
deleted += del;
fromIndex += block.numRows_;
} else {
fromIndex += readAndDecompressBlock(block, allColumns, locatedTriples).numRows();
fromIndex += readAndDecompressBlock(block, allColumns,
locatedTriples).numRows();
}
});
return {fromIndex - std::max(deleted, fromIndex), fromIndex + inserted};
Expand All @@ -686,15 +719,15 @@ size_t CompressedRelationReader::getResultSizeOfScan(
// col1Id
auto relevantBlocks = getRelevantBlocks(scanSpec, blocks);
auto [beginBlock, endBlock] = getBeginAndEnd(relevantBlocks);
std::array<ColumnIndex, 1> columnIndices{0u};

// The first and the last block might be incomplete (that is, only
// a part of these blocks is actually part of the result,
// set up a lambda which allows us to read these blocks, and returns
// the size of the result.
auto config = getScanConfig(scanSpec, {}, locatedTriplesPerBlock);
auto readSizeOfPossiblyIncompleteBlock = [&](const auto& block) {
return readPossiblyIncompleteBlock(scanSpec, block, std::nullopt,
columnIndices, locatedTriplesPerBlock)
return readPossiblyIncompleteBlock(scanSpec, config, block, std::nullopt,
locatedTriplesPerBlock)
.numRows();
};

Expand Down Expand Up @@ -785,9 +818,9 @@ IdTable CompressedRelationReader::getDistinctColIdsAndCountsImpl(
} else {
// Multiple `colId`s -> we have to read the block.
const auto& optionalBlock =
i == 0 ? readPossiblyIncompleteBlock(
scanSpec, blockMetadata, std::nullopt,
scanConfig.scanColumns_, locatedTriplesPerBlock)
i == 0 ? readPossiblyIncompleteBlock(scanSpec, scanConfig,
blockMetadata, std::nullopt,
locatedTriplesPerBlock)
: readAndDecompressBlock(blockMetadata, scanConfig);
cancellationHandle->throwIfCancelled();
if (!optionalBlock.has_value()) {
Expand Down Expand Up @@ -883,14 +916,14 @@ CompressedBlock CompressedRelationReader::readCompressedBlockFromFile(

// ____________________________________________________________________________
DecompressedBlock CompressedRelationReader::decompressBlock(
const CompressedBlock& compressedBlock, size_t numRowsToRead
) const {
const CompressedBlock& compressedBlock, size_t numRowsToRead) const {
DecompressedBlock decompressedBlock{compressedBlock.size(), allocator_};
decompressedBlock.resize(numRowsToRead);
for (size_t i = 0; i < compressedBlock.size(); ++i) {
auto col = decompressedBlock.getColumn(i);
decompressColumn(compressedBlock[i], numRowsToRead, col.data());
}
return decompressedBlock;
}

// ____________________________________________________________________________
Expand All @@ -906,6 +939,7 @@ DecompressedBlock CompressedRelationReader::decompressAndPostprocessBlock(
lt.includeGraphColumn_);
}
scanConfig.graphFilter_.postprocessBlock(decompressedBlock, metadata);
return decompressedBlock;
}

// ____________________________________________________________________________
Expand All @@ -921,7 +955,8 @@ void CompressedRelationReader::decompressColumn(
}

// ____________________________________________________________________________
std::optional<DecompressedBlock> CompressedRelationReader::readAndDecompressBlock(
std::optional<DecompressedBlock>
CompressedRelationReader::readAndDecompressBlock(
const CompressedBlockMetadata& blockMetaData,
const ScanImplConfig& scanConfig) const {
if (scanConfig.graphFilter_.canBlockBeSkipped(blockMetaData)) {
Expand All @@ -933,7 +968,7 @@ std::optional<DecompressedBlock> CompressedRelationReader::readAndDecompressBloc
readCompressedBlockFromFile(blockMetaData, scanConfig.scanColumns_);
const auto numRowsToRead = blockMetaData.numRows_;
return decompressAndPostprocessBlock(compressedColumns, numRowsToRead,
scanConfig, blockMetaData);
scanConfig, blockMetaData);
}

// ____________________________________________________________________________
Expand Down Expand Up @@ -1075,11 +1110,15 @@ auto CompressedRelationReader::getFirstAndLastTriple(
}
const auto& scanSpec = metadataAndBlocks.scanSpec_;

ScanSpecification scanSpecForAllColumns{
std::nullopt, std::nullopt, std::nullopt, {}, std::nullopt};
auto config = getScanConfig(scanSpecForAllColumns,
std::array{ADDITIONAL_COLUMN_GRAPH_ID},
locatedTriplesPerBlock);
auto scanBlock = [&](const CompressedBlockMetadata& block) {
// Note: the following call only returns the part of the block that
// actually matches the col0 and col1.
return readPossiblyIncompleteBlock(scanSpec, block, std::nullopt,
{{0, 1, 2, ADDITIONAL_COLUMN_GRAPH_ID}},
return readPossiblyIncompleteBlock(scanSpec, config, block, std::nullopt,
locatedTriplesPerBlock);
};

Expand Down Expand Up @@ -1129,8 +1168,14 @@ std::vector<ColumnIndex> CompressedRelationReader::prepareColumnIndices(
CompressedRelationReader::LocatedTriplesConfiguration
CompressedRelationReader::prepareLocatedTriples(
ColumnIndicesRef columns, const LocatedTriplesPerBlock& locatedTriples) {
AD_CORRECTNESS_CHECK(!columns.empty());
size_t numScanColumns = 3 - columns[0];
AD_CORRECTNESS_CHECK(std::ranges::is_sorted(columns));
size_t numScanColumns = [&]() -> size_t {
if (columns.empty() || columns[0] > 3) {
return 0;
} else {
return 3 - columns[0];
}
}();
auto it = std::ranges::find(columns, ADDITIONAL_COLUMN_GRAPH_ID);
bool containsGraphId = it != columns.end();
if (containsGraphId) {
Expand Down Expand Up @@ -1521,13 +1566,14 @@ CompressedRelationReader::getMetadataForSmallRelation(
metadata.offsetInBlock_ = 0;
ScanSpecification scanSpec{col0Id, std::nullopt, std::nullopt};
auto blocks = getRelevantBlocks(scanSpec, allBlocksMetadata);
auto config = getScanConfig(scanSpec, {}, locatedTriplesPerBlock);
AD_CONTRACT_CHECK(blocks.size() <= 1,
"Should only be called for small relations");
if (blocks.empty()) {
return std::nullopt;
}
auto block = readPossiblyIncompleteBlock(
scanSpec, blocks.front(), std::nullopt, {{1, 2}}, locatedTriplesPerBlock);
scanSpec, config, blocks.front(), std::nullopt, locatedTriplesPerBlock);
if (block.empty()) {
return std::nullopt;
}
Expand All @@ -1549,9 +1595,13 @@ CompressedRelationReader::getMetadataForSmallRelation(
return metadata;
}

auto CompressedRelationReader::getScanConfig(const ScanSpecification& scanSpec, CompressedRelationReader::ColumnIndicesRef additionalColumns, const LocatedTriplesPerBlock& locatedTriples) -> ScanImplConfig {
auto CompressedRelationReader::getScanConfig(
const ScanSpecification& scanSpec,
CompressedRelationReader::ColumnIndicesRef additionalColumns,
const LocatedTriplesPerBlock& locatedTriples) -> ScanImplConfig {
auto columnIndices = prepareColumnIndices(scanSpec, additionalColumns);
auto locatedTriplesConfig = prepareLocatedTriples(columnIndices, locatedTriples);
auto locatedTriplesConfig =
prepareLocatedTriples(columnIndices, locatedTriples);
// If we need to filter by the graph ID of the triples, then we need to add
// the graph column to the scan. If the graph column is not needed as an
// output anyway, then we have to delete it after the filtering. The
Expand All @@ -1562,7 +1612,7 @@ auto CompressedRelationReader::getScanConfig(const ScanSpecification& scanSpec,
// before any additional payload columns, else the `prepareLocatedTriples`
// logic will throw an assertion.
auto [graphColumnIndex,
deleteGraphColumn] = [&]() -> std::pair<ColumnIndex, bool> {
deleteGraphColumn] = [&]() -> std::pair<ColumnIndex, bool> {
if (!scanSpec.graphsToFilter().has_value()) {
// No filtering required, these are dummy values that are ignored by the
// filtering logic.
Expand All @@ -1580,6 +1630,8 @@ auto CompressedRelationReader::getScanConfig(const ScanSpecification& scanSpec,
}
return {idx, deleteColumn};
}();
FilterDuplicatesAndGraphs graphFilter{scanSpec.graphsToFilter(), graphColumnIndex, deleteGraphColumn};
return {std::move(columnIndices), locatedTriplesConfig, std::move(graphFilter)};
FilterDuplicatesAndGraphs graphFilter{scanSpec.graphsToFilter(),
graphColumnIndex, deleteGraphColumn};
return {std::move(columnIndices), locatedTriplesConfig,
std::move(graphFilter)};
}
7 changes: 3 additions & 4 deletions src/index/CompressedRelation.h
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,7 @@ class CompressedRelationReader {
// are returned.
std::optional<DecompressedBlock> readAndDecompressBlock(
const CompressedBlockMetadata& blockMetaData,
const ScanImplConfig& scanConfig
) const;
const ScanImplConfig& scanConfig) const;

// TODO comment.
DecompressedBlock decompressAndPostprocessBlock(
Expand All @@ -671,10 +670,10 @@ class CompressedRelationReader {
// an unnecessary copy of the block. Therefore, if you know that you need the
// whole block, use `readAndDecompressBlock` instead.
DecompressedBlock readPossiblyIncompleteBlock(
const ScanSpecification& scanSpec,
const ScanSpecification& scanSpec, const ScanImplConfig& scanConfig,
const CompressedBlockMetadata& blockMetadata,
std::optional<std::reference_wrapper<LazyScanMetadata>> scanMetadata,
ColumnIndicesRef columnIndices, const LocatedTriplesPerBlock&) const;
const LocatedTriplesPerBlock&) const;

// Yield all the blocks in the range `[beginBlock, endBlock)`. If the
// `columnIndices` are set, only the specified columns from the blocks
Expand Down
9 changes: 4 additions & 5 deletions test/CompressedRelationsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -688,8 +688,7 @@ TEST(CompressedRelationReader, filterDuplicatesAndGraphs) {
graphs.emplace();
graphs->insert(ValueId::makeFromVocabIndex(VocabIndex::make(1)));
graphs->insert(ValueId::makeFromVocabIndex(VocabIndex::make(2)));
f.graphColumn_ = 1;
f.deleteGraphColumn_ = false;
f = Filter{graphs, 1, false};
EXPECT_TRUE(f.postprocessBlock(table, metadata));
EXPECT_THAT(table, matchesIdTableFromVector({{3, 1}, {3, 2}}));

Expand All @@ -710,9 +709,9 @@ TEST(CompressedRelationReader, makeCanBeSkippedForBlock) {
{{}, 0, {V(16), V(0), V(0), g}, {V(38), V(4), V(12), g}, {}, false}, 0};

using Graphs = ScanSpecification::Graphs;
Graphs graphs = std::nullopt;
auto filter =
CompressedRelationReader::FilterDuplicatesAndGraphs{graphs, 0, false};
auto filter = CompressedRelationReader::FilterDuplicatesAndGraphs{
std::nullopt, 0, false};
auto& graphs = filter.desiredGraphs_;
// No information about the contained blocks, and no graph filter specified,
// so we cannot skip.
EXPECT_FALSE(filter.canBlockBeSkipped(metadata));
Expand Down
Loading

0 comments on commit 1435b1c

Please sign in to comment.