Skip to content

Commit c862ea1

Browse files
committed
support MIGRATE command
1 parent a83090a commit c862ea1

File tree

6 files changed

+368
-142
lines changed

6 files changed

+368
-142
lines changed

src/cluster/slot_migrate.cc

Lines changed: 4 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <memory>
2424
#include <utility>
2525

26+
#include "check_response.h"
2627
#include "db_util.h"
2728
#include "event_util.h"
2829
#include "fmt/format.h"
@@ -492,7 +493,7 @@ Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) {
492493
return s.Prefixed("failed to send AUTH command");
493494
}
494495

495-
s = checkSingleResponse(sock_fd);
496+
s = util::CheckSingleResponse(sock_fd);
496497
if (!s.IsOK()) {
497498
return s.Prefixed("failed to check the response of AUTH command");
498499
}
@@ -510,7 +511,7 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) {
510511
return s.Prefixed("failed to send command to the destination node");
511512
}
512513

513-
s = checkSingleResponse(sock_fd);
514+
s = util::CheckSingleResponse(sock_fd);
514515
if (!s.IsOK()) {
515516
return s.Prefixed("failed to check the response from the destination node");
516517
}
@@ -545,138 +546,6 @@ StatusOr<bool> SlotMigrator::supportedApplyBatchCommandOnDstNode(int sock_fd) {
545546
return false;
546547
}
547548

