Skip to content

Commit

Permalink
Add new event source type async and use it in event dispatcher
Browse files Browse the repository at this point in the history
On Windows, the pipe API is very different from unix that uv_poll
will not work easily without some big change. Instead, we introduce an
Async event that can be easily mapped to uv_async, while keep self
pipe trick on linux for other event loop implementation.

We will try to avoid pipe usage in the actual code for simplicity.
  • Loading branch information
wengxt committed Feb 3, 2025
1 parent 4394baf commit 9311232
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 53 deletions.
9 changes: 9 additions & 0 deletions src/lib/fcitx-utils/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,13 @@ std::unique_ptr<EventSource> EventLoop::addPostEvent(EventCallback callback) {
return d->impl_->addPostEvent(std::move(callback));
}

std::unique_ptr<EventSourceAsync>
EventLoop::addAsyncEvent(EventCallback callback) {
FCITX_D();
if (auto *v2 = dynamic_cast<EventLoopInterfaceV2 *>(d->impl_.get())) {
return v2->addAsyncEvent(std::move(callback));
}
return nullptr;
}

} // namespace fcitx
23 changes: 23 additions & 0 deletions src/lib/fcitx-utils/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,29 @@ class FCITXUTILS_EXPORT EventLoop {
FCITX_NODISCARD std::unique_ptr<EventSource>
addPostEvent(EventCallback callback);

/**
* Add an async event that is safe to be triggered from another thread.
*
* To ensure safe usage of this event:
* 1. Do not change event to disable, if there may be pending call to send()
* 2. Due to (1), if it is oneshot, ensure you only call send() once.
* 3. Join all the possible pending thread that may call send(), before
* destructing the event.
* 4. Like other event, the event should be only created/destructed on the
* event loop thread.
*
* EventDispatcher uses this event internally and provides an easier and
* safer interface to use.
*
* @see EventDispatcher
*
* @param callback callback function
* @return async event source
* @since 5.1.13
*/
FCITX_NODISCARD std::unique_ptr<EventSourceAsync>
addAsyncEvent(EventCallback callback);

/**
* Set an external event loop implementation.
*
Expand Down
48 changes: 48 additions & 0 deletions src/lib/fcitx-utils/event_libuv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,32 @@ void IOEventCallback(uv_poll_t *handle, int status, int events);
void TimeEventCallback(uv_timer_t *handle);
void PostEventCallback(uv_prepare_t *handle);

void AsyncEventCallback(uv_async_t *handle) {
auto *source = static_cast<LibUVSourceAsync *>(
static_cast<LibUVSourceBase *>(handle->data));

if (!source->isEnabled()) {
return;
}

try {
auto sourceRef = source->watch();
if (source->isOneShot()) {
source->setEnabled(false);
}
auto callback = source->callback_;
auto ret = (*callback)(source);
if (sourceRef.isValid()) {
if (!ret) {
source->setEnabled(false);
}
}
} catch (const std::exception &e) {
// some abnormal things threw{
FCITX_FATAL() << e.what();
}
}

UVLoop::~UVLoop() {
// Close and detach all handle.
uv_walk(
Expand Down Expand Up @@ -98,6 +124,7 @@ bool LibUVSourceTime::setup(uv_loop_t *loop, uv_timer_t *timer) {
}
return true;
}

bool LibUVSourcePost::setup(uv_loop_t *loop, uv_prepare_t *prepare) {
if (int err = uv_prepare_init(loop, prepare); err < 0) {
FCITX_LIBUV_DEBUG() << "Failed to init prepare with error: " << err;
Expand All @@ -109,6 +136,19 @@ bool LibUVSourcePost::setup(uv_loop_t *loop, uv_prepare_t *prepare) {
}
return true;
}

bool LibUVSourceAsync::setup(uv_loop_t *loop, uv_async_t *async) {
if (int err = uv_async_init(loop, async, &AsyncEventCallback); err < 0) {
FCITX_LIBUV_DEBUG() << "Failed to init async with error: " << err;
return false;
}
return true;
}

void LibUVSourceAsync::send() {
uv_async_send(reinterpret_cast<uv_async_t *>(handle_));
}

bool LibUVSourceIO::setup(uv_loop_t *loop, uv_poll_t *poll) {
if (int err = uv_poll_init(loop, poll, fd_); err < 0) {
FCITX_LIBUV_DEBUG()
Expand Down Expand Up @@ -266,4 +306,12 @@ EventLoopLibUV::addPostEvent(EventCallback callback) {
auto source = std::make_unique<LibUVSourcePost>(std::move(callback), loop_);
return source;
}

std::unique_ptr<EventSourceAsync>
EventLoopLibUV::addAsyncEvent(EventCallback callback) {
auto source =
std::make_unique<LibUVSourceAsync>(std::move(callback), loop_);
return source;
}

} // namespace fcitx
20 changes: 19 additions & 1 deletion src/lib/fcitx-utils/event_libuv.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,23 @@ struct LibUVSourceExit final : public EventSource,
EventCallback callback_;
};

class EventLoopLibUV : public EventLoopInterface {
struct LibUVSourceAsync final
: public LibUVSource<EventSourceAsync, uv_async_t>,
public TrackableObject<LibUVSourceAsync> {
LibUVSourceAsync(EventCallback callback, std::shared_ptr<UVLoop> loop)
: LibUVSource(std::move(loop)),
callback_(std::make_shared<EventCallback>(std::move(callback))) {
setEnabled(true);
}

bool setup(uv_loop_t *loop, uv_async_t *async) override;

void send() override;

std::shared_ptr<EventCallback> callback_;
};

class EventLoopLibUV : public EventLoopInterfaceV2 {
public:
EventLoopLibUV();
bool exec() override;
Expand All @@ -238,6 +254,8 @@ class EventLoopLibUV : public EventLoopInterface {
addDeferEvent(EventCallback callback) override;
FCITX_NODISCARD std::unique_ptr<EventSource>
addPostEvent(EventCallback callback) override;
FCITX_NODISCARD std::unique_ptr<EventSourceAsync>
addAsyncEvent(EventCallback callback) override;

private:
std::shared_ptr<UVLoop> loop_;
Expand Down
47 changes: 46 additions & 1 deletion src/lib/fcitx-utils/event_sdevent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "log.h"
#include "macros.h"
#include "stringutils.h"
#include "unixfd.h"

#if defined(__COVERITY__) && !defined(__INCLUDE_LEVEL__)
#define __INCLUDE_LEVEL__ 2
Expand Down Expand Up @@ -57,7 +58,7 @@ IOEventFlags EpollFlagsToIOEventFlags(uint32_t flags) {

} // namespace

class EventLoopSDEvent : public EventLoopInterface {
class EventLoopSDEvent : public EventLoopInterfaceV2 {
public:
EventLoopSDEvent();
~EventLoopSDEvent();
Expand All @@ -77,6 +78,8 @@ class EventLoopSDEvent : public EventLoopInterface {
addDeferEvent(EventCallback callback) override;
FCITX_NODISCARD std::unique_ptr<EventSource>
addPostEvent(EventCallback callback) override;
FCITX_NODISCARD std::unique_ptr<EventSourceAsync>
addAsyncEvent(EventCallback callback) override;

private:
std::mutex mutex_;
Expand Down Expand Up @@ -374,4 +377,46 @@ EventLoopSDEvent::addPostEvent(EventCallback callback) {
source->setEventSource(sdEventSource);
return source;
}

struct SDEventSourceAsync : public EventSourceAsync {
public:
SDEventSourceAsync(EventLoopInterfaceV2 *event, EventCallback callback) {
int selfpipe[2];
if (safePipe(selfpipe)) {
throw EventLoopException(-EPIPE);
}
fd_[0].give(selfpipe[0]);
fd_[1].give(selfpipe[1]);
ioEvent_ = event->addIOEvent(
fd_[0].fd(), IOEventFlag::In,
[this, callback = std::move(callback)](EventSource *, int fd,
IOEventFlags) {
uint8_t dummy;
while (fs::safeRead(fd, &dummy, sizeof(dummy)) > 0) {
}
callback_(this);
return true;
});
}

bool isEnabled() const override { return ioEvent_->isEnabled(); }
void setEnabled(bool enabled) override { ioEvent_->setEnabled(enabled); }
bool isOneShot() const override { return ioEvent_->isOneShot(); }
void setOneShot() override { ioEvent_->setOneShot(); }

void send() override {
uint8_t dummy = 0;
fs::safeWrite(fd_[1].fd(), &dummy, 1);
}

protected:
UnixFD fd_[2];
std::unique_ptr<EventSourceIO> ioEvent_;
};

std::unique_ptr<EventSourceAsync>
EventLoopSDEvent::addAsyncEvent(EventCallback callback) {
return std::make_unique<SDEventSourceAsync>(this, std::move(callback));
}

} // namespace fcitx
47 changes: 16 additions & 31 deletions src/lib/fcitx-utils/eventdispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,17 @@
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <utility>
#include "event.h"
#include "eventloopinterface.h"
#include "fs.h"
#include "macros.h"
#include "misc_p.h"
#include "unixfd.h"

namespace fcitx {
class EventDispatcherPrivate {
public:
void dispatchEvent() {
uint8_t dummy;
while (fs::safeRead(fd_[0].fd(), &dummy, sizeof(dummy)) > 0) {
}
std::queue<std::function<void()>> eventList;
{
std::lock_guard<std::mutex> lock(mutex_);
Expand All @@ -39,56 +34,46 @@ class EventDispatcherPrivate {
}
}

// Mutex to be used to protect eventList_.
// Mutex to be used to protect fields below.
mutable std::mutex mutex_;
std::queue<std::function<void()>> eventList_;
std::unique_ptr<EventSourceIO> ioEvent_;
std::unique_ptr<EventSourceAsync> asyncEvent_;
EventLoop *loop_ = nullptr;
UnixFD fd_[2];
};

EventDispatcher::EventDispatcher()
: d_ptr(std::make_unique<EventDispatcherPrivate>()) {
FCITX_D();
int selfpipe[2];
if (safePipe(selfpipe)) {
throw std::runtime_error("Failed to create pipe");
}
d->fd_[0].give(selfpipe[0]);
d->fd_[1].give(selfpipe[1]);
}
: d_ptr(std::make_unique<EventDispatcherPrivate>()) {}

EventDispatcher::~EventDispatcher() = default;

void EventDispatcher::attach(EventLoop *event) {
FCITX_D();
std::lock_guard<std::mutex> lock(d->mutex_);
d->ioEvent_ = event->addIOEvent(d->fd_[0].fd(), IOEventFlag::In,
[d](EventSource *, int, IOEventFlags) {
d->dispatchEvent();
return true;
});
d->asyncEvent_ = event->addAsyncEvent([d](EventSource *) {
d->dispatchEvent();
return true;
});
d->loop_ = event;
}

void EventDispatcher::detach() {
FCITX_D();
std::lock_guard<std::mutex> lock(d->mutex_);
d->ioEvent_.reset();
d->asyncEvent_.reset();
d->loop_ = nullptr;
}

void EventDispatcher::schedule(std::function<void()> functor) {
FCITX_D();
if (functor) {
std::lock_guard<std::mutex> lock(d->mutex_);
if (!d->ioEvent_) {
return;
}
d->eventList_.push(std::move(functor));
if (!functor) {
return;
}
std::lock_guard<std::mutex> lock(d->mutex_);
if (!d->asyncEvent_) {
return;
}
uint8_t dummy = 0;
fs::safeWrite(d->fd_[1].fd(), &dummy, 1);
d->eventList_.push(std::move(functor));
d->asyncEvent_->send();
}

EventLoop *EventDispatcher::eventLoop() const {
Expand Down
22 changes: 22 additions & 0 deletions src/lib/fcitx-utils/eventloopinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ struct FCITXUTILS_EXPORT EventSourceTime : public EventSource {
FCITX_NODISCARD virtual clockid_t clock() const = 0;
};

/**
* A thread-safe event source can be triggered from other threads.
*
* @since 5.1.13
*/
struct FCITXUTILS_EXPORT EventSourceAsync : public EventSource {
/**
* Trigger the event from other thread.
*
* The callback is guranteed to be called send() if it is enabled.
* Multiple call to send() may only trigger the callback once.
*/
virtual void send() = 0;
};

using IOCallback =
std::function<bool(EventSourceIO *, int fd, IOEventFlags flags)>;
using TimeCallback = std::function<bool(EventSourceTime *, uint64_t usec)>;
Expand Down Expand Up @@ -131,6 +146,13 @@ class FCITXUTILS_EXPORT EventLoopInterface {
FCITX_NODISCARD virtual std::unique_ptr<EventSource>
addPostEvent(EventCallback callback) = 0;
};

class FCITXUTILS_EXPORT EventLoopInterfaceV2 : public EventLoopInterface {
public:
FCITX_NODISCARD virtual std::unique_ptr<EventSourceAsync>
addAsyncEvent(EventCallback callback) = 0;
};

} // namespace fcitx

#endif // _FCITX_UTILS_EVENTLOOPINTERFACE_H_
3 changes: 3 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ set(FCITX_UTILS_DBUS_TEST
set(testdbus_LIBS Pthread::Pthread)
set(testeventdispatcher_LIBS Pthread::Pthread)
set(testevent_LIBS Pthread::Pthread eventlooptests)

if (NOT WIN32)
set(testcustomeventloop_LIBS Pthread::Pthread eventlooptests)
endif()

find_program(XVFB_BIN Xvfb)

Expand Down
Loading

0 comments on commit 9311232

Please sign in to comment.