Skip to content

Commit

Permalink
wip kmerger
Browse files Browse the repository at this point in the history
  • Loading branch information
ate47 committed Mar 21, 2024
1 parent 1662af5 commit 942bd4a
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 19 deletions.
158 changes: 142 additions & 16 deletions src/cli/tools/lib/actslibtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ namespace {

actslib::rdf::RDFParser& parser;
size_t tripleId{ 1 };
std::string buffers[rdf::RDF_TRIPLE_COUNT]{};
size_t bufferOffsets[rdf::RDF_TRIPLE_COUNT]{};
std::vector<size_t> strings[rdf::RDF_TRIPLE_COUNT]{};
std::string buffers[rdf::RDF_TRIPLE_COUNT]{ {}, {}, {} };
size_t bufferOffsets[rdf::RDF_TRIPLE_COUNT]{ 0, 0, 0 };
std::vector<size_t> strings[rdf::RDF_TRIPLE_COUNT]{ {}, {}, {} };
public:

KMergerTest(actslib::rdf::RDFParser& parser, size_t bufferSize) : parser(parser) {
Expand All @@ -25,16 +25,26 @@ namespace {
}
}

void CreateDefaultChunk(const std::filesystem::path& chunkLocation) override {
std::ofstream out{ chunkLocation, std::ios::binary };
inline void WriteEmptyChunk(const std::filesystem::path& chunkLocation, rdf::RDFComponentType type) {
std::filesystem::path s = chunkLocation / rdf::GetRDFComponentTypeName(type);

std::ofstream out{ s, std::ios::binary };

if (!out) throw std::runtime_error("Can't create default chunk");
if (!out) throw std::runtime_error(actslib::va("Can't create default chunk %s", rdf::GetRDFComponentTypeName(type)));

ToClose<std::ofstream> outClose{ out };

actslib::rdf::raio::CompressComponentWriter w{ out };

w.WriteEnd();
}

out.close();
void CreateDefaultChunk(const std::filesystem::path& chunkLocation) override {
std::filesystem::create_directories(chunkLocation);

WriteEmptyChunk(chunkLocation, rdf::RDF_SUBJECT);
WriteEmptyChunk(chunkLocation, rdf::RDF_PREDICATE);
WriteEmptyChunk(chunkLocation, rdf::RDF_OBJECT);
}

inline bool WriteChunkData(const rdf::Triple& triple, bool force, size_t index) {
Expand All @@ -52,19 +62,19 @@ namespace {
if (!force) {
return false;
}
bufferss.resize((size_t)((triple.subject->length + sizeof(size_t) + 1) * 1.5));
bufferss.resize(bufferOffsets[rdf::RDF_SUBJECT] + (size_t)((triple.subject->length + sizeof(size_t) + 1) * 1.5));
}
if (bufferOffsets[rdf::RDF_PREDICATE] + triple.predicate->length + sizeof(size_t) + 1 > bufferss.size()) {
if (bufferOffsets[rdf::RDF_PREDICATE] + triple.predicate->length + sizeof(size_t) + 1 > buffersp.size()) {
if (!force) {
return false;
}
buffersp.resize((size_t)((triple.predicate->length + sizeof(size_t) + 1) * 1.5));
buffersp.resize(bufferOffsets[rdf::RDF_PREDICATE] + (size_t)((triple.predicate->length + sizeof(size_t) + 1) * 1.5));
}
if (bufferOffsets[rdf::RDF_OBJECT] + triple.object->length + sizeof(size_t) + 1 > bufferso.size()) {
if (!force) {
return false;
}
bufferso.resize((size_t)((triple.object->length + sizeof(size_t) + 1) * 1.5));
bufferso.resize(bufferOffsets[rdf::RDF_OBJECT] + (size_t)((triple.object->length + sizeof(size_t) + 1) * 1.5));
}

size_t offs = bufferOffsets[rdf::RDF_SUBJECT];
Expand All @@ -83,14 +93,17 @@ namespace {
*reinterpret_cast<size_t*>(buffersp.data() + offp) = index;
*reinterpret_cast<size_t*>(bufferso.data() + offo) = index;

assert((bufferss.size() > offs + sizeof(size_t) && bufferss.size() - (offs + sizeof(size_t)) >= triple.subject->length) && "bad buffer s");
memcpy(bufferss.data() + offs + sizeof(size_t), triple.subject->buffer + triple.subject->offset, triple.subject->length);
assert((buffersp.size() > offs + sizeof(size_t) && buffersp.size() - (offs + sizeof(size_t)) >= triple.predicate->length) && "bad buffer p");
memcpy(buffersp.data() + offp + sizeof(size_t), triple.predicate->buffer + triple.predicate->offset, triple.predicate->length);
assert((bufferso.size() > offs + sizeof(size_t) && bufferso.size() - (offs + sizeof(size_t)) >= triple.object->length) && "bad buffer o");
memcpy(bufferso.data() + offo + sizeof(size_t), triple.object->buffer + triple.object->offset, triple.object->length);

// end str
bufferss.data()[bufferOffsets[rdf::RDF_SUBJECT]] = 0;
buffersp.data()[bufferOffsets[rdf::RDF_PREDICATE]] = 0;
bufferso.data()[bufferOffsets[rdf::RDF_OBJECT]] = 0;
bufferss.data()[bufferOffsets[rdf::RDF_SUBJECT] - 1] = 0;
buffersp.data()[bufferOffsets[rdf::RDF_PREDICATE] - 1] = 0;
bufferso.data()[bufferOffsets[rdf::RDF_OBJECT] - 1] = 0;

return true;
}
Expand All @@ -104,20 +117,51 @@ namespace {
const char* s2 = &buff[off2 + sizeof(size_t)];
return strcmp(s1, s2) < 0;
});

}

inline void WriteChunkFile(const std::filesystem::path& chunkLocation, rdf::RDFComponentType type) {
std::filesystem::path su = chunkLocation / rdf::GetRDFComponentTypeName(type);

std::ofstream out{ chunkLocation, std::ios::binary };

if (!out) throw std::runtime_error(actslib::va("Can't create chunk file of %s", rdf::GetRDFComponentTypeName(type)));

ToClose<std::ofstream> outClose{ out };

actslib::rdf::raio::CompressComponentWriter w{ out };

const auto& str = strings[type];
const auto& buff = buffers[type];

rdf::Component cmp{};
for (size_t offset : str) {
cmp.buffer = &buff[offset + sizeof(size_t)];
cmp.length = strlen(cmp.buffer);
cmp.offset = 0;

w.WriteNode(*reinterpret_cast<const size_t*>(buff.data() + offset), cmp);
}

w.WriteEnd();
};

bool CreateChunk(const std::filesystem::path& chunkLocation) override {
if (!parser) {
return false;
}
// clear buffers
for (size_t i = 0; i < rdf::RDF_TRIPLE_COUNT; i++) {
bufferOffsets[i] = 0;
strings[i].clear();
}

WriteChunkData(*parser, true, tripleId);

++parser;
tripleId++;

while (parser) {
if (!WriteChunkData(*parser, true, tripleId)) {
if (!WriteChunkData(*parser, false, tripleId)) {
break;
}
++parser;
Expand All @@ -130,12 +174,63 @@ namespace {
SortBuffer(rdf::RDF_OBJECT);

// write strings
std::filesystem::create_directories(chunkLocation);

WriteChunkFile(chunkLocation, rdf::RDF_SUBJECT);
WriteChunkFile(chunkLocation, rdf::RDF_PREDICATE);
WriteChunkFile(chunkLocation, rdf::RDF_OBJECT);


return true;
}

void MergeChunks(const std::vector<actslib::data::KMergerChunk>& chunks, const std::filesystem::path& chunkLocation) override {
std::vector<std::shared_ptr<rdf::raio::CompressComponentReaderFile>> readers{};

readers.reserve(chunks.size());
for (size_t i = 0; i < rdf::RDF_TRIPLE_COUNT; i++) {
rdf::RDFComponentType type = (rdf::RDFComponentType)i;
ToCloseFunc tcf{ [&readers] {
for (auto& r : readers) {
r->close();
}
readers.clear();
} };
for (const auto& chunk : chunks) {
readers.emplace_back(std::make_shared<rdf::raio::CompressComponentReaderFile>(chunk.file / rdf::GetRDFComponentTypeName(type)));
}

class IdCompComparator {
public:
static inline bool Compare(const rdf::raio::IdComponent& a, const rdf::raio::IdComponent& b) {
return a.comp < b.comp;
}
};

actslib::data::iterator::AllocatedMergeAIterator<const rdf::raio::IdComponent, std::shared_ptr<rdf::raio::CompressComponentReaderFile>, IdCompComparator> merger{
readers, [](auto& e) { return e; }
};


std::filesystem::path su = chunkLocation / rdf::GetRDFComponentTypeName(type);

std::ofstream out{ chunkLocation, std::ios::binary };


if (!out) throw std::runtime_error(actslib::va("Can't create merge chunk file of %s", rdf::GetRDFComponentTypeName(type)));

ToClose<std::ofstream> outClose{ out };

actslib::rdf::raio::CompressComponentWriter w{ out };

while (merger) {
auto& c = *merger;
w.WriteNode(c.id, c.comp);
}

w.WriteEnd();
}


}
};
Expand Down Expand Up @@ -311,6 +406,37 @@ namespace {
in.close();

}
else if (!_strcmpi(type, "km")) {
if (argc < 4) {
LOG_ERROR("{} {} km file", argv[0], argv[1]);
return tool::BAD_USAGE;
}

std::filesystem::path path{ argv[3] };

auto format = actslib::rdf::GuessFormat(path);

LOG_INFO("Format: {}", actslib::rdf::FormatName(format));

std::ifstream is{ path };

if (!is) {
LOG_ERROR("Can't open {}", path.string());
return tool::BASIC_ERROR;
}

std::unique_ptr<actslib::rdf::RDFParser> parserPtr = actslib::rdf::CreateParser(format, is, "http://atesab.fr/#");

KMergerTest mergerCfg{ *parserPtr, 1024 * 1024 };

data::KMerger kmerger{ "wmergertest", 20, 4, mergerCfg };

kmerger.Init();

auto end = kmerger.PushAndJoin();

LOG_INFO("End file: {}", end.string());
}
return tool::OK;
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib/actslib/actslib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace actslib {
va_start(va, fmt);

auto& buff = buf.buffer[buf.NextId()];
sprintf_s(buff, fmt);
vsprintf_s(buff, fmt, va);

va_end(va);

Expand Down
22 changes: 22 additions & 0 deletions src/lib/actslib/actslib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,26 @@ namespace actslib {
return std::ranges::copy(std::move(out).str(), ctx.out()).out;
}
};

template<typename Type>
class ToClose {
Type& v;

public:
ToClose(Type& v) : v(v) {}
~ToClose() {
v.close();
}
};
class ToCloseFunc {
std::function<void()> f;

public:
ToCloseFunc(std::function<void()> f) : f(f) {}
~ToCloseFunc() {
f();
}
};


}
2 changes: 1 addition & 1 deletion src/lib/actslib/data/iterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ namespace actslib::data::iterator {
}
};

template<typename Type, typename InputType = Type, typename Comparator = MergeIteratorBasicComp<Type>>
template<typename Type, typename InputType, typename Comparator = MergeIteratorBasicComp<Type>>
class AllocatedMergeAIterator : public AIterator<Type> {
std::vector<std::shared_ptr<AIterator<Type>>> its{};
AIterator<Type>* main;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/actslib/data/kmerger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace actslib::data {
}

inline std::filesystem::path GetNewIdSyncPath() {
return workdir / va("c_$lld", GetNewIdSync());
return workdir / va("c_%lld", GetNewIdSync());
}

void PushTask(std::function<void()> task) {
Expand Down
16 changes: 16 additions & 0 deletions src/lib/actslib/rdf/raio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ namespace actslib::rdf::raio {
}
};

class CompressComponentReaderFile : public CompressComponentReader {
std::ifstream stream;

public:
CompressComponentReaderFile(const std::filesystem::path& path) : stream(path), CompressComponentReader(stream) {
if (!stream) {
const auto str = path.string();
throw std::runtime_error(actslib::va("Can't open compress reader file path '%s'", str.c_str()));
}
}

void close() {
stream.close();
}
};



}
19 changes: 19 additions & 0 deletions src/lib/actslib/rdf/rdf.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include "rdf.hpp"

namespace actslib::rdf {
namespace {
const char* rdfnames[]{
"subject",
"predicate",
"object",
"graph",
};
}

const char* GetRDFComponentTypeName(RDFComponentType type) {
if (type >= RDF_QUAD_COUNT || type < 0) {
return "unknown";
}
return rdfnames[type];
}
}
23 changes: 23 additions & 0 deletions src/lib/actslib/rdf/rdf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ namespace actslib::rdf {
RDF_GRAPH = 3,
RDF_QUAD_COUNT = 4,
};

const char* GetRDFComponentTypeName(RDFComponentType type);

class StringComponent;

Expand Down Expand Up @@ -38,6 +40,27 @@ namespace actslib::rdf {

return d;
}

bool operator<(const Component& c2) const {
if (c2.length == length) {
return memcmp(buffer + offset, c2.buffer + c2.offset, length) < 0;
}
else if (c2.length > length) {
int r = memcmp(buffer + offset, c2.buffer + c2.offset, length);

if (r) {
return r < 0;
}
return true;
}

int r = memcmp(buffer + offset, c2.buffer + c2.offset, c2.length);

if (r) {
return r < 0;
}
return false;
}
};

class StringComponent : public Component {
Expand Down

0 comments on commit 942bd4a

Please sign in to comment.