Skip to content

Commit cf28ab1

Browse files
authored
feat(ts): Add label-based indexing support and TS.MGET command (#3164)
Part of #3048 Add a new CF for label-based indexing. The key format is as following: ``` +-----------+--------------+----------------+------------+------------------+--------------+---------+ | ns | index_type | label_key_size | label_key | label_value_size | label_value | key | => null | (1+Xbytes)| (1byte) | (4byte) | (Ybyte) | (4byte) | (Zbyte) | (Kbyte) | +-----------+--------------+----------------+------------+------------------+--------------+---------+ ``` | enum | index_type | | ---- | ---------- | | 0 | `TS_LABEL` | Now we can do: ``` 127.0.0.1:6666> TS.CREATE temp:TLV LABELS type temp location TLV 127.0.0.1:6666> TS.CREATE temp:JLM LABELS type temp location JLM 127.0.0.1:6666> TS.MADD temp:TLV 1000 30 temp:TLV 1010 35 temp:TLV 1020 9999 temp:TLV 1030 40 127.0.0.1:6666> TS.MADD temp:JLM 1005 30 temp:JLM 1015 35 temp:JLM 1025 9999 temp:JLM 1035 40 127.0.0.1:6666> TS.MGET WITHLABELS FILTER type=temp 1) 1) "temp:JLM" 2) 1) 1) "location" 2) "JLM" 2) 1) "type" 2) "temp" 3) 1) 1) (integer) 1035 2) (double) 40 2) 1) "temp:TLV" 2) 1) 1) "location" 2) "TLV" 2) 1) "type" 2) "temp" 3) 1) 1) (integer) 1030 2) (double) 40 127.0.0.1:6666> TS.MGET SELECTED_LABELS location FILTER type=temp 1) 1) "temp:JLM" 2) 1) 1) "location" 2) "JLM" 3) 1) 1) (integer) 1035 2) (double) 40 2) 1) "temp:TLV" 2) 1) 1) "location" 2) "TLV" 3) 1) 1) (integer) 1030 2) (double) 40 ```
1 parent df3735e commit cf28ab1

File tree

9 files changed

+1088
-60
lines changed

9 files changed

+1088
-60
lines changed

src/commands/cmd_timeseries.cc

Lines changed: 108 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,17 @@ std::string FormatCreateRuleResAsRedisReply(TSCreateRuleResult res) {
126126
return "";
127127
}
128128

129+
std::string FormatTSLabelListAsRedisReply(const redis::LabelKVList &labels) {
130+
std::vector<std::string> labels_str;
131+
labels_str.reserve(labels.size());
132+
for (const auto &label : labels) {
133+
auto str = redis::Array(
134+
{redis::BulkString(label.k), label.v.size() ? redis::BulkString(label.v) : redis::NilString(redis::RESP::v3)});
135+
labels_str.push_back(str);
136+
}
137+
return redis::Array(labels_str);
138+
}
139+
129140
} // namespace
130141

