Skip to content

Commit

Permalink
Resolved conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Beihao-Zhou committed Jun 24, 2024
2 parents f1a277f + 92df2b8 commit 2b9817b
Show file tree
Hide file tree
Showing 19 changed files with 279 additions and 83 deletions.
4 changes: 2 additions & 2 deletions cmake/cpptrace.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(cpptrace
jeremy-rifkin/cpptrace v0.6.1
MD5=246eb8d730b44373573783f218bd3b01
jeremy-rifkin/cpptrace v0.6.2
MD5=b13786adcc1785cb900746ea96c50bee
)

if (SYMBOLIZE_BACKEND STREQUAL "libbacktrace")
Expand Down
2 changes: 1 addition & 1 deletion src/commands/cmd_search.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static void DumpQueryResult(const std::vector<kqir::ExecutorContext::RowType> &r
output->append(MultiLen(fields.size() * 2));
for (const auto &[info, field] : fields) {
output->append(redis::BulkString(info->name));
output->append(redis::BulkString(field));
output->append(redis::BulkString(field.ToString(info->metadata.get())));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/string_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ std::string StringNext(std::string s);

template <typename T, typename F>
std::string StringJoin(
const T &con, F &&f = [](const auto &v) -> decltype(auto) { return v; }, const std::string &sep = ", ") {
const T &con, F &&f = [](const auto &v) -> decltype(auto) { return v; }, std::string_view sep = ", ") {
std::string res;
bool is_first = true;
for (const auto &v : con) {
Expand Down
2 changes: 1 addition & 1 deletion src/search/common_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct UnescapedChar : peg::utf8::range<0x20, 0x10FFFF> {};
struct Char : peg::if_then_else<peg::one<'\\'>, EscapedChar, UnescapedChar> {};

struct StringContent : peg::until<peg::at<peg::one<'"'>>, Char> {};
struct String : peg::seq<peg::one<'"'>, StringContent, peg::any> {};
struct StringL : peg::seq<peg::one<'"'>, StringContent, peg::any> {};

struct Identifier : peg::identifier {};

Expand Down
10 changes: 5 additions & 5 deletions src/search/executors/filter_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ struct QueryExprEvaluator {

StatusOr<bool> Visit(TagContainExpr *v) const {
auto val = GET_OR_RET(ctx->Retrieve(row, v->field->info));
auto meta = v->field->info->MetadataAs<redis::TagFieldMetadata>();

auto split = util::Split(val, std::string(1, meta->separator));
CHECK(val.Is<kqir::StringArray>());
auto split = val.Get<kqir::StringArray>();
return std::find(split.begin(), split.end(), v->tag->val) != split.end();
}

StatusOr<bool> Visit(NumericCompareExpr *v) const {
auto l_str = GET_OR_RET(ctx->Retrieve(row, v->field->info));
auto l_val = GET_OR_RET(ctx->Retrieve(row, v->field->info));

// TODO: reconsider how to handle failure case here
auto l = GET_OR_RET(ParseFloat(l_str));
CHECK(l_val.Is<kqir::Numeric>());
auto l = l_val.Get<kqir::Numeric>();
auto r = v->num->val;

switch (v->op) {
Expand Down
3 changes: 2 additions & 1 deletion src/search/executors/numeric_field_scan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "encoding.h"
#include "search/plan_executor.h"
#include "search/search_encoding.h"
#include "search/value.h"
#include "storage/redis_db.h"
#include "storage/redis_metadata.h"
#include "storage/storage.h"
Expand Down Expand Up @@ -108,7 +109,7 @@ struct NumericFieldScanExecutor : ExecutorNode {
} else {
iter->Prev();
}
return RowType{key_str, {{scan->field->info, std::to_string(curr)}}, scan->field->info->index};
return RowType{key_str, {{scan->field->info, kqir::MakeValue<kqir::Numeric>(curr)}}, scan->field->info->index};
}
};

Expand Down
6 changes: 3 additions & 3 deletions src/search/executors/topn_sort_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ struct TopNSortExecutor : ExecutorNode {
auto &row = std::get<RowType>(v);

auto get_order = [this](RowType &row) -> StatusOr<double> {
auto order_str = GET_OR_RET(ctx->Retrieve(row, sort->order->field->info));
auto order = GET_OR_RET(ParseFloat(order_str));
return order;
auto order_val = GET_OR_RET(ctx->Retrieve(row, sort->order->field->info));
CHECK(order_val.Is<kqir::Numeric>());
return order_val.Get<kqir::Numeric>();
};

if (rows.size() == total) {
Expand Down
129 changes: 103 additions & 26 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "db_util.h"
#include "parse_util.h"
#include "search/search_encoding.h"
#include "search/value.h"
#include "storage/redis_metadata.h"
#include "storage/storage.h"
#include "string_util.h"
Expand Down Expand Up @@ -58,25 +59,100 @@ StatusOr<FieldValueRetriever> FieldValueRetriever::Create(IndexOnDataType type,
}
}

rocksdb::Status FieldValueRetriever::Retrieve(std::string_view field, std::string *output) {
// placeholders, remove them after vector indexing is implemented
static bool IsVectorType(const redis::IndexFieldMetadata *) { return false; }
static size_t GetVectorDim(const redis::IndexFieldMetadata *) { return 1; }

StatusOr<kqir::Value> FieldValueRetriever::ParseFromJson(const jsoncons::json &val,
const redis::IndexFieldMetadata *type) {
if (auto numeric [[maybe_unused]] = dynamic_cast<const redis::NumericFieldMetadata *>(type)) {
if (!val.is_number() || val.is_string()) return {Status::NotOK, "json value cannot be string for numeric fields"};
return kqir::MakeValue<kqir::Numeric>(val.as_double());
} else if (auto tag = dynamic_cast<const redis::TagFieldMetadata *>(type)) {
if (val.is_string()) {
const char delim[] = {tag->separator, '\0'};
auto vec = util::Split(val.as_string(), delim);
return kqir::MakeValue<kqir::StringArray>(vec);
} else if (val.is_array()) {
std::vector<std::string> strs;
for (size_t i = 0; i < val.size(); ++i) {
if (!val[i].is_string())
return {Status::NotOK, "json value should be string or array of strings for tag fields"};
strs.push_back(val[i].as_string());
}
return kqir::MakeValue<kqir::StringArray>(strs);
} else {
return {Status::NotOK, "json value should be string or array of strings for tag fields"};
}
} else if (IsVectorType(type)) {
size_t dim = GetVectorDim(type);
if (!val.is_array()) return {Status::NotOK, "json value should be array of numbers for vector fields"};
if (dim != val.size()) return {Status::NotOK, "the size of the json array is not equal to the dim of the vector"};
std::vector<double> nums;
for (size_t i = 0; i < dim; ++i) {
if (!val[i].is_number() || val[i].is_string())
return {Status::NotOK, "json value should be array of numbers for vector fields"};
nums.push_back(val[i].as_double());
}
return kqir::MakeValue<kqir::NumericArray>(nums);
} else {
return {Status::NotOK, "unknown field type to retrieve"};
}
}

StatusOr<kqir::Value> FieldValueRetriever::ParseFromHash(const std::string &value,
const redis::IndexFieldMetadata *type) {
if (auto numeric [[maybe_unused]] = dynamic_cast<const redis::NumericFieldMetadata *>(type)) {
auto num = GET_OR_RET(ParseFloat(value));
return kqir::MakeValue<kqir::Numeric>(num);
} else if (auto tag = dynamic_cast<const redis::TagFieldMetadata *>(type)) {
const char delim[] = {tag->separator, '\0'};
auto vec = util::Split(value, delim);
return kqir::MakeValue<kqir::StringArray>(vec);
} else if (IsVectorType(type)) {
const size_t dim = GetVectorDim(type);
if (value.size() != dim * sizeof(double)) {
return {Status::NotOK, "field value is too short or too long to be parsed as a vector"};
}
std::vector<double> vec;
for (size_t i = 0; i < dim; ++i) {
// TODO: care about endian later
// TODO: currently only support 64bit floating point
vec.push_back(*(reinterpret_cast<const double *>(value.data()) + i));
}
return kqir::MakeValue<kqir::NumericArray>(vec);
} else {
return {Status::NotOK, "unknown field type to retrieve"};
}
}

StatusOr<kqir::Value> FieldValueRetriever::Retrieve(std::string_view field, const redis::IndexFieldMetadata *type) {
if (std::holds_alternative<HashData>(db)) {
auto &[hash, metadata, key] = std::get<HashData>(db);
std::string ns_key = hash.AppendNamespacePrefix(key);

LatestSnapShot ss(hash.storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
std::string sub_key = InternalKey(ns_key, field, metadata.version, hash.storage_->IsSlotIdEncoded()).Encode();
return hash.storage_->Get(read_options, sub_key, output);
std::string value;
auto s = hash.storage_->Get(read_options, sub_key, &value);
if (s.IsNotFound()) return {Status::NotFound, s.ToString()};
if (!s.ok()) return {Status::NotOK, s.ToString()};

return ParseFromHash(value, type);
} else if (std::holds_alternative<JsonData>(db)) {
auto &value = std::get<JsonData>(db);

auto s = value.Get(field.front() == '$' ? field : fmt::format("$.{}", field));
if (!s.IsOK()) return rocksdb::Status::Corruption(s.Msg());
if (!s.IsOK()) return {Status::NotOK, s.Msg()};
if (s->value.size() != 1)
return rocksdb::Status::NotFound("json value specified by the field (json path) should exist and be unique");
*output = s->value[0].as_string();
return rocksdb::Status::OK();
return {Status::NotFound, "json value specified by the field (json path) should exist and be unique"};
auto val = s->value[0];

return ParseFromJson(val, type);
} else {
__builtin_unreachable();
return {Status::NotOK, "unknown redis data type to retrieve"};
}
}

Expand Down Expand Up @@ -104,22 +180,22 @@ StatusOr<IndexUpdater::FieldValues> IndexUpdater::Record(std::string_view key) c
continue;
}

std::string value;
auto s = retriever.Retrieve(field, &value);
if (s.IsNotFound()) continue;
if (!s.ok()) return {Status::NotOK, s.ToString()};
auto s = retriever.Retrieve(field, i.metadata.get());
if (s.Is<Status::NotFound>()) continue;
if (!s) return s;

values.emplace(field, value);
values.emplace(field, *s);
}

return values;
}

Status IndexUpdater::UpdateTagIndex(std::string_view key, std::string_view original, std::string_view current,
Status IndexUpdater::UpdateTagIndex(std::string_view key, const kqir::Value &original, const kqir::Value &current,
const SearchKey &search_key, const TagFieldMetadata *tag) const {
const char delim[] = {tag->separator, '\0'};
auto original_tags = util::Split(original, delim);
auto current_tags = util::Split(current, delim);
CHECK(original.IsNull() || original.Is<kqir::StringArray>());
CHECK(current.IsNull() || current.Is<kqir::StringArray>());
auto original_tags = original.IsNull() ? std::vector<std::string>() : original.Get<kqir::StringArray>();
auto current_tags = current.IsNull() ? std::vector<std::string>() : current.Get<kqir::StringArray>();

auto to_tag_set = [](const std::vector<std::string> &tags, bool case_sensitive) -> std::set<std::string> {
if (case_sensitive) {
Expand Down Expand Up @@ -169,22 +245,23 @@ Status IndexUpdater::UpdateTagIndex(std::string_view key, std::string_view origi
return Status::OK();
}

Status IndexUpdater::UpdateNumericIndex(std::string_view key, std::string_view original, std::string_view current,
Status IndexUpdater::UpdateNumericIndex(std::string_view key, const kqir::Value &original, const kqir::Value &current,
const SearchKey &search_key, const NumericFieldMetadata *num) const {
CHECK(original.IsNull() || original.Is<kqir::Numeric>());
CHECK(original.IsNull() || original.Is<kqir::Numeric>());

auto *storage = indexer->storage;
auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);

if (!original.empty()) {
auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), original.end())));
auto index_key = search_key.ConstructNumericFieldData(original_num, key);
if (!original.IsNull()) {
auto index_key = search_key.ConstructNumericFieldData(original.Get<kqir::Numeric>(), key);

batch->Delete(cf_handle, index_key);
}

if (!current.empty()) {
auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(), current.end())));
auto index_key = search_key.ConstructNumericFieldData(current_num, key);
if (!current.IsNull()) {
auto index_key = search_key.ConstructNumericFieldData(current.Get<kqir::Numeric>(), key);

batch->Put(cf_handle, index_key, Slice());
}
Expand Down Expand Up @@ -213,8 +290,8 @@ Status IndexUpdater::UpdateHnswVectorIndex(std::string_view key, std::string_vie
return Status::OK();
}

Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current) const {
Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, const kqir::Value &original,
const kqir::Value &current) const {
if (original == current) {
// the value of this field is unchanged, no need to update
return Status::OK();
Expand Down Expand Up @@ -248,7 +325,7 @@ Status IndexUpdater::Update(const FieldValues &original, std::string_view key) c
continue;
}

std::string_view original_val, current_val;
kqir::Value original_val, current_val;

if (auto it = original.find(field); it != original.end()) {
original_val = it->second;
Expand Down
16 changes: 10 additions & 6 deletions src/search/indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "storage/storage.h"
#include "types/redis_hash.h"
#include "types/redis_json.h"
#include "value.h"

namespace redis {

Expand Down Expand Up @@ -63,27 +64,30 @@ struct FieldValueRetriever {

explicit FieldValueRetriever(JsonValue json) : db(std::in_place_type<JsonData>, std::move(json)) {}

rocksdb::Status Retrieve(std::string_view field, std::string *output);
StatusOr<kqir::Value> Retrieve(std::string_view field, const redis::IndexFieldMetadata *type);

static StatusOr<kqir::Value> ParseFromJson(const jsoncons::json &value, const redis::IndexFieldMetadata *type);
static StatusOr<kqir::Value> ParseFromHash(const std::string &value, const redis::IndexFieldMetadata *type);
};

struct IndexUpdater {
using FieldValues = std::map<std::string, std::string>;
using FieldValues = std::map<std::string, kqir::Value>;

const kqir::IndexInfo *info = nullptr;
GlobalIndexer *indexer = nullptr;

explicit IndexUpdater(const kqir::IndexInfo *info) : info(info) {}

StatusOr<FieldValues> Record(std::string_view key) const;
Status UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current) const;
Status UpdateIndex(const std::string &field, std::string_view key, const kqir::Value &original,
const kqir::Value &current) const;
Status Update(const FieldValues &original, std::string_view key) const;

Status Build() const;

Status UpdateTagIndex(std::string_view key, std::string_view original, std::string_view current,
Status UpdateTagIndex(std::string_view key, const kqir::Value &original, const kqir::Value &current,
const SearchKey &search_key, const TagFieldMetadata *tag) const;
Status UpdateNumericIndex(std::string_view key, std::string_view original, std::string_view current,
Status UpdateNumericIndex(std::string_view key, const kqir::Value &original, const kqir::Value &current,
const SearchKey &search_key, const NumericFieldMetadata *num) const;
Status UpdateHnswVectorIndex(std::string_view key, std::string_view original, std::string_view current,
const SearchKey &search_key, HnswVectorFieldMetadata *vector) const;
Expand Down
2 changes: 1 addition & 1 deletion src/search/ir.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
#include "string_util.h"
#include "type_util.h"

// kqir stands for Kvorcks Query Intermediate Representation
// kqir stands for Kvrocks Query Intermediate Representation
namespace kqir {

struct Node {
Expand Down
9 changes: 4 additions & 5 deletions src/search/plan_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,11 @@ auto ExecutorContext::Retrieve(RowType &row, const FieldInfo *field) -> StatusOr
auto retriever = GET_OR_RET(
redis::FieldValueRetriever::Create(field->index->metadata.on_data_type, row.key, storage, field->index->ns));

std::string result;
auto s = retriever.Retrieve(field->name, &result);
if (!s.ok()) return {Status::NotOK, s.ToString()};
auto s = retriever.Retrieve(field->name, field->metadata.get());
if (!s) return s;

row.fields.emplace(field, result);
return result;
row.fields.emplace(field, *s);
return *s;
}

} // namespace kqir
8 changes: 5 additions & 3 deletions src/search/plan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "ir_plan.h"
#include "search/index_info.h"
#include "search/value.h"
#include "storage/storage.h"
#include "string_util.h"

Expand All @@ -33,7 +34,7 @@ struct ExecutorContext;

struct ExecutorNode {
using KeyType = std::string;
using ValueType = std::string;
using ValueType = kqir::Value;
struct RowType {
KeyType key;
std::map<const FieldInfo *, ValueType> fields;
Expand All @@ -52,8 +53,9 @@ struct ExecutorNode {
} else {
os << row.key;
}
return os << " {" << util::StringJoin(row.fields, [](const auto &v) { return v.first->name + ": " + v.second; })
<< "}";
return os << " {" << util::StringJoin(row.fields, [](const auto &v) {
return v.first->name + ": " + v.second.ToString();
}) << "}";
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/search/redis_query_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ using namespace peg;

struct Field : seq<one<'@'>, Identifier> {};

struct Tag : sor<Identifier, String> {};
struct Tag : sor<Identifier, StringL> {};
struct TagList : seq<one<'{'>, WSPad<Tag>, star<seq<one<'|'>, WSPad<Tag>>>, one<'}'>> {};

struct Inf : seq<opt<one<'+', '-'>>, string<'i', 'n', 'f'>> {};
Expand Down
Loading

0 comments on commit 2b9817b

Please sign in to comment.