Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 4 additions & 150 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "event_util.h"
#include "fmt/format.h"
#include "io_util.h"
#include "redis_protocol.h"
#include "storage/batch_extractor.h"
#include "storage/iterator.h"
#include "storage/redis_metadata.h"
Expand Down Expand Up @@ -283,7 +284,7 @@ Status SlotMigrator::startMigration() {
// Auth first
std::string pass = srv_->GetConfig()->requirepass;
if (!pass.empty()) {
auto s = authOnDstNode(*dst_fd_, pass);
auto s = util::AuthOnDstNode(*dst_fd_, pass);
if (!s.IsOK()) {
return s.Prefixed("failed to authenticate on destination node");
}
Expand Down Expand Up @@ -485,21 +486,6 @@ void SlotMigrator::clean() {
SetStopMigrationFlag(false);
}

Status SlotMigrator::authOnDstNode(int sock_fd, const std::string &password) {
std::string cmd = redis::ArrayOfBulkStrings({"auth", password});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send AUTH command");
}

s = checkSingleResponse(sock_fd);
if (!s.IsOK()) {
return s.Prefixed("failed to check the response of AUTH command");
}

return Status::OK();
}

Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) {
if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"};

Expand All @@ -510,7 +496,7 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) {
return s.Prefixed("failed to send command to the destination node");
}

s = checkSingleResponse(sock_fd);
s = util::CheckSingleResponse(sock_fd);
if (!s.IsOK()) {
return s.Prefixed("failed to check the response from the destination node");
}
Expand Down Expand Up @@ -545,138 +531,6 @@ StatusOr<bool> SlotMigrator::supportedApplyBatchCommandOnDstNode(int sock_fd) {
return false;
}

Status SlotMigrator::checkSingleResponse(int sock_fd) { return checkMultipleResponses(sock_fd, 1); }

// Commands | Response | Instance
// ++++++++++++++++++++++++++++++++++++++++
// set Redis::Integer :1\r\n
// hset Redis::SimpleString +OK\r\n
// sadd Redis::Integer
// zadd Redis::Integer
// siadd Redis::Integer
// setbit Redis::Integer
// expire Redis::Integer
// lpush Redis::Integer
// rpush Redis::Integer
// ltrim Redis::SimpleString -Err\r\n
// linsert Redis::Integer
// lset Redis::SimpleString
// hdel Redis::Integer
// srem Redis::Integer
// zrem Redis::Integer
// lpop Redis::NilString $-1\r\n
// or Redis::BulkString $1\r\n1\r\n
// rpop Redis::NilString
// or Redis::BulkString
// lrem Redis::Integer
// sirem Redis::Integer
// del Redis::Integer
// xadd Redis::BulkString
// bitfield Redis::Array *1\r\n:0
Status SlotMigrator::checkMultipleResponses(int sock_fd, int total) {
if (sock_fd < 0 || total <= 0) {
return {Status::NotOK, fmt::format("invalid arguments: sock_fd={}, count={}", sock_fd, total)};
}

// Set socket receive timeout first
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));

// Start checking response
size_t bulk_or_array_len = 0;
int cnt = 0;
parser_state_ = ParserState::ArrayLen;
UniqueEvbuf evbuf;
while (true) {
// Read response data from socket buffer to the event buffer
if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
return {Status::NotOK, fmt::format("failed to read response: {}", strerror(errno))};
}

// Parse response data in event buffer
bool run = true;
while (run) {
switch (parser_state_) {
// Handle single string response
case ParserState::ArrayLen: {
UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
LOG(INFO) << "[migrate] Event buffer is empty, read socket again";
run = false;
break;
}

if (line[0] == '-') {
return {Status::NotOK, fmt::format("got invalid response of length {}: {}", line.length, line.get())};
} else if (line[0] == '$' || line[0] == '*') {
auto parse_result = ParseInt<uint64_t>(std::string(line.get() + 1, line.length - 1), 10);
if (!parse_result) {
return {Status::NotOK, "protocol error: expected integer value"};
}

bulk_or_array_len = *parse_result;
if (bulk_or_array_len <= 0) {
parser_state_ = ParserState::OneRspEnd;
} else if (line[0] == '$') {
parser_state_ = ParserState::BulkData;
} else {
parser_state_ = ParserState::ArrayData;
}
} else if (line[0] == '+' || line[0] == ':') {
parser_state_ = ParserState::OneRspEnd;
} else {
return {Status::NotOK, fmt::format("got unexpected response of length {}: {}", line.length, line.get())};
}

break;
}
// Handle bulk string response
case ParserState::BulkData: {
if (evbuffer_get_length(evbuf.get()) < bulk_or_array_len + 2) {
LOG(INFO) << "[migrate] Bulk data in event buffer is not complete, read socket again";
run = false;
break;
}
// TODO(chrisZMF): Check tail '\r\n'
evbuffer_drain(evbuf.get(), bulk_or_array_len + 2);
bulk_or_array_len = 0;
parser_state_ = ParserState::OneRspEnd;
break;
}
case ParserState::ArrayData: {
while (run && bulk_or_array_len > 0) {
evbuffer_ptr ptr = evbuffer_search_eol(evbuf.get(), nullptr, nullptr, EVBUFFER_EOL_CRLF_STRICT);
if (ptr.pos < 0) {
LOG(INFO) << "[migrate] Array data in event buffer is not complete, read socket again";
run = false;
break;
}
evbuffer_drain(evbuf.get(), ptr.pos + 2);
--bulk_or_array_len;
}
if (run) {
parser_state_ = ParserState::OneRspEnd;
}
break;
}
case ParserState::OneRspEnd: {
cnt++;
if (cnt >= total) {
return Status::OK();
}

parser_state_ = ParserState::ArrayLen;
break;
}
default:
break;
}
}
}
}

StatusOr<KeyMigrationResult> SlotMigrator::migrateOneKey(const rocksdb::Slice &key,
const rocksdb::Slice &encoded_metadata,
std::string *restore_cmds) {
Expand Down Expand Up @@ -1029,7 +883,7 @@ Status SlotMigrator::sendCmdsPipelineIfNeed(std::string *commands, bool need) {

last_send_time_ = util::GetTimeStampUS();

s = checkMultipleResponses(*dst_fd_, current_pipeline_size_);
s = util::CheckMultipleResponses(*dst_fd_, current_pipeline_size_);
if (!s.IsOK()) {
return s.Prefixed("wrong response from the destination node");
}
Expand Down
4 changes: 0 additions & 4 deletions src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,11 @@ class SlotMigrator : public redis::Database {
Status finishFailedMigration();
void clean();

Status authOnDstNode(int sock_fd, const std::string &password);
Status setImportStatusOnDstNode(int sock_fd, int status);
static StatusOr<bool> supportedApplyBatchCommandOnDstNode(int sock_fd);

Status sendSnapshotByCmd();
Status syncWALByCmd();
Status checkSingleResponse(int sock_fd);
Status checkMultipleResponses(int sock_fd, int total);

StatusOr<KeyMigrationResult> migrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata,
std::string *restore_cmds);
Expand Down Expand Up @@ -175,7 +172,6 @@ class SlotMigrator : public redis::Database {
std::atomic<size_t> migrate_batch_size_bytes_;

SlotMigrationStage current_stage_ = SlotMigrationStage::kNone;
ParserState parser_state_ = ParserState::ArrayLen;
std::atomic<ThreadState> thread_state_ = ThreadState::Uninitialized;
std::atomic<MigrationState> migration_state_ = MigrationState::kNone;

Expand Down
Loading