forked from SerenityOS/serenity
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAsyncStreamBuffer.h
131 lines (108 loc) Β· 3.26 KB
/
AsyncStreamBuffer.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*
* Copyright (c) 2024, Dan Klishch <[email protected]>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#pragma once
#include <AK/Coroutine.h>
#include <AK/Error.h>
namespace AK {
class AsyncStreamBuffer {
AK_MAKE_NONCOPYABLE(AsyncStreamBuffer);
public:
AsyncStreamBuffer()
{
m_capacity = min_capacity;
m_data = reinterpret_cast<u8*>(kmalloc(m_capacity));
}
AsyncStreamBuffer(AsyncStreamBuffer&& other)
: m_read_head(exchange(other.m_read_head, 0))
, m_peek_head(exchange(other.m_peek_head, 0))
, m_capacity(exchange(other.m_capacity, 0))
, m_data(exchange(other.m_data, nullptr))
{
}
AsyncStreamBuffer& operator=(AsyncStreamBuffer&& buffer)
{
if (this != &buffer) {
this->~AsyncStreamBuffer();
new (this) AsyncStreamBuffer(move(buffer));
}
return *this;
}
~AsyncStreamBuffer()
{
if (m_data)
kfree_sized(m_data, m_capacity);
}
bool is_empty() const
{
return m_read_head == m_peek_head;
}
ReadonlyBytes data() const
{
return { m_data + m_read_head, m_peek_head - m_read_head };
}
void dequeue(size_t bytes)
{
m_read_head += bytes;
}
template<typename Func>
Coroutine<ErrorOr<size_t>> enqueue(size_t preferred_capacity_for_writing, Func&& func)
{
allocate_enough_space_for(preferred_capacity_for_writing);
size_t nread = CO_TRY(co_await func(Bytes { m_data + m_peek_head, m_capacity - m_peek_head }));
m_peek_head += nread;
co_return nread;
}
void append(ReadonlyBytes bytes)
{
if (m_peek_head + bytes.size() > m_capacity)
allocate_enough_space_for(bytes.size());
memcpy(m_data + m_peek_head, bytes.data(), bytes.size());
m_peek_head += bytes.size();
}
void append(u8 byte)
{
if (m_peek_head == m_capacity)
allocate_enough_space_for(1);
m_data[m_peek_head++] = byte;
}
Bytes get_bytes_for_writing(size_t length)
{
if (m_peek_head + length > m_capacity)
allocate_enough_space_for(length);
m_peek_head += length;
return { m_data + m_peek_head - length, length };
}
private:
static constexpr size_t min_capacity = 32;
void allocate_enough_space_for(size_t length)
{
if (m_read_head != 0) {
if (m_capacity - (m_peek_head - m_read_head) >= length) {
memmove(m_data, m_data + m_read_head, m_peek_head - m_read_head);
m_peek_head -= m_read_head;
m_read_head = 0;
return;
}
}
VERIFY(m_capacity < NumericLimits<size_t>::max() / 3);
size_t new_capacity = max(m_capacity * 3 / 2, m_capacity + length);
u8* new_data = (u8*)kmalloc(new_capacity);
memcpy(new_data, m_data + m_read_head, m_peek_head - m_read_head);
kfree_sized(m_data, m_capacity);
m_data = new_data;
m_capacity = new_capacity;
m_peek_head -= m_read_head;
m_read_head = 0;
}
size_t m_read_head { 0 };
size_t m_peek_head { 0 };
size_t m_capacity { 0 };
u8* m_data { nullptr };
};
}
#ifdef USING_AK_GLOBALLY
using AK::AsyncStreamBuffer;
#endif