diff --git a/src/libs/karm-sys/ipc.cpp b/src/libs/karm-sys/ipc.cpp new file mode 100644 index 0000000000..4225380f8e --- /dev/null +++ b/src/libs/karm-sys/ipc.cpp @@ -0,0 +1,13 @@ +#include "ipc.h" + +namespace Karm::Sys { + +Async::Task ipcRecvAsync(Sys::IpcConnection &con) { + Message msg; + auto [bufLen, hndsLen] = co_trya$(con.recvAsync(msg.bytes, msg.handles)); + msg.len = bufLen; + msg.handlesLen = hndsLen; + co_return msg; +} + +} // namespace Karm::Sys diff --git a/src/libs/karm-sys/ipc.h b/src/libs/karm-sys/ipc.h index 9ea0c887b8..aa08c4788e 100644 --- a/src/libs/karm-sys/ipc.h +++ b/src/libs/karm-sys/ipc.h @@ -21,6 +21,8 @@ inline ChannelHook &useChannel(Sys::Context &ctx = Sys::globalContext()) { namespace Karm::Sys { +// MARK: Primitive Types ------------------------------------------------------- + struct Port : public Distinct { static Port const INVALID; static Port const BUS; @@ -74,6 +76,28 @@ struct Message { } }; +// MARK: Primitive Operations -------------------------------------------------- + +template +Res<> ipcSend(Sys::IpcConnection &con, Port port, u64 seq, Args &&...args) { + Header header{seq, port, Meta::idOf()}; + T msg{std::forward(args)...}; + + Io::BufferWriter reqBuf; + Io::PackEmit reqPack{reqBuf}; + + try$(Io::pack(reqPack, header)); + try$(Io::pack(reqPack, msg)); + + try$(con.send(reqBuf.bytes(), reqPack.handles())); + + return Ok(); +} + +Async::Task ipcRecvAsync(Sys::IpcConnection &con); + +// MARK: Ipc ------------------------------------------------------------------- + struct Ipc { Sys::IpcConnection _con; bool _receiving = false; @@ -85,25 +109,9 @@ struct Ipc { return Ipc{std::move(channel.con)}; } - template - Res<> _send(Port port, u64 seq, Args &&...args) { - Header header{seq, port, Meta::idOf()}; - T msg{std::forward(args)...}; - - Io::BufferWriter reqBuf; - Io::PackEmit reqPack{reqBuf}; - - try$(Io::pack(reqPack, header)); - try$(Io::pack(reqPack, msg)); - - try$(_con.send(reqBuf.bytes(), reqPack.handles())); - - return Ok(); - } - template Res<> send(Port port, Args &&...args) { - return _send(port, _seq++, std::forward(args)...); + return ipcSend(_con, port, _seq++, std::forward(args)...); } Async::Task recvAsync() { @@ -116,10 +124,7 @@ struct Ipc { }}; while (true) { - Message msg; - auto [bufLen, hndsLen] = co_trya$(_con.recvAsync(msg.bytes, msg.handles)); - msg.len = bufLen; - msg.handlesLen = hndsLen; + Message msg = co_trya$(ipcRecvAsync(_con)); auto maybeHeader = msg.header(); if (not maybeHeader) { @@ -141,9 +146,9 @@ struct Ipc { Res<> resp(Message &msg, Res message) { auto header = try$(msg.header()); if (not message) - return _send(header.port, header.seq, message.none()); + return ipcSend(_con, header.port, header.seq, message.none()); - return _send(header.port, header.seq, message.take()); + return ipcSend(_con, header.port, header.seq, message.take()); } template @@ -154,7 +159,7 @@ struct Ipc { _pending.put(seq, std::move(promise)); - co_try$(_send(port, seq, std::forward(args)...)); + co_try$(ipcSend(_con, port, seq, std::forward(args)...)); Message msg = co_await future; diff --git a/src/srvs/grund-bus/bus.cpp b/src/srvs/grund-bus/bus.cpp index ad3fdb13cb..7587644055 100644 --- a/src/srvs/grund-bus/bus.cpp +++ b/src/srvs/grund-bus/bus.cpp @@ -10,6 +10,8 @@ namespace Grund::Bus { static constexpr bool DEBUG_TASK = false; static constexpr bool DEBUG_ELF = false; +// MARK: Service --------------------------------------------------------------- + Res> Service::prepare(Sys::Context &, Str id) { logInfo("prepare service '{}'", id); @@ -19,11 +21,12 @@ Res> Service::prepare(Sys::Context &, Str id) { auto out = try$(Hj::Channel::create(Hj::Domain::self(), 512, 16)); try$(out.label(Io::format("{}-out", id).unwrap())); - return Ok(makeStrong( - id, + auto ipc = makeStrong( std::move(in), std::move(out) - )); + ); + + return Ok(makeStrong(id, ipc)); } Res<> Service::activate(Sys::Context &ctx) { @@ -83,8 +86,8 @@ Res<> Service::activate(Sys::Context &ctx) { auto handoverVrange = try$(elfSpace.map(0, handoverVmo, 0, 0, Hj::MapFlags::READ)); logInfoIf(DEBUG_TASK, "attaching channels..."); - auto inCap = try$(domain.attach(_in)); - auto outCap = try$(domain.attach(_out)); + auto inCap = try$(domain.attach(_ipc->_in)); + auto outCap = try$(domain.attach(_ipc->_out)); logInfoIf(DEBUG_TASK, "starting the task..."); try$(task.start( @@ -92,8 +95,10 @@ Res<> Service::activate(Sys::Context &ctx) { stackRange.end(), { handoverVrange.start, - inCap.slot(), + + // NOTE: In and out are intentionally swapped outCap.slot(), + inCap.slot(), } )); @@ -102,4 +107,53 @@ Res<> Service::activate(Sys::Context &ctx) { return Ok(); } +Async::Task<> Service::runAsync() { + logDebug("Listening for messages on service '{}'...", _id); + while (true) { + auto msg = co_trya$(Sys::ipcRecvAsync(_con)); + + logDebug("Received message on service '{}'", _id); + + auto maybeHeader = msg.header(); + if (not maybeHeader) + continue; + auto &header = maybeHeader.unwrap(); + auto res = _bus->dispatch(maybeHeader.unwrap(), msg); + if (not res) + co_try$(Sys::ipcSend(_con, header.port, header.seq, res.none())); + } +} + +Res<> Service::dispatch(Sys::Message &msg) { + logDebug("Dispatching message on service '{}'", _id); + return _con.send(msg.bytes, msg.handles); +} + +// MARK: Bus ------------------------------------------------------------------- + +Karm::Res> Bus::create(Sys::Context &ctx) { + return Ok(makeStrong(ctx)); +} + +Karm::Res<> Bus::startService(Str id) { + auto service = try$(Service::prepare(_context, id)); + try$(attach(service)); + Async::detach(service->runAsync()); + return Ok(); +} + +Karm::Res<> Bus::attach(Strong endpoint) { + endpoint->attach(*this); + _endpoints.pushBack(endpoint); + return Ok(); +} + +Res<> Bus::dispatch(Sys::Header &h, Sys::Message &msg) { + for (auto &endpoint : _endpoints) { + if (endpoint->_port == h.port) + return endpoint->dispatch(msg); + } + return Error::notFound("service not found"); +} + } // namespace Grund::Bus diff --git a/src/srvs/grund-bus/bus.h b/src/srvs/grund-bus/bus.h index 7bf6f4f740..bb205a7c6c 100644 --- a/src/srvs/grund-bus/bus.h +++ b/src/srvs/grund-bus/bus.h @@ -1,62 +1,70 @@ #pragma once #include +#include #include #include #include +#include namespace Grund::Bus { -struct Service { +struct Bus; + +struct Endpoint : public Meta::Static { + static Sys::Port nextPort() { + static usize port = 2; + return Sys::Port{port++}; + } + + Sys::Port _port; + Bus *_bus; + + virtual ~Endpoint() = default; + + Sys::Port port() const { return _port; } + + void attach(Bus &bus) { _bus = &bus; } + + virtual Res<> dispatch(Sys::Message &) { return Ok(); } + + virtual Res<> activate(Sys::Context &) { return Ok(); } +}; + +struct Service : public Endpoint { String _id; - Hj::Channel _in; - Hj::Channel _out; + Strong _ipc; + Sys::IpcConnection _con; Opt _task = NONE; static Res> prepare(Sys::Context &ctx, Str id); - Res<> activate(Sys::Context &ctx); + Service(Str id, Strong ipc) + : _id{id}, _ipc{ipc}, _con{ipc, ""_url} { + } + + Res<> activate(Sys::Context &ctx) override; + + Async::Task<> runAsync(); + + Res<> dispatch(Sys::Message &msg) override; }; -struct Bus { +struct Bus : public Meta::Static { Sys::Context &_context; - Hj::Listener _listener; - Hj::Domain _domain; - Vec> _services{}; + Vec> _endpoints{}; - static Res create(Sys::Context &ctx) { - auto domain = try$(Hj::Domain::create(Hj::ROOT)); - auto listener = try$(Hj::Listener::create(Hj::ROOT)); - return Ok(Bus{ctx, std::move(listener), std::move(domain)}); - } + Bus(Sys::Context &ctx) + : _context(ctx) {} - Res<> prepare(Str id) { - auto service = try$(Service::prepare(_context, id)); - try$(_attach(service)); - return Ok(); - } - - Res<> _attach(Strong service) { - try$(_listener.listen(service->_out, Hj::Sigs::READABLE, Hj::Sigs::NONE)); - _services.pushBack(std::move(service)); - return Ok(); - } + static Res> create(Sys::Context &ctx); - Res<> run() { - for (auto &service : _services) - try$(service->activate(_context)); + Res<> attach(Strong endpoint); - logDebug("running system event loop"); - while (true) { - try$(_listener.poll(TimeStamp::endOfTime())); - while (auto ev = _listener.next()) { - logInfo("handling system event"); + Res<> dispatch(Sys::Header &h, Sys::Message &msg); - try$(Hj::_signal(ev->cap, Hj::Sigs::NONE, Hj::Sigs::READABLE)); - } - } - } + Res<> startService(Str id); }; } // namespace Grund::Bus diff --git a/src/srvs/grund-bus/main.cpp b/src/srvs/grund-bus/main.cpp index a2d7529796..63f3778def 100644 --- a/src/srvs/grund-bus/main.cpp +++ b/src/srvs/grund-bus/main.cpp @@ -8,31 +8,28 @@ using namespace Grund::Bus; -Res<> entryPoint(Sys::Context &ctx) { - try$(Hj::Task::self().label("grund-bus")); +Async::Task<> entryPointAsync(Sys::Context &ctx) { + co_try$(Hj::Task::self().label("grund-bus")); logInfo("skiftOS " stringify$(__ck_version_value)); - auto system = try$(Bus::create(ctx)); + auto system = co_try$(Bus::create(ctx)); - try$(system.prepare("grund-av"s)); - try$(system.prepare("grund-conf"s)); - try$(system.prepare("grund-device"s)); - try$(system.prepare("grund-dhcp"s)); - try$(system.prepare("grund-dns"s)); - try$(system.prepare("grund-echo"s)); - try$(system.prepare("grund-fs"s)); - try$(system.prepare("grund-net"s)); - try$(system.prepare("grund-seat"s)); - try$(system.prepare("grund-shell"s)); + co_try$(system->startService("grund-av"s)); + co_try$(system->startService("grund-conf"s)); + co_try$(system->startService("grund-device"s)); + co_try$(system->startService("grund-dhcp"s)); + co_try$(system->startService("grund-dns"s)); + co_try$(system->startService("grund-echo"s)); + co_try$(system->startService("grund-fs"s)); + co_try$(system->startService("grund-net"s)); + co_try$(system->startService("grund-seat"s)); + co_try$(system->startService("grund-shell"s)); - return system.run(); -} + for (auto &endpoint : system->_endpoints) + co_try$(endpoint->activate(ctx)); -// NOTE: We can't use the normal entryPointAsync because -// the kernel invoke us with a different signature. -Async::Task<> entryPointAsync(Sys::Context &) { - unreachable(); + co_return Sys::globalSched().wait(TimeStamp::endOfTime()); } void __panicHandler(Karm::PanicKind kind, char const *msg); @@ -46,7 +43,7 @@ extern "C" void __entryPoint(usize rawHandover) { ctx.add(1, argv); ctx.add((Handover::Payload *)rawHandover); - auto res = entryPoint(ctx); + auto res = Sys::run(entryPointAsync(ctx)); auto self = Hj::Task::self();