Skip to content

Commit

Permalink
karm-rpc: Split off karm-rpc from karm-sys.
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepy-monax committed Jan 2, 2025
1 parent 2c497f7 commit 7485ecd
Show file tree
Hide file tree
Showing 29 changed files with 141 additions and 94 deletions.
2 changes: 1 addition & 1 deletion src/impls/impl-skift/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#include <hjert-api/api.h>
#include <karm-base/panic.h>
#include <karm-logger/logger.h>
#include <karm-rpc/base.h>
#include <karm-sys/context.h>
#include <karm-sys/rpc.h>

#include "fd.h"

Expand Down
28 changes: 28 additions & 0 deletions src/libs/karm-rpc/base.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "base.h"

namespace Karm::Rpc {

static Endpoint *_globalEndpoint = nullptr;

Endpoint::Endpoint(Sys::IpcConnection con)
: _con(std::move(con)) {
_globalEndpoint = this;
// FIXME: Find a way to do proper cleanup
Async::detach(_receiverTask(*this), [](Res<> res) {
logError("receiver task exited: {}", res);
panic("receiver task exited");
});
}

Endpoint Endpoint::create(Sys::Context &ctx) {
auto &channel = useChannel(ctx);
return {std::move(channel.con)};
}

Endpoint &globalEndpoint() {
if (not _globalEndpoint)
panic("no active endpoint");
return *_globalEndpoint;
}

} // namespace Karm::Rpc
26 changes: 9 additions & 17 deletions src/libs/karm-sys/rpc.h → src/libs/karm-rpc/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ inline ChannelHook &useChannel(Sys::Context &ctx = Sys::globalContext()) {
return ctx.use<ChannelHook>();
}

