Skip to content

Commit

Permalink
kevent retry
Browse files Browse the repository at this point in the history
  • Loading branch information
lihuiba committed Sep 29, 2024
1 parent 426be58 commit 31de9b5
Showing 1 changed file with 27 additions and 18 deletions.
45 changes: 27 additions & 18 deletions io/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ namespace photon {
constexpr static EventsMap<EVUnderlay<EVFILT_READ, EVFILT_WRITE, EVFILT_EXCEPT>>
evmap;

constexpr static struct timespec tm00 = {0, 0};

class KQueue : public MasterEventEngine, public CascadingEventEngine, public ResetHandle {
public:
struct InFlightEvent {
Expand Down Expand Up @@ -72,17 +74,26 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res
close(_kq);
}

int do_kevent(const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout, int ntry = 5) {
int ret;
for (int i = 1; i <= ntry; ++i) {
ret = kevent(_kq, changelist, nchanges, eventlist, nevents, timeout);
if (likely(ret >= 0)) return ret;
LOG_ERROR("failed to call kevent(`, `)", VALUE(nchanges), VALUE(nevents), ERRNO());
usleep(100 * 1000);
}
return ret;
}

int enqueue(int fd, short event, uint16_t action, uint32_t event_flags, void* udata, bool immediate = false) {
assert(_n < LEN(_events));
auto entry = &_events[_n++];
EV_SET(entry, fd, event, action, event_flags, 0, udata);
if (immediate || _n == LEN(_events)) {
struct timespec tm{0, 0};
int ret = kevent(_kq, _events, _n, nullptr, 0, &tm);
if (ret < 0) {
LOG_ERRNO_RETURN(0, -1, "failed to submit events with kevent()");
}
_n = 0;
DEFER(_n = 0);
return do_kevent(_events, _n, nullptr, 0, &tm00);
}
return 0;
}
Expand All @@ -92,13 +103,13 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res
errno = ENOSYS;
return -1;
}
short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE;
auto current = CURRENT;
short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE;
int ret = enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current);
if (ret < 0) return ret;
ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
if (likely(ret == -1 && err.no == EOK)) {
return 0; // event arrived
}

Expand All @@ -114,8 +125,8 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res
tm.tv_nsec = (timeout % (1000 * 1000)) * 1000;

again:
int ret = kevent(_kq, _events, _n, _events, LEN(_events), &tm);
if (ret < 0) LOG_ERRNO_RETURN(0, -1, "failed to call kevent()");
int ret = do_kevent(_events, _n, _events, LEN(_events), &tm);
if (ret < 0) return ret;

_n = 0;
nev += ret;
Expand All @@ -137,8 +148,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res
// as same as `enqueue(_kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, nullptr, true)`
struct kevent entry;
EV_SET(&entry, _kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, 0, nullptr);
struct timespec tm{0, 0};
return kevent(_kq, &entry, 1, nullptr, 0, &tm);
return do_kevent(&entry, 1, nullptr, 0, &tm00);
}

// This vector is used to filter invalid add/rm_interest requests which may affect kevent's
Expand Down Expand Up @@ -189,8 +199,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res
if (ret < 0) return errno == ETIMEDOUT ? 0 : -1;
if (count > LEN(_events))
count = LEN(_events);
static const struct timespec _tm = {0, 0};
ret = kevent(_kq, _events, _n, _events, count, &_tm);
ret = do_kevent(_events, _n, _events, count, &tm00);
if (ret < 0)
LOG_ERRNO_RETURN(0, -1, "failed to call kevent()");

Expand All @@ -204,17 +213,17 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res
};

__attribute__((noinline))
static KQueue* new_kqueue_engine() {
LOG_INFO("Init event engine: kqueue");
static KQueue* new_kqueue_engine(bool is_master) {
LOG_INFO("Init event engine: kqueue ", VALUE(is_master));
return NewObj<KQueue>()->init();
}

MasterEventEngine* new_kqueue_master_engine() {
return new_kqueue_engine();
return new_kqueue_engine(true);
}

CascadingEventEngine* new_kqueue_cascading_engine() {
return new_kqueue_engine();
return new_kqueue_engine(false);
}


Expand Down

0 comments on commit 31de9b5

Please sign in to comment.