Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepy-monax committed Nov 12, 2024
1 parent 7b632c0 commit 0523d33
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 34 deletions.
4 changes: 2 additions & 2 deletions src/impls/impl-skift/async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ struct HjertSched : public Sys::Sched {

co_trya$(waitFor(chan.cap(), Hj::Sigs::READABLE, Hj::Sigs::NONE));
static_assert(sizeof(Handle) == sizeof(Hj::Cap) and alignof(Handle) == alignof(Hj::Cap));
co_try$(chan.recv(buf, hnds.cast<Hj::Cap>()));
auto [len, hndsLen] = co_try$(chan.recv(buf, hnds.cast<Hj::Cap>()));

co_return Ok<_Received>{buf.len(), hnds.len(), Sys::Ip4::unspecified(0)};
co_return Ok<_Received>{len, hnds.len(), Sys::Ip4::unspecified(0)};
}

co_return Error::notImplemented("unsupported fd type");
Expand Down
33 changes: 22 additions & 11 deletions src/libs/karm-sys/ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ struct Header {
static_assert(Meta::TrivialyCopyable<Header>);

struct Message {
static constexpr usize CAP = 4096;

union {
struct {
Header _header;
Array<u8, 4096 - sizeof(Header)> _payload;
Array<u8, CAP - sizeof(Header)> _payload;
};

Array<u8, 4096> _buf;
Array<u8, CAP> _buf;
};

usize _payloadLen = 0;
usize _len = 0;

Array<Sys::Handle, 16> _hnds;
usize _hndsLen = 0;
Expand All @@ -74,7 +76,7 @@ struct Message {
}

usize len() const {
return sizeof(Header) + _payloadLen;
return _len;
}

Bytes bytes() {
Expand All @@ -95,13 +97,18 @@ struct Message {
T payload{std::forward<Args>(args)...};

Message msg;
msg._header = {seq, Port::INVALID, to, Meta::idOf<T>()};
msg._header = {
seq,
Port::INVALID,
to,
Meta::idOf<T>(),
};
Io::BufWriter reqBuf{msg._payload};
Io::PackEmit reqPack{reqBuf};

try$(Io::pack(reqPack, payload));

msg._payloadLen = try$(Io::tell(reqBuf));
msg._len = try$(Io::tell(reqBuf)) + sizeof(Header);

return Ok(std::move(msg));
}
Expand All @@ -118,12 +125,12 @@ struct Message {
Meta::idOf<T>(),
};

Io::BufWriter respBuf{resp._buf};
Io::BufWriter respBuf{resp._payload};
Io::PackEmit respPack{respBuf};

try$(Io::pack(respPack, payload));

resp._payloadLen = try$(Io::tell(respBuf));
resp._len = try$(Io::tell(respBuf)) + sizeof(Header);

return Ok(std::move(resp));
}
Expand All @@ -141,8 +148,10 @@ struct Message {
// MARK: Primitive Operations --------------------------------------------------

template <typename T, typename... Args>
Res<> ipcSend(Sys::IpcConnection &con, Port port, u64 seq, Args &&...args) {
Message msg = Message::packReq<T>(port, seq, std::forward<Args>(args)...).take();
Res<> ipcSend(Sys::IpcConnection &con, Port to, u64 seq, Args &&...args) {
Message msg = Message::packReq<T>(to, seq, std::forward<Args>(args)...).take();

logDebug("ipcSend : {}, len: {}", msg.header(), msg.len());
try$(con.send(msg.bytes(), msg.handles()));
return Ok();
}
Expand All @@ -152,8 +161,10 @@ static inline Async::Task<Message> ipcRecvAsync(Sys::IpcConnection &con) {
auto [bufLen, hndsLen] = co_trya$(con.recvAsync(msg._buf, msg._hnds));
if (bufLen < sizeof(Header))
co_return Error::invalidData("invalid message");
msg._payloadLen = bufLen - sizeof(Header);
msg._len = bufLen;
msg._hndsLen = hndsLen;

logDebug("ipcRecv: {}, len: {}", msg.header(), msg.len());
co_return msg;
}

Expand Down
30 changes: 13 additions & 17 deletions src/srvs/grund-bus/bus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ static constexpr bool DEBUG_ELF = false;
// MARK: Endpoint --------------------------------------------------------------

Res<> Endpoint::dispatch(Sys::Message &msg) {
return _bus->dispatch(port(), msg);
msg.header().from = _port;
return _bus->dispatch(msg);
}

// MARK: Service ---------------------------------------------------------------

Res<Strong<Service>> Service::prepare(Sys::Context &, Str id) {
logInfo("prepare service '{}'", id);

auto in = try$(Hj::Channel::create(Hj::Domain::self(), 4096, 16));
try$(in.label(Io::format("{}-in", id).unwrap()));

Expand Down Expand Up @@ -115,24 +114,22 @@ Res<> Service::activate(Sys::Context &ctx) {
}

Async::Task<> Service::runAsync() {
logDebug("Listening for messages on service '{}'...", _id);
while (true) {
auto msg = co_trya$(Sys::ipcRecvAsync(_con));

logDebug("Received message on service '{}'", _id);
logDebug("Received message on service '{}' {}", _id, msg.header());

auto &header = maybeHeader.unwrap();
auto res = dispatch(msg);
if (not res)
co_try$(Sys::ipcSend<Error>(_con, header.port, header.seq, res.none()));
co_try$(Sys::ipcSend<Error>(_con, port(), msg.header().seq, res.none()));
}
}

Res<> Service::send(Sys::Port, Sys::Message &msg) {
Res<> Service::send(Sys::Message &msg) {
logDebug("Dispatching message on service '{}'", _id);
return _con.send(
sub(msg._buf, msg.len),
sub(msg._hnds, msg._hndsLen)
sub(msg.bytes()),
sub(msg.handles())
);
}

Expand All @@ -146,12 +143,12 @@ Str Locator::id() const {
return "grund-bus";
}

Res<> Locator::send(Sys::Port from, Sys::Message &msg) {
Res<> Locator::send(Sys::Message &msg) {
if (msg.is<Locate>()) {
auto locate = try$(msg.unpack<Locate>());
for (auto &endpoint : _bus->_endpoints) {
if (endpoint->id() == locate.id) {
auto resp = try$(Sys::Message::packResp<Locate>(from, msg.unwrapHeader().seq, endpoint->port()));
auto resp = try$(msg.packResp<Locate>(endpoint->port()));
try$(dispatch(resp));
return Ok();
}
Expand Down Expand Up @@ -182,12 +179,11 @@ Karm::Res<> Bus::attach(Strong<Endpoint> endpoint) {
return Ok();
}

Res<> Bus::dispatch(Sys::Port from, Sys::Message &msg) {
auto header = try$(msg.unpackHeader());
logDebug("dispatching message to {p}", header.port);
Res<> Bus::dispatch(Sys::Message &msg) {
logDebug("dispatching message from {p} to {p}", msg.header().from, msg.header().to);
for (auto &endpoint : _endpoints) {
if (endpoint->port() == header.port)
return endpoint->send(from, msg);
if (endpoint->port() == msg.header().to)
return endpoint->send(msg);
}
return Error::notFound("service not found");
}
Expand Down
8 changes: 4 additions & 4 deletions src/srvs/grund-bus/bus.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct Endpoint : public Meta::Static {

virtual Str id() const = 0;

virtual Res<> send(Sys::Port, Sys::Message &) { return Ok(); }
virtual Res<> send(Sys::Message &) { return Ok(); }

virtual Res<> activate(Sys::Context &) { return Ok(); }
};
Expand All @@ -53,15 +53,15 @@ struct Service : public Endpoint {

Async::Task<> runAsync();

Res<> send(Sys::Port from, Sys::Message &msg) override;
Res<> send(Sys::Message &msg) override;
};

struct Locator : public Endpoint {
Locator();

Str id() const override;

Res<> send(Sys::Port from, Sys::Message &msg) override;
Res<> send(Sys::Message &msg) override;
};

struct Bus : public Meta::Static {
Expand All @@ -76,7 +76,7 @@ struct Bus : public Meta::Static {

Res<> attach(Strong<Endpoint> endpoint);

Res<> dispatch(Sys::Port from, Sys::Message &msg);
Res<> dispatch(Sys::Message &msg);

Res<> startService(Str id);
};
Expand Down

0 comments on commit 0523d33

Please sign in to comment.