548-
Status SlotMigrator::checkSingleResponse(int sock_fd) { return checkMultipleResponses(sock_fd, 1); }
549-
550-
// Commands | Response | Instance
551-
// ++++++++++++++++++++++++++++++++++++++++
552-
// set Redis::Integer :1\r\n
553-
// hset Redis::SimpleString +OK\r\n
554-
// sadd Redis::Integer
555-
// zadd Redis::Integer
556-
// siadd Redis::Integer
557-
// setbit Redis::Integer
558-
// expire Redis::Integer
559-
// lpush Redis::Integer
560-
// rpush Redis::Integer
561-
// ltrim Redis::SimpleString -Err\r\n
562-
// linsert Redis::Integer
563-
// lset Redis::SimpleString
564-
// hdel Redis::Integer
565-
// srem Redis::Integer
566-
// zrem Redis::Integer
567-
// lpop Redis::NilString $-1\r\n
568-
// or Redis::BulkString $1\r\n1\r\n
569-
// rpop Redis::NilString
570-
// or Redis::BulkString
571-
// lrem Redis::Integer
572-
// sirem Redis::Integer
573-
// del Redis::Integer
574-
// xadd Redis::BulkString
575-
// bitfield Redis::Array *1\r\n:0
576-
Status SlotMigrator::checkMultipleResponses(int sock_fd, int total) {
577-
if (sock_fd < 0 || total <= 0) {
578-
return {Status::NotOK, fmt::format("invalid arguments: sock_fd={}, count={}", sock_fd, total)};
579-
}
580-
581-
// Set socket receive timeout first
582-
struct timeval tv;
583-
tv.tv_sec = 1;
584-
tv.tv_usec = 0;
585-
setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
586-
587-
// Start checking response
588-
size_t bulk_or_array_len = 0;
589-
int cnt = 0;
590-
parser_state_ = ParserState::ArrayLen;
591-
UniqueEvbuf evbuf;
592-
while (true) {
593-
// Read response data from socket buffer to the event buffer
594-
if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
595-
return {Status::NotOK, fmt::format("failed to read response: {}", strerror(errno))};
596-
}
597-
598-
// Parse response data in event buffer
599-
bool run = true;
600-
while (run) {
601-
switch (parser_state_) {
602-
// Handle single string response
603-
case ParserState::ArrayLen: {
604-
UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
605-
if (!line) {
606-
LOG(INFO) << "[migrate] Event buffer is empty, read socket again";
607-
run = false;
608-
break;
609-
}
610-
611-
if (line[0] == '-') {
612-
return {Status::NotOK, fmt::format("got invalid response of length {}: {}", line.length, line.get())};
613-
} else if (line[0] == '$' || line[0] == '*') {
614-
auto parse_result = ParseInt<uint64_t>(std::string(line.get() + 1, line.length - 1), 10);
615-
if (!parse_result) {
616-
return {Status::NotOK, "protocol error: expected integer value"};
617-
}
618-
619-
bulk_or_array_len = *parse_result;
620-
if (bulk_or_array_len <= 0) {
621-
parser_state_ = ParserState::OneRspEnd;
622-
} else if (line[0] == '$') {
623-
parser_state_ = ParserState::BulkData;
624-
} else {
625-
parser_state_ = ParserState::ArrayData;
626-
}
627-
} else if (line[0] == '+' || line[0] == ':') {
628-
parser_state_ = ParserState::OneRspEnd;
629-
} else {
630-
return {Status::NotOK, fmt::format("got unexpected response of length {}: {}", line.length, line.get())};
631-
}
632-
633-
break;
634-
}
635-
// Handle bulk string response
636-
case ParserState::BulkData: {
637-
if (evbuffer_get_length(evbuf.get()) < bulk_or_array_len + 2) {
638-
LOG(INFO) << "[migrate] Bulk data in event buffer is not complete, read socket again";
639-
run = false;
640-
break;
641-
}
642-
// TODO(chrisZMF): Check tail '\r\n'
643-
evbuffer_drain(evbuf.get(), bulk_or_array_len + 2);
644-
bulk_or_array_len = 0;
645-
parser_state_ = ParserState::OneRspEnd;
646-
break;
647-
}
648-
case ParserState::ArrayData: {
649-
while (run && bulk_or_array_len > 0) {
650-
evbuffer_ptr ptr = evbuffer_search_eol(evbuf.get(), nullptr, nullptr, EVBUFFER_EOL_CRLF_STRICT);
651-
if (ptr.pos < 0) {
652-
LOG(INFO) << "[migrate] Array data in event buffer is not complete, read socket again";
653-
run = false;
654-
break;
655-
}
656-
evbuffer_drain(evbuf.get(), ptr.pos + 2);
657-
--bulk_or_array_len;
658-
}
659-
if (run) {
660-
parser_state_ = ParserState::OneRspEnd;
661-
}
662-
break;
663-
}
664-
case ParserState::OneRspEnd: {
665-
cnt++;
666-
if (cnt >= total) {
667-
return Status::OK();
668-
}
669-
670-
parser_state_ = ParserState::ArrayLen;
671-
break;
672-
}
673-
default:
674-
break;
675-
}
676-
}
677-
}
678-
}
679-
680549
StatusOr<KeyMigrationResult> SlotMigrator::migrateOneKey(const rocksdb::Slice &key,
681550
const rocksdb::Slice &encoded_metadata,
682551
std::string *restore_cmds) {
@@ -1029,7 +898,7 @@ Status SlotMigrator::sendCmdsPipelineIfNeed(std::string *commands, bool need) {
1029898

1030899
last_send_time_ = util::GetTimeStampUS();
1031900

1032-
s = checkMultipleResponses(*dst_fd_, current_pipeline_size_);
901+
s = util::CheckMultipleResponses(*dst_fd_, current_pipeline_size_);
1033902
if (!s.IsOK()) {
1034903
return s.Prefixed("wrong response from the destination node");
1035904
}

src/cluster/slot_migrate.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,12 @@ class SlotMigrator : public redis::Database {
121121
Status finishFailedMigration();
122122
void clean();
123123

124-
Status authOnDstNode(int sock_fd, const std::string &password);
124+
static Status authOnDstNode(int sock_fd, const std::string &password);
125125
Status setImportStatusOnDstNode(int sock_fd, int status);
126126
static StatusOr<bool> supportedApplyBatchCommandOnDstNode(int sock_fd);
127127

128128
Status sendSnapshotByCmd();
129129
Status syncWALByCmd();
130-
Status checkSingleResponse(int sock_fd);
131-
Status checkMultipleResponses(int sock_fd, int total);
132130

133131
StatusOr<KeyMigrationResult> migrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata,
134132
std::string *restore_cmds);
@@ -175,7 +173,6 @@ class SlotMigrator : public redis::Database {
175173
std::atomic<size_t> migrate_batch_size_bytes_;
176174

177175
SlotMigrationStage current_stage_ = SlotMigrationStage::kNone;
178-
ParserState parser_state_ = ParserState::ArrayLen;
179176
std::atomic<ThreadState> thread_state_ = ThreadState::Uninitialized;
180177
std::atomic<MigrationState> migration_state_ = MigrationState::kNone;
181178

src/commands/cmd_server.cc

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include <ctime>
2424

25+
#include "check_response.h"
2526
#include "command_parser.h"
2627
#include "commander.h"
2728
#include "commands/scan_base.h"
@@ -1257,8 +1258,17 @@ class CommandDump : public Commander {
12571258
return Status::OK();
12581259
}
12591260

1261+
std::string dump_result;
1262+
auto s = DumpKey(ctx, srv, conn, key, dump_result);
1263+
if (!s.IsOK()) return s;
1264+
*output = redis::BulkString(dump_result);
1265+
return Status::OK();
1266+
}
1267+
static Status DumpKey(engine::Context &ctx, Server *srv, Connection *conn, std::string &key,
1268+
std::string &dump_result) {
12601269
RedisType type = kRedisNone;
1261-
db_status = redis.Type(ctx, key, &type);
1270+
redis::Database redis(srv->storage, conn->GetNamespace());
1271+
rocksdb::Status db_status = redis.Type(ctx, key, &type);
12621272
if (!db_status.ok()) return {Status::RedisExecErr, db_status.ToString()};
12631273

12641274
std::string result;
@@ -1267,7 +1277,7 @@ class CommandDump : public Commander {
12671277
auto s = rdb.Dump(key, type);
12681278
if (!s.IsOK()) return s;
12691279
CHECK(dynamic_cast<RdbStringStream *>(rdb.GetStream().get()) != nullptr);
1270-
*output = redis::BulkString(static_cast<RdbStringStream *>(rdb.GetStream().get())->GetInput());
1280+
dump_result = static_cast<RdbStringStream *>(rdb.GetStream().get())->GetInput();
12711281
return Status::OK();
12721282
}
12731283
};
@@ -1370,6 +1380,86 @@ class CommandPollUpdates : public Commander {
13701380
Format format_ = Format::Raw;
13711381
};
13721382

1383+
class CommandMigrate : public Commander {
1384+
public:
1385+
Status Parse(const std::vector<std::string> &args) override {
1386+
if (args.size() != 6) {
1387+
return {Status::RedisExecErr, errWrongNumOfArguments};
1388+
}
1389+
CommandParser parser(args, 1);
1390+
host_ = GET_OR_RET(parser.TakeStr());
1391+
port_ = GET_OR_RET(parser.TakeInt<uint32_t>());
1392+
key_ = GET_OR_RET(parser.TakeStr());
1393+
db_ = GET_OR_RET(parser.TakeInt<int>());
1394+
timeout_ = GET_OR_RET(parser.TakeInt<uint32_t>());
1395+
return Status::OK();
1396+
}
1397+
1398+
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
1399+
redis::Database redis(srv->storage, conn->GetNamespace());
1400+
int count = 0;
1401+
auto db_status = redis.Exists(ctx, {key_}, &count);
1402+
if (!db_status.ok()) {
1403+
return {Status::RedisExecErr, db_status.ToString()};
1404+
}
1405+
if (count == 0) {
1406+
*output = conn->NilString();
1407+
return Status::OK();
1408+
}
1409+
1410+
std::string dump_result;
1411+
auto status = CommandDump::DumpKey(ctx, srv, conn, key_, dump_result);
1412+
if (!status.OK()) return status;
1413+
auto result = util::SockConnect(host_, port_, 0, timeout_);
1414+
if (!result.IsOK()) {
1415+
return {Status::RedisExecErr, "failed to connect to the destination node"};
1416+
}
1417+
UniqueFD dst_fd;
1418+
dst_fd.Reset(*result);
1419+
1420+
status = restoreOnDstNode(redis, ctx, *dst_fd, key_, dump_result);
1421+
if (!status.IsOK()) {
1422+
return {Status::RedisExecErr, status.Msg()};
1423+
}
1424+
auto redis_status = redis.Del(ctx, key_);
1425+
if (!redis_status.ok()) {
1426+
return {Status::RedisExecErr, redis_status.ToString()};
1427+
}
1428+
1429+
*output = redis::SimpleString("OK");
1430+
return Status::OK();
1431+
}
1432+
1433+
private:
1434+
static Status restoreOnDstNode(redis::Database &redis, engine::Context &ctx, int dst_fd, std::string &key,
1435+
std::string &dump_result) {
1436+
std::string restore_cmd;
1437+
uint64_t timestamp = 0;
1438+
auto s = redis.GetExpireTime(ctx, key, &timestamp);
1439+
if (!s.ok() || s.IsExpired()) {
1440+
return {Status::RedisExecErr, "failed to get expire time"};
1441+
}
1442+
restore_cmd += redis::ArrayOfBulkStrings({"restore", key, std::to_string(timestamp), dump_result});
1443+
auto sock_status = util::SockSend(dst_fd, restore_cmd);
1444+
if (!sock_status.IsOK()) {
1445+
return {Status::RedisExecErr, fmt::format("failed to send restore command to destination node")};
1446+
}
1447+
1448+
sock_status = util::CheckSingleResponse(dst_fd);
1449+
if (!sock_status.IsOK()) {
1450+
return {Status::RedisExecErr, sock_status.Msg()};
1451+
}
1452+
1453+
return Status::OK();
1454+
}
1455+
1456+
std::string host_;
1457+
uint32_t port_;
1458+
std::string key_;
1459+
int db_;
1460+
int timeout_;
1461+
};
1462+
13731463
REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loading auth", NO_KEY),
13741464
MakeCmdAttr<CommandPing>("ping", -1, "read-only", NO_KEY),
13751465
MakeCmdAttr<CommandSelect>("select", 2, "read-only", NO_KEY),
@@ -1410,5 +1500,6 @@ REDIS_REGISTER_COMMANDS(Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only o
14101500
MakeCmdAttr<CommandReset>("reset", 1, "ok-loading bypass-multi no-script", NO_KEY),
14111501
MakeCmdAttr<CommandApplyBatch>("applybatch", -2, "write no-multi", NO_KEY),
14121502
MakeCmdAttr<CommandDump>("dump", 2, "read-only", 1, 1, 1),
1413-
MakeCmdAttr<CommandPollUpdates>("pollupdates", -2, "read-only admin", NO_KEY), )
1503+
MakeCmdAttr<CommandPollUpdates>("pollupdates", -2, "read-only admin", NO_KEY),
1504+
MakeCmdAttr<CommandMigrate>("migrate", -6, "write", 1, 1, 1))
14141505
} // namespace redis

0 commit comments

Comments
 (0)