Skip to content

Commit

Permalink
[fix][feat] Use Parallax net lib
Browse files Browse the repository at this point in the history
Also, this commit uses a hash map for keeping Parallax open DBs.
  • Loading branch information
gesalous committed Aug 28, 2024
1 parent baa146f commit 49109cc
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 53 deletions.
18 changes: 18 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
cmake_minimum_required( VERSION 3.12 FATAL_ERROR )
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

find_package( ecbuild 3.7 REQUIRED HINTS ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../ecbuild)

Expand All @@ -7,6 +8,12 @@ find_library(PARALLAX_LIBRARY
NAMES parallax
HINTS ${CMAKE_INSTALL_PREFIX}/lib ${CMAKE_INSTALL_PREFIX}/lib64)


# Search for the library in /home/gesalous/local/lib
find_library(PARALLAX_NET_LIBRARY
NAMES parallax_net_lib
HINTS ${CMAKE_INSTALL_PREFIX}/lib ${CMAKE_INSTALL_PREFIX}/lib64)

# Search for the header in /home/gesalous/local/include
find_path(PARALLAX_INCLUDE_DIR
NAMES parallax.h
Expand All @@ -23,6 +30,16 @@ else()
message(FATAL_ERROR "Parallax not found!")
endif()


if(PARALLAX_NET_LIBRARY)
message(INFO "--> Parallax NETWORK LIBRARY found!")
message(INFO " --> Library: ${PARALLAX_NET_LIBRARY}")
message(INFO " --> Include directory: ${PARALLAX_INCLUDE_DIR}")
else()
message(WARN "Parallax NETWORK library not found!")
endif()


project( fdb5 LANGUAGES C CXX )

set(CMAKE_CXX_STANDARD 17)
Expand Down Expand Up @@ -52,6 +69,7 @@ ecbuild_add_option( FEATURE GRIB
CONDITION eccodes_FOUND
DESCRIPTION "Support for GRIB via eccodes")


ecbuild_find_package( NAME metkit VERSION 1.5 REQUIRED )

### FDB backend in persistent memory, i.e. pmem (NVRAM)
Expand Down
151 changes: 99 additions & 52 deletions src/dummy_daos/parallax.cc
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@

/*
* @file parallax.cc
* @author Toutoudakis Michalis
* @date July 2024
*/

#include <cstring>
#include <string>
#include <iomanip>
#include <unistd.h>
#include <limits.h>
#include <set>
#include <unordered_map>

#include "eckit/runtime/Main.h"
#include "eckit/filesystem/PathName.h"
Expand Down Expand Up @@ -81,10 +75,77 @@ typedef struct daos_handle_internal_t {
PathName path;
} daos_handle_internal_t;

std::map<std::string, par_handle> created_dbs;
// std::set<std::string> created_dbs;
std::unordered_map<std::string, par_handle> parallax_db_map;

//----------------------------------------------------------------------------------------------------------------------

par_handle par_get_db(const std::string& db_name) {
std::cout << "File: " << __FILE__ << ", Line: " << __LINE__ << ", Function: " << __func__ << std::endl;

// Check if the database is already opened
auto it = parallax_db_map.find(db_name);
if (it != parallax_db_map.end()) {
return it->second; // Return the existing handle
}

// Database is not opened yet, proceed to open it
const char* volume_name = getenv(PARALLAX_VOLUME_ENV_VAR);

par_db_options db_options = {.volume_name = (char*)volume_name,
.db_name = db_name.c_str(),
.create_flag = PAR_CREATE_DB,
.options = par_get_default_options()};
db_options.options[LEVEL0_SIZE].value = PARALLAX_L0_SIZE;
db_options.options[GROWTH_FACTOR].value = PARALLAX_GROWTH_FACTOR;
db_options.options[PRIMARY_MODE].value = 1;
db_options.options[ENABLE_BLOOM_FILTERS].value = 1;

const char* error_message = NULL;
par_handle handle = par_open(&db_options, &error_message);

if (error_message) {
LSM_DEBUG("Parallax says: %s", error_message);
}

if (handle == NULL && error_message) {
LSM_FATAL("Error upon opening the DB, error %s", error_message);
}

// Insert the new handle into the hash table
parallax_db_map[db_name] = handle;

return handle;
}


// static bool lsm_open(par_handle* handle, std::string db_name){
// std::cout << "File: " << __FILE__ << ", Line: " << __LINE__ << ", Function: " << __func__ << std::endl;
// const char* volume_name = getenv(PARALLAX_VOLUME_ENV_VAR);

// par_db_options db_options = {.volume_name = (char*)volume_name,
// .db_name = db_name.c_str(),
// .create_flag = PAR_CREATE_DB,
// .options = par_get_default_options()};
// db_options.options[LEVEL0_SIZE].value = PARALLAX_L0_SIZE;
// db_options.options[GROWTH_FACTOR].value = PARALLAX_GROWTH_FACTOR;
// db_options.options[PRIMARY_MODE].value = 1;
// db_options.options[ENABLE_BLOOM_FILTERS].value = 1;

// const char* error_message = NULL;

// *handle = par_open(&db_options, &error_message);
// if (error_message)
// LSM_DEBUG("Parallax says: %s", error_message);

// if (*handle == NULL && error_message)
// LSM_FATAL("Error uppon opening the DB, error %s", error_message);

// created_dbs.insert(db_name);

// return true;
// }

int daos_init() {
//std::cout << "File: " << __FILE__ << ", Line: " << __LINE__ << ", Function: " << __func__ << std::endl;
const char* argv[2] = {"dummy-parallax-api", 0};
Expand All @@ -98,32 +159,6 @@ int daos_fini() {
return 0;
}

bool lsm_open(std::string db_name){
//std::cout << "File: " << __FILE__ << ", Line: " << __LINE__ << ", Function: " << __func__ << std::endl;
const char* volume_name = getenv(PARALLAX_VOLUME_ENV_VAR);

par_db_options db_options = {.volume_name = (char*)volume_name,
.db_name = db_name.c_str(),
.create_flag = PAR_CREATE_DB,
.options = par_get_default_options()};
db_options.options[LEVEL0_SIZE].value = PARALLAX_L0_SIZE;
db_options.options[GROWTH_FACTOR].value = PARALLAX_GROWTH_FACTOR;
db_options.options[PRIMARY_MODE].value = 1;
db_options.options[ENABLE_BLOOM_FILTERS].value = 1;

const char* error_message = NULL;
par_handle handle;
handle = par_open(&db_options, &error_message);
if (error_message)
LSM_DEBUG("Parallax says: %s", error_message);

if (handle == NULL && error_message)
LSM_FATAL("Error uppon opening the DB, error %s", error_message);

created_dbs[db_name] = handle;

return true;
}

bool lsm_put(par_handle handle, const char* key, const char* value, daos_size_t size) {
//std::cout << "File: " << __FILE__ << ", Line: " << __LINE__ << ", Function: " << __func__ << std::endl;
Expand All @@ -133,7 +168,9 @@ bool lsm_put(par_handle handle, const char* key, const char* value, daos_size_t


KV.v.val_size = size;
KV.v.val_buffer = (char*) value;
KV.v.val_buffer = (char *)value;
// memset(KV.v.val_buffer, '\0', size);
// std::memcpy(KV.v.val_buffer, value, size);
const char* error_msg_put = NULL;
par_put(handle, &KV, &error_msg_put);
if (error_msg_put) {
Expand Down Expand Up @@ -192,16 +229,16 @@ int daos_pool_connect(const char *pool, const char *sys, unsigned int flags,
if (ev != NULL) NOTIMP;

eckit::PathName path = dummy_daos_root() / pool;

std::cout << "Path is " << path.asString() << std::endl;
if (!path.exists()) return -1;

std::unique_ptr<daos_handle_internal_t> impl(new daos_handle_internal_t);
impl->path = path;
poh->impl = impl.release();

// Initialize parallax databases
lsm_open("metadata");
lsm_open("tenants");
par_get_db("metadata");
par_get_db("tenants");
return 0;

}
Expand All @@ -212,10 +249,15 @@ int daos_pool_disconnect(daos_handle_t poh, daos_event_t *ev) {
delete poh.impl;

if (ev != NULL) NOTIMP;

for (const auto& db_name : created_dbs) {
par_close(db_name.second);

for (auto it = parallax_db_map.begin(); it != parallax_db_map.end(); ++it) {
par_sync(it->second);
const char* error = par_close(it->second);
if (error) {
LSM_FATAL("Failed to close DB");
}
}
parallax_db_map.clear();
return 0;

}
Expand Down Expand Up @@ -288,11 +330,15 @@ int daos_cont_create_with_label(daos_handle_t poh, const char *label,

eckit::PathName label_symlink_path = poh.impl->path / label;

par_handle handle = created_dbs["metadata"];
struct par_key search_key;
search_key.size = strlen(label) + 1;
search_key.data = label;
if (par_exists(handle, &search_key) == 0) return 0;
// open parallax
par_handle handle = par_get_db("metadata");

// check if label exists
std::string buffer;
par_value value = lsm_get(handle, label);
if(value.val_size == 0)
return 0;


// Create uuid for the "container"
uuid_t new_uuid = {0};
Expand Down Expand Up @@ -335,7 +381,9 @@ int daos_cont_open(daos_handle_t poh, const char *cont, unsigned int flags, daos
} else {
path /= cont;
}
par_handle handle = created_dbs["metadata"];
// open parallax
par_handle handle = par_get_db("metadata");

// check if key exists
struct par_key search_key;
search_key.size = strlen(cont) + 1;
Expand Down Expand Up @@ -439,8 +487,7 @@ int daos_kv_put(daos_handle_t oh, daos_handle_t th, uint64_t flags, const char *

//std::cout << "File: " << __FILE__ << ", Line: " << __LINE__ << ", Function: " << __func__ << std::endl;

std::string region_name = "tenants";
par_handle handle = created_dbs[region_name];
par_handle handle = par_get_db("tenants");

std::string lsm_key = get_path_after_default(oh.impl->path) + "/" + key;
lsm_put(handle, lsm_key.c_str(), (const char*)buf, size);
Expand All @@ -463,7 +510,7 @@ int daos_kv_get(daos_handle_t oh, daos_handle_t th, uint64_t flags, const char *
std::string lsm_key = get_path_after_default(oh.impl->path) + "/" + key;

// check if "container" exits
par_handle handle = created_dbs[region_name];
par_handle handle = par_get_db(region_name);

par_value value = lsm_get(handle, lsm_key.c_str());

Expand Down Expand Up @@ -513,13 +560,13 @@ int daos_kv_list(daos_handle_t oh, daos_handle_t th, uint32_t *nr,
if (sgl->sg_iovs == NULL) return -1;
if (anchor == NULL) return -1;

std::string region_name = "tenants";
par_handle handle = created_dbs[region_name];

const char* error_message = NULL;
struct par_key it_key = {0};
std::string start_key = get_path_after_default(oh.impl->path);
it_key.size = get_path_after_default(oh.impl->path).length();
it_key.data = start_key.c_str();
par_handle handle = par_get_db("tenants");
par_scanner scanner = par_init_scanner(handle, &it_key, PAR_GREATER_OR_EQUAL, &error_message);

size_t sgl_pos = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ ecbuild_add_library(
"${DAOS_LIBRARIES}"
"${DAOS_TESTS_LIBRARIES}"
)
target_link_libraries(fdb5 PRIVATE ${PARALLAX_LIBRARY})
target_link_libraries(fdb5 PRIVATE ${PARALLAX_NET_LIBRARY})
if(HAVE_FDB_BUILD_TOOLS)

list( APPEND fdb5_scripts fdb fdb-which )
Expand Down

0 comments on commit 49109cc

Please sign in to comment.