Skip to content

Commit

Permalink
grund-bus: Initial implementation of message dispatch.
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepy-monax committed Nov 10, 2024
1 parent e6844df commit 1727300
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 86 deletions.
13 changes: 13 additions & 0 deletions src/libs/karm-sys/ipc.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#include "ipc.h"

namespace Karm::Sys {

Async::Task<Message> ipcRecvAsync(Sys::IpcConnection &con) {
Message msg;
auto [bufLen, hndsLen] = co_trya$(con.recvAsync(msg.bytes, msg.handles));
msg.len = bufLen;
msg.handlesLen = hndsLen;
co_return msg;
}

} // namespace Karm::Sys
53 changes: 29 additions & 24 deletions src/libs/karm-sys/ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ inline ChannelHook &useChannel(Sys::Context &ctx = Sys::globalContext()) {

namespace Karm::Sys {

// MARK: Primitive Types -------------------------------------------------------

struct Port : public Distinct<u64, struct _PortTag> {
static Port const INVALID;
static Port const BUS;
Expand Down Expand Up @@ -74,6 +76,28 @@ struct Message {
}
};

// MARK: Primitive Operations --------------------------------------------------

template <typename T, typename... Args>
Res<> ipcSend(Sys::IpcConnection &con, Port port, u64 seq, Args &&...args) {
Header header{seq, port, Meta::idOf<T>()};
T msg{std::forward<Args>(args)...};

Io::BufferWriter reqBuf;
Io::PackEmit reqPack{reqBuf};

try$(Io::pack(reqPack, header));
try$(Io::pack(reqPack, msg));

try$(con.send(reqBuf.bytes(), reqPack.handles()));

return Ok();
}

Async::Task<Message> ipcRecvAsync(Sys::IpcConnection &con);

// MARK: Ipc -------------------------------------------------------------------

struct Ipc {
Sys::IpcConnection _con;
bool _receiving = false;
Expand All @@ -85,25 +109,9 @@ struct Ipc {
return Ipc{std::move(channel.con)};
}

template <typename T, typename... Args>
Res<> _send(Port port, u64 seq, Args &&...args) {
Header header{seq, port, Meta::idOf<T>()};
T msg{std::forward<Args>(args)...};

Io::BufferWriter reqBuf;
Io::PackEmit reqPack{reqBuf};

try$(Io::pack(reqPack, header));
try$(Io::pack(reqPack, msg));

try$(_con.send(reqBuf.bytes(), reqPack.handles()));

return Ok();
}

template <typename T, typename... Args>
Res<> send(Port port, Args &&...args) {
return _send<T>(port, _seq++, std::forward<Args>(args)...);
return ipcSend<T>(_con, port, _seq++, std::forward<Args>(args)...);
}

Async::Task<Message> recvAsync() {
Expand All @@ -116,10 +124,7 @@ struct Ipc {
}};

while (true) {
Message msg;
auto [bufLen, hndsLen] = co_trya$(_con.recvAsync(msg.bytes, msg.handles));
msg.len = bufLen;
msg.handlesLen = hndsLen;
Message msg = co_trya$(ipcRecvAsync(_con));

auto maybeHeader = msg.header();
if (not maybeHeader) {
Expand All @@ -141,9 +146,9 @@ struct Ipc {
Res<> resp(Message &msg, Res<typename T::Response> message) {
auto header = try$(msg.header());
if (not message)
return _send<Error>(header.port, header.seq, message.none());
return ipcSend<Error>(_con, header.port, header.seq, message.none());

return _send<typename T::Response>(header.port, header.seq, message.take());
return ipcSend<typename T::Response>(_con, header.port, header.seq, message.take());
}

template <typename T, typename... Args>
Expand All @@ -154,7 +159,7 @@ struct Ipc {

_pending.put(seq, std::move(promise));

co_try$(_send<T>(port, seq, std::forward<Args>(args)...));
co_try$(ipcSend<T>(_con, port, seq, std::forward<Args>(args)...));

Message msg = co_await future;

Expand Down
66 changes: 60 additions & 6 deletions src/srvs/grund-bus/bus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ namespace Grund::Bus {
static constexpr bool DEBUG_TASK = false;
static constexpr bool DEBUG_ELF = false;

// MARK: Service ---------------------------------------------------------------

Res<Strong<Service>> Service::prepare(Sys::Context &, Str id) {
logInfo("prepare service '{}'", id);

Expand All @@ -19,11 +21,12 @@ Res<Strong<Service>> Service::prepare(Sys::Context &, Str id) {
auto out = try$(Hj::Channel::create(Hj::Domain::self(), 512, 16));
try$(out.label(Io::format("{}-out", id).unwrap()));

return Ok(makeStrong<Service>(
id,
auto ipc = makeStrong<Skift::IpcFd>(
std::move(in),
std::move(out)
));
);

return Ok(makeStrong<Service>(id, ipc));
}

Res<> Service::activate(Sys::Context &ctx) {
Expand Down Expand Up @@ -83,17 +86,19 @@ Res<> Service::activate(Sys::Context &ctx) {
auto handoverVrange = try$(elfSpace.map(0, handoverVmo, 0, 0, Hj::MapFlags::READ));

logInfoIf(DEBUG_TASK, "attaching channels...");
auto inCap = try$(domain.attach(_in));
auto outCap = try$(domain.attach(_out));
auto inCap = try$(domain.attach(_ipc->_in));
auto outCap = try$(domain.attach(_ipc->_out));

logInfoIf(DEBUG_TASK, "starting the task...");
try$(task.start(
image.header().entry,
stackRange.end(),
{
handoverVrange.start,
inCap.slot(),

// NOTE: In and out are intentionally swapped
outCap.slot(),
inCap.slot(),
}
));

Expand All @@ -102,4 +107,53 @@ Res<> Service::activate(Sys::Context &ctx) {
return Ok();
}

Async::Task<> Service::runAsync() {
logDebug("Listening for messages on service '{}'...", _id);
while (true) {
auto msg = co_trya$(Sys::ipcRecvAsync(_con));

logDebug("Received message on service '{}'", _id);

auto maybeHeader = msg.header();
if (not maybeHeader)
continue;
auto &header = maybeHeader.unwrap();
auto res = _bus->dispatch(maybeHeader.unwrap(), msg);
if (not res)
co_try$(Sys::ipcSend<Error>(_con, header.port, header.seq, res.none()));
}
}

Res<> Service::dispatch(Sys::Message &msg) {
logDebug("Dispatching message on service '{}'", _id);
return _con.send(msg.bytes, msg.handles);
}

// MARK: Bus -------------------------------------------------------------------

Karm::Res<Strong<Bus>> Bus::create(Sys::Context &ctx) {
return Ok(makeStrong<Bus>(ctx));
}

Karm::Res<> Bus::startService(Str id) {
auto service = try$(Service::prepare(_context, id));
try$(attach(service));
Async::detach(service->runAsync());
return Ok();
}

Karm::Res<> Bus::attach(Strong<Endpoint> endpoint) {
endpoint->attach(*this);
_endpoints.pushBack(endpoint);
return Ok();
}

Res<> Bus::dispatch(Sys::Header &h, Sys::Message &msg) {
for (auto &endpoint : _endpoints) {
if (endpoint->_port == h.port)
return endpoint->dispatch(msg);
}
return Error::notFound("service not found");
}

} // namespace Grund::Bus
80 changes: 44 additions & 36 deletions src/srvs/grund-bus/bus.h
Original file line number Diff line number Diff line change
@@ -1,62 +1,70 @@
#pragma once

#include <hjert-api/api.h>
#include <impl-skift/fd.h>
#include <karm-logger/logger.h>
#include <karm-mime/url.h>
#include <karm-sys/context.h>
#include <karm-sys/ipc.h>

namespace Grund::Bus {

struct Service {
struct Bus;

struct Endpoint : public Meta::Static {
static Sys::Port nextPort() {
static usize port = 2;
return Sys::Port{port++};
}

Sys::Port _port;
Bus *_bus;

virtual ~Endpoint() = default;

Sys::Port port() const { return _port; }

void attach(Bus &bus) { _bus = &bus; }

virtual Res<> dispatch(Sys::Message &) { return Ok(); }

virtual Res<> activate(Sys::Context &) { return Ok(); }
};

struct Service : public Endpoint {
String _id;
Hj::Channel _in;
Hj::Channel _out;
Strong<Skift::IpcFd> _ipc;
Sys::IpcConnection _con;
Opt<Hj::Task> _task = NONE;

static Res<Strong<Service>> prepare(Sys::Context &ctx, Str id);

Res<> activate(Sys::Context &ctx);
Service(Str id, Strong<Skift::IpcFd> ipc)
: _id{id}, _ipc{ipc}, _con{ipc, ""_url} {
}

Res<> activate(Sys::Context &ctx) override;

Async::Task<> runAsync();

Res<> dispatch(Sys::Message &msg) override;
};

struct Bus {
struct Bus : public Meta::Static {
Sys::Context &_context;
Hj::Listener _listener;
Hj::Domain _domain;

Vec<Strong<Service>> _services{};
Vec<Strong<Endpoint>> _endpoints{};

static Res<Bus> create(Sys::Context &ctx) {
auto domain = try$(Hj::Domain::create(Hj::ROOT));
auto listener = try$(Hj::Listener::create(Hj::ROOT));
return Ok(Bus{ctx, std::move(listener), std::move(domain)});
}
Bus(Sys::Context &ctx)
: _context(ctx) {}

Res<> prepare(Str id) {
auto service = try$(Service::prepare(_context, id));
try$(_attach(service));
return Ok();
}

Res<> _attach(Strong<Service> service) {
try$(_listener.listen(service->_out, Hj::Sigs::READABLE, Hj::Sigs::NONE));
_services.pushBack(std::move(service));
return Ok();
}
static Res<Strong<Bus>> create(Sys::Context &ctx);

Res<> run() {
for (auto &service : _services)
try$(service->activate(_context));
Res<> attach(Strong<Endpoint> endpoint);

logDebug("running system event loop");
while (true) {
try$(_listener.poll(TimeStamp::endOfTime()));
while (auto ev = _listener.next()) {
logInfo("handling system event");
Res<> dispatch(Sys::Header &h, Sys::Message &msg);

try$(Hj::_signal(ev->cap, Hj::Sigs::NONE, Hj::Sigs::READABLE));
}
}
}
Res<> startService(Str id);
};

} // namespace Grund::Bus
37 changes: 17 additions & 20 deletions src/srvs/grund-bus/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,28 @@

using namespace Grund::Bus;

Res<> entryPoint(Sys::Context &ctx) {
try$(Hj::Task::self().label("grund-bus"));
Async::Task<> entryPointAsync(Sys::Context &ctx) {
co_try$(Hj::Task::self().label("grund-bus"));

logInfo("skiftOS " stringify$(__ck_version_value));

auto system = try$(Bus::create(ctx));
auto system = co_try$(Bus::create(ctx));

try$(system.prepare("grund-av"s));
try$(system.prepare("grund-conf"s));
try$(system.prepare("grund-device"s));
try$(system.prepare("grund-dhcp"s));
try$(system.prepare("grund-dns"s));
try$(system.prepare("grund-echo"s));
try$(system.prepare("grund-fs"s));
try$(system.prepare("grund-net"s));
try$(system.prepare("grund-seat"s));
try$(system.prepare("grund-shell"s));
co_try$(system->startService("grund-av"s));
co_try$(system->startService("grund-conf"s));
co_try$(system->startService("grund-device"s));
co_try$(system->startService("grund-dhcp"s));
co_try$(system->startService("grund-dns"s));
co_try$(system->startService("grund-echo"s));
co_try$(system->startService("grund-fs"s));
co_try$(system->startService("grund-net"s));
co_try$(system->startService("grund-seat"s));
co_try$(system->startService("grund-shell"s));

return system.run();
}
for (auto &endpoint : system->_endpoints)
co_try$(endpoint->activate(ctx));

// NOTE: We can't use the normal entryPointAsync because
// the kernel invoke us with a different signature.
Async::Task<> entryPointAsync(Sys::Context &) {
unreachable();
co_return Sys::globalSched().wait(TimeStamp::endOfTime());
}

void __panicHandler(Karm::PanicKind kind, char const *msg);
Expand All @@ -46,7 +43,7 @@ extern "C" void __entryPoint(usize rawHandover) {
ctx.add<Sys::ArgsHook>(1, argv);
ctx.add<HandoverHook>((Handover::Payload *)rawHandover);

auto res = entryPoint(ctx);
auto res = Sys::run(entryPointAsync(ctx));

auto self = Hj::Task::self();

Expand Down

0 comments on commit 1727300

Please sign in to comment.