131142
namespace redis {
@@ -137,23 +148,14 @@ class KeywordCommandBase : public Commander {
137148
Status Parse(const std::vector<std::string> &args) override {
138149
TSOptionsParser parser(std::next(args.begin(), static_cast<std::ptrdiff_t>(skip_num_)),
139150
std::prev(args.end(), static_cast<std::ptrdiff_t>(tail_skip_num_)));
140-
141151
while (parser.Good()) {
142-
bool handled = false;
143-
for (const auto &handler : handlers_) {
144-
if (parser.EatEqICase(handler.first)) {
145-
Status s = handler.second(parser);
146-
if (!s.IsOK()) return s;
147-
handled = true;
148-
break;
149-
}
150-
}
151-
152-
if (!handled) {
153-
parser.Skip(1);
152+
auto &value = parser.RawTake();
153+
auto value_upper = util::ToUpper(value);
154+
if (containsKeyword(value_upper, true)) {
155+
Status s = handlers_[value_upper](parser);
156+
if (!s.IsOK()) return s;
154157
}
155158
}
156-
157159
return Commander::Parse(args);
158160
}
159161

@@ -162,20 +164,24 @@ class KeywordCommandBase : public Commander {
162164

163165
template <typename Handler>
164166
void registerHandler(const std::string &keyword, Handler &&handler) {
165-
handlers_.emplace_back(keyword, std::forward<Handler>(handler));
167+
handlers_.emplace(util::ToUpper(keyword), std::forward<Handler>(handler));
166168
}
167-
168169
virtual void registerDefaultHandlers() = 0;
169170

170171
void setSkipNum(size_t num) { skip_num_ = num; }
171-
172172
void setTailSkipNum(size_t num) { tail_skip_num_ = num; }
173+
bool containsKeyword(const std::string &keyword, bool is_upper = false) const {
174+
if (is_upper) {
175+
return handlers_.count(keyword);
176+
} else {
177+
return handlers_.count(util::ToUpper(keyword));
178+
}
179+
}
173180

174181
private:
175182
size_t skip_num_ = 0;
176183
size_t tail_skip_num_ = 0;
177-
178-
std::vector<std::pair<std::string, std::function<Status(TSOptionsParser &)>>> handlers_;
184+
std::unordered_map<std::string, std::function<Status(TSOptionsParser &)>> handlers_;
179185
};
180186

181187
class CommandTSCreateBase : public KeywordCommandBase {
@@ -313,13 +319,7 @@ class CommandTSInfo : public Commander {
313319
*output += redis::SimpleString("duplicatePolicy");
314320
*output += redis::SimpleString(FormatDuplicatePolicyAsRedisReply(info.metadata.duplicate_policy));
315321
*output += redis::SimpleString("labels");
316-
std::vector<std::string> labels_str;
317-
labels_str.reserve(info.labels.size());
318-
for (const auto &label : info.labels) {
319-
auto str = redis::Array({redis::BulkString(label.k), redis::BulkString(label.v)});
320-
labels_str.push_back(str);
321-
}
322-
*output += redis::Array(labels_str);
322+
*output += FormatTSLabelListAsRedisReply(info.labels);
323323
*output += redis::SimpleString("sourceKey");
324324
*output += info.metadata.source_key.empty() ? redis::NilString(redis::RESP::v3)
325325
: redis::BulkString(info.metadata.source_key);
@@ -784,12 +784,93 @@ class CommandTSGet : public CommandTSAggregatorBase {
784784
std::string user_key_;
785785
};
786786

787+
class CommandTSMGetBase : public CommandTSAggregatorBase {
788+
public:
789+
CommandTSMGetBase(size_t skip_num, size_t tail_skip_num) : CommandTSAggregatorBase(skip_num, tail_skip_num) {}
790+
791+
protected:
792+
static Status handleWithLabels([[maybe_unused]] TSOptionsParser &parser, bool &with_labels) {
793+
with_labels = true;
794+
return Status::OK();
795+
}
796+
Status handleSelectedLabels(TSOptionsParser &parser, std::set<std::string> &selected_labels) {
797+
while (parser.Good()) {
798+
auto &value = parser.RawPeek();
799+
if (containsKeyword(value)) {
800+
break;
801+
}
802+
selected_labels.emplace(parser.TakeStr().GetValue());
803+
}
804+
return Status::OK();
805+
}
806+
Status handleFilterExpr(TSOptionsParser &parser, TSMGetOption::FilterOption &filter_option) {
807+
auto filter_parser = TSMQueryFilterParser(filter_option);
808+
while (parser.Good()) {
809+
auto &value = parser.RawPeek();
810+
if (containsKeyword(value)) {
811+
break;
812+
}
813+
auto s = filter_parser.Parse(parser.TakeStr().GetValue());
814+
if (!s.IsOK()) return s;
815+
}
816+
return filter_parser.Check();
817+
}
818+
};
819+
820+
class CommandTSMGet : public CommandTSMGetBase {
821+
public:
822+
CommandTSMGet() : CommandTSMGetBase(0, 0) { registerDefaultHandlers(); }
823+
Status Parse(const std::vector<std::string> &args) override {
824+
if (args.size() < 3) {
825+
return {Status::RedisParseErr, "wrong number of arguments for 'ts.mget' command"};
826+
}
827+
return CommandTSMGetBase::Parse(args);
828+
}
829+
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
830+
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
831+
std::vector<TSMGetResult> results;
832+
auto s = timeseries_db.MGet(ctx, option_, is_return_latest_, &results);
833+
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
834+
std::vector<std::string> reply;
835+
reply.reserve(results.size());
836+
for (auto &result : results) {
837+
std::vector<std::string> entry(3);
838+
entry[0] = redis::BulkString(result.name);
839+
entry[1] = FormatTSLabelListAsRedisReply(result.labels);
840+
std::vector<std::string> temp;
841+
for (auto &sample : result.samples) {
842+
temp.push_back(FormatTSSampleAsRedisReply(sample));
843+
}
844+
entry[2] = redis::Array(temp);
845+
reply.push_back(redis::Array(entry));
846+
}
847+
*output = redis::Array(reply);
848+
return Status::OK();
849+
}
850+
851+
protected:
852+
void registerDefaultHandlers() override {
853+
CommandTSAggregatorBase::registerDefaultHandlers();
854+
registerHandler("LATEST", [this](TSOptionsParser &parser) { return handleLatest(parser, is_return_latest_); });
855+
registerHandler("WITHLABELS",
856+
[this](TSOptionsParser &parser) { return handleWithLabels(parser, option_.with_labels); });
857+
registerHandler("SELECTED_LABELS",
858+
[this](TSOptionsParser &parser) { return handleSelectedLabels(parser, option_.selected_labels); });
859+
registerHandler("FILTER", [this](TSOptionsParser &parser) { return handleFilterExpr(parser, option_.filter); });
860+
}
861+
862+
private:
863+
TSMGetOption option_;
864+
bool is_return_latest_ = false;
865+
};
866+
787867
REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", -2, "write", 1, 1, 1),
788868
MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 1),
789869
MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, -3, 1),
790870
MakeCmdAttr<CommandTSRange>("ts.range", -4, "read-only", 1, 1, 1),
791871
MakeCmdAttr<CommandTSInfo>("ts.info", -2, "read-only", 1, 1, 1),
792872
MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only", 1, 1, 1),
793-
MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6, "write", 1, 2, 1), );
873+
MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6, "write", 1, 2, 1),
874+
MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only", NO_KEY), );
794875

