Skip to content

Commit

Permalink
karm-sys: Fixup message dispatch.
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepy-monax committed Dec 26, 2024
1 parent ff50e32 commit 239f46c
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 41 deletions.
6 changes: 1 addition & 5 deletions src/impls/impl-skift/async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ struct HjertSched : public Sys::Sched {

co_trya$(waitFor(chan.cap(), Hj::Sigs::WRITABLE, Hj::Sigs::NONE));
static_assert(sizeof(Handle) == sizeof(Hj::Cap) and alignof(Handle) == alignof(Hj::Cap));
logDebug("sending len: {}, hndsLen: {}", buf.len(), hnds.len());
co_try$(chan.send(buf, hnds.cast<Hj::Cap>()));

co_return Ok<_Sent>({buf.len(), hnds.len()});
Expand All @@ -63,14 +62,11 @@ struct HjertSched : public Sys::Sched {
if (auto ipc = fd.is<Skift::IpcFd>()) {
auto &chan = ipc->_in;

logDebug("Buffer size: {}", buf.len());

co_trya$(waitFor(chan.cap(), Hj::Sigs::READABLE, Hj::Sigs::NONE));
static_assert(sizeof(Handle) == sizeof(Hj::Cap) and alignof(Handle) == alignof(Hj::Cap));
auto [len, hndsLen] = co_try$(chan.recv(buf, hnds.cast<Hj::Cap>()));
logDebug("got len: {}, hndsLen: {}", len, hndsLen);

co_return Ok<_Received>{buf.len(), hnds.len(), Sys::Ip4::unspecified(0)};
co_return Ok<_Received>{len, hndsLen, Sys::Ip4::unspecified(0)};
}

co_return Error::notImplemented("unsupported fd type");
Expand Down
2 changes: 1 addition & 1 deletion src/kernel/hjert-core/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Res<Hj::SentRecv> Channel::recv(Domain &dom, MutBytes bytes, MutSlice<Hj::Cap> c
if (caps.len() < expectedCaps)
return Error::invalidInput("not enough space for caps");

if (caps.len() > dom._availableUnlocked())
if (dom._availableUnlocked() < expectedCaps)
return Error::invalidInput("not enough space in domain");

// Everything is ready, let's receive the message
Expand Down
3 changes: 2 additions & 1 deletion src/libs/karm-async/promise.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ struct _Future {

template <Receiver<T> R>
struct _Operation :
public State<T>::Listener {
public State<T>::Listener,
Meta::Pinned {

Strong<State<T>> _state;
R _r;
Expand Down
55 changes: 27 additions & 28 deletions src/libs/karm-sys/rpc.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#pragma once

#include <karm-async/promise.h>
#include <karm-async/queue.h>
#include <karm-base/cons.h>
#include <karm-base/map.h>
#include <karm-io/pack.h>
#include <karm-logger/logger.h>
#include <karm-sys/async.h>
#include <karm-sys/context.h>
#include <karm-sys/socket.h>

Expand Down Expand Up @@ -122,7 +123,7 @@ struct Message {
header().seq,
header().to,
header().from,
Meta::idOf<T>(),
Meta::idOf<typename T::Response>(),
};

Io::BufWriter respBuf{resp._payload};
Expand All @@ -137,7 +138,7 @@ struct Message {

template <typename T>
Res<T> unpack() {
Io::PackScan s{_buf, _hnds};
Io::PackScan s{bytes(), handles()};
if (not is<T>())
return Error::invalidData("unexpected message");
try$(Io::unpack<Header>(s));
Expand All @@ -151,7 +152,6 @@ template <typename T, typename... Args>
Res<> rpcSend(Sys::IpcConnection &con, Port to, u64 seq, Args &&...args) {
Message msg = Message::packReq<T>(to, seq, std::forward<Args>(args)...).take();

logDebug("rpcSend : {}, len: {}", msg.header(), msg.len());
try$(con.send(msg.bytes(), msg.handles()));
return Ok();
}
Expand All @@ -164,55 +164,54 @@ static inline Async::Task<Message> rpcRecvAsync(Sys::IpcConnection &con) {
msg._len = bufLen;
msg._hndsLen = hndsLen;

logDebug("rpcRecv: {}, len: {}", msg.header(), msg.len());
co_return msg;
}

// MARK: Rpc -------------------------------------------------------------------

struct Rpc : Meta::Pinned {
Sys::IpcConnection _con;
bool _receiving = false;
Map<u64, Async::_Promise<Message>> _pending{};
Async::Queue<Message> _incoming{};
u64 _seq = 1;

Rpc(Sys::IpcConnection con)
: _con(std::move(con)) {}
: _con(std::move(con)) {
// FIXME: Fid a way to do proper cleanup
Async::detach(_receiverTask(*this), [](Res<> res) {
logError("receiver task exited: {}", res);
panic("receiver task exited");
});
}

static Rpc create(Sys::Context &ctx) {
auto &channel = useChannel(ctx);
return Rpc{std::move(channel.con)};
}

template <typename T, typename... Args>
Res<> send(Port port, Args &&...args) {
return rpcSend<T>(_con, port, _seq++, std::forward<Args>(args)...);
}

Async::Task<Message> recvAsync() {
if (_receiving)
co_return Error::other("already receiving");

_receiving = true;
Defer defer{[this] {
_receiving = false;
}};

static Async::Task<> _receiverTask(Rpc &self) {
while (true) {
Message msg = co_trya$(rpcRecvAsync(_con));

Message msg = co_trya$(rpcRecvAsync(self._con));
auto header = msg._header;

if (_pending.has(header.seq)) {
auto promise = _pending.take(header.seq);
if (self._pending.has(header.seq)) {
auto promise = self._pending.take(header.seq);
promise.resolve(std::move(msg));
continue;
} else {
self._incoming.enqueue(std::move(msg));
}

co_return msg;
}
}

template <typename T, typename... Args>
Res<> send(Port port, Args &&...args) {
return rpcSend<T>(_con, port, _seq++, std::forward<Args>(args)...);
}

Async::Task<Message> recvAsync() {
co_return Ok(co_await _incoming.dequeueAsync());
}

template <typename T>
Res<> resp(Message &msg, Res<typename T::Response> message) {
auto header = msg._header;
Expand Down
7 changes: 1 addition & 6 deletions src/srvs/grund-bus/bus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Res<> Endpoint::dispatch(Sys::Message &msg) {
// MARK: Service ---------------------------------------------------------------

Res<Strong<Service>> Service::prepare(Sys::Context &, Str id) {
auto in = try$(Hj::Channel::create(Hj::Domain::self(), 4096, 16));
auto in = try$(Hj::Channel::create(Hj::Domain::self(), 512, 16));
try$(in.label(Io::format("{}-in", id).unwrap()));

auto out = try$(Hj::Channel::create(Hj::Domain::self(), 512, 16));
Expand Down Expand Up @@ -117,16 +117,13 @@ Async::Task<> Service::runAsync() {
while (true) {
auto msg = co_trya$(Sys::rpcRecvAsync(_con));

logDebug("Received message on service '{}' {}", _id, msg.header());

auto res = dispatch(msg);
if (not res)
co_try$(Sys::rpcSend<Error>(_con, port(), msg.header().seq, res.none()));
}
}

Res<> Service::send(Sys::Message &msg) {
logDebug("Dispatching message on service '{}'", _id);
return _con.send(
msg.bytes(),
msg.handles()
Expand All @@ -146,7 +143,6 @@ Str Locator::id() const {
Res<> Locator::send(Sys::Message &msg) {
if (msg.is<Locate>()) {
auto locate = try$(msg.unpack<Locate>());
logDebug("looking for {#}", locate.id);
for (auto &endpoint : _bus->_endpoints) {
if (endpoint->id() == locate.id) {
auto resp = try$(msg.packResp<Locate>(endpoint->port()));
Expand Down Expand Up @@ -181,7 +177,6 @@ Karm::Res<> Bus::attach(Strong<Endpoint> endpoint) {
}

Res<> Bus::dispatch(Sys::Message &msg) {
logDebug("dispatching message from {p} to {p}", msg.header().from, msg.header().to);
for (auto &endpoint : _endpoints) {
if (endpoint->port() == msg.header().to)
return endpoint->send(msg);
Expand Down

0 comments on commit 239f46c

Please sign in to comment.