Skip to content

Commit 62dfd77

Browse files
committed
support MIGRATE command
1 parent a83090a commit 62dfd77

File tree

6 files changed

+590
-158
lines changed

6 files changed

+590
-158
lines changed

src/cluster/slot_migrate.cc

Lines changed: 4 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "event_util.h"
2828
#include "fmt/format.h"
2929
#include "io_util.h"
30+
#include "redis_protocol.h"
3031
#include "storage/batch_extractor.h"
3132
#include "storage/iterator.h"
3233
#include "storage/redis_metadata.h"
@@ -283,7 +284,7 @@ Status SlotMigrator::startMigration() {
283284
// Auth first
284285
std::string pass = srv_->GetConfig()->requirepass;
285286
if (!pass.empty()) {
286-
auto s = authOnDstNode(*dst_fd_, pass);
287+
auto s = util::AuthOnDstNode(*dst_fd_, pass);
287288
if (!s.IsOK()) {
288289
return s.Prefixed("failed to authenticate on destination node");
289290
}
@@ -485,21 +486,6 @@ void SlotMigrator::clean() {
485486
SetStopMigrationFlag(false);
486487
}
487488

488-
Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) {
489-
std::string cmd = redis::ArrayOfBulkStrings({"auth", password});
490-
auto s = util::SockSend(sock_fd, cmd);
491-
if (!s.IsOK()) {
492-
return s.Prefixed("failed to send AUTH command");
493-
}
494-
495-
s = checkSingleResponse(sock_fd);
496-
if (!s.IsOK()) {
497-
return s.Prefixed("failed to check the response of AUTH command");
498-
}
499-
500-
return Status::OK();
501-
}
502-
503489
Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) {
504490
if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"};
505491

@@ -510,7 +496,7 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) {
510496
return s.Prefixed("failed to send command to the destination node");
511497
}
512498

513-
s = checkSingleResponse(sock_fd);
499+
s = util::CheckSingleResponse(sock_fd);
514500
if (!s.IsOK()) {
515501
return s.Prefixed("failed to check the response from the destination node");
516502
}
@@ -545,138 +531,6 @@ StatusOr<bool> SlotMigrator::supportedApplyBatchCommandOnDstNode(int sock_fd) {
545531
return false;
546532
}
547533

