Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepy-monax committed Dec 22, 2024
1 parent 5a5b214 commit b8e05f0
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 23 deletions.
36 changes: 35 additions & 1 deletion src/libs/karm-base/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,39 @@ struct _Promise : Meta::NoCopy {
template <typename V = None, typename E = Error>
using Promise = _Promise<Res<V, E>>;

// MARK: Queue -----------------------------------------------------------------

template <typename T>
struct Queue {
struct Listener : public Resumable {
LlItem<Listener> item;
};

Vec<T> _buffer;
Ll<Listener> _listeners;

void equeue(T value) {
if (_listeners.empty()) {
_buffer.pushBack(std::move(value));
return;
}

//TODO
}

_Future<T> dequeueAsync();

Opt<T> dequeue() {
if (_buffer.empty())
return NONE;
return _buffer.popFont();
}

bool empty() const {
return _buffer.empty();
}
};

// MARK: Task ------------------------------------------------------------------

enum struct Cfp {
Expand Down Expand Up @@ -428,7 +461,8 @@ struct _Task {

_Task(_Task const &other) = delete;

_Task(_Task &&other) : _coro(std::exchange(other._coro, nullptr)) {}
_Task(_Task &&other)
: _coro(std::exchange(other._coro, nullptr)) {}

_Task &operator=(_Task const &other) = delete;

Expand Down
40 changes: 18 additions & 22 deletions src/libs/karm-sys/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,47 +172,43 @@ static inline Async::Task<Message> rpcRecvAsync(Sys::IpcConnection &con) {

struct Rpc : Meta::Pinned {
Sys::IpcConnection _con;
bool _receiving = false;
Map<u64, Async::_Promise<Message>> _pending{};
Async::Queue<Message> _incoming{};
u64 _seq = 1;
Async::Task<> _receiver;

Rpc(Sys::IpcConnection con)
: _con(std::move(con)) {}
: _con(std::move(con)), _receiver{_receiverTask(*this)} {}

static Rpc create(Sys::Context &ctx) {
auto &channel = useChannel(ctx);
return Rpc{std::move(channel.con)};
}

template <typename T, typename... Args>
Res<> send(Port port, Args &&...args) {
return rpcSend<T>(_con, port, _seq++, std::forward<Args>(args)...);
}

Async::Task<Message> recvAsync() {
if (_receiving)
co_return Error::other("already receiving");

_receiving = true;
Defer defer{[this] {
_receiving = false;
}};

static Async::Task<> _receiverTask(Rpc &rpc) {
while (true) {
Message msg = co_trya$(rpcRecvAsync(_con));
Message msg = co_trya$(rpcRecvAsync(rpc._con));

auto header = msg._header;

if (_pending.has(header.seq)) {
auto promise = _pending.take(header.seq);
if (rpc._pending.has(header.seq)) {
auto promise = rpc._pending.take(header.seq);
promise.resolve(std::move(msg));
continue;
} else {
rpc._incoming.equeue(std::move(msg));
}

co_return msg;
}
}

template <typename T, typename... Args>
Res<> send(Port port, Args &&...args) {
return rpcSend<T>(_con, port, _seq++, std::forward<Args>(args)...);
}

Async::Task<Message> recvAsync() {
co_return _incoming.dequeueAsync();
}

template <typename T>
Res<> resp(Message &msg, Res<typename T::Response> message) {
auto header = msg._header;
Expand Down

0 comments on commit b8e05f0

Please sign in to comment.