namespace Karm::Sys {
namespace Karm::Rpc {

// MARK: Primitive Types -------------------------------------------------------

Expand Down Expand Up @@ -93,7 +93,7 @@ struct Message {
return sub(_buf, 0, len());
}

Slice<Handle> handles() {
Slice<Sys::Handle> handles() {
return sub(_hnds, 0, _hndsLen);
}

Expand Down Expand Up @@ -178,27 +178,17 @@ static inline Async::Task<Message> rpcRecvAsync(Sys::IpcConnection &con) {

// MARK: Rpc -------------------------------------------------------------------

struct Rpc : Meta::Pinned {
struct Endpoint : Meta::Pinned {
Sys::IpcConnection _con;
Map<u64, Async::_Promise<Message>> _pending{};
Async::Queue<Message> _incoming{};
u64 _seq = 1;

Rpc(Sys::IpcConnection con)
: _con(std::move(con)) {
// FIXME: Find a way to do proper cleanup
Async::detach(_receiverTask(*this), [](Res<> res) {
logError("receiver task exited: {}", res);
panic("receiver task exited");
});
}
Endpoint(Sys::IpcConnection con);

static Rpc create(Sys::Context &ctx) {
auto &channel = useChannel(ctx);
return Rpc{std::move(channel.con)};
}
static Endpoint create(Sys::Context &ctx);

static Async::Task<> _receiverTask(Rpc &self) {
static Async::Task<> _receiverTask(Endpoint &self) {
while (true) {
Message msg = co_trya$(rpcRecvAsync(self._con));
auto header = msg._header;
Expand Down Expand Up @@ -250,4 +240,6 @@ struct Rpc : Meta::Pinned {
}
};

} // namespace Karm::Sys
Endpoint &globalEndpoint();

} // namespace Karm::Rpc
9 changes: 9 additions & 0 deletions src/libs/karm-rpc/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"$schema": "https://schemas.cute.engineering/stable/cutekit.manifest.component.v1",
"id": "karm-rpc",
"type": "lib",
"description": "A remote procedure call library",
"requires": [
"karm-sys"
]
}
6 changes: 3 additions & 3 deletions src/srvs/grund-av/main.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#include <karm-rpc/base.h>
#include <karm-sys/entry.h>
#include <karm-sys/rpc.h>

namespace Grund::Av {

Async::Task<> serv(Sys::Context &ctx) {
Sys::Rpc rpc = Sys::Rpc::create(ctx);
auto endpoint = Rpc::Endpoint::create(ctx);

logInfo("service started");
while (true) {
co_trya$(rpc.recvAsync());
co_trya$(endpoint.recvAsync());
logDebug("received message from system");
}
}
Expand Down
1 change: 1 addition & 0 deletions src/srvs/grund-av/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
]
},
"requires": [
"karm-rpc",
"karm-sys"
]
}
13 changes: 9 additions & 4 deletions src/srvs/grund-bus/api.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
#pragma once

#include <karm-base/string.h>
#include <karm-sys/rpc.h>
#include <karm-rpc/base.h>

namespace Grund::Bus {
namespace Grund::Bus::Api {

struct Locate {
using Response = Sys::Port;
using Response = Rpc::Port;
String id;
};

struct Listen {
Meta::Id mid;
};

} // namespace Grund::Bus
struct Start {
// FIXME: Handle intent instead of simple id?
String id;
};

} // namespace Grund::Bus::Api
37 changes: 20 additions & 17 deletions src/srvs/grund-bus/bus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ static constexpr bool DEBUG_ELF = false;

// MARK: Endpoint --------------------------------------------------------------

Res<> Endpoint::dispatch(Sys::Message &msg) {
Res<> Endpoint::dispatch(Rpc::Message &msg) {
msg.header().from = _port;
return _bus->dispatch(msg);
}
Expand Down Expand Up @@ -115,54 +115,57 @@ Res<> Service::activate(Sys::Context &ctx) {

Async::Task<> Service::runAsync() {
while (true) {
auto msg = co_trya$(Sys::rpcRecvAsync(_con));
auto msg = co_trya$(Rpc::rpcRecvAsync(_con));

if (msg.is<Listen>()) {
auto listen = co_try$(msg.unpack<Listen>());
if (msg.is<Api::Listen>()) {
auto listen = co_try$(msg.unpack<Api::Listen>());
_listen.pushBack(listen.mid);
} else {
auto res = dispatch(msg);
if (not res) {
logError("{}: dispatch failed: {}", id(), res);
co_try$(Sys::rpcSend<Error>(_con, port(), msg.header().seq, res.none()));
co_try$(Rpc::rpcSend<Error>(_con, port(), msg.header().seq, res.none()));
}
}
}
}

Res<> Service::send(Sys::Message &msg) {
Res<> Service::send(Rpc::Message &msg) {
return _con.send(
msg.bytes(),
msg.handles()
);
}

bool Service::accept(Sys::Message const &msg) {
bool Service::accept(Rpc::Message const &msg) {
return contains(_listen, msg.header().mid);
}

// MARK: Locator ---------------------------------------------------------------

Locator::Locator() {
_port = Sys::Port::BUS;
System::System() {
_port = Rpc::Port::BUS;
}

Str Locator::id() const {
Str System::id() const {
return "grund-bus";
}

Res<> Locator::send(Sys::Message &msg) {
if (msg.is<Locate>()) {
auto locate = try$(msg.unpack<Locate>());
Res<> System::send(Rpc::Message &msg) {
if (msg.is<Api::Locate>()) {
auto locate = try$(msg.unpack<Api::Locate>());
for (auto &endpoint : _bus->_endpoints) {
if (endpoint->id() == locate.id) {
auto resp = try$(msg.packResp<Locate>(endpoint->port()));
auto resp = try$(msg.packResp<Api::Locate>(endpoint->port()));
try$(dispatch(resp));
return Ok();
}
}

return Error::notFound("service not found");
} else if (msg.is<Api::Start>()) {
auto start = try$(msg.unpack<Api::Start>());
return _bus->startService(start.id);
}

return Ok();
Expand All @@ -187,7 +190,7 @@ Karm::Res<> Bus::attach(Strong<Endpoint> endpoint) {
return Ok();
}

void Bus::_broadcast(Sys::Message &msg) {
void Bus::_broadcast(Rpc::Message &msg) {
for (auto &endpoint : _endpoints) {
if (msg.header().from != endpoint->port() and endpoint->accept(msg)) {
auto res = endpoint->send(msg);
Expand All @@ -197,8 +200,8 @@ void Bus::_broadcast(Sys::Message &msg) {
}
}

Res<> Bus::dispatch(Sys::Message &msg) {
if (msg.header().to == Sys::Port::BROADCAST) {
Res<> Bus::dispatch(Rpc::Message &msg) {
if (msg.header().to == Rpc::Port::BROADCAST) {
_broadcast(msg);
return Ok();
}
Expand Down
30 changes: 15 additions & 15 deletions src/srvs/grund-bus/bus.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,35 @@
#include <impl-skift/fd.h>
#include <karm-logger/logger.h>
#include <karm-mime/url.h>
#include <karm-rpc/base.h>
#include <karm-sys/context.h>
#include <karm-sys/rpc.h>

namespace Grund::Bus {

struct Bus;

struct Endpoint : public Meta::Pinned {
static Sys::Port nextPort() {
static Rpc::Port nextPort() {
static usize port = 2;
return Sys::Port{port++};
return Rpc::Port{port++};
}

Sys::Port _port = nextPort();
Rpc::Port _port = nextPort();
Bus *_bus;

virtual ~Endpoint() = default;

Sys::Port port() const { return _port; }
Rpc::Port port() const { return _port; }

void attach(Bus &bus) { _bus = &bus; }

Res<> dispatch(Sys::Message &msg);
Res<> dispatch(Rpc::Message &msg);

virtual Str id() const = 0;

virtual Res<> send(Sys::Message &) { return Ok(); }
virtual Res<> send(Rpc::Message &) { return Ok(); }

virtual bool accept(Sys::Message const &) { return true; }
virtual bool accept(Rpc::Message const &) { return true; }

virtual Res<> activate(Sys::Context &) { return Ok(); }
};
Expand All @@ -56,17 +56,17 @@ struct Service : public Endpoint {

Async::Task<> runAsync();

Res<> send(Sys::Message &msg) override;
Res<> send(Rpc::Message &msg) override;

bool accept(Sys::Message const &msg) override;
bool accept(Rpc::Message const &msg) override;
};

struct Locator : public Endpoint {
Locator();
struct System : public Endpoint {
System();

Str id() const override;

Res<> send(Sys::Message &msg) override;
Res<> send(Rpc::Message &msg) override;
};

struct Bus : public Meta::Pinned {
Expand All @@ -81,9 +81,9 @@ struct Bus : public Meta::Pinned {

Res<> attach(Strong<Endpoint> endpoint);

void _broadcast(Sys::Message &msg);
void _broadcast(Rpc::Message &msg);

Res<> dispatch(Sys::Message &msg);
Res<> dispatch(Rpc::Message &msg);

Res<> startService(Str id);
};
Expand Down
2 changes: 1 addition & 1 deletion src/srvs/grund-bus/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Async::Task<> entryPointAsync(Sys::Context &ctx) {

auto system = co_try$(Bus::create(ctx));

co_try$(system->attach(makeStrong<Locator>()));
co_try$(system->attach(makeStrong<System>()));
co_try$(system->startService("grund-av"s));
co_try$(system->startService("grund-conf"s));
co_try$(system->startService("grund-device"s));
Expand Down
1 change: 1 addition & 0 deletions src/srvs/grund-bus/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
]
},
"requires": [
"karm-rpc",
"karm-sys"
]
}
6 changes: 3 additions & 3 deletions src/srvs/grund-conf/main.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#include <karm-rpc/base.h>
#include <karm-sys/entry.h>
#include <karm-sys/rpc.h>

namespace Grund::Conf {

Async::Task<> serv(Sys::Context &ctx) {
auto rpc = Sys::Rpc::create(ctx);
auto endpoint = Rpc::Endpoint::create(ctx);

logInfo("service started");
while (true) {
co_trya$(rpc.recvAsync());
co_trya$(endpoint.recvAsync());
logDebug("received message from system");
}
}
Expand Down
1 change: 1 addition & 0 deletions src/srvs/grund-conf/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
]
},
"requires": [
"karm-rpc",
"karm-sys"
]
}
Loading

0 comments on commit 7485ecd

Please sign in to comment.