diff --git a/src/umap/CMakeLists.txt b/src/umap/CMakeLists.txt index c641938..936a253 100644 --- a/src/umap/CMakeLists.txt +++ b/src/umap/CMakeLists.txt @@ -59,6 +59,17 @@ if (caliper_DIR) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DCALIPER") endif() +if (ZSTD_ROOT) + find_library(LIBZSTD NAMES zstd PATHS ${ZSTD_ROOT}/lib) + message(STATUS "Using ZSTD Compression" ) + target_include_directories(umap PUBLIC ${ZSTD_ROOT}/lib) + target_include_directories(umap-static PUBLIC ${ZSTD_ROOT}/lib) + target_link_libraries(umap ${LIBZSTD}) + target_link_libraries(umap-static ${LIBZSTD}) + target_compile_definitions(umap PUBLIC USE_COMPRESSION) + target_compile_definitions(umap-static PUBLIC USE_COMPRESSION) +endif() + set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}") install(TARGETS umap umap-static diff --git a/src/umap/store/SparseStore.cpp b/src/umap/store/SparseStore.cpp index 19397a7..e5f56ff 100644 --- a/src/umap/store/SparseStore.cpp +++ b/src/umap/store/SparseStore.cpp @@ -25,6 +25,11 @@ #include #include +#ifdef USE_COMPRESSION + #include + #include +#endif + namespace Umap { // Create mode @@ -37,30 +42,62 @@ namespace Umap { numreads = numwrites = 0; read_only = false; file_descriptors = new file_descriptor[num_files]; + file_exists_map = new int8_t[num_files]; for (int i = 0 ; i < num_files ; i++){ file_descriptors[i].id = -1; + file_exists_map[i] = 0; } DIR *directory; struct dirent *ent; std::string metadata_file_path = root_path + "/_metadata"; if ((directory = opendir(root_path.c_str())) != NULL){ - UMAP_ERROR("Directory already exist. Needs to be opened in open mode: store = new SparseStore(root_path,is_read_only); "); + UMAP_LOG(Warning, "Directory already exist... removing and creating a new directory"); // Needs to be opened in open mode: store = new SparseStore(root_path,is_read_only); "); + std::string mkdir_command("rm -r " + root_path); + const int status = std::system(mkdir_command.c_str()); + if (status == -1){ + UMAP_ERROR("Failed to remove directory" + root_path); + } + } + //else{ + int directory_creation_status = mkdir(root_path.c_str(),S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + if (directory_creation_status != 0){ + UMAP_ERROR("ERROR: Failed to create directory" << " - " << strerror(errno)); + } + std::ofstream metadata(metadata_file_path.c_str()); + if (!metadata.is_open()){ + UMAP_ERROR("Failed to open metadata file" << " - " << strerror(errno)); } else{ - int directory_creation_status = mkdir(root_path.c_str(),S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - if (directory_creation_status != 0){ - UMAP_ERROR("ERROR: Failed to create directory" << " - " << strerror(errno)); - } - std::ofstream metadata(metadata_file_path.c_str()); - if (!metadata.is_open()){ - UMAP_ERROR("Failed to open metadata file" << " - " << strerror(errno)); - } - else{ - metadata << file_size << std::endl; - // set current capacity to be the file granularity - metadata << std::max(file_size,rsize); - } + metadata << file_size << std::endl; + // set current capacity to be the file granularity + metadata << std::max(file_size,rsize); + } + + // File exists map + std::string filemap_path = root_path + "/_filemap"; + filemap_fd = open(filemap_path.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (filemap_fd == -1){ + UMAP_ERROR("SparseStore: failed to open filemap metadata - " << strerror(errno)); + } + + size_t written = pwrite(filemap_fd, (void*) file_exists_map, num_files * sizeof(int8_t), 0); + if (written == -1){ + UMAP_ERROR("SparseStore: failed to write filemap metadata - " << strerror(errno)); } + + // Zero page + char *tmp; + if (posix_memalign((void**)&tmp, ::umapcfg_get_umap_page_size(), ::umapcfg_get_umap_page_size())) { + std::cerr << "Virtual Memory Manager: Error posix_memalign - " << strerror(errno) << std::endl; + } + zero_page = (char*) mmap((void *) tmp, ::umapcfg_get_umap_page_size(), PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS | MAP_FIXED | MAP_POPULATE, -1, 0); + if (zero_page == MAP_FAILED){ + std::cerr << "Error mmap zero page " << strerror(errno) << std::endl; + exit(-1); + } + + // } } // Open mode @@ -89,8 +126,27 @@ namespace Umap { file_descriptors[i].id = -1; } } + file_exists_map = new int8_t[num_files]; + std::string filemap_path = root_path + "/_filemap"; + filemap_fd = open(filemap_path.c_str(), O_RDWR); + if (filemap_fd == -1){ + UMAP_ERROR("SparseStore: failed to open filemap metadata - " << strerror(errno)); + } + size_t read = pread(filemap_fd, (void*) file_exists_map, num_files *sizeof(int8_t), 0); + if (read == -1){ + UMAP_ERROR("SparseStore: failed to read filemap metadata - " << strerror(errno)); + } + } + char *tmp; + if (posix_memalign((void**)&tmp, ::umapcfg_get_umap_page_size(), ::umapcfg_get_umap_page_size())) { + std::cerr << "Virtual Memory Manager: Error posix_memalign - " << strerror(errno) << std::endl; + } + zero_page = (char*) mmap((void *) tmp, ::umapcfg_get_umap_page_size(), PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS | MAP_FIXED | MAP_POPULATE, -1, 0); + if (zero_page == MAP_FAILED){ + std::cerr << "Error mmap zero page " << strerror(errno) << std::endl; + exit(-1); } - closedir(directory); } @@ -100,29 +156,94 @@ namespace Umap { UMAP_LOG(Info,"SparseStore Total Reads: " << numreads); UMAP_LOG(Info,"SparseStore Total Writes: " << numwrites); delete [] file_descriptors; + int status = munmap((void *) zero_page, file_size); + if (status == -1){ + UMAP_ERROR("SparseStore: Error unmapping zero page - " << strerror(errno)); + } + if (close(filemap_fd) == -1){ + UMAP_ERROR("SparseStore: failed to close filemap metadata - " << strerror(errno)); + } + delete [] file_exists_map; } ssize_t SparseStore::read_from_store(char* buf, size_t nb, off_t off) { ssize_t read = 0; off_t file_offset; - int fd = get_fd(off, file_offset); - read = pread(fd,buf,nb,file_offset); - if(read == -1){ - UMAP_ERROR("pread(fd=" << fd << ", buff=" << (void*)buf << ", nb=" << nb << ", off=" << off << ") Failed - " << strerror(errno)); + int fd_index = off / file_size; + if (file_exists_map[fd_index] == 0){ + // return zero page + memcpy(buf, (void*) zero_page, file_size); + return file_size; + } + else{ + int fd = get_fd(off, file_offset, 0); + #ifdef USE_COMPRESSION + // Get compressed size using lseek + size_t compressed_block_size = lseek(fd, 0, SEEK_END); + if (compressed_block_size == -1){ + UMAP_ERROR("SparseStore: Failed to get file size, lseek failed with error - " << strerror(errno)); + } + // Return to start of file + int location = lseek(fd, 0, SEEK_SET); + if (location == -1){ + UMAP_ERROR("SparseStore: Failed reset lseek with error - " << strerror(errno)); + } + + char* read_buffer; + int memaligned_status = posix_memalign((void **)&read_buffer, ::umapcfg_get_umap_page_size(), compressed_block_size); + if (memaligned_status != 0){ + UMAP_ERROR("SparseStore: Allocating temporary decompression buffer failed"); + } + if (pread(fd, (void*)read_buffer, compressed_block_size, 0) == -1){ + UMAP_ERROR("pread(fd=" << fd << ", buff=" << (void*)buf << ", nb=" << nb << ", off=" << off << ") of compressed file Failed - " << strerror(errno)); + } + size_t decompressed_size = Umap::decompress((void*)read_buffer, buf, compressed_block_size); + free(read_buffer); + #else + read = pread(fd,buf,nb,file_offset); + if(read == -1){ + UMAP_ERROR("pread(fd=" << fd << ", buff=" << (void*)buf << ", nb=" << nb << ", off=" << off << ") Failed - " << strerror(errno)); + } + #endif + numreads++; + int close_status = close(fd); + if (close_status == -1){ + UMAP_ERROR("Error Closing file descriptor for block " << (uint64_t) off << " - " << strerror(errno)) + } + file_descriptors[fd_index].id = -1; + return read; } - numreads++; - return read; } ssize_t SparseStore::write_to_store(char* buf, size_t nb, off_t off) { ssize_t written = 0; off_t file_offset; - int fd = get_fd(off, file_offset); + int fd_index = off / file_size; + int fd = -1; + #ifdef USE_COMPRESSION + std::pair compressed_buffer_and_size = Umap::compress(buf, file_size); + void* const write_buffer = compressed_buffer_and_size.first; + size_t compressed_block_size = compressed_buffer_and_size.second; + fd = get_fd(off, file_offset, compressed_block_size); + written = pwrite(fd, write_buffer, compressed_block_size, file_offset); + #else + fd = get_fd(off, file_offset, 0); written = pwrite(fd,buf,nb,file_offset); + #endif if(written == -1){ UMAP_ERROR("pwrite(fd=" << fd << ", buff=" << (void*)buf << ", nb=" << nb << ", off=" << off << ") Failed - " << strerror(errno)); } + + size_t written_filemap = pwrite(filemap_fd, (void*) file_exists_map, num_files * sizeof(int8_t), 0); + if (written_filemap == -1){ + UMAP_ERROR("SparseStore: failed to write filemap metadata - " << strerror(errno)); + } numwrites++; + int close_status = close(fd); + if (close_status == -1){ + UMAP_ERROR("Error Closing file descriptor for block " << (uint64_t) off << " - " << strerror(errno)) + } + file_descriptors[fd_index].id = -1; return written; } @@ -135,7 +256,7 @@ namespace Umap { UMAP_LOG(Warning,"SparseStore: Failed to close file with id: " << i << " - " << strerror(errno)); } return_status = return_status | close_status; - } + } } return return_status; } @@ -143,10 +264,10 @@ namespace Umap { size_t SparseStore::get_current_capacity(){ return current_capacity; } - + /** * To get the size of any persistent region created using SparseStore without the need to instianiate an object - **/ + **/ size_t SparseStore::get_capacity(std::string base_path){ size_t capacity = 0; std::string metadata_path = base_path + "/_metadata"; @@ -161,14 +282,14 @@ namespace Umap { return capacity; } - int SparseStore::get_fd(off_t offset, off_t &file_offset){ + int SparseStore::get_fd(off_t offset, off_t &file_offset, size_t trunc_size){ int fd_index = offset / file_size; file_offset = offset % file_size; std::string filename = root_path + "/" + std::to_string(fd_index); if ( file_descriptors[fd_index].id == -1 ){ creation_mutex.lock(); // Grab mutex (only in case of creating new file, rather than serializing a larger protion of the code) if (file_descriptors[fd_index].id == -1){ // Recheck the value to make sure that another thread did not already create the file - int flags = (read_only ? O_RDONLY : O_RDWR ) | O_CREAT | O_DIRECT | O_LARGEFILE; + int flags = (read_only ? O_RDONLY : O_RDWR ) | O_CREAT | O_LARGEFILE; int fd = open(filename.c_str(), flags, S_IRUSR | S_IWUSR); if (fd == -1){ // Handling FS that do not support O_DIRECT, e.g., TMPFS @@ -181,12 +302,14 @@ namespace Umap { // when successfully open file: if (!read_only){ int fallocate_status; - if( (fallocate_status = posix_fallocate(fd,0,file_size) ) != 0){ + size_t fallocate_size = trunc_size != 0 ? trunc_size : file_size; + if( (fallocate_status = posix_fallocate(fd,0,fallocate_size) ) != 0){ UMAP_ERROR("SparseStore: fallocate() failed for file with id: " << fd_index << " - " << fallocate_status); } } // when fallocate() succeeds or when read_only file_descriptors[fd_index].id = fd; + file_exists_map[fd_index] = 1; } creation_mutex.unlock(); // Release mutex } diff --git a/src/umap/store/SparseStore.h b/src/umap/store/SparseStore.h index 2cecbed..16bd455 100644 --- a/src/umap/store/SparseStore.h +++ b/src/umap/store/SparseStore.h @@ -22,7 +22,7 @@ namespace Umap { static size_t get_capacity(std::string base_path); int close_files(); private: - int fd; + // int fd; size_t file_size; size_t current_capacity; uint64_t num_files; @@ -40,7 +40,10 @@ namespace Umap { }; file_descriptor* file_descriptors; std::mutex creation_mutex; - int get_fd(off_t offset, off_t &file_offset); + char *zero_page{nullptr}; + int8_t *file_exists_map; + int filemap_fd; + int get_fd(off_t offset, off_t &file_offset, size_t trunc_size); // ssize_t get_file_size(const std::string file_path); }; } diff --git a/src/umap/util/Compression.hpp b/src/umap/util/Compression.hpp new file mode 100644 index 0000000..f2c1b16 --- /dev/null +++ b/src/umap/util/Compression.hpp @@ -0,0 +1,39 @@ +#pragma once +#include +#include +#include +#include +#include + +#include // free +#include "zstd.h" + + + +namespace Umap{ + inline std::pair compress(void* input_buffer, size_t input_buffer_size){ + size_t output_buffer_size_bound = ZSTD_compressBound(input_buffer_size); + void* const output_buffer = malloc(output_buffer_size_bound); + size_t output_size = ZSTD_compress(output_buffer, output_buffer_size_bound, input_buffer, input_buffer_size, 1); + if (ZSTD_isError(output_size)){ + std::cerr << "Compression Error: - " << ZSTD_getErrorName(output_size) << std::endl; + exit(-1); + } + return std::pair(output_buffer,output_size); + } + + inline size_t decompress(void* input_buffer, void* output_buffer, size_t compressed_size){ + uint64_t rSize = ZSTD_getFrameContentSize(input_buffer, compressed_size); + if (rSize == ZSTD_CONTENTSIZE_ERROR){ + std::cerr << "Decompression Error: File was not compressed by ZSTD" << std::endl; + return -1; + } + + if(rSize == ZSTD_CONTENTSIZE_UNKNOWN){ + std::cerr << "Decompression Error: File unable to get content size" << std::endl; + return -1; + } + size_t decompressed_size = ZSTD_decompress(output_buffer, rSize, input_buffer, compressed_size); + return decompressed_size; + } +}