Skip to content

Commit

Permalink
karm-async: Added simple queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepy-monax committed Dec 25, 2024
1 parent 3a3a149 commit ff50e32
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 0 deletions.
95 changes: 95 additions & 0 deletions src/libs/karm-async/queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#pragma once

#include <karm-base/list.h>
#include <karm-base/vec.h>

#include "awaiter.h"

namespace Karm::Async {

template <typename T>
struct Queue {
struct _Listener {
LlItem<_Listener> item;
Opt<T> value;
virtual ~_Listener() = default;
virtual void complete() = 0;
};

Vec<T> _buf;
Ll<_Listener> _listeners;

Queue() = default;

template <typename... Ts>
void emplace(Ts &&...arg) {
if (_listeners.empty()) {
_buf.emplaceBack(std::forward<Ts>(arg)...);
return;
}

if (not empty())
panic("queue should be empty");

auto listener = _listeners.detach(_listeners.head());
listener->value.emplace(std::forward<Ts>(arg)...);
listener->complete();
}

void enqueue(T item) {
emplace(std::move(item));
}

template <Receiver<T> R>
struct _GetOperation : private _Listener {
using _Listener::value;

Queue *_q;
R _r;

_GetOperation(Queue *q, R r)
: _q{q}, _r{std::move(r)} {}

bool start() {
if (_q->empty()) {
_q->_listeners.append(this, _q->_listeners.head());
return false;
}

if (not _q->_listeners.empty())
panic("listeners should be empty");

_r.recv(INLINE, _q->_buf.popFront());
return true;
}

void complete() override {
_r.recv(LATER, value.take());
}
};

struct _GetSender {
using Inner = T;
Queue *_q;

auto connect(Receiver<T> auto r) -> _GetOperation<decltype(r)> {
return {_q, std::move(r)};
}
};

auto dequeueAsync() {
return _GetSender{this};
}

bool empty() {
return ::isEmpty(_buf);
}

Opt<T> dequeue() {
if (empty())
return NONE;
return _buf.popFront();
}
};

} // namespace Karm::Async
52 changes: 52 additions & 0 deletions src/libs/karm-async/tests/test-queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include <karm-async/one.h>
#include <karm-async/queue.h>
#include <karm-test/macros.h>

namespace Karm::Async::Tests {

test$("karm-queue-enqueue-dequeue") {
Queue<isize> q;
q.enqueue(42);
q.enqueue(69);

auto res1 = Async::run(q.dequeueAsync());
auto res2 = Async::run(q.dequeueAsync());

expectEq$(res1, 42);
expectEq$(res2, 69);

return Ok();
}

test$("karm-queue-dequeue-enqueue") {
Queue<isize> q;

isize res1 = 0;
isize res2 = 0;
bool orderOk = false;

Async::detach(q.dequeueAsync(), [&](isize v) {
res1 = v;
});

Async::detach(q.dequeueAsync(), [&](isize v) {
if (res1 == 42)
orderOk = true;
res2 = v;
});

q.enqueue(42);
q.enqueue(69);
q.enqueue(96);

expect$(orderOk);
expectEq$(res1, 42);
expectEq$(res2, 69);

expect$(not q.empty());
expectEq$(q.dequeue(), 96);

return Ok();
}

} // namespace Karm::Async::Tests

0 comments on commit ff50e32

Please sign in to comment.