From 31de9b51be73da28c1d493e36583dd23386a2fdf Mon Sep 17 00:00:00 2001 From: lihuiba Date: Fri, 27 Sep 2024 21:42:52 +0800 Subject: [PATCH] kevent retry --- io/kqueue.cpp | 45 +++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/io/kqueue.cpp b/io/kqueue.cpp index 19563549..54d543a0 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -28,6 +28,8 @@ namespace photon { constexpr static EventsMap> evmap; +constexpr static struct timespec tm00 = {0, 0}; + class KQueue : public MasterEventEngine, public CascadingEventEngine, public ResetHandle { public: struct InFlightEvent { @@ -72,17 +74,26 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res close(_kq); } + int do_kevent(const struct kevent *changelist, int nchanges, + struct kevent *eventlist, int nevents, + const struct timespec *timeout, int ntry = 5) { + int ret; + for (int i = 1; i <= ntry; ++i) { + ret = kevent(_kq, changelist, nchanges, eventlist, nevents, timeout); + if (likely(ret >= 0)) return ret; + LOG_ERROR("failed to call kevent(`, `)", VALUE(nchanges), VALUE(nevents), ERRNO()); + usleep(100 * 1000); + } + return ret; + } + int enqueue(int fd, short event, uint16_t action, uint32_t event_flags, void* udata, bool immediate = false) { assert(_n < LEN(_events)); auto entry = &_events[_n++]; EV_SET(entry, fd, event, action, event_flags, 0, udata); if (immediate || _n == LEN(_events)) { - struct timespec tm{0, 0}; - int ret = kevent(_kq, _events, _n, nullptr, 0, &tm); - if (ret < 0) { - LOG_ERRNO_RETURN(0, -1, "failed to submit events with kevent()"); - } - _n = 0; + DEFER(_n = 0); + return do_kevent(_events, _n, nullptr, 0, &tm00); } return 0; } @@ -92,13 +103,13 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res errno = ENOSYS; return -1; } - short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; auto current = CURRENT; + short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; int ret = enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current); if (ret < 0) return ret; ret = thread_usleep(timeout); ERRNO err; - if (ret == -1 && err.no == EOK) { + if (likely(ret == -1 && err.no == EOK)) { return 0; // event arrived } @@ -114,8 +125,8 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res tm.tv_nsec = (timeout % (1000 * 1000)) * 1000; again: - int ret = kevent(_kq, _events, _n, _events, LEN(_events), &tm); - if (ret < 0) LOG_ERRNO_RETURN(0, -1, "failed to call kevent()"); + int ret = do_kevent(_events, _n, _events, LEN(_events), &tm); + if (ret < 0) return ret; _n = 0; nev += ret; @@ -137,8 +148,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res // as same as `enqueue(_kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, nullptr, true)` struct kevent entry; EV_SET(&entry, _kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, 0, nullptr); - struct timespec tm{0, 0}; - return kevent(_kq, &entry, 1, nullptr, 0, &tm); + return do_kevent(&entry, 1, nullptr, 0, &tm00); } // This vector is used to filter invalid add/rm_interest requests which may affect kevent's @@ -189,8 +199,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res if (ret < 0) return errno == ETIMEDOUT ? 0 : -1; if (count > LEN(_events)) count = LEN(_events); - static const struct timespec _tm = {0, 0}; - ret = kevent(_kq, _events, _n, _events, count, &_tm); + ret = do_kevent(_events, _n, _events, count, &tm00); if (ret < 0) LOG_ERRNO_RETURN(0, -1, "failed to call kevent()"); @@ -204,17 +213,17 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res }; __attribute__((noinline)) -static KQueue* new_kqueue_engine() { - LOG_INFO("Init event engine: kqueue"); +static KQueue* new_kqueue_engine(bool is_master) { + LOG_INFO("Init event engine: kqueue ", VALUE(is_master)); return NewObj()->init(); } MasterEventEngine* new_kqueue_master_engine() { - return new_kqueue_engine(); + return new_kqueue_engine(true); } CascadingEventEngine* new_kqueue_cascading_engine() { - return new_kqueue_engine(); + return new_kqueue_engine(false); }