Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepy-monax committed Nov 11, 2024
1 parent 57d20f4 commit 7b632c0
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 95 deletions.
20 changes: 10 additions & 10 deletions src/libs/karm-base/opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,36 +141,36 @@ struct [[nodiscard]] Opt {
return _present;
}

always_inline constexpr T *operator->() {
always_inline constexpr T *operator->() lifetimebound {
if (not _present) [[unlikely]]
panic("unwrapping None");

return &_value;
}

always_inline constexpr T &operator*() {
always_inline constexpr T &operator*() lifetimebound {
if (not _present) [[unlikely]]
panic("unwrapping None");

return _value;
}

always_inline constexpr T const *operator->() const {
always_inline constexpr T const *operator->() const lifetimebound {
if (not _present) [[unlikely]]
panic("unwrapping None");

return &_value;
}

always_inline constexpr T const &operator*() const {
always_inline constexpr T const &operator*() const lifetimebound {
if (not _present) [[unlikely]]
panic("unwrapping None");

return _value;
}

template <typename... Args>
always_inline constexpr T &emplace(Args &&...args) {
always_inline constexpr T &emplace(Args &&...args) lifetimebound {
clear();
_present = true;
std::construct_at(&_value, std::forward<Args>(args)...);
Expand All @@ -188,31 +188,31 @@ struct [[nodiscard]] Opt {
return NONE;
}

always_inline constexpr T &unwrap(char const *msg = "unwraping none") {
always_inline constexpr T &unwrap(char const *msg = "unwraping none") lifetimebound {
if (not _present) [[unlikely]]
panic(msg);
return _value;
}

always_inline constexpr T const &unwrap(char const *msg = "unwraping none") const {
always_inline constexpr T const &unwrap(char const *msg = "unwraping none") const lifetimebound {
if (not _present) [[unlikely]]
panic(msg);
return _value;
}

always_inline constexpr T const &unwrapOr(T const &other) const {
always_inline constexpr T const &unwrapOr(T const &other) const lifetimebound {
if (_present)
return _value;
return other;
}

always_inline constexpr T unwrapOrDefault(T other) const {
always_inline constexpr T unwrapOrDefault(T other) const lifetimebound {
if (_present)
return _value;
return other;
}

always_inline constexpr T unwrapOrElse(auto f) const {
always_inline constexpr T unwrapOrElse(auto f) const lifetimebound {
if (_present)
return _value;
return f();
Expand Down
8 changes: 4 additions & 4 deletions src/libs/karm-base/res.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,28 @@ struct [[nodiscard]] Res {
return _inner.template is<Ok<V>>();
}

always_inline constexpr E const &none() const {
always_inline constexpr E const &none() const lifetimebound {
if (not _inner.template is<E>()) [[unlikely]]
panic("none() called on an ok");

return _inner.template unwrap<E>();
}

always_inline constexpr V &unwrap(char const *msg = "unwraping an error") {
always_inline constexpr V &unwrap(char const *msg = "unwraping an error") lifetimebound {
if (not _inner.template is<Ok<V>>()) [[unlikely]]
panic(msg);

return _inner.template unwrap<Ok<V>>().inner;
}

always_inline constexpr V const &unwrap(char const *msg = "unwraping an error") const {
always_inline constexpr V const &unwrap(char const *msg = "unwraping an error") const lifetimebound {
if (not _inner.template is<Ok<V>>()) [[unlikely]]
panic(msg);

return _inner.template unwrap<Ok<V>>().inner;
}

always_inline constexpr V const &unwrapOr(V const &other) const {
always_inline constexpr V const &unwrapOr(V const &other) const lifetimebound {
if (_inner.template is<Ok<V>>())
return _inner.template unwrap<Ok<V>>().inner;
return other;
Expand Down
13 changes: 0 additions & 13 deletions src/libs/karm-sys/ipc.cpp

This file was deleted.

128 changes: 83 additions & 45 deletions src/libs/karm-sys/ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,54 +39,98 @@ constexpr Port Port::BUS{Limits<u64>::MAX};

struct Header {
u64 seq;
Port port;
Port from;
Port to;
Meta::Id mid;

void repr(Io::Emit &e) const {
e("(header seq: {}, port: {}, mid: {:016x})", seq, port, mid);
e("(header seq: {}, from: {}, to: {}, mid: {:016x})", seq, from, to, mid);
}
};

static_assert(Meta::TrivialyCopyable<Header>);

struct Message {
Array<u8, 4096> bytes;
usize len;
Array<Sys::Handle, 16> handles;
usize handlesLen;
union {
struct {
Header _header;
Array<u8, 4096 - sizeof(Header)> _payload;
};

Array<u8, 4096> _buf;
};

usize _payloadLen = 0;

Array<Sys::Handle, 16> _hnds;
usize _hndsLen = 0;

Header &header() {
return _header;
}

Header const &header() const {
return _header;
}

usize len() const {
return sizeof(Header) + _payloadLen;
}

Res<Header> header() const {
Io::PackScan s{bytes, handles};
return Io::unpack<Header>(s);
Bytes bytes() {
return sub(_buf, len());
}

Slice<Handle> handles() {
return sub(_hnds, _hndsLen);
}

template <typename T>
bool is() const {
auto maybeHeader = header();
if (not maybeHeader)
return false;
return maybeHeader.unwrap().mid == Meta::idOf<T>();
return _header.mid == Meta::idOf<T>();
}

template <typename T, typename... Args>
static Res<Message> pack(Port port, u64 seq, Args &&...args) {
Header header{seq, port, Meta::idOf<T>()};
static Res<Message> packReq(Port to, u64 seq, Args &&...args) {
T payload{std::forward<Args>(args)...};

Message msg;
Io::BufWriter reqBuf{msg.bytes};
msg._header = {seq, Port::INVALID, to, Meta::idOf<T>()};
Io::BufWriter reqBuf{msg._payload};
Io::PackEmit reqPack{reqBuf};

try$(Io::pack(reqPack, header));
try$(Io::pack(reqPack, payload));

msg.len = try$(Io::tell(reqBuf));
msg.handlesLen = 0;
msg._payloadLen = try$(Io::tell(reqBuf));

return Ok(std::move(msg));
}

template <typename T, typename... Args>
Res<Message> packResp(Args &&...args) {
typename T::Response payload{std::forward<Args>(args)...};

Message resp;
resp._header = {
header().seq,
header().to,
header().from,
Meta::idOf<T>(),
};

Io::BufWriter respBuf{resp._buf};
Io::PackEmit respPack{respBuf};

return Ok(msg);
try$(Io::pack(respPack, payload));

resp._payloadLen = try$(Io::tell(respBuf));

return Ok(std::move(resp));
}

template <typename T>
Res<T> unpack() {
Io::PackScan s{bytes, handles};
Io::PackScan s{_buf, _hnds};
if (not is<T>())
return Error::invalidData("unexpected message");
try$(Io::unpack<Header>(s));
Expand All @@ -98,21 +142,20 @@ struct Message {

template <typename T, typename... Args>
Res<> ipcSend(Sys::IpcConnection &con, Port port, u64 seq, Args &&...args) {
Header header{seq, port, Meta::idOf<T>()};
T msg{std::forward<Args>(args)...};

Io::BufferWriter reqBuf;
Io::PackEmit reqPack{reqBuf};

try$(Io::pack(reqPack, header));
try$(Io::pack(reqPack, msg));

try$(con.send(reqBuf.bytes(), reqPack.handles()));

Message msg = Message::packReq<T>(port, seq, std::forward<Args>(args)...).take();
try$(con.send(msg.bytes(), msg.handles()));
return Ok();
}

Async::Task<Message> ipcRecvAsync(Sys::IpcConnection &con);
static inline Async::Task<Message> ipcRecvAsync(Sys::IpcConnection &con) {
Message msg;
auto [bufLen, hndsLen] = co_trya$(con.recvAsync(msg._buf, msg._hnds));
if (bufLen < sizeof(Header))
co_return Error::invalidData("invalid message");
msg._payloadLen = bufLen - sizeof(Header);
msg._hndsLen = hndsLen;
co_return msg;
}

// MARK: Ipc -------------------------------------------------------------------

Expand Down Expand Up @@ -144,15 +187,11 @@ struct Ipc {
while (true) {
Message msg = co_trya$(ipcRecvAsync(_con));

auto maybeHeader = msg.header();
if (not maybeHeader) {
logWarn("dropping message: {}", maybeHeader.none().msg());
continue;
}
auto header = msg._header;

if (_pending.has(maybeHeader.unwrap().seq)) {
auto promise = _pending.take(maybeHeader.unwrap().seq);
promise.resolve(msg);
if (_pending.has(header.seq)) {
auto promise = _pending.take(header.seq);
promise.resolve(std::move(msg));
continue;
}

Expand All @@ -162,11 +201,10 @@ struct Ipc {

template <typename T>
Res<> resp(Message &msg, Res<typename T::Response> message) {
auto header = try$(msg.header());
auto header = msg._header;
if (not message)
return ipcSend<Error>(_con, header.port, header.seq, message.none());

return ipcSend<typename T::Response>(_con, header.port, header.seq, message.take());
return ipcSend<Error>(_con, header.from, header.seq, message.none());
return ipcSend<typename T::Response>(_con, header.from, header.seq, message.take());
}

template <typename T, typename... Args>
Expand Down
42 changes: 26 additions & 16 deletions src/srvs/grund-bus/bus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ namespace Grund::Bus {
static constexpr bool DEBUG_TASK = false;
static constexpr bool DEBUG_ELF = false;

// MARK: Endpoint --------------------------------------------------------------

Res<> Endpoint::dispatch(Sys::Message &msg) {
return _bus->dispatch(port(), msg);
}

// MARK: Service ---------------------------------------------------------------

Res<Strong<Service>> Service::prepare(Sys::Context &, Str id) {
Expand Down Expand Up @@ -115,19 +121,19 @@ Async::Task<> Service::runAsync() {

logDebug("Received message on service '{}'", _id);

auto maybeHeader = msg.header();
if (not maybeHeader)
continue;
auto &header = maybeHeader.unwrap();
auto res = _bus->dispatch(maybeHeader.unwrap(), msg);
auto res = dispatch(msg);
if (not res)
co_try$(Sys::ipcSend<Error>(_con, header.port, header.seq, res.none()));
}
}

Res<> Service::dispatch(Sys::Message &msg) {
Res<> Service::send(Sys::Port, Sys::Message &msg) {
logDebug("Dispatching message on service '{}'", _id);
return _con.send(sub(msg.bytes, msg.len), sub(msg.handles, msg.handlesLen));
return _con.send(
sub(msg._buf, msg.len),
sub(msg._hnds, msg._hndsLen)
);
}

// MARK: Locator ---------------------------------------------------------------
Expand All @@ -140,18 +146,21 @@ Str Locator::id() const {
return "grund-bus";
}

Res<> Locator::dispatch(Sys::Port from, Sys::Message &msg) {
Res<> Locator::send(Sys::Port from, Sys::Message &msg) {
if (msg.is<Locate>()) {
auto locate = try$(msg.unpack<Locate>());
for (auto &endpoint : _bus->_endpoints) {
if (endpoint->id() == locate.id){

return from.dispatch()
if (endpoint->id() == locate.id) {
auto resp = try$(Sys::Message::packResp<Locate>(from, msg.unwrapHeader().seq, endpoint->port()));
try$(dispatch(resp));
return Ok();
}
}
} else {
return Error::invalidInput("invalid message");

return Error::notFound("service not found");
}

return Error::invalidInput("invalid message");
}

// MARK: Bus -------------------------------------------------------------------
Expand All @@ -173,11 +182,12 @@ Karm::Res<> Bus::attach(Strong<Endpoint> endpoint) {
return Ok();
}

Res<> Bus::dispatch(Sys::Header &h, Sys::Message &msg) {
logDebug("dispatching message to {}", h.port);
Res<> Bus::dispatch(Sys::Port from, Sys::Message &msg) {
auto header = try$(msg.unpackHeader());
logDebug("dispatching message to {p}", header.port);
for (auto &endpoint : _endpoints) {
if (endpoint->port() == h.port)
return endpoint->dispatch(msg);
if (endpoint->port() == header.port)
return endpoint->send(from, msg);
}
return Error::notFound("service not found");
}
Expand Down
Loading

0 comments on commit 7b632c0

Please sign in to comment.