diff --git a/src/impls/impl-skift/async.cpp b/src/impls/impl-skift/async.cpp index 5bb92f8583..8c530b58d5 100644 --- a/src/impls/impl-skift/async.cpp +++ b/src/impls/impl-skift/async.cpp @@ -50,7 +50,6 @@ struct HjertSched : public Sys::Sched { co_trya$(waitFor(chan.cap(), Hj::Sigs::WRITABLE, Hj::Sigs::NONE)); static_assert(sizeof(Handle) == sizeof(Hj::Cap) and alignof(Handle) == alignof(Hj::Cap)); - logDebug("sending len: {}, hndsLen: {}", buf.len(), hnds.len()); co_try$(chan.send(buf, hnds.cast())); co_return Ok<_Sent>({buf.len(), hnds.len()}); @@ -63,14 +62,11 @@ struct HjertSched : public Sys::Sched { if (auto ipc = fd.is()) { auto &chan = ipc->_in; - logDebug("Buffer size: {}", buf.len()); - co_trya$(waitFor(chan.cap(), Hj::Sigs::READABLE, Hj::Sigs::NONE)); static_assert(sizeof(Handle) == sizeof(Hj::Cap) and alignof(Handle) == alignof(Hj::Cap)); auto [len, hndsLen] = co_try$(chan.recv(buf, hnds.cast())); - logDebug("got len: {}, hndsLen: {}", len, hndsLen); - co_return Ok<_Received>{buf.len(), hnds.len(), Sys::Ip4::unspecified(0)}; + co_return Ok<_Received>{len, hndsLen, Sys::Ip4::unspecified(0)}; } co_return Error::notImplemented("unsupported fd type"); diff --git a/src/kernel/hjert-core/channel.cpp b/src/kernel/hjert-core/channel.cpp index 7d46d175e3..81bba5f79b 100644 --- a/src/kernel/hjert-core/channel.cpp +++ b/src/kernel/hjert-core/channel.cpp @@ -85,7 +85,7 @@ Res Channel::recv(Domain &dom, MutBytes bytes, MutSlice c if (caps.len() < expectedCaps) return Error::invalidInput("not enough space for caps"); - if (caps.len() > dom._availableUnlocked()) + if (dom._availableUnlocked() < expectedCaps) return Error::invalidInput("not enough space in domain"); // Everything is ready, let's receive the message diff --git a/src/libs/karm-async/promise.h b/src/libs/karm-async/promise.h index c80d3f7384..86cfea3f98 100644 --- a/src/libs/karm-async/promise.h +++ b/src/libs/karm-async/promise.h @@ -60,7 +60,8 @@ struct _Future { template R> struct _Operation : - public State::Listener { + public State::Listener, + Meta::Pinned { Strong> _state; R _r; diff --git a/src/libs/karm-sys/rpc.h b/src/libs/karm-sys/rpc.h index c293c54fff..16cfb03cf6 100644 --- a/src/libs/karm-sys/rpc.h +++ b/src/libs/karm-sys/rpc.h @@ -1,10 +1,11 @@ #pragma once +#include +#include #include #include #include #include -#include #include #include @@ -122,7 +123,7 @@ struct Message { header().seq, header().to, header().from, - Meta::idOf(), + Meta::idOf(), }; Io::BufWriter respBuf{resp._payload}; @@ -137,7 +138,7 @@ struct Message { template Res unpack() { - Io::PackScan s{_buf, _hnds}; + Io::PackScan s{bytes(), handles()}; if (not is()) return Error::invalidData("unexpected message"); try$(Io::unpack
(s)); @@ -151,7 +152,6 @@ template Res<> rpcSend(Sys::IpcConnection &con, Port to, u64 seq, Args &&...args) { Message msg = Message::packReq(to, seq, std::forward(args)...).take(); - logDebug("rpcSend : {}, len: {}", msg.header(), msg.len()); try$(con.send(msg.bytes(), msg.handles())); return Ok(); } @@ -164,7 +164,6 @@ static inline Async::Task rpcRecvAsync(Sys::IpcConnection &con) { msg._len = bufLen; msg._hndsLen = hndsLen; - logDebug("rpcRecv: {}, len: {}", msg.header(), msg.len()); co_return msg; } @@ -172,47 +171,47 @@ static inline Async::Task rpcRecvAsync(Sys::IpcConnection &con) { struct Rpc : Meta::Pinned { Sys::IpcConnection _con; - bool _receiving = false; Map> _pending{}; + Async::Queue _incoming{}; u64 _seq = 1; Rpc(Sys::IpcConnection con) - : _con(std::move(con)) {} + : _con(std::move(con)) { + // FIXME: Fid a way to do proper cleanup + Async::detach(_receiverTask(*this), [](Res<> res) { + logError("receiver task exited: {}", res); + panic("receiver task exited"); + }); + } static Rpc create(Sys::Context &ctx) { auto &channel = useChannel(ctx); return Rpc{std::move(channel.con)}; } - template - Res<> send(Port port, Args &&...args) { - return rpcSend(_con, port, _seq++, std::forward(args)...); - } - - Async::Task recvAsync() { - if (_receiving) - co_return Error::other("already receiving"); - - _receiving = true; - Defer defer{[this] { - _receiving = false; - }}; - + static Async::Task<> _receiverTask(Rpc &self) { while (true) { - Message msg = co_trya$(rpcRecvAsync(_con)); - + Message msg = co_trya$(rpcRecvAsync(self._con)); auto header = msg._header; - if (_pending.has(header.seq)) { - auto promise = _pending.take(header.seq); + if (self._pending.has(header.seq)) { + auto promise = self._pending.take(header.seq); promise.resolve(std::move(msg)); - continue; + } else { + self._incoming.enqueue(std::move(msg)); } - - co_return msg; } } + template + Res<> send(Port port, Args &&...args) { + return rpcSend(_con, port, _seq++, std::forward(args)...); + } + + Async::Task recvAsync() { + co_return Ok(co_await _incoming.dequeueAsync()); + } + template Res<> resp(Message &msg, Res message) { auto header = msg._header; diff --git a/src/srvs/grund-bus/bus.cpp b/src/srvs/grund-bus/bus.cpp index 89659290a4..575dfb47d1 100644 --- a/src/srvs/grund-bus/bus.cpp +++ b/src/srvs/grund-bus/bus.cpp @@ -21,7 +21,7 @@ Res<> Endpoint::dispatch(Sys::Message &msg) { // MARK: Service --------------------------------------------------------------- Res> Service::prepare(Sys::Context &, Str id) { - auto in = try$(Hj::Channel::create(Hj::Domain::self(), 4096, 16)); + auto in = try$(Hj::Channel::create(Hj::Domain::self(), 512, 16)); try$(in.label(Io::format("{}-in", id).unwrap())); auto out = try$(Hj::Channel::create(Hj::Domain::self(), 512, 16)); @@ -117,8 +117,6 @@ Async::Task<> Service::runAsync() { while (true) { auto msg = co_trya$(Sys::rpcRecvAsync(_con)); - logDebug("Received message on service '{}' {}", _id, msg.header()); - auto res = dispatch(msg); if (not res) co_try$(Sys::rpcSend(_con, port(), msg.header().seq, res.none())); @@ -126,7 +124,6 @@ Async::Task<> Service::runAsync() { } Res<> Service::send(Sys::Message &msg) { - logDebug("Dispatching message on service '{}'", _id); return _con.send( msg.bytes(), msg.handles() @@ -146,7 +143,6 @@ Str Locator::id() const { Res<> Locator::send(Sys::Message &msg) { if (msg.is()) { auto locate = try$(msg.unpack()); - logDebug("looking for {#}", locate.id); for (auto &endpoint : _bus->_endpoints) { if (endpoint->id() == locate.id) { auto resp = try$(msg.packResp(endpoint->port())); @@ -181,7 +177,6 @@ Karm::Res<> Bus::attach(Strong endpoint) { } Res<> Bus::dispatch(Sys::Message &msg) { - logDebug("dispatching message from {p} to {p}", msg.header().from, msg.header().to); for (auto &endpoint : _endpoints) { if (endpoint->port() == msg.header().to) return endpoint->send(msg);