From 62aa683ce2eda450ed54abc1aaaa9bfe897b2aea Mon Sep 17 00:00:00 2001 From: Coldwings Date: Mon, 9 Sep 2024 15:57:11 +0800 Subject: [PATCH 1/7] FIX: wait_for_fd in epoll should atleast put into sleep for a tiny moment --- io/epoll.cpp | 4 +++- net/pooled_socket.cpp | 9 +++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/io/epoll.cpp b/io/epoll.cpp index a45ec09f..794fd066 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -301,7 +301,9 @@ ok: entry.interests |= eint; return rm_interest({fd, EVENT_RWE| ONE_SHOT, 0}); // remove fd from epoll int ret = add_interest({fd, interest | ONE_SHOT, CURRENT}); if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest"); - ret = thread_usleep(timeout); + // if timeout is just simple 0, wait for a tiny little moment + // so that events can be collect. + ret = thread_usleep(timeout.timeout() ? timeout : Timeout(10)); ERRNO err; if (ret == -1 && err.no == EOK) { return 0; // Event arrived diff --git a/net/pooled_socket.cpp b/net/pooled_socket.cpp index 541c44d9..668798b7 100644 --- a/net/pooled_socket.cpp +++ b/net/pooled_socket.cpp @@ -121,9 +121,10 @@ class TCPSocketPool : public ForwardSocketClient { photon::Timer timer; // all fd < 0 treated as socket not based on fd - // and always alive. Using such socket needs user + // and always reuseable. Using such socket needs user // to check if connected socket is still usable. - bool stream_alive(int fd) { + // if there still have unread bytes in strema, it should be closed. + bool stream_reusable(int fd) { return (fd < 0) || (wait_for_fd_readable(fd, 0) != 0); } @@ -196,7 +197,7 @@ class TCPSocketPool : public ForwardSocketClient { if (!stream) { stream = m_underlay->connect(remote, local); if (!stream) return nullptr; - } else if (!stream_alive(stream->get_underlay_fd())) { + } else if (!stream_reusable(stream->get_underlay_fd())) { delete stream; goto again; } @@ -231,7 +232,7 @@ class TCPSocketPool : public ForwardSocketClient { bool release(const EndPoint& ep, ISocketStream* stream) { auto fd = stream->get_underlay_fd(); ERRNO err; - if (!stream_alive(fd)) return false; + if (!stream_reusable(fd)) return false; auto node = new StreamListNode(ep, stream, fd, TTL_us); push_into_pool(node); errno = err.no; From dfd865dc6b170e70b4c40ba42abef55e6ed5347f Mon Sep 17 00:00:00 2001 From: Coldwings Date: Mon, 9 Sep 2024 16:49:16 +0800 Subject: [PATCH 2/7] Also fix on kqueue --- io/kqueue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/kqueue.cpp b/io/kqueue.cpp index 616f1372..d89c6cde 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -99,7 +99,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res return 0; short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT); - int ret = thread_usleep(timeout); + int ret = thread_usleep(timeout.timeout() ? timeout : Timeout(10)); ERRNO err; if (ret == -1 && err.no == EOK) { return 0; // event arrived From b0e32eb3d40d90b24ca78dadd107671d6d18756f Mon Sep 17 00:00:00 2001 From: Coldwings Date: Wed, 11 Sep 2024 11:27:43 +0800 Subject: [PATCH 3/7] wait_for_fd reap events when wait_for_fd in 0 timeout --- io/epoll.cpp | 18 ++++++++++++++++ io/kqueue.cpp | 59 +++++++++++++++++++++++++++++++++++---------------- 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/io/epoll.cpp b/io/epoll.cpp index 794fd066..8b8d4a8c 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -303,6 +303,24 @@ ok: entry.interests |= eint; if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest"); // if timeout is just simple 0, wait for a tiny little moment // so that events can be collect. + if (timeout.expired()) { + ret = -1; + wait_for_events( + 0, + [&](void* data) __INLINE__ { + if ((thread*)data == CURRENT) { + ret = 0; + } else { + thread_interrupt((thread*)data, EOK); + } + }, + [&]() __INLINE__ { return true; }); + if (ret < 0) { + rm_interest({fd, interest, 0}); + errno = ETIMEDOUT; + } + return ret; + } ret = thread_usleep(timeout.timeout() ? timeout : Timeout(10)); ERRNO err; if (ret == -1 && err.no == EOK) { diff --git a/io/kqueue.cpp b/io/kqueue.cpp index d89c6cde..8c437065 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -94,22 +94,8 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res return 0; } - int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { - if (unlikely(interests == 0)) - return 0; - short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; - enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT); - int ret = thread_usleep(timeout.timeout() ? timeout : Timeout(10)); - ERRNO err; - if (ret == -1 && err.no == EOK) { - return 0; // event arrived - } - - errno = (ret == 0) ? ETIMEDOUT : err.no; - return -1; - } - - ssize_t wait_and_fire_events(uint64_t timeout) override { + template + ssize_t do_wait_and_fire_events(uint64_t timeout, EVCB&& event_callback) { ssize_t nev = 0; struct timespec tm; tm.tv_sec = timeout / 1000 / 1000; @@ -124,8 +110,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res nev += ret; for (int i = 0; i < ret; ++i) { if (_events[i].filter == EVFILT_USER) continue; - auto th = (thread*) _events[i].udata; - if (th) thread_interrupt(th, EOK); + event_callback(_events[i].udata); } if (ret == (int) LEN(_events)) { // there may be more events tm.tv_sec = tm.tv_nsec = 0; @@ -134,6 +119,44 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res return nev; } + int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { + if (unlikely(interests == 0)) + return 0; + short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; + enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT, timeout.expired()); + if (timeout.expired()) { + int ret = -1; + do_wait_and_fire_events(0, [&](void* data) { + auto th = (thread*)data; + if (th == CURRENT) + ret = 0; + else + thread_interrupt(th); + }); + if (ret <0) { + enqueue(fd, ev, EV_DELETE, 0, CURRENT, true); + errno = ETIMEDOUT; + } + return ret; + } + int ret = thread_usleep(timeout); + ERRNO err; + if (ret == -1 && err.no == EOK) { + return 0; // event arrived + } + + errno = (ret == 0) ? ETIMEDOUT : err.no; + enqueue(fd, ev, EV_DELETE, 0, CURRENT, true); + return -1; + } + + ssize_t wait_and_fire_events(uint64_t timeout) override { + return do_wait_and_fire_events(timeout, [](void* data) { + auto th = (thread*)data; + thread_interrupt(th); + }); + } + int cancel_wait() override { enqueue(_kq, EVFILT_USER, EV_ONESHOT, NOTE_TRIGGER, nullptr, true); return 0; From 992be127e128b1fe7a629bc9ac55c98c1d9269ce Mon Sep 17 00:00:00 2001 From: Coldwings Date: Wed, 11 Sep 2024 12:11:25 +0800 Subject: [PATCH 4/7] fix kevent immeidate enqueue --- io/kqueue.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/io/kqueue.cpp b/io/kqueue.cpp index 8c437065..289b4087 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -84,7 +84,8 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res auto entry = &_events[_n++]; EV_SET(entry, fd, event, action, event_flags, 0, udata); if (immediate || _n == LEN(_events)) { - int ret = kevent(_kq, _events, _n, nullptr, 0, nullptr); + struct timespec tm {0, 0}; + int ret = kevent(_kq, _events, _n, nullptr, 0, &tm); if (ret < 0) { // debug_breakpoint(); LOG_ERRNO_RETURN(0, -1, "failed to submit events with kevent()"); From 7669aec9bfd7ec4e60b4642cbed21fe9cb59155a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B8=A3=E8=88=9F?= Date: Wed, 11 Sep 2024 14:36:22 +0800 Subject: [PATCH 5/7] KQueue event submitt always use immediate --- io/kqueue.cpp | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/io/kqueue.cpp b/io/kqueue.cpp index 289b4087..4a0842dd 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -111,7 +111,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res nev += ret; for (int i = 0; i < ret; ++i) { if (_events[i].filter == EVFILT_USER) continue; - event_callback(_events[i].udata); + event_callback((thread*)_events[i].udata); } if (ret == (int) LEN(_events)) { // there may be more events tm.tv_sec = tm.tv_nsec = 0; @@ -124,18 +124,18 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res if (unlikely(interests == 0)) return 0; short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; - enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, CURRENT, timeout.expired()); + auto current = CURRENT; + enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current, true); if (timeout.expired()) { int ret = -1; - do_wait_and_fire_events(0, [&](void* data) { - auto th = (thread*)data; - if (th == CURRENT) + do_wait_and_fire_events(0, [current, &ret](thread* th) { + if (th == current) ret = 0; else thread_interrupt(th); }); if (ret <0) { - enqueue(fd, ev, EV_DELETE, 0, CURRENT, true); + enqueue(fd, ev, EV_DELETE, 0, current, true); errno = ETIMEDOUT; } return ret; @@ -147,13 +147,12 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res } errno = (ret == 0) ? ETIMEDOUT : err.no; - enqueue(fd, ev, EV_DELETE, 0, CURRENT, true); + enqueue(fd, ev, EV_DELETE, 0, current, true); return -1; } ssize_t wait_and_fire_events(uint64_t timeout) override { - return do_wait_and_fire_events(timeout, [](void* data) { - auto th = (thread*)data; + return do_wait_and_fire_events(timeout, [](thread* th) { thread_interrupt(th); }); } From 5b40acf2738c889edb00d4d10a2261b7dcdeed4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B8=A3=E8=88=9F?= Date: Wed, 11 Sep 2024 16:38:38 +0800 Subject: [PATCH 6/7] fix --- io/epoll.cpp | 7 ++++--- io/kqueue.cpp | 15 ++++++--------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/io/epoll.cpp b/io/epoll.cpp index 8b8d4a8c..8a3a3142 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -299,7 +299,8 @@ ok: entry.interests |= eint; LOG_ERROR_RETURN(EINVAL, -1, "can not wait for multiple interests"); if (unlikely(interest == 0)) return rm_interest({fd, EVENT_RWE| ONE_SHOT, 0}); // remove fd from epoll - int ret = add_interest({fd, interest | ONE_SHOT, CURRENT}); + thread* current = CURRENT; + int ret = add_interest({fd, interest | ONE_SHOT, current}); if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest"); // if timeout is just simple 0, wait for a tiny little moment // so that events can be collect. @@ -307,8 +308,8 @@ ok: entry.interests |= eint; ret = -1; wait_for_events( 0, - [&](void* data) __INLINE__ { - if ((thread*)data == CURRENT) { + [current, &ret](void* data) __INLINE__ { + if ((thread*)data == current) { ret = 0; } else { thread_interrupt((thread*)data, EOK); diff --git a/io/kqueue.cpp b/io/kqueue.cpp index 4a0842dd..c80d659e 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -84,7 +84,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res auto entry = &_events[_n++]; EV_SET(entry, fd, event, action, event_flags, 0, udata); if (immediate || _n == LEN(_events)) { - struct timespec tm {0, 0}; + struct timespec tm{0, 0}; int ret = kevent(_kq, _events, _n, nullptr, 0, &tm); if (ret < 0) { // debug_breakpoint(); @@ -111,7 +111,8 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res nev += ret; for (int i = 0; i < ret; ++i) { if (_events[i].filter == EVFILT_USER) continue; - event_callback((thread*)_events[i].udata); + auto th = (thread*) _events[i].udata; + if (th) event_callback(th); } if (ret == (int) LEN(_events)) { // there may be more events tm.tv_sec = tm.tv_nsec = 0; @@ -121,8 +122,6 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res } int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { - if (unlikely(interests == 0)) - return 0; short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; auto current = CURRENT; enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current, true); @@ -132,7 +131,7 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res if (th == current) ret = 0; else - thread_interrupt(th); + thread_interrupt(th, EOK); }); if (ret <0) { enqueue(fd, ev, EV_DELETE, 0, current, true); @@ -145,16 +144,14 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res if (ret == -1 && err.no == EOK) { return 0; // event arrived } - + errno = (ret == 0) ? ETIMEDOUT : err.no; enqueue(fd, ev, EV_DELETE, 0, current, true); return -1; } ssize_t wait_and_fire_events(uint64_t timeout) override { - return do_wait_and_fire_events(timeout, [](thread* th) { - thread_interrupt(th); - }); + return do_wait_and_fire_events(timeout, [](thread *th) { thread_interrupt(th, EOK); }); } int cancel_wait() override { From 668e8aa5d5220bf3757aa5fc75db10b86b2b96aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=B8=A3=E8=88=9F?= Date: Thu, 12 Sep 2024 10:39:03 +0800 Subject: [PATCH 7/7] Fix kqueue response for wait_for_fd with interests=0 --- io/epoll.cpp | 2 +- io/kqueue.cpp | 17 +++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/io/epoll.cpp b/io/epoll.cpp index 8a3a3142..ec8a3fa7 100644 --- a/io/epoll.cpp +++ b/io/epoll.cpp @@ -322,7 +322,7 @@ ok: entry.interests |= eint; } return ret; } - ret = thread_usleep(timeout.timeout() ? timeout : Timeout(10)); + ret = thread_usleep(timeout); ERRNO err; if (ret == -1 && err.no == EOK) { return 0; // Event arrived diff --git a/io/kqueue.cpp b/io/kqueue.cpp index c80d659e..e8b54daa 100644 --- a/io/kqueue.cpp +++ b/io/kqueue.cpp @@ -77,9 +77,6 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res } int enqueue(int fd, short event, uint16_t action, uint32_t event_flags, void* udata, bool immediate = false) { - // if (fd == _kq) debug_breakpoint(); - // immediate = true; - // LOG_DEBUG(VALUE(_kq), VALUE(fd), VALUE(event), VALUE(action), VALUE(event_flags), VALUE(udata), VALUE(immediate)); assert(_n < LEN(_events)); auto entry = &_events[_n++]; EV_SET(entry, fd, event, action, event_flags, 0, udata); @@ -87,7 +84,6 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res struct timespec tm{0, 0}; int ret = kevent(_kq, _events, _n, nullptr, 0, &tm); if (ret < 0) { - // debug_breakpoint(); LOG_ERRNO_RETURN(0, -1, "failed to submit events with kevent()"); } _n = 0; @@ -122,11 +118,16 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res } int wait_for_fd(int fd, uint32_t interests, Timeout timeout) override { + if (unlikely(interests == 0)) { + errno = ENOSYS; + return -1; + } short ev = (interests == EVENT_READ) ? EVFILT_READ : EVFILT_WRITE; auto current = CURRENT; - enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current, true); + int ret = enqueue(fd, ev, EV_ADD | EV_ONESHOT, 0, current); + if (ret < 0) return ret; if (timeout.expired()) { - int ret = -1; + ret = -1; do_wait_and_fire_events(0, [current, &ret](thread* th) { if (th == current) ret = 0; @@ -139,14 +140,14 @@ class KQueue : public MasterEventEngine, public CascadingEventEngine, public Res } return ret; } - int ret = thread_usleep(timeout); + ret = thread_usleep(timeout); ERRNO err; if (ret == -1 && err.no == EOK) { return 0; // event arrived } - errno = (ret == 0) ? ETIMEDOUT : err.no; enqueue(fd, ev, EV_DELETE, 0, current, true); + errno = (ret == 0) ? ETIMEDOUT : err.no; return -1; }