Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(search): add a value type system to KQIR #2369

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/commands/cmd_search.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static void DumpQueryResult(const std::vector<kqir::ExecutorContext::RowType> &r
output->append(MultiLen(fields.size() * 2));
for (const auto &[info, field] : fields) {
output->append(redis::BulkString(info->name));
output->append(redis::BulkString(field));
output->append(redis::BulkString(field.ToString(info->metadata.get())));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/string_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ std::string StringNext(std::string s);

template <typename T, typename F>
std::string StringJoin(
const T &con, F &&f = [](const auto &v) -> decltype(auto) { return v; }, const std::string &sep = ", ") {
const T &con, F &&f = [](const auto &v) -> decltype(auto) { return v; }, std::string_view sep = ", ") {
std::string res;
bool is_first = true;
for (const auto &v : con) {
Expand Down
2 changes: 1 addition & 1 deletion src/search/common_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct UnescapedChar : peg::utf8::range<0x20, 0x10FFFF> {};
struct Char : peg::if_then_else<peg::one<'\\'>, EscapedChar, UnescapedChar> {};

struct StringContent : peg::until<peg::at<peg::one<'"'>>, Char> {};
struct String : peg::seq<peg::one<'"'>, StringContent, peg::any> {};
struct StringL : peg::seq<peg::one<'"'>, StringContent, peg::any> {};

struct Identifier : peg::identifier {};

Expand Down
10 changes: 5 additions & 5 deletions src/search/executors/filter_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ struct QueryExprEvaluator {

StatusOr<bool> Visit(TagContainExpr *v) const {
auto val = GET_OR_RET(ctx->Retrieve(row, v->field->info));
auto meta = v->field->info->MetadataAs<redis::TagFieldMetadata>();

auto split = util::Split(val, std::string(1, meta->separator));
CHECK(val.Is<kqir::StringArray>());
auto split = val.Get<kqir::StringArray>();
return std::find(split.begin(), split.end(), v->tag->val) != split.end();
}

StatusOr<bool> Visit(NumericCompareExpr *v) const {
auto l_str = GET_OR_RET(ctx->Retrieve(row, v->field->info));
auto l_val = GET_OR_RET(ctx->Retrieve(row, v->field->info));

// TODO: reconsider how to handle failure case here
auto l = GET_OR_RET(ParseFloat(l_str));
CHECK(l_val.Is<kqir::Numeric>());
auto l = l_val.Get<kqir::Numeric>();
auto r = v->num->val;

switch (v->op) {
Expand Down
3 changes: 2 additions & 1 deletion src/search/executors/numeric_field_scan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "encoding.h"
#include "search/plan_executor.h"
#include "search/search_encoding.h"
#include "search/value.h"
#include "storage/redis_db.h"
#include "storage/redis_metadata.h"
#include "storage/storage.h"
Expand Down Expand Up @@ -108,7 +109,7 @@ struct NumericFieldScanExecutor : ExecutorNode {
} else {
iter->Prev();
}
return RowType{key_str, {{scan->field->info, std::to_string(curr)}}, scan->field->info->index};
return RowType{key_str, {{scan->field->info, kqir::MakeValue<kqir::Numeric>(curr)}}, scan->field->info->index};
}
};

Expand Down
6 changes: 3 additions & 3 deletions src/search/executors/topn_sort_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ struct TopNSortExecutor : ExecutorNode {
auto &row = std::get<RowType>(v);

auto get_order = [this](RowType &row) -> StatusOr<double> {
auto order_str = GET_OR_RET(ctx->Retrieve(row, sort->order->field->info));
auto order = GET_OR_RET(ParseFloat(order_str));
return order;
auto order_val = GET_OR_RET(ctx->Retrieve(row, sort->order->field->info));
CHECK(order_val.Is<kqir::Numeric>());
return order_val.Get<kqir::Numeric>();
};

if (rows.size() == total) {
Expand Down
96 changes: 70 additions & 26 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "db_util.h"
#include "parse_util.h"
#include "search/search_encoding.h"
#include "search/value.h"
#include "storage/redis_metadata.h"
#include "storage/storage.h"
#include "string_util.h"
Expand Down Expand Up @@ -56,25 +57,67 @@ StatusOr<FieldValueRetriever> FieldValueRetriever::Create(IndexOnDataType type,
}
}

rocksdb::Status FieldValueRetriever::Retrieve(std::string_view field, std::string *output) {
StatusOr<kqir::Value> FieldValueRetriever::Retrieve(std::string_view field, const redis::IndexFieldMetadata *type) {
if (std::holds_alternative<HashData>(db)) {
auto &[hash, metadata, key] = std::get<HashData>(db);
std::string ns_key = hash.AppendNamespacePrefix(key);

LatestSnapShot ss(hash.storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
std::string sub_key = InternalKey(ns_key, field, metadata.version, hash.storage_->IsSlotIdEncoded()).Encode();
return hash.storage_->Get(read_options, sub_key, output);
std::string value;
auto s = hash.storage_->Get(read_options, sub_key, &value);
if (s.IsNotFound()) return {Status::NotFound, s.ToString()};
if (!s.ok()) return {Status::NotOK, s.ToString()};

if (auto numeric [[maybe_unused]] = dynamic_cast<const redis::NumericFieldMetadata *>(type)) {
auto num = GET_OR_RET(ParseFloat(value));
return kqir::MakeValue<kqir::Numeric>(num);
} else if (auto tag = dynamic_cast<const redis::TagFieldMetadata *>(type)) {
const char delim[] = {tag->separator, '\0'};
auto vec = util::Split(value, delim);
return kqir::MakeValue<kqir::StringArray>(vec);
} else {
return {Status::NotOK, "unknown field type to retrieve"};
}

} else if (std::holds_alternative<JsonData>(db)) {
auto &value = std::get<JsonData>(db);

auto s = value.Get(field.front() == '$' ? field : fmt::format("$.{}", field));
if (!s.IsOK()) return rocksdb::Status::Corruption(s.Msg());
if (!s.IsOK()) return {Status::NotOK, s.Msg()};
if (s->value.size() != 1)
return rocksdb::Status::NotFound("json value specified by the field (json path) should exist and be unique");
*output = s->value[0].as_string();
return rocksdb::Status::OK();
return {Status::NotFound, "json value specified by the field (json path) should exist and be unique"};
auto val = s->value[0];

if (auto numeric [[maybe_unused]] = dynamic_cast<const redis::NumericFieldMetadata *>(type)) {
if (val.is_string()) return {Status::NotOK, "json value cannot be string for numeric fields"};
return kqir::MakeValue<kqir::Numeric>(val.as_double());
} else if (auto tag = dynamic_cast<const redis::TagFieldMetadata *>(type)) {
if (val.is_string()) {
const char delim[] = {tag->separator, '\0'};
auto vec = util::Split(val.as_string(), delim);
return kqir::MakeValue<kqir::StringArray>(vec);
} else if (val.is_array()) {
std::vector<std::string> strs;
for (size_t i = 0; i < val.size(); ++i) {
if (!val[i].is_string())
return {Status::NotOK, "json value should be string or array of strings for tag fields"};
strs.push_back(val[i].as_string());
}
return kqir::MakeValue<kqir::StringArray>(strs);
} else {
return {Status::NotOK, "json value should be string or array of strings for tag fields"};
}
} else {
return {Status::NotOK, "unknown field type to retrieve"};
}

return Status::OK();

} else {
__builtin_unreachable();
return {Status::NotOK, "unknown redis data type to retrieve"};
}
}

Expand Down Expand Up @@ -102,22 +145,22 @@ StatusOr<IndexUpdater::FieldValues> IndexUpdater::Record(std::string_view key) c
continue;
}

std::string value;
auto s = retriever.Retrieve(field, &value);
if (s.IsNotFound()) continue;
if (!s.ok()) return {Status::NotOK, s.ToString()};
auto s = retriever.Retrieve(field, i.metadata.get());
if (s.Is<Status::NotFound>()) continue;
if (!s) return s;

values.emplace(field, value);
values.emplace(field, *s);
}

return values;
}

Status IndexUpdater::UpdateTagIndex(std::string_view key, std::string_view original, std::string_view current,
Status IndexUpdater::UpdateTagIndex(std::string_view key, const kqir::Value &original, const kqir::Value &current,
const SearchKey &search_key, const TagFieldMetadata *tag) const {
const char delim[] = {tag->separator, '\0'};
auto original_tags = util::Split(original, delim);
auto current_tags = util::Split(current, delim);
CHECK(original.IsNull() || original.Is<kqir::StringArray>());
CHECK(current.IsNull() || current.Is<kqir::StringArray>());
auto original_tags = original.IsNull() ? std::vector<std::string>() : original.Get<kqir::StringArray>();
auto current_tags = current.IsNull() ? std::vector<std::string>() : current.Get<kqir::StringArray>();

auto to_tag_set = [](const std::vector<std::string> &tags, bool case_sensitive) -> std::set<std::string> {
if (case_sensitive) {
Expand Down Expand Up @@ -167,22 +210,23 @@ Status IndexUpdater::UpdateTagIndex(std::string_view key, std::string_view origi
return Status::OK();
}

Status IndexUpdater::UpdateNumericIndex(std::string_view key, std::string_view original, std::string_view current,
Status IndexUpdater::UpdateNumericIndex(std::string_view key, const kqir::Value &original, const kqir::Value &current,
const SearchKey &search_key, const NumericFieldMetadata *num) const {
CHECK(original.IsNull() || original.Is<kqir::Numeric>());
CHECK(original.IsNull() || original.Is<kqir::Numeric>());

auto *storage = indexer->storage;
auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);

if (!original.empty()) {
auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), original.end())));
auto index_key = search_key.ConstructNumericFieldData(original_num, key);
if (!original.IsNull()) {
auto index_key = search_key.ConstructNumericFieldData(original.Get<kqir::Numeric>(), key);

batch->Delete(cf_handle, index_key);
}

if (!current.empty()) {
auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(), current.end())));
auto index_key = search_key.ConstructNumericFieldData(current_num, key);
if (!current.IsNull()) {
auto index_key = search_key.ConstructNumericFieldData(current.Get<kqir::Numeric>(), key);

batch->Put(cf_handle, index_key, Slice());
}
Expand All @@ -192,8 +236,8 @@ Status IndexUpdater::UpdateNumericIndex(std::string_view key, std::string_view o
return Status::OK();
}

Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current) const {
Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, const kqir::Value &original,
const kqir::Value &current) const {
if (original == current) {
// the value of this field is unchanged, no need to update
return Status::OK();
Expand Down Expand Up @@ -225,7 +269,7 @@ Status IndexUpdater::Update(const FieldValues &original, std::string_view key) c
continue;
}

std::string_view original_val, current_val;
kqir::Value original_val, current_val;

if (auto it = original.find(field); it != original.end()) {
original_val = it->second;
Expand Down
13 changes: 7 additions & 6 deletions src/search/indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "storage/storage.h"
#include "types/redis_hash.h"
#include "types/redis_json.h"
#include "value.h"

namespace redis {

Expand Down Expand Up @@ -63,27 +64,27 @@ struct FieldValueRetriever {

explicit FieldValueRetriever(JsonValue json) : db(std::in_place_type<JsonData>, std::move(json)) {}

rocksdb::Status Retrieve(std::string_view field, std::string *output);
StatusOr<kqir::Value> Retrieve(std::string_view field, const redis::IndexFieldMetadata *type);
};

struct IndexUpdater {
using FieldValues = std::map<std::string, std::string>;
using FieldValues = std::map<std::string, kqir::Value>;

const kqir::IndexInfo *info = nullptr;
GlobalIndexer *indexer = nullptr;

explicit IndexUpdater(const kqir::IndexInfo *info) : info(info) {}

StatusOr<FieldValues> Record(std::string_view key) const;
Status UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current) const;
Status UpdateIndex(const std::string &field, std::string_view key, const kqir::Value &original,
const kqir::Value &current) const;
Status Update(const FieldValues &original, std::string_view key) const;

Status Build() const;

Status UpdateTagIndex(std::string_view key, std::string_view original, std::string_view current,
Status UpdateTagIndex(std::string_view key, const kqir::Value &original, const kqir::Value &current,
const SearchKey &search_key, const TagFieldMetadata *tag) const;
Status UpdateNumericIndex(std::string_view key, std::string_view original, std::string_view current,
Status UpdateNumericIndex(std::string_view key, const kqir::Value &original, const kqir::Value &current,
const SearchKey &search_key, const NumericFieldMetadata *num) const;
};

Expand Down
2 changes: 1 addition & 1 deletion src/search/ir.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
#include "string_util.h"
#include "type_util.h"

// kqir stands for Kvorcks Query Intermediate Representation
// kqir stands for Kvrocks Query Intermediate Representation
namespace kqir {

struct Node {
Expand Down
9 changes: 4 additions & 5 deletions src/search/plan_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,11 @@ auto ExecutorContext::Retrieve(RowType &row, const FieldInfo *field) -> StatusOr
auto retriever = GET_OR_RET(
redis::FieldValueRetriever::Create(field->index->metadata.on_data_type, row.key, storage, field->index->ns));

std::string result;
auto s = retriever.Retrieve(field->name, &result);
if (!s.ok()) return {Status::NotOK, s.ToString()};
auto s = retriever.Retrieve(field->name, field->metadata.get());
if (!s) return s;

row.fields.emplace(field, result);
return result;
row.fields.emplace(field, *s);
return *s;
}

} // namespace kqir
8 changes: 5 additions & 3 deletions src/search/plan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "ir_plan.h"
#include "search/index_info.h"
#include "search/value.h"
#include "storage/storage.h"
#include "string_util.h"

Expand All @@ -33,7 +34,7 @@ struct ExecutorContext;

struct ExecutorNode {
using KeyType = std::string;
using ValueType = std::string;
using ValueType = kqir::Value;
struct RowType {
KeyType key;
std::map<const FieldInfo *, ValueType> fields;
Expand All @@ -52,8 +53,9 @@ struct ExecutorNode {
} else {
os << row.key;
}
return os << " {" << util::StringJoin(row.fields, [](const auto &v) { return v.first->name + ": " + v.second; })
<< "}";
return os << " {" << util::StringJoin(row.fields, [](const auto &v) {
return v.first->name + ": " + v.second.ToString();
}) << "}";
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/search/redis_query_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ using namespace peg;

struct Field : seq<one<'@'>, Identifier> {};

struct Tag : sor<Identifier, String> {};
struct Tag : sor<Identifier, StringL> {};
struct TagList : seq<one<'{'>, WSPad<Tag>, star<seq<one<'|'>, WSPad<Tag>>>, one<'}'>> {};

struct Inf : seq<opt<one<'+', '-'>>, string<'i', 'n', 'f'>> {};
Expand Down
2 changes: 1 addition & 1 deletion src/search/redis_query_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace ir = kqir;

template <typename Rule>
using TreeSelector =
parse_tree::selector<Rule, parse_tree::store_content::on<Number, String, Identifier, Inf>,
parse_tree::selector<Rule, parse_tree::store_content::on<Number, StringL, Identifier, Inf>,
parse_tree::remove_content::on<TagList, NumericRange, ExclusiveNumber, FieldQuery, NotExpr,
AndExpr, OrExpr, Wildcard>>;

Expand Down
2 changes: 1 addition & 1 deletion src/search/sql_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace sql {
using namespace peg;

struct HasTag : string<'h', 'a', 's', 't', 'a', 'g'> {};
struct HasTagExpr : WSPad<seq<Identifier, WSPad<HasTag>, String>> {};
struct HasTagExpr : WSPad<seq<Identifier, WSPad<HasTag>, StringL>> {};

struct NumericAtomExpr : WSPad<sor<Number, Identifier>> {};
struct NumericCompareOp : sor<string<'!', '='>, string<'<', '='>, string<'>', '='>, one<'=', '<', '>'>> {};
Expand Down
4 changes: 2 additions & 2 deletions src/search/sql_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace ir = kqir;
template <typename Rule>
using TreeSelector = parse_tree::selector<
Rule,
parse_tree::store_content::on<Boolean, Number, String, Identifier, NumericCompareOp, AscOrDesc, UnsignedInteger>,
parse_tree::store_content::on<Boolean, Number, StringL, Identifier, NumericCompareOp, AscOrDesc, UnsignedInteger>,
parse_tree::remove_content::on<HasTagExpr, NumericCompareExpr, NotExpr, AndExpr, OrExpr, Wildcard, SelectExpr,
FromExpr, WhereClause, OrderByClause, LimitClause, SearchStmt>>;

Expand All @@ -58,7 +58,7 @@ struct Transformer : ir::TreeTransformer {
return Node::Create<ir::BoolLiteral>(node->string_view() == "true");
} else if (Is<Number>(node)) {
return Node::Create<ir::NumericLiteral>(*ParseFloat(node->string()));
} else if (Is<String>(node)) {
} else if (Is<StringL>(node)) {
return Node::Create<ir::StringLiteral>(GET_OR_RET(UnescapeString(node->string_view())));
} else if (Is<HasTagExpr>(node)) {
CHECK(node->children.size() == 2);
Expand Down
Loading
Loading