From 7b632c026f7de72dbda468c471e177d054e235e1 Mon Sep 17 00:00:00 2001 From: VAN BOSSUYT Nicolas Date: Mon, 11 Nov 2024 14:25:20 +0100 Subject: [PATCH] wip --- src/libs/karm-base/opt.h | 20 +++--- src/libs/karm-base/res.h | 8 +-- src/libs/karm-sys/ipc.cpp | 13 ---- src/libs/karm-sys/ipc.h | 128 +++++++++++++++++++++++------------ src/srvs/grund-bus/bus.cpp | 42 +++++++----- src/srvs/grund-bus/bus.h | 12 ++-- src/srvs/grund-dns/main.cpp | 4 +- src/srvs/grund-echo/main.cpp | 2 +- 8 files changed, 134 insertions(+), 95 deletions(-) delete mode 100644 src/libs/karm-sys/ipc.cpp diff --git a/src/libs/karm-base/opt.h b/src/libs/karm-base/opt.h index f473c726de..c3e1bfee80 100644 --- a/src/libs/karm-base/opt.h +++ b/src/libs/karm-base/opt.h @@ -141,28 +141,28 @@ struct [[nodiscard]] Opt { return _present; } - always_inline constexpr T *operator->() { + always_inline constexpr T *operator->() lifetimebound { if (not _present) [[unlikely]] panic("unwrapping None"); return &_value; } - always_inline constexpr T &operator*() { + always_inline constexpr T &operator*() lifetimebound { if (not _present) [[unlikely]] panic("unwrapping None"); return _value; } - always_inline constexpr T const *operator->() const { + always_inline constexpr T const *operator->() const lifetimebound { if (not _present) [[unlikely]] panic("unwrapping None"); return &_value; } - always_inline constexpr T const &operator*() const { + always_inline constexpr T const &operator*() const lifetimebound { if (not _present) [[unlikely]] panic("unwrapping None"); @@ -170,7 +170,7 @@ struct [[nodiscard]] Opt { } template - always_inline constexpr T &emplace(Args &&...args) { + always_inline constexpr T &emplace(Args &&...args) lifetimebound { clear(); _present = true; std::construct_at(&_value, std::forward(args)...); @@ -188,31 +188,31 @@ struct [[nodiscard]] Opt { return NONE; } - always_inline constexpr T &unwrap(char const *msg = "unwraping none") { + always_inline constexpr T &unwrap(char const *msg = "unwraping none") lifetimebound { if (not _present) [[unlikely]] panic(msg); return _value; } - always_inline constexpr T const &unwrap(char const *msg = "unwraping none") const { + always_inline constexpr T const &unwrap(char const *msg = "unwraping none") const lifetimebound { if (not _present) [[unlikely]] panic(msg); return _value; } - always_inline constexpr T const &unwrapOr(T const &other) const { + always_inline constexpr T const &unwrapOr(T const &other) const lifetimebound { if (_present) return _value; return other; } - always_inline constexpr T unwrapOrDefault(T other) const { + always_inline constexpr T unwrapOrDefault(T other) const lifetimebound { if (_present) return _value; return other; } - always_inline constexpr T unwrapOrElse(auto f) const { + always_inline constexpr T unwrapOrElse(auto f) const lifetimebound { if (_present) return _value; return f(); diff --git a/src/libs/karm-base/res.h b/src/libs/karm-base/res.h index 1ea6dcdecc..d945c6f390 100644 --- a/src/libs/karm-base/res.h +++ b/src/libs/karm-base/res.h @@ -63,28 +63,28 @@ struct [[nodiscard]] Res { return _inner.template is>(); } - always_inline constexpr E const &none() const { + always_inline constexpr E const &none() const lifetimebound { if (not _inner.template is()) [[unlikely]] panic("none() called on an ok"); return _inner.template unwrap(); } - always_inline constexpr V &unwrap(char const *msg = "unwraping an error") { + always_inline constexpr V &unwrap(char const *msg = "unwraping an error") lifetimebound { if (not _inner.template is>()) [[unlikely]] panic(msg); return _inner.template unwrap>().inner; } - always_inline constexpr V const &unwrap(char const *msg = "unwraping an error") const { + always_inline constexpr V const &unwrap(char const *msg = "unwraping an error") const lifetimebound { if (not _inner.template is>()) [[unlikely]] panic(msg); return _inner.template unwrap>().inner; } - always_inline constexpr V const &unwrapOr(V const &other) const { + always_inline constexpr V const &unwrapOr(V const &other) const lifetimebound { if (_inner.template is>()) return _inner.template unwrap>().inner; return other; diff --git a/src/libs/karm-sys/ipc.cpp b/src/libs/karm-sys/ipc.cpp deleted file mode 100644 index 4225380f8e..0000000000 --- a/src/libs/karm-sys/ipc.cpp +++ /dev/null @@ -1,13 +0,0 @@ -#include "ipc.h" - -namespace Karm::Sys { - -Async::Task ipcRecvAsync(Sys::IpcConnection &con) { - Message msg; - auto [bufLen, hndsLen] = co_trya$(con.recvAsync(msg.bytes, msg.handles)); - msg.len = bufLen; - msg.handlesLen = hndsLen; - co_return msg; -} - -} // namespace Karm::Sys diff --git a/src/libs/karm-sys/ipc.h b/src/libs/karm-sys/ipc.h index 47493545a8..61a29e8292 100644 --- a/src/libs/karm-sys/ipc.h +++ b/src/libs/karm-sys/ipc.h @@ -39,54 +39,98 @@ constexpr Port Port::BUS{Limits::MAX}; struct Header { u64 seq; - Port port; + Port from; + Port to; Meta::Id mid; void repr(Io::Emit &e) const { - e("(header seq: {}, port: {}, mid: {:016x})", seq, port, mid); + e("(header seq: {}, from: {}, to: {}, mid: {:016x})", seq, from, to, mid); } }; +static_assert(Meta::TrivialyCopyable
); + struct Message { - Array bytes; - usize len; - Array handles; - usize handlesLen; + union { + struct { + Header _header; + Array _payload; + }; + + Array _buf; + }; + + usize _payloadLen = 0; + + Array _hnds; + usize _hndsLen = 0; + + Header &header() { + return _header; + } + + Header const &header() const { + return _header; + } + + usize len() const { + return sizeof(Header) + _payloadLen; + } - Res
header() const { - Io::PackScan s{bytes, handles}; - return Io::unpack
(s); + Bytes bytes() { + return sub(_buf, len()); + } + + Slice handles() { + return sub(_hnds, _hndsLen); } template bool is() const { - auto maybeHeader = header(); - if (not maybeHeader) - return false; - return maybeHeader.unwrap().mid == Meta::idOf(); + return _header.mid == Meta::idOf(); } template - static Res pack(Port port, u64 seq, Args &&...args) { - Header header{seq, port, Meta::idOf()}; + static Res packReq(Port to, u64 seq, Args &&...args) { T payload{std::forward(args)...}; Message msg; - Io::BufWriter reqBuf{msg.bytes}; + msg._header = {seq, Port::INVALID, to, Meta::idOf()}; + Io::BufWriter reqBuf{msg._payload}; Io::PackEmit reqPack{reqBuf}; - try$(Io::pack(reqPack, header)); try$(Io::pack(reqPack, payload)); - msg.len = try$(Io::tell(reqBuf)); - msg.handlesLen = 0; + msg._payloadLen = try$(Io::tell(reqBuf)); + + return Ok(std::move(msg)); + } + + template + Res packResp(Args &&...args) { + typename T::Response payload{std::forward(args)...}; + + Message resp; + resp._header = { + header().seq, + header().to, + header().from, + Meta::idOf(), + }; + + Io::BufWriter respBuf{resp._buf}; + Io::PackEmit respPack{respBuf}; - return Ok(msg); + try$(Io::pack(respPack, payload)); + + resp._payloadLen = try$(Io::tell(respBuf)); + + return Ok(std::move(resp)); } template Res unpack() { - Io::PackScan s{bytes, handles}; + Io::PackScan s{_buf, _hnds}; if (not is()) return Error::invalidData("unexpected message"); try$(Io::unpack
(s)); @@ -98,21 +142,20 @@ struct Message { template Res<> ipcSend(Sys::IpcConnection &con, Port port, u64 seq, Args &&...args) { - Header header{seq, port, Meta::idOf()}; - T msg{std::forward(args)...}; - - Io::BufferWriter reqBuf; - Io::PackEmit reqPack{reqBuf}; - - try$(Io::pack(reqPack, header)); - try$(Io::pack(reqPack, msg)); - - try$(con.send(reqBuf.bytes(), reqPack.handles())); - + Message msg = Message::packReq(port, seq, std::forward(args)...).take(); + try$(con.send(msg.bytes(), msg.handles())); return Ok(); } -Async::Task ipcRecvAsync(Sys::IpcConnection &con); +static inline Async::Task ipcRecvAsync(Sys::IpcConnection &con) { + Message msg; + auto [bufLen, hndsLen] = co_trya$(con.recvAsync(msg._buf, msg._hnds)); + if (bufLen < sizeof(Header)) + co_return Error::invalidData("invalid message"); + msg._payloadLen = bufLen - sizeof(Header); + msg._hndsLen = hndsLen; + co_return msg; +} // MARK: Ipc ------------------------------------------------------------------- @@ -144,15 +187,11 @@ struct Ipc { while (true) { Message msg = co_trya$(ipcRecvAsync(_con)); - auto maybeHeader = msg.header(); - if (not maybeHeader) { - logWarn("dropping message: {}", maybeHeader.none().msg()); - continue; - } + auto header = msg._header; - if (_pending.has(maybeHeader.unwrap().seq)) { - auto promise = _pending.take(maybeHeader.unwrap().seq); - promise.resolve(msg); + if (_pending.has(header.seq)) { + auto promise = _pending.take(header.seq); + promise.resolve(std::move(msg)); continue; } @@ -162,11 +201,10 @@ struct Ipc { template Res<> resp(Message &msg, Res message) { - auto header = try$(msg.header()); + auto header = msg._header; if (not message) - return ipcSend(_con, header.port, header.seq, message.none()); - - return ipcSend(_con, header.port, header.seq, message.take()); + return ipcSend(_con, header.from, header.seq, message.none()); + return ipcSend(_con, header.from, header.seq, message.take()); } template diff --git a/src/srvs/grund-bus/bus.cpp b/src/srvs/grund-bus/bus.cpp index b463c7ba4f..187a297378 100644 --- a/src/srvs/grund-bus/bus.cpp +++ b/src/srvs/grund-bus/bus.cpp @@ -11,6 +11,12 @@ namespace Grund::Bus { static constexpr bool DEBUG_TASK = false; static constexpr bool DEBUG_ELF = false; +// MARK: Endpoint -------------------------------------------------------------- + +Res<> Endpoint::dispatch(Sys::Message &msg) { + return _bus->dispatch(port(), msg); +} + // MARK: Service --------------------------------------------------------------- Res> Service::prepare(Sys::Context &, Str id) { @@ -115,19 +121,19 @@ Async::Task<> Service::runAsync() { logDebug("Received message on service '{}'", _id); - auto maybeHeader = msg.header(); - if (not maybeHeader) - continue; auto &header = maybeHeader.unwrap(); - auto res = _bus->dispatch(maybeHeader.unwrap(), msg); + auto res = dispatch(msg); if (not res) co_try$(Sys::ipcSend(_con, header.port, header.seq, res.none())); } } -Res<> Service::dispatch(Sys::Message &msg) { +Res<> Service::send(Sys::Port, Sys::Message &msg) { logDebug("Dispatching message on service '{}'", _id); - return _con.send(sub(msg.bytes, msg.len), sub(msg.handles, msg.handlesLen)); + return _con.send( + sub(msg._buf, msg.len), + sub(msg._hnds, msg._hndsLen) + ); } // MARK: Locator --------------------------------------------------------------- @@ -140,18 +146,21 @@ Str Locator::id() const { return "grund-bus"; } -Res<> Locator::dispatch(Sys::Port from, Sys::Message &msg) { +Res<> Locator::send(Sys::Port from, Sys::Message &msg) { if (msg.is()) { auto locate = try$(msg.unpack()); for (auto &endpoint : _bus->_endpoints) { - if (endpoint->id() == locate.id){ - - return from.dispatch() + if (endpoint->id() == locate.id) { + auto resp = try$(Sys::Message::packResp(from, msg.unwrapHeader().seq, endpoint->port())); + try$(dispatch(resp)); + return Ok(); } } - } else { - return Error::invalidInput("invalid message"); + + return Error::notFound("service not found"); } + + return Error::invalidInput("invalid message"); } // MARK: Bus ------------------------------------------------------------------- @@ -173,11 +182,12 @@ Karm::Res<> Bus::attach(Strong endpoint) { return Ok(); } -Res<> Bus::dispatch(Sys::Header &h, Sys::Message &msg) { - logDebug("dispatching message to {}", h.port); +Res<> Bus::dispatch(Sys::Port from, Sys::Message &msg) { + auto header = try$(msg.unpackHeader()); + logDebug("dispatching message to {p}", header.port); for (auto &endpoint : _endpoints) { - if (endpoint->port() == h.port) - return endpoint->dispatch(msg); + if (endpoint->port() == header.port) + return endpoint->send(from, msg); } return Error::notFound("service not found"); } diff --git a/src/srvs/grund-bus/bus.h b/src/srvs/grund-bus/bus.h index b542d67c13..239ef2c1a0 100644 --- a/src/srvs/grund-bus/bus.h +++ b/src/srvs/grund-bus/bus.h @@ -26,9 +26,11 @@ struct Endpoint : public Meta::Static { void attach(Bus &bus) { _bus = &bus; } + Res<> dispatch(Sys::Message &msg); + virtual Str id() const = 0; - virtual Res<> dispatch(Sys::Message &) { return Ok(); } + virtual Res<> send(Sys::Port, Sys::Message &) { return Ok(); } virtual Res<> activate(Sys::Context &) { return Ok(); } }; @@ -51,15 +53,15 @@ struct Service : public Endpoint { Async::Task<> runAsync(); - Res<> dispatch(Sys::Message &msg) override; + Res<> send(Sys::Port from, Sys::Message &msg) override; }; struct Locator : public Endpoint { Locator(); - Str id() const; + Str id() const override; - virtual Res<> dispatch(Sys::Message &); + Res<> send(Sys::Port from, Sys::Message &msg) override; }; struct Bus : public Meta::Static { @@ -74,7 +76,7 @@ struct Bus : public Meta::Static { Res<> attach(Strong endpoint); - Res<> dispatch(Sys::Header &h, Sys::Message &msg); + Res<> dispatch(Sys::Port from, Sys::Message &msg); Res<> startService(Str id); }; diff --git a/src/srvs/grund-dns/main.cpp b/src/srvs/grund-dns/main.cpp index 342bafdc8f..6e5bc86704 100644 --- a/src/srvs/grund-dns/main.cpp +++ b/src/srvs/grund-dns/main.cpp @@ -10,10 +10,12 @@ Async::Task<> serv(Sys::Context &ctx) { Sys::Ipc ipc = Sys::Ipc::create(ctx); logDebug("sending nonsens to system"); + auto echoPort = co_trya$(ipc.callAsync(Sys::Port::BUS, "grund-echo"s)); + logDebug("located echo service at port: {}", echoPort); auto res = co_trya$(ipc.callAsync(echoPort, "nonsens"s)); - logDebug("received response from system: ", res); + logDebug("received response from system: {}", res); logInfo("service started"); while (true) { diff --git a/src/srvs/grund-echo/main.cpp b/src/srvs/grund-echo/main.cpp index 8abc48b237..8d224db4bb 100644 --- a/src/srvs/grund-echo/main.cpp +++ b/src/srvs/grund-echo/main.cpp @@ -11,7 +11,7 @@ Async::Task<> serv(Sys::Context &ctx) { auto msg = co_trya$(ipc.recvAsync()); if (msg.is()) { auto req = co_try$(msg.unpack()); - co_try$(ipc.resp(msg, Ok(req.msg))); + co_try$(ipc.resp(msg, Ok("Let's fucking go!"s))); } }