548-
Status SlotMigrator::checkSingleResponse(int sock_fd) { return checkMultipleResponses(sock_fd, 1); }
549-
550-
// Commands | Response | Instance
551-
// ++++++++++++++++++++++++++++++++++++++++
552-
// set Redis::Integer :1\r\n
553-
// hset Redis::SimpleString +OK\r\n
554-
// sadd Redis::Integer
555-
// zadd Redis::Integer
556-
// siadd Redis::Integer
557-
// setbit Redis::Integer
558-
// expire Redis::Integer
559-
// lpush Redis::Integer
560-
// rpush Redis::Integer
561-
// ltrim Redis::SimpleString -Err\r\n
562-
// linsert Redis::Integer
563-
// lset Redis::SimpleString
564-
// hdel Redis::Integer
565-
// srem Redis::Integer
566-
// zrem Redis::Integer
567-
// lpop Redis::NilString $-1\r\n
568-
// or Redis::BulkString $1\r\n1\r\n
569-
// rpop Redis::NilString
570-
// or Redis::BulkString
571-
// lrem Redis::Integer
572-
// sirem Redis::Integer
573-
// del Redis::Integer
574-
// xadd Redis::BulkString
575-
// bitfield Redis::Array *1\r\n:0
576-
Status SlotMigrator::checkMultipleResponses(int sock_fd, int total) {
577-
if (sock_fd < 0 || total <= 0) {
578-
return {Status::NotOK, fmt::format("invalid arguments: sock_fd={}, count={}", sock_fd, total)};
579-
}
580-
581-
// Set socket receive timeout first
582-
struct timeval tv;
583-
tv.tv_sec = 1;
584-
tv.tv_usec = 0;
585-
setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
586-
587-
// Start checking response
588-
size_t bulk_or_array_len = 0;
589-
int cnt = 0;
590-
parser_state_ = ParserState::ArrayLen;
591-
UniqueEvbuf evbuf;
592-
while (true) {
593-
// Read response data from socket buffer to the event buffer
594-
if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
595-
return {Status::NotOK, fmt::format("failed to read response: {}", strerror(errno))};
596-
}
597-
598-
// Parse response data in event buffer
599-
bool run = true;
600-
while (run) {
601-
switch (parser_state_) {
602-
// Handle single string response
603-
case ParserState::ArrayLen: {
604-
UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
605-
if (!line) {
606-
LOG(INFO) << "[migrate] Event buffer is empty, read socket again";
607-
run = false;
608-
break;
609-
}
610-
611-
if (line[0] == '-') {
612-
return {Status::NotOK, fmt::format("got invalid response of length {}: {}", line.length, line.get())};
613-
} else if (line[0] == '$' || line[0] == '*') {
614-
auto parse_result = ParseInt<uint64_t>(std::string(line.get() + 1, line.length - 1), 10);
615-
if (!parse_result) {
616-
return {Status::NotOK, "protocol error: expected integer value"};
617-
}
618-
619-
bulk_or_array_len = *parse_result;
620-
if (bulk_or_array_len <= 0) {
621-
parser_state_ = ParserState::OneRspEnd;
622-
} else if (line[0] == '$') {
623-
parser_state_ = ParserState::BulkData;
624-
} else {
625-
parser_state_ = ParserState::ArrayData;
626-
}
627-
} else if (line[0] == '+' || line[0] == ':') {
628-
parser_state_ = ParserState::OneRspEnd;
629-
} else {
630-
return {Status::NotOK, fmt::format("got unexpected response of length {}: {}", line.length, line.get())};
631-
}
632-
633-
break;
634-
}
635-
// Handle bulk string response
636-
case ParserState::BulkData: {
637-
if (evbuffer_get_length(evbuf.get()) < bulk_or_array_len + 2) {
638-
LOG(INFO) << "[migrate] Bulk data in event buffer is not complete, read socket again";
639-
run = false;
640-
break;
641-
}
642-
// TODO(chrisZMF): Check tail '\r\n'
643-
evbuffer_drain(evbuf.get(), bulk_or_array_len + 2);
644-
bulk_or_array_len = 0;
645-
parser_state_ = ParserState::OneRspEnd;
646-
break;
647-
}
648-
case ParserState::ArrayData: {
649-
while (run && bulk_or_array_len > 0) {
650-
evbuffer_ptr ptr = evbuffer_search_eol(evbuf.get(), nullptr, nullptr, EVBUFFER_EOL_CRLF_STRICT);
651-
if (ptr.pos < 0) {
652-
LOG(INFO) << "[migrate] Array data in event buffer is not complete, read socket again";
653-
run = false;
654-
break;
655-
}
656-
evbuffer_drain(evbuf.get(), ptr.pos + 2);
657-
--bulk_or_array_len;
658-
}
659-
if (run) {
660-
parser_state_ = ParserState::OneRspEnd;
661-
}
662-
break;
663-
}
664-
case ParserState::OneRspEnd: {
665-
cnt++;
666-
if (cnt >= total) {
667-
return Status::OK();
668-
}
669-
670-
parser_state_ = ParserState::ArrayLen;
671-
break;
672-
}
673-
default:
674-
break;
675-
}
676-
}
677-
}
678-
}
679-
680534
StatusOr<KeyMigrationResult> SlotMigrator::migrateOneKey(const rocksdb::Slice &key,
681535
const rocksdb::Slice &encoded_metadata,
682536
std::string *restore_cmds) {
@@ -1029,7 +883,7 @@ Status SlotMigrator::sendCmdsPipelineIfNeed(std::string *commands, bool need) {
1029883

1030884
last_send_time_ = util::GetTimeStampUS();
1031885

1032-
s = checkMultipleResponses(*dst_fd_, current_pipeline_size_);
886+
s = util::CheckMultipleResponses(*dst_fd_, current_pipeline_size_);
1033887
if (!s.IsOK()) {
1034888
return s.Prefixed("wrong response from the destination node");
1035889
}

src/cluster/slot_migrate.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,11 @@ class SlotMigrator : public redis::Database {
121121
Status finishFailedMigration();
122122
void clean();
123123

124-
Status authOnDstNode(int sock_fd, const std::string &password);
125124
Status setImportStatusOnDstNode(int sock_fd, int status);
126125
static StatusOr<bool> supportedApplyBatchCommandOnDstNode(int sock_fd);
127126

128127
Status sendSnapshotByCmd();
129128
Status syncWALByCmd();
130-
Status checkSingleResponse(int sock_fd);
131-
Status checkMultipleResponses(int sock_fd, int total);
132129

133130
StatusOr<KeyMigrationResult> migrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata,
134131
std::string *restore_cmds);
@@ -175,7 +172,6 @@ class SlotMigrator : public redis::Database {
175172
std::atomic<size_t> migrate_batch_size_bytes_;
176173

177174
SlotMigrationStage current_stage_ = SlotMigrationStage::kNone;
178-
ParserState parser_state_ = ParserState::ArrayLen;
179175
std::atomic<ThreadState> thread_state_ = ThreadState::Uninitialized;
180176
std::atomic<MigrationState> migration_state_ = MigrationState::kNone;
181177

0 commit comments

Comments
 (0)