Skip to content

Commit

Permalink
Introduce UniqueEvent and CallbackBase for more intuitive event handle (
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice committed May 5, 2023
1 parent f4aa7e2 commit ff715c5
Show file tree
Hide file tree
Showing 16 changed files with 184 additions and 136 deletions.
19 changes: 9 additions & 10 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include <thread>

#include "event_util.h"
#include "fd_util.h"
#include "fmt/format.h"
#include "io_util.h"
#include "rocksdb_crc32c.h"
Expand All @@ -44,6 +43,7 @@
#include "storage/batch_debugger.h"
#include "thread_util.h"
#include "time_util.h"
#include "unique_fd.h"

Status FeedSlaveThread::Start() {
auto s = util::CreateThread("feed-replica", [this] {
Expand Down Expand Up @@ -361,12 +361,12 @@ void ReplicationThread::run() {
}
psync_steps_.Start();

auto timer = event_new(base_, -1, EV_PERSIST, eventTimerCb, this);
auto timer = UniqueEvent(NewEvent(base_, -1, EV_PERSIST));
timeval tmo{0, 100000}; // 100 ms
evtimer_add(timer, &tmo);
evtimer_add(timer.get(), &tmo);

event_base_dispatch(base_);
event_free(timer);
timer.reset();
event_base_free(base_);
}

Expand Down Expand Up @@ -920,14 +920,13 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, const
}

// Check if stop_flag_ is set, when do, tear down replication
void ReplicationThread::eventTimerCb(int, int16_t, void *ctx) {
void ReplicationThread::TimerCB(int, int16_t) {
// DLOG(INFO) << "[replication] timer";
auto self = static_cast<ReplicationThread *>(ctx);
if (self->stop_flag_) {
if (stop_flag_) {
LOG(INFO) << "[replication] Stop ev loop";
event_base_loopbreak(self->base_);
self->psync_steps_.Stop();
self->fullsync_steps_.Stop();
event_base_loopbreak(base_);
psync_steps_.Stop();
fullsync_steps_.Stop();
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <utility>
#include <vector>

#include "event_util.h"
#include "server/redis_connection.h"
#include "status.h"
#include "storage/storage.h"
Expand Down Expand Up @@ -90,14 +91,16 @@ class FeedSlaveThread {
void checkLivenessIfNeed();
};

class ReplicationThread {
class ReplicationThread : private EventCallbackBase<ReplicationThread> {
public:
explicit ReplicationThread(std::string host, uint32_t port, Server *srv);
Status Start(std::function<void()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb);
void Stop();
ReplState State() { return repl_state_.load(std::memory_order_relaxed); }
time_t LastIOTime() { return last_io_time_.load(std::memory_order_relaxed); }

void TimerCB(int, int16_t);

protected:
event_base *base_ = nullptr;

Expand Down Expand Up @@ -201,8 +204,6 @@ class ReplicationThread {
static bool isWrongPsyncNum(const char *err);
static bool isUnknownOption(const char *err);

static void eventTimerCb(int, int16_t, void *ctx);

Status parseWriteBatch(const std::string &batch_string);
};

Expand Down
2 changes: 1 addition & 1 deletion src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include <utility>
#include <vector>

#include "common/fd_util.h"
#include "config.h"
#include "encoding.h"
#include "parse_util.h"
Expand All @@ -44,6 +43,7 @@
#include "stats/stats.h"
#include "status.h"
#include "storage/redis_db.h"
#include "unique_fd.h"

enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed };

Expand Down
62 changes: 26 additions & 36 deletions src/commands/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "commander.h"
#include "error_constants.h"
#include "event_util.h"
#include "server/server.h"
#include "types/redis_list.h"

Expand Down Expand Up @@ -152,19 +153,16 @@ class CommandRPop : public CommandPop {
CommandRPop() : CommandPop(false) {}
};

class CommandBPop : public Commander {
class CommandBPop : public Commander,
private EvbufCallbackBase<CommandBPop, false>,
private EventCallbackBase<CommandBPop> {
public:
explicit CommandBPop(bool left) : left_(left) {}

CommandBPop(const CommandBPop &) = delete;
CommandBPop &operator=(const CommandBPop &) = delete;

~CommandBPop() override {
if (timer_) {
event_free(timer_);
timer_ = nullptr;
}
}
~CommandBPop() override = default;

Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
Expand Down Expand Up @@ -201,12 +199,12 @@ class CommandBPop : public Commander {
svr_->BlockOnKey(key, conn_);
}

bufferevent_setcb(bev, nullptr, WriteCB, EventCB, this);
SetCB(bev);

if (timeout_) {
timer_ = evtimer_new(bufferevent_get_base(bev), TimerCB, this);
timer_.reset(NewTimer(bufferevent_get_base(bev)));
timeval tm = {timeout_, 0};
evtimer_add(timer_, &tm);
evtimer_add(timer_.get(), &tm);
}

return {Status::BlockingCmd};
Expand Down Expand Up @@ -241,9 +239,8 @@ class CommandBPop : public Commander {
return s;
}

static void WriteCB(bufferevent *bev, void *ctx) {
auto self = reinterpret_cast<CommandBPop *>(ctx);
auto s = self->TryPopFromList();
void OnWrite(bufferevent *bev) {
auto s = TryPopFromList();
if (s.IsNotFound()) {
// The connection may be waked up but can't pop from list. For example,
// connection A is blocking on list and connection B push a new element
Expand All @@ -253,42 +250,35 @@ class CommandBPop : public Commander {
return;
}

if (self->timer_) {
event_free(self->timer_);
self->timer_ = nullptr;
if (timer_) {
timer_.reset();
}

self->unBlockingAll();
bufferevent_setcb(bev, redis::Connection::OnRead, redis::Connection::OnWrite, redis::Connection::OnEvent,
self->conn_);
unBlockingAll();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop processing commands
// in connection after the blocking command, so there may have some commands to be processed.
// Related issue: https://github.com/apache/incubator-kvrocks/issues/831
bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}

static void EventCB(bufferevent *bev, int16_t events, void *ctx) {
auto self = static_cast<CommandBPop *>(ctx);
void OnEvent(bufferevent *bev, int16_t events) {
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
if (self->timer_ != nullptr) {
event_free(self->timer_);
self->timer_ = nullptr;
if (timer_ != nullptr) {
timer_.reset();
}
self->unBlockingAll();
unBlockingAll();
}
redis::Connection::OnEvent(bev, events, self->conn_);
conn_->OnEvent(bev, events);
}

static void TimerCB(int, int16_t events, void *ctx) {
auto self = reinterpret_cast<CommandBPop *>(ctx);
self->conn_->Reply(redis::NilString());
event_free(self->timer_);
self->timer_ = nullptr;
self->unBlockingAll();
auto bev = self->conn_->GetBufferEvent();
bufferevent_setcb(bev, redis::Connection::OnRead, redis::Connection::OnWrite, redis::Connection::OnEvent,
self->conn_);
void TimerCB(int, int16_t events) {
conn_->Reply(redis::NilString());
timer_.reset();
unBlockingAll();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}

Expand All @@ -298,7 +288,7 @@ class CommandBPop : public Commander {
std::vector<std::string> keys_;
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
event *timer_ = nullptr;
UniqueEvent timer_;

void unBlockingAll() {
for (const auto &key : keys_) {
Expand Down
2 changes: 1 addition & 1 deletion src/commands/cmd_replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

#include "commander.h"
#include "error_constants.h"
#include "fd_util.h"
#include "io_util.h"
#include "scope_exit.h"
#include "server/server.h"
#include "thread_util.h"
#include "time_util.h"
#include "unique_fd.h"

namespace redis {

Expand Down
Loading

0 comments on commit ff715c5

Please sign in to comment.