Skip to content

Commit

Permalink
Realize true one-shot semantics for epoll with EPOLLONESHOT (#410)
Browse files Browse the repository at this point in the history
* realize ONE_SHOT for epoll

* fix

* fix

* improve branching

* fix

* fix
  • Loading branch information
lihuiba authored Mar 17, 2024
1 parent ecbb2fa commit a7597a9
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 89 deletions.
174 changes: 93 additions & 81 deletions io/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,62 +110,81 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine,
}
return 0;
}

std::vector<InFlightEvent> _inflight_events;
virtual int add_interest(Event e) override {
if (e.fd < 0)
LOG_ERROR_RETURN(EINVAL, -1, "invalid file descriptor ", e.fd);
if ((size_t)e.fd >= _inflight_events.size())
if (unlikely((size_t)e.fd >= _inflight_events.size()))
_inflight_events.resize(e.fd * 2);
auto& entry = _inflight_events[e.fd];
if (e.interests & entry.interests) {
if (((e.interests & entry.interests & EVENT_READ) &&
(entry.reader_data != e.data)) ||
((e.interests & entry.interests & EVENT_WRITE) &&
(entry.writer_data != e.data)) ||
((e.interests & entry.interests & EVENT_ERROR) &&
(entry.error_data != e.data))) {
LOG_ERROR_RETURN(EALREADY, -1, "conflicted interest(s)");
auto intersection = e.interests & entry.interests & EVENT_RWE;
auto data = (entry.reader_data != e.data) * EVENT_READ |
(entry.writer_data != e.data) * EVENT_WRITE |
(entry.error_data != e.data) * EVENT_ERROR ;
if (intersection & data)
LOG_ERROR_RETURN(EALREADY, -1, "conflicted interest(s)");

int ret;
auto eint = entry.interests;
auto x = (eint | e.interests) & EVENT_RWE;
auto events = evmap.translate_bitwisely(x);
if (likely(e.interests & ONE_SHOT)) {
events |= EPOLLONESHOT;
if (likely(eint & ONE_SHOT)) {
ret = ctl(e.fd, EPOLL_CTL_MOD, events);
if (unlikely(ret < 0 && errno == ENOENT))
ret = ctl(e.fd, EPOLL_CTL_ADD, events);
} else {
if (eint != 0) LOG_ERROR_RETURN(EINVAL, -1, "conflicted interest(s) regarding ONE_SHOT");
ret = ctl(e.fd, EPOLL_CTL_ADD, events);
}
} else {
auto op = (eint & EVENT_RWE) ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
ret = ctl(e.fd, op, events);
}
if (ret < 0)
LOG_ERROR_RETURN(0, ret, "failed to add_interest()");

if (e.interests & EVENT_READ) entry.reader_data = e.data;
entry.interests |= e.interests;
if (e.interests & EVENT_READ) entry.reader_data = e.data;
if (e.interests & EVENT_WRITE) entry.writer_data = e.data;
if (e.interests & EVENT_ERROR) entry.error_data = e.data;
auto eint = entry.interests & (EVENT_READ | EVENT_WRITE | EVENT_ERROR);
auto op = eint ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
if (op == EPOLL_CTL_MOD &&
(e.interests & ONE_SHOT) != (entry.interests & ONE_SHOT)) {
LOG_ERROR_RETURN(
EINVAL, -1,
"do not support ONE_SHOT on no-oneshot interested fd");
}
auto x = entry.interests |= e.interests;
x &= (EVENT_READ | EVENT_WRITE | EVENT_ERROR);
// since epoll oneshot shows totally different meanning of ONESHOT in
// photon all epoll action keeps no oneshot
auto events = evmap.translate_bitwisely(x);
return ctl(e.fd, op, events);
if (e.interests & EVENT_ERROR) entry.error_data = e.data;
return 0;
}

virtual int rm_interest(Event e) override {
if (e.fd < 0 || (size_t)e.fd >= _inflight_events.size())
LOG_ERROR_RETURN(EINVAL, -1, "invalid file descriptor ", e.fd);
if (unlikely(!e.interests)) return 0;
auto& entry = _inflight_events[e.fd];
auto intersection = e.interests & entry.interests &
(EVENT_READ | EVENT_WRITE | EVENT_ERROR);
auto intersection = e.interests & entry.interests & EVENT_RWE;
if (intersection == 0) return 0;

auto x = (entry.interests ^= intersection) &
(EVENT_READ | EVENT_WRITE | EVENT_ERROR);
if (e.interests & EVENT_READ) entry.reader_data = nullptr;
if (e.interests & EVENT_WRITE) entry.writer_data = nullptr;
if (e.interests & EVENT_ERROR) entry.error_data = nullptr;
auto op = x ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
auto events = evmap.translate_bitwisely(x);
if (op == EPOLL_CTL_DEL) {
entry.interests = 0;
int ret, op = 0; // ^ is to flip intersected bits
auto x = (entry.interests ^ intersection) & EVENT_RWE;
if (likely(e.interests & ONE_SHOT)) {
if (!x) {
ret = 0; // no need to epoll_ctl()
} else {
auto events = evmap.translate_bitwisely(x);
ret = ctl(e.fd, EPOLL_CTL_MOD, events); // re-arm other interests
}
} else {
op = x ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
auto events = evmap.translate_bitwisely(x);
ret = ctl(e.fd, op, events);
}
return ctl(e.fd, op, events);
if (ret < 0)
LOG_ERROR_RETURN(0, ret, "failed to rm_interest()");
// ^ is to flip intersected bits
entry.interests = (op == EPOLL_CTL_DEL) ? 0 : (entry.interests ^ intersection);
if (intersection & EVENT_READ) entry.reader_data = nullptr;
if (intersection & EVENT_WRITE) entry.writer_data = nullptr;
if (intersection & EVENT_ERROR) entry.error_data = nullptr;
return 0;
}

epoll_event _events[16];
uint16_t _events_remain = 0;
int do_epoll_wait(uint64_t timeout) {
Expand Down Expand Up @@ -210,32 +229,23 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine,
assert(e.data.u64 < _inflight_events.size());
if (e.data.u64 >= _inflight_events.size()) continue;
auto& entry = _inflight_events[e.data.u64];
uint32_t events = 0;
if ((e.events & ERRBIT) && (entry.interests & EVENT_ERROR)) {
auto data = entry.error_data;
if (entry.interests & ONE_SHOT) {
rm_interest({.fd = (int)e.data.u64,
.interests = EVENT_ERROR | ONE_SHOT,
.data = nullptr});
}
datacb(data);
events |= EVENT_ERROR;
datacb(entry.error_data);
}
if ((e.events & READBITS) && (entry.interests & EVENT_READ)) {
auto data = entry.reader_data;
if (entry.interests & ONE_SHOT) {
rm_interest({.fd = (int)e.data.u64,
.interests = EVENT_READ | ONE_SHOT,
.data = nullptr});
}
datacb(data);
events |= EVENT_READ;
datacb(entry.reader_data);
}
if ((e.events & WRITEBITS) && (entry.interests & EVENT_WRITE)) {
auto data = entry.writer_data;
if (entry.interests & ONE_SHOT) {
rm_interest({.fd = (int)e.data.u64,
.interests = EVENT_WRITE | ONE_SHOT,
.data = nullptr});
}
datacb(data);
events |= EVENT_WRITE;
datacb(entry.writer_data);
}
if (events && (entry.interests & ONE_SHOT)) {
rm_interest({.fd = (int)e.data.u64,
.interests = events | ONE_SHOT,
.data = nullptr});
}
}
}
Expand All @@ -248,21 +258,19 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine,
}
auto ptr = data;
auto end = data + count;
wait_for_events(
0, [&](void* data) __INLINE__ { *ptr++ = data; },
[&]()
__INLINE__ { // make sure each fd receives all possible events
return (end - ptr) >= 3;
});
wait_for_events(0, // pass timeout as 0 to avoid another wait
[&](void* data) __INLINE__ { *ptr++ = data; },
[&]() __INLINE__ { // make sure each fd receives all possible events
return (end - ptr) >= 3;
});
if (ptr == data) {
return 0;
}
return ptr - data;
}
virtual ssize_t wait_and_fire_events(uint64_t timeout = -1) override {
ssize_t n = 0;
wait_for_events(
timeout,
wait_for_events(timeout,
[&](void* data) __INLINE__ {
assert(data);
thread_interrupt((thread*)data, EOK);
Expand All @@ -273,35 +281,39 @@ class EventEngineEPoll : public MasterEventEngine, public CascadingEventEngine,
}
virtual int cancel_wait() override { return eventfd_write(_evfd, 1); }

int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) override {
Event event{fd, interests | ONE_SHOT, CURRENT};
int ret = add_interest(event);
int wait_for_fd(int fd, uint32_t interest, uint64_t timeout) override {
if (fd < 0)
LOG_ERROR_RETURN(EINVAL, -1, "invalid fd");
if (interest & (interest-1))
LOG_ERROR_RETURN(EINVAL, -1, "can not wait for multiple interests");
if (unlikely(!interest))
return rm_interest({fd, EVENT_RWE, 0}); // remove fd from epoll
int ret = add_interest({fd, interest | ONE_SHOT, CURRENT});
if (ret < 0) LOG_ERROR_RETURN(0, -1, "failed to add event interest");
ret = thread_usleep(timeout);
ERRNO err;
if (ret == -1 && err.no == EOK) {
return 0; // Event arrived
} else if (ret == 0) {
rm_interest(event); // Timeout
errno = ETIMEDOUT;
return -1;
} else {
rm_interest(event); // Interrupted by other thread
errno = err.no;
return -1;
}
rm_interest({fd, interest, 0}); // no ONE_SHOT, to reconfig epoll
errno = (ret == 0) ? ETIMEDOUT : // Timeout
err.no; // Interrupted by other thread
return -1;
}
};

__attribute__((noinline)) static EventEngineEPoll* new_epoll_engine() {
LOG_INFO("Init event engine: epoll");
__attribute__((noinline)) static
EventEngineEPoll* new_epoll_engine(ALogStringL role) {
LOG_INFO("Init epoll event engine: ", role);
return NewObj<EventEngineEPoll>()->init();
}

MasterEventEngine* new_epoll_master_engine() { return new_epoll_engine(); }
MasterEventEngine* new_epoll_master_engine() {
return new_epoll_engine("master");
}

CascadingEventEngine* new_epoll_cascading_engine() {
return new_epoll_engine();
return new_epoll_engine("cascading");
}

} // namespace photon
3 changes: 2 additions & 1 deletion io/fd-events.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace photon {
const static uint32_t EVENT_READ = 1;
const static uint32_t EVENT_WRITE = 2;
const static uint32_t EVENT_ERROR = 4;
const static uint32_t EVENT_RWE = EVENT_READ | EVENT_WRITE | EVENT_ERROR;
const static uint32_t EDGE_TRIGGERED = 0x4000;
const static uint32_t ONE_SHOT = 0x8000;

Expand All @@ -44,7 +45,7 @@ class MasterEventEngine {
* @return 0 for success, which means event arrived in time
* -1 for failure, could be timeout or interrupted by another thread
*/
virtual int wait_for_fd(int fd, uint32_t interests, uint64_t timeout) = 0;
virtual int wait_for_fd(int fd, uint32_t interest, uint64_t timeout) = 0;

int wait_for_fd_readable(int fd, uint64_t timeout = -1) {
return wait_for_fd(fd, EVENT_READ, timeout);
Expand Down
14 changes: 8 additions & 6 deletions net/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,18 +695,20 @@ void* start_server(void*) {

TEST(utils, gethostbyname) {
net::IPAddr localhost("127.0.0.1");
net::IPAddr addr;
net::gethostbyname("localhost", &addr);
EXPECT_EQ(localhost.to_nl(), addr.to_nl());
std::vector<net::IPAddr> addrs;
net::gethostbyname("localhost", addrs);
EXPECT_GT((int)addrs.size(), 0);
EXPECT_EQ(localhost.to_nl(), addrs[0].to_nl());

net::IPAddr host = net::gethostbypeer("localhost");
EXPECT_EQ(localhost.to_nl(), host.to_nl());
for (auto &x : addrs) {
bool found_localhost = false, found_host = false;
for (auto& x: addrs) {
LOG_INFO(VALUE(x));
EXPECT_TRUE(x.is_loopback());
found_localhost |= (x == localhost);
found_host |= (x == host);
}
EXPECT_TRUE(found_localhost);
EXPECT_TRUE(found_host);
}

TEST(utils, resolver) {
Expand Down
2 changes: 1 addition & 1 deletion net/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ inline int gethostbyname(const char* name, IPAddr* buf, int bufsize = 1) {
int i = 0;
auto cb = [&](IPAddr addr) {
if (i < bufsize) buf[i++] = addr;
return 0;
return (i < bufsize) ? 0 : -1;
};
return _gethostbyname(name, cb);
}
Expand Down

0 comments on commit a7597a9

Please sign in to comment.