Skip to content

Commit

Permalink
redis protocol RESP2 serialization and deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
lihuiba committed Oct 13, 2024
1 parent 35fe23a commit 3e5b1d3
Show file tree
Hide file tree
Showing 4 changed files with 509 additions and 82 deletions.
158 changes: 84 additions & 74 deletions ecosystem/redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,93 +15,103 @@ limitations under the License.
*/

#include <photon/ecosystem/redis.h>
#include <inttypes.h>
#include <memory>
// #include <inttypes.h>
// #include <memory>
#include <photon/net/socket.h>
#include <photon/common/alog.h>
#include <cpp_redis/network/tcp_client_iface.hpp>
// #include <photon/thread/thread11.h>

namespace photon {
using namespace net;
namespace integration {
namespace redis {

static thread_local std::unique_ptr<ISocketClient>
_client { new_tcp_socket_client() };

class tcp_client_redis : public cpp_redis::network::tcp_client_iface {
public:
ISocketStream* _s = nullptr;
~tcp_client_redis() {
delete _s;
}
void connect(const std::string& addr,
uint32_t port, uint32_t timeout_msecs) override {
if (_s)
LOG_ERROR_RETURN(0, , "client is already connected");
EndPoint ep(addr.c_str(), port);
_s = _client->connect(ep);
if (_s == nullptr)
LOG_ERRNO_RETURN(0, , "failed to connect to ", ep);
}
bool is_connected(void) const override {
return _s;
any BufferedStream::parse_item() {
switch (auto mark = this->get_char()) {
case simple_string::mark():
return get_simple_string();
case error_message::mark():
return get_error_message();
case integer::mark():
return get_integer();
case bulk_string::mark():
return get_bulk_string();
case array<>::mark():
return {array<>(), get_integer()};
default:
return {};
}
void disconnect(bool /*wait_for_removal*/ = false) override {
delete _s;
_s = nullptr;
_dch();
}
disconnection_handler_t _dch;
void set_on_disconnection_handler(const disconnection_handler_t& h) override {
_dch = h;
}

void BufferedStream::__refill(size_t atleast) {
size_t room = _bufsize - _j;
if (!room || room < atleast) { if (_refcnt > 0) {
LOG_ERROR_RETURN(0, , "no enough buffer space");
} else {
size_t available = _j - _i;
memmove(ibuf(), ibuf() + _i, available);
_i = 0; _j = available;
room = _bufsize - _j;
} }
ssize_t ret = _s->recv_at_least(ibuf() + _j, room, atleast);
if (ret < (ssize_t)atleast)
LOG_ERRNO_RETURN(0,, "failed to recv at least ` bytes", atleast);
_j += ret;
}

std::string_view BufferedStream::__getline() {
size_t pos;
assert(_j >= _i);
estring_view sv(ibuf() + _i, _j - _i);
while ((pos = sv.find('\n')) == sv.npos) {
size_t j = _j;
__refill(0);
assert(_j > j);
sv = {ibuf() + j, (uint32_t)(_j - j)};
}
void async_read(read_request& request) override {
ssize_t cnt;
read_result r;
if (!_s) {
LOG_ERROR("not connected");
goto fail;
}
if (!request.size) { fail:
r.success = false;
goto out;
}

r.buffer.resize(request.size);
cnt = _s->read(&r.buffer[0], r.buffer.size());
if (cnt <= 0) {
if (cnt == 0) errno = ENETRESET;
LOG_ERROR("failed to read socket: ", ERRNO());
r.buffer.clear();
disconnect();
} else { r.success = true; }
out:
request.async_read_callback(r);
assert(sv.begin() >= ibuf());
auto begin = ibuf() + _i; // not sv.begin()!
auto end = &sv[pos];
assert(*end == '\n');
_i = end - ibuf() + 1;
assert(_i <= _j);
if (likely(end[-1] == '\r')) {
*(uint16_t*)--end = 0;
} else {
*(char*)end = 0;
}
void async_write(write_request& request) override {
ssize_t cnt;
write_result r{false, 0};
if (!_s) {
LOG_ERROR("not connected");
goto out;
}
if (!request.buffer.size()) {
goto out;
}
return {begin, size_t(end - begin)};
}

cnt = _s->write(&request.buffer[0], request.buffer.size());
if (cnt <= 0) {
if (cnt == 0) errno = ENETRESET;
LOG_ERROR("failed to write socket: ", ERRNO());
disconnect();
} else { r = {true, (size_t)cnt}; }
out:
request.async_write_callback(r);
std::string_view BufferedStream::__getstring(size_t length) {
assert(_i <= _j);
size_t available = _j - _i;
if (available < length + 2)
__refill(length + 2 - available);
auto begin = ibuf() + _i;
_i += length;
assert(_i + 2 <= _j);
auto ptr = ibuf() + _i;
if (likely(ptr[0] == '\r')) {
ptr[0] = '\0';
_i += 1 + (ptr[1]=='\n');
}
};
return {begin, length};
}

cpp_redis::network::tcp_client_iface* new_tcp_client_redis() {
return new tcp_client_redis;
void BufferedStream::flush(const void* extra_buffer, size_t size) {
iovec iov[2];
iov[0] = {obuf(), _o};
int iovcnt = 1;
ssize_t sum = _o;
if (extra_buffer && size) {
iov[iovcnt++] = {(void*)extra_buffer, size};
sum += size;
}
ssize_t ret = _s->writev_mutable(iov, iovcnt);
if (ret < sum)
LOG_ERRNO_RETURN(0,, "failed to write to socket stream");
_o = 0;
}

}
Expand Down
Loading

0 comments on commit 3e5b1d3

Please sign in to comment.