795876
} // namespace redis

src/storage/compact_filter.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,10 @@ bool SearchFilter::Filter([[maybe_unused]] int level, const Slice &key, [[maybe_
216216
return false;
217217
}
218218

219+
bool IndexFilter::Filter([[maybe_unused]] int level, [[maybe_unused]] const Slice &key,
220+
[[maybe_unused]] const Slice &value, [[maybe_unused]] std::string *new_value,
221+
[[maybe_unused]] bool *modified) const {
222+
return false;
223+
}
224+
219225
} // namespace engine

src/storage/compact_filter.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,29 @@ class SearchFilterFactory : public rocksdb::CompactionFilterFactory {
150150
engine::Storage *stor_ = nullptr;
151151
};
152152

153+
class IndexFilter : public rocksdb::CompactionFilter {
154+
public:
155+
explicit IndexFilter(Storage *storage) : stor_(storage) {}
156+
157+
const char *Name() const override { return "IndexFilter"; }
158+
bool Filter([[maybe_unused]] int level, [[maybe_unused]] const Slice &key, [[maybe_unused]] const Slice &value,
159+
[[maybe_unused]] std::string *new_value, [[maybe_unused]] bool *modified) const override;
160+
161+
private:
162+
engine::Storage *stor_ = nullptr;
163+
};
164+
165+
class IndexFilterFactory : public rocksdb::CompactionFilterFactory {
166+
public:
167+
explicit IndexFilterFactory(engine::Storage *storage) : stor_(storage) {}
168+
const char *Name() const override { return "IndexFilterFactory"; }
169+
std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
170+
[[maybe_unused]] const rocksdb::CompactionFilter::Context &context) override {
171+
return std::unique_ptr<rocksdb::CompactionFilter>(new IndexFilter(stor_));
172+
}
173+
174+
private:
175+
engine::Storage *stor_ = nullptr;
176+
};
177+
153178
} // namespace engine

src/storage/storage.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,13 @@ Status Storage::Open(DBOpenMode mode) {
355355
search_opts.disable_auto_compactions = config_->rocks_db.disable_auto_compactions;
356356
SetBlobDB(&search_opts);
357357

358+
rocksdb::BlockBasedTableOptions index_table_opts = InitTableOptions();
359+
rocksdb::ColumnFamilyOptions index_opts(options);
360+
index_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(index_table_opts));
361+
index_opts.compaction_filter_factory = std::make_shared<IndexFilterFactory>(this);
362+
index_opts.disable_auto_compactions = config_->rocks_db.disable_auto_compactions;
363+
SetBlobDB(&index_opts);
364+
358365
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
359366
// Caution: don't change the order of column family, or the handle will be mismatched
360367
column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, subkey_opts);
@@ -364,6 +371,7 @@ Status Storage::Open(DBOpenMode mode) {
364371
column_families.emplace_back(std::string(kPropagateColumnFamilyName), propagate_opts);
365372
column_families.emplace_back(std::string(kStreamColumnFamilyName), subkey_opts);
366373
column_families.emplace_back(std::string(kSearchColumnFamilyName), search_opts);
374+
column_families.emplace_back(std::string(kIndexColumnFamilyName), index_opts);
367375

368376
auto start = std::chrono::high_resolution_clock::now();
369377
switch (mode) {

src/storage/storage.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,10 @@ enum class ColumnFamilyID : uint32_t {
7171
Propagate,
7272
Stream,
7373
Search,
74+
Index,
7475
};
7576

76-
constexpr uint32_t kMaxColumnFamilyID = static_cast<uint32_t>(ColumnFamilyID::Search);
77+
constexpr uint32_t kMaxColumnFamilyID = static_cast<uint32_t>(ColumnFamilyID::Index);
7778

7879
namespace engine {
7980

@@ -148,6 +149,7 @@ constexpr const std::string_view kPubSubColumnFamilyName = "pubsub";
148149
constexpr const std::string_view kPropagateColumnFamilyName = "propagate";
149150
constexpr const std::string_view kStreamColumnFamilyName = "stream";
150151
constexpr const std::string_view kSearchColumnFamilyName = "search";
152+
constexpr const std::string_view kIndexColumnFamilyName = "index";
151153

152154
class ColumnFamilyConfigs {
153155
public:
@@ -186,6 +188,10 @@ class ColumnFamilyConfigs {
186188
return {ColumnFamilyID::Search, kSearchColumnFamilyName, /*is_minor=*/true};
187189
}
188190

191+
static ColumnFamilyConfig IndexColumnFamily() {
192+
return {ColumnFamilyID::Index, kIndexColumnFamilyName, /*is_minor=*/true};
193+
}
194+
189195
/// ListAllColumnFamilies returns all column families in kvrocks.
190196
static const std::vector<ColumnFamilyConfig> &ListAllColumnFamilies() { return AllCfs; }
191197

@@ -197,11 +203,11 @@ class ColumnFamilyConfigs {
197203
// Caution: don't change the order of column family, or the handle will be mismatched
198204
inline const static std::vector<ColumnFamilyConfig> AllCfs = {
199205
PrimarySubkeyColumnFamily(), MetadataColumnFamily(), SecondarySubkeyColumnFamily(), PubSubColumnFamily(),
200-
PropagateColumnFamily(), StreamColumnFamily(), SearchColumnFamily(),
206+
PropagateColumnFamily(), StreamColumnFamily(), SearchColumnFamily(), IndexColumnFamily(),
201207
};
202208
inline const static std::vector<ColumnFamilyConfig> AllCfsWithoutDefault = {
203-
MetadataColumnFamily(), SecondarySubkeyColumnFamily(), PubSubColumnFamily(),
204-
PropagateColumnFamily(), StreamColumnFamily(), SearchColumnFamily(),
209+
MetadataColumnFamily(), SecondarySubkeyColumnFamily(), PubSubColumnFamily(), PropagateColumnFamily(),
210+
StreamColumnFamily(), SearchColumnFamily(), IndexColumnFamily(),
205211
};
206212
};
207213

0 commit comments

Comments
 (0)