Skip to content
This repository was archived by the owner on Sep 11, 2024. It is now read-only.

Commit c4a8977

Browse files
committed
River: Beginnings of shared memory-based IPC implementation
1 parent 3635f35 commit c4a8977

File tree

3 files changed

+225
-1
lines changed

3 files changed

+225
-1
lines changed

libraries/libriver/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
SET(SOURCES BusConnection.cpp BusServer.cpp Endpoint.cpp packet.cpp)
1+
SET(SOURCES BusConnection.cpp BusServer.cpp Endpoint.cpp IPCBuffer.cpp packet.cpp)
22
MAKE_LIBRARY(libriver)
33
TARGET_LINK_LIBRARIES(libriver libduck)

libraries/libriver/IPCBuffer.cpp

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/* SPDX-License-Identifier: GPL-3.0-or-later */
2+
/* Copyright © 2016-2024 Byteduck */
3+
4+
#include "IPCBuffer.h"
5+
#include <sys/futex.h>
6+
7+
using namespace Duck;
8+
using namespace River;
9+
10+
IPCBuffer::IPCBuffer(Duck::Ptr<Duck::SharedBuffer> buffer):
11+
m_buffer(std::move(buffer)),
12+
m_header(m_buffer->ptr<Header>()),
13+
m_data(m_buffer->ptr<uint8_t>() + sizeof(Header)),
14+
m_data_size(m_buffer->size() - sizeof(Header))
15+
{
16+
assert(m_header->magic == IPC_MAGIC);
17+
}
18+
19+
Duck::ResultRet<Duck::Ptr<IPCBuffer>> IPCBuffer::alloc(std::string name, size_t buffer_size) {
20+
if (buffer_size <= sizeof(Header))
21+
return Result {"Invalid buffer size"};
22+
23+
auto buf = TRY(Duck::SharedBuffer::alloc(buffer_size, std::move(name)));
24+
auto* hdr = buf->ptr<Header>();
25+
hdr->magic = IPC_MAGIC;
26+
hdr->read_futex = 0;
27+
hdr->write_futex = 0;
28+
hdr->read = 0;
29+
hdr->write = 0;
30+
return Duck::Ptr<IPCBuffer>(new IPCBuffer(buf));
31+
}
32+
33+
Duck::ResultRet<Duck::Ptr<IPCBufferReceiver>> IPCBufferReceiver::attach(Duck::Ptr<Duck::SharedBuffer> buffer) {
34+
if (buffer->size() <= sizeof(Header))
35+
return Result {"Invalid buffer size"};
36+
if (buffer->ptr<Header>()->magic != IPC_MAGIC)
37+
return Result {"Invalid magic"};
38+
return Duck::Ptr<IPCBufferReceiver>(new IPCBufferReceiver(std::move(buffer)));
39+
}
40+
41+
Result IPCBufferReceiver::recv(const ReadCallback& callback, bool blocking) {
42+
size_t msg_size;
43+
auto read = m_header->read.load(); // Since this is in shared memory, we save it to the stack to prevent malicious screwery
44+
45+
// Acquire read futex
46+
if(blocking)
47+
futex_wait(&m_header->read_futex);
48+
else if(!futex_trywait(&m_header->read_futex))
49+
return {NO_MESSAGE};
50+
51+
while(true) {
52+
// Ensure read head is valid
53+
if (read > m_data_size) {
54+
return {INVALID_BUFFER_STATE};
55+
} else if (read + sizeof(Message) > m_data_size) {
56+
// If reading a message header would go past the end of the buffer, that means we need to wrap around.
57+
m_header->read = 0;
58+
read = 0;
59+
}
60+
61+
// Read into output buffer
62+
msg_size = *((size_t*) (m_data + read));
63+
64+
// Size of -1 means that we ran out of space when writing the last packet and wrapped around to the beginning
65+
if (msg_size == -1) {
66+
m_header->read = 0;
67+
read = 0;
68+
} else {
69+
break;
70+
}
71+
}
72+
73+
// Call callback with message. Check buffer size twice to account for overflows
74+
if(read > m_data_size || (read + msg_size + sizeof(Message)) > m_data_size)
75+
return {INVALID_BUFFER_STATE};
76+
77+
callback(m_data + read + sizeof(Message), msg_size);
78+
79+
// Move read head
80+
m_header->read = (read + msg_size + sizeof(Message)) % m_data_size;
81+
m_header->unread.fetch_sub(msg_size + sizeof(Message), std::memory_order::memory_order_release);
82+
futex_signal(&m_header->write_futex);
83+
84+
return Result::SUCCESS;
85+
}
86+
87+
Duck::ResultRet<Duck::Ptr<IPCBufferSender>> IPCBufferSender::attach(Ptr<SharedBuffer> buffer) {
88+
if (buffer->size() <= sizeof(Header))
89+
return Result {"Invalid buffer size"};
90+
if (buffer->ptr<Header>()->magic != IPC_MAGIC)
91+
return Result {"Invalid magic"};
92+
return Duck::Ptr<IPCBufferSender>(new IPCBufferSender(std::move(buffer)));
93+
}
94+
95+
Duck::Result IPCBufferSender::send(size_t size, const WriteCallback& callback, bool blocking) {
96+
if (size > m_data_size - sizeof(Message))
97+
return {MESSAGE_TOO_LARGE};
98+
99+
// Set the write futex to 0 just in case we need it
100+
__atomic_store_n(&m_header->write_futex, 0, __ATOMIC_SEQ_CST);
101+
102+
auto write = m_header->write.load();
103+
104+
auto can_write = [&write, size, this] () -> bool {
105+
const size_t read = m_header->read.load();
106+
if (read == m_header->write)
107+
return m_header->unread.load() == 0;
108+
else if (read > write)
109+
return write + size + sizeof(Message) < read;
110+
else
111+
return (write + size + sizeof(Message) <= m_data_size) || (read >= size + sizeof(Message));
112+
};
113+
114+
while (!can_write()) {
115+
if (!blocking)
116+
return {NO_BUFFER_SPACE};
117+
futex_wait(&m_header->write_futex);
118+
}
119+
120+
if (write + sizeof(Message) > m_data_size) {
121+
// If writing a message header would overflow, just wrap around to 0. Reader will know to follow.
122+
write = 0;
123+
} else if (write + size + sizeof(Message) > m_data_size) {
124+
// If writing the header is possible but writing the whole message is not, we need to write a -1 to signify this
125+
((Message*) (m_data + write))->size = -1;
126+
write = 0;
127+
}
128+
129+
// Write the message header and message
130+
auto* message = (Message*) (m_data + write);
131+
message->size = size;
132+
callback(message->data);
133+
134+
// Update write head
135+
write = write + size + sizeof(Message);
136+
assert(write <= m_data_size);
137+
if (write == m_data_size)
138+
write = 0;
139+
m_header->write = write;
140+
141+
// Signal reader futex
142+
m_header->unread.fetch_add(size + sizeof(Message), std::memory_order::memory_order_release);
143+
futex_signal(&m_header->read_futex);
144+
145+
return Result::SUCCESS;
146+
}
147+
148+
Duck::Result IPCBufferSender::send(size_t size, const void* val, bool blocking) {
149+
return send(size, [val, size] (uint8_t* buf) {
150+
memcpy(buf, val, size);
151+
}, blocking);
152+
}

libraries/libriver/IPCBuffer.h

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/* SPDX-License-Identifier: GPL-3.0-or-later */
2+
/* Copyright © 2016-2024 Byteduck */
3+
4+
#pragma once
5+
6+
#include <libduck/SharedBuffer.h>
7+
#include <functional>
8+
#include <atomic>
9+
10+
namespace River {
11+
class IPCBuffer {
12+
public:
13+
struct Message {
14+
size_t size;
15+
uint8_t data[];
16+
};
17+
18+
enum ResultCode {
19+
NO_MESSAGE,
20+
INVALID_BUFFER_STATE,
21+
NO_BUFFER_SPACE,
22+
MESSAGE_TOO_LARGE
23+
};
24+
25+
static constexpr size_t default_buffer_size = PAGE_SIZE * 8;
26+
27+
static Duck::ResultRet<Duck::Ptr<IPCBuffer>> alloc(std::string name, size_t buffer_size = default_buffer_size);
28+
29+
Duck::Ptr<Duck::SharedBuffer> buffer() const { return m_buffer; }
30+
31+
protected:
32+
IPCBuffer(Duck::Ptr<Duck::SharedBuffer> buffer);
33+
34+
static constexpr int IPC_MAGIC = 0x42069;
35+
struct Header {
36+
int magic;
37+
int read_futex;
38+
int write_futex;
39+
std::atomic<size_t> unread, read, write;
40+
};
41+
42+
Duck::Ptr<Duck::SharedBuffer> m_buffer;
43+
uint8_t* m_data;
44+
size_t m_data_size;
45+
Header* m_header;
46+
};
47+
48+
class IPCBufferReceiver: public IPCBuffer {
49+
public:
50+
static Duck::ResultRet<Duck::Ptr<IPCBufferReceiver>> attach(Duck::Ptr<Duck::SharedBuffer> buffer);
51+
52+
using ReadCallback = std::function<void(const uint8_t*, size_t)>;
53+
Duck::Result recv(const ReadCallback& callback, bool blocking = true);
54+
55+
private:
56+
IPCBufferReceiver(Duck::Ptr<Duck::SharedBuffer> buffer): IPCBuffer(std::move(buffer)) {};
57+
};
58+
59+
class IPCBufferSender: public IPCBuffer {
60+
public:
61+
62+
static Duck::ResultRet<Duck::Ptr<IPCBufferSender>> attach(Duck::Ptr<Duck::SharedBuffer> buffer);
63+
64+
using WriteCallback = std::function<void(uint8_t*)>;
65+
Duck::Result send(size_t size, const WriteCallback& callback, bool blocking = true);
66+
Duck::Result send(size_t size, const void* val, bool blocking = true);
67+
68+
private:
69+
70+
IPCBufferSender(Duck::Ptr<Duck::SharedBuffer> buffer): IPCBuffer(std::move(buffer)) {};
71+
};
72+
}

0 commit comments

Comments
 (0)