Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 135 additions & 32 deletions src/core/framework/index_mapping.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ static inline int UnpackMappingSize(ailego::File &file, size_t *len) {
}

int IndexMapping::open(const std::string &path, bool cow, bool full_mode) {
path_ = path;
full_mode_ = full_mode;
copy_on_write_ = cow;
huge_page_ = Ishugetlbfs(path);
Expand All @@ -108,25 +109,47 @@ int IndexMapping::open(const std::string &path, bool cow, bool full_mode) {
return this->init_index_mapping(mapping_size);
}

int IndexMapping::create(const std::string &path, size_t segs_size) {
int IndexMapping::create(const std::string &path, size_t seg_meta_capacity) {
path_ = path;
seg_meta_capacity_ = seg_meta_capacity;
current_header_start_offset_ = 0;

// write() & copying to mmap() will auto extend the file size
if (!file_.create(path.c_str(), 0)) {
LOG_ERROR("Failed to create file %s, errno %d, %s", path.c_str(), errno,
std::strerror(errno));
return IndexError_CreateFile;
}
huge_page_ = Ishugetlbfs(path);
if (huge_page_) {
return create_hugepage(segs_size);
return init_hugepage_meta_section();
}
size_t len = CalcPageAlignedSize(segs_size + sizeof(IndexFormat::MetaHeader) +
sizeof(IndexFormat::MetaFooter),
false);
return init_meta_section();
}

int IndexMapping::init_meta_section() {
if (current_header_start_offset_ % ailego::MemoryHelper::PageSize() != 0) {
LOG_ERROR("File offset %zu is not a multiple of the page size: %zu",
current_header_start_offset_, ailego::MemoryHelper::PageSize());
return IndexError_InvalidValue;
}

auto &path = path_;
size_t len =
CalcPageAlignedSize(seg_meta_capacity_ + sizeof(IndexFormat::MetaHeader) +
sizeof(IndexFormat::MetaFooter),
false);

IndexFormat::MetaHeader meta_header;
IndexFormat::MetaFooter meta_footer;

// Write index header
IndexFormat::SetupMetaHeader(&meta_header, len - sizeof(meta_footer), len);
if (!file_.seek(current_header_start_offset_, ailego::File::Origin::Begin)) {
LOG_ERROR("Failed to seek file %s, errno %d, %s", path.c_str(), errno,
std::strerror(errno));
return IndexError_SeekFile;
}
if (file_.write(&meta_header, sizeof(meta_header)) != sizeof(meta_header)) {
LOG_ERROR("Failed to write file: %s, errno %d, %s", path.c_str(), errno,
std::strerror(errno));
Expand Down Expand Up @@ -155,11 +178,18 @@ int IndexMapping::create(const std::string &path, size_t segs_size) {
return this->init_index_mapping(len);
}

int IndexMapping::create_hugepage(size_t segs_size) {
size_t len = CalcPageAlignedSize(segs_size + sizeof(IndexFormat::MetaHeader) +
sizeof(IndexFormat::MetaFooter),
true);
int file_offset = 0;
int IndexMapping::init_hugepage_meta_section() {
ssize_t file_offset = (ssize_t)current_header_start_offset_;
if (file_offset % ailego::MemoryHelper::HugePageSize() != 0) {
LOG_ERROR("File offset %zu is not a multiple of the page size: %zu",
file_offset, ailego::MemoryHelper::HugePageSize());
return IndexError_InvalidValue;
}

size_t len =
CalcPageAlignedSize(seg_meta_capacity_ + sizeof(IndexFormat::MetaHeader) +
sizeof(IndexFormat::MetaFooter),
true);
int opts = ailego::File::MMAP_SHARED | ailego::File::MMAP_HUGE_PAGE;
void *addr =
ailego::File::MemoryMap(file_.native_handle(), file_offset, len, opts);
Expand Down Expand Up @@ -194,19 +224,24 @@ int IndexMapping::create_hugepage(size_t segs_size) {
IndexFormat::UpdateMetaFooter(&meta_footer, 0);
memcpy((char *)addr + file_offset, &meta_footer, sizeof(meta_footer));
file_offset += sizeof(meta_footer);

return this->init_index_mapping(len);
}

void IndexMapping::close(void) {
// Unmap all memory
this->unmap_all();
if (header_) {
ailego::File::MemoryUnmap(header_, header_->content_offset);
for (auto item : header_addr_map_) {
auto header = item.second;
ailego::File::MemoryUnmap(header, header->content_offset);
}
}
// Reset members
segment_ids_offset_ = 0;
segment_start_ = nullptr;
header_ = nullptr;
header_addr_map_.clear();
footer_ = nullptr;
index_size_ = 0u;
segments_.clear();
Expand All @@ -218,9 +253,19 @@ void IndexMapping::close(void) {
}

void IndexMapping::refresh(uint64_t check_point) {
footer_->segments_meta_crc =
ailego::Crc32c::Hash(segment_start_, footer_->segments_meta_size, 0);
IndexFormat::UpdateMetaFooter(footer_, check_point);
// support add_with_id
for (auto item : header_addr_map_) {
auto header_start_offset = item.first;
auto header = item.second;
auto footer = reinterpret_cast<IndexFormat::MetaFooter *>(
reinterpret_cast<uint8_t *>(header) + header->meta_footer_offset);
auto segment_start = reinterpret_cast<IndexFormat::SegmentMeta *>(
reinterpret_cast<uint8_t *>(header) +
(header->meta_footer_offset - footer->segments_meta_size));
footer->segments_meta_crc =
ailego::Crc32c::Hash(segment_start, footer->segments_meta_size, 0);
IndexFormat::UpdateMetaFooter(footer, check_point);
}
header_dirty_ = true;
}

Expand All @@ -238,7 +283,21 @@ int IndexMapping::append(const std::string &id, size_t size) {
size_t need_size = sizeof(IndexFormat::SegmentMeta) + id_size;
if (sizeof(IndexFormat::SegmentMeta) * footer_->segment_count + need_size >
segment_ids_offset_) {
return IndexError_NoBuffer;
LOG_DEBUG("segment meta section expanded: %s", path_.c_str());
footer_->next_meta_header_offset = index_size_;
refresh(0);
flush();
// mmap file storage write() will update segment's meta
// ailego::File::MemoryUnmap(header_, header_->content_offset);
header_ = nullptr;
footer_ = nullptr;

current_header_start_offset_ = index_size_;
const int ret =
huge_page_ ? init_hugepage_meta_section() : init_meta_section();
if (ret != 0) {
return ret;
}
}

if (!copy_on_write_ && !file_.truncate(index_size_ + size)) {
Expand All @@ -251,7 +310,8 @@ int IndexMapping::append(const std::string &id, size_t size) {
segment_ids_offset_ -= static_cast<uint32_t>(id_size);
IndexFormat::SegmentMeta *segment = segment_start_ + footer_->segment_count;
segment->segment_id_offset = segment_ids_offset_;
segment->data_index = index_size_ - header_->content_offset;
segment->data_index =
index_size_ - header_->content_offset - current_header_start_offset_;
segment->data_size = 0;
segment->data_crc = 0;
segment->padding_size = size;
Expand All @@ -265,7 +325,8 @@ int IndexMapping::append(const std::string &id, size_t size) {
footer_->content_size += size;
footer_->total_size += size;
IndexFormat::UpdateMetaFooter(footer_, 0);
segments_.emplace(id, segment);
segments_.emplace(
id, SegmentInfo{Segment{segment}, current_header_start_offset_, header_});
header_dirty_ = true;
return 0;
}
Expand All @@ -276,11 +337,14 @@ IndexMapping::Segment *IndexMapping::map(const std::string &id, bool warmup,
if (iter == segments_.end()) {
return nullptr;
}
Segment *item = &iter->second;
SegmentInfo &segment_info = iter->second;
Segment *item = &segment_info.segment;
if (!item->data()) {
auto meta = item->meta();
size_t mapping_size = meta->data_size + meta->padding_size;
size_t offset = meta->data_index + header_->content_offset;
size_t offset = segment_info.segment_header_start_offset +
segment_info.segment_header->content_offset +
meta->data_index;

void *addr = nullptr;
if (!copy_on_write_) {
Expand Down Expand Up @@ -326,7 +390,8 @@ IndexMapping::Segment *IndexMapping::map(const std::string &id, bool warmup,
void IndexMapping::unmap(const std::string &id) {
auto iter = segments_.find(id);
if (iter != segments_.end()) {
Segment *item = &iter->second;
SegmentInfo &segment_info = iter->second;
Segment *item = &segment_info.segment;

if (item->data()) {
ailego::File::MemoryUnmap(
Expand All @@ -338,7 +403,8 @@ void IndexMapping::unmap(const std::string &id) {

void IndexMapping::unmap_all(void) {
for (auto iter = segments_.begin(); iter != segments_.end(); ++iter) {
Segment *item = &iter->second;
SegmentInfo &segment_info = iter->second;
Segment *item = &segment_info.segment;

if (item->data()) {
ailego::File::MemoryUnmap(
Expand All @@ -356,14 +422,17 @@ int IndexMapping::flush(void) {
}

for (auto iter = segments_.begin(); iter != segments_.end(); ++iter) {
const Segment *item = &iter->second;
SegmentInfo &segment_info = iter->second;
Segment *item = &segment_info.segment;
if (!item->data() || !item->dirty()) {
continue;
}

size_t segment_size = item->meta()->data_size + item->meta()->padding_size;
if (full_mode_ && copy_on_write_) {
size_t off = header_->content_offset + item->meta()->data_index;
size_t off = segment_info.segment_header_start_offset +
segment_info.segment_header->content_offset +
item->meta()->data_index;
if (file_.write(off, item->data(), segment_size) != segment_size) {
LOG_ERROR("Failed to write segment, size %zu, errno %d, %s",
segment_size, errno, std::strerror(errno));
Expand All @@ -381,14 +450,21 @@ int IndexMapping::flush(void) {

header_dirty_ = false;
if (full_mode_ && copy_on_write_) {
if (file_.write(0, header_, header_->content_offset) !=
header_->content_offset) {
LOG_ERROR("Failed to write segment, size %u, errno %d, %s",
header_->content_offset, errno, std::strerror(errno));
return IndexError_WriteData;
for (auto item : header_addr_map_) {
auto header_start_offset = item.first;
auto header = item.second;
if (file_.write(header_start_offset, header, header->content_offset) !=
header->content_offset) {
LOG_ERROR("Failed to write segment, size %lu, errno %d, %s",
header->content_offset, errno, std::strerror(errno));
return IndexError_WriteData;
}
}
} else {
ailego::File::MemoryFlush(header_, header_->content_offset);
for (auto item : header_addr_map_) {
auto header = item.second;
ailego::File::MemoryFlush(header, header->content_offset);
}
}
return 0;
}
Expand All @@ -399,15 +475,16 @@ int IndexMapping::init_index_mapping(size_t len) {
if (huge_page_) {
opts |= ailego::File::MMAP_HUGE_PAGE;
}
uint8_t *start = reinterpret_cast<uint8_t *>(
ailego::File::MemoryMap(file_.native_handle(), 0, len, opts));
uint8_t *start = reinterpret_cast<uint8_t *>(ailego::File::MemoryMap(
file_.native_handle(), current_header_start_offset_, len, opts));
if (!start) {
LOG_ERROR("Failed to map file, errno %d, %s", errno, std::strerror(errno));
return IndexError_MMapFile;
}

// Unpack header
header_ = reinterpret_cast<IndexFormat::MetaHeader *>(start);
header_addr_map_.insert({current_header_start_offset_, header_});
if (header_->meta_header_size != sizeof(IndexFormat::MetaHeader)) {
return IndexError_InvalidLength;
}
Expand All @@ -416,6 +493,14 @@ int IndexMapping::init_index_mapping(size_t len) {
return IndexError_InvalidChecksum;
}

switch (header_->version) {
case IndexFormat::FORMAT_VERSION:
break;
default:
LOG_ERROR("Unsupported index version: %u", header_->version);
return IndexError_Unsupported;
}

// Unpack footer
if (header_->meta_footer_size != sizeof(IndexFormat::MetaFooter)) {
return IndexError_InvalidLength;
Expand Down Expand Up @@ -479,12 +564,30 @@ int IndexMapping::init_index_mapping(size_t len) {
segments_.emplace(
std::string(reinterpret_cast<const char *>(segment_start_) +
iter->segment_id_offset),
iter);
SegmentInfo{Segment{iter}, current_header_start_offset_, header_});
}
if (sizeof(IndexFormat::SegmentMeta) * footer_->segment_count >
segment_ids_offset_) {
return IndexError_InvalidLength;
}

// if (header_->version == IndexFormat::COMPATIBLE_FORMAT_VERSION_0X0002) {
// header_->version = IndexFormat::CURRENT_FORMAT_VERSION;
// LOG_INFO("Index file format upgraded");
// IndexFormat::UpdateMetaHeader(header_);
// footer_->segments_meta_crc =
// ailego::Crc32c::Hash(segment_start_, footer_->segments_meta_size, 0);
// IndexFormat::UpdateMetaFooter(footer_, 0);
// header_dirty_ = true;
// }

if (footer_->next_meta_header_offset > 0) {
current_header_start_offset_ = footer_->next_meta_header_offset;
// Meta sections have all the same size, so we can use the same size to map
// the next meta section
return this->init_index_mapping(len);
}

return 0;
}

Expand Down
Loading