Skip to content

Commit

Permalink
string_socket_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
lihuiba committed Oct 17, 2024
1 parent 6b396c7 commit 1dc374c
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 149 deletions.
75 changes: 75 additions & 0 deletions common/memory-stream/memory-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,81 @@ class FaultStream : public IStream

};

using namespace photon::net;
class StringSocketStreamImpl : public StringSocketStream {
public:
virtual ssize_t recv(void *buf, size_t count, int flags = 0) override {
size_t n = rand() % 8 + 1;
if (n > _inv.size()) n = _inv.size();
if (n > count) n = count;
memcpy(buf, _inv.data(), n);
_inv = _inv.substr(n);
return n;
}
virtual ssize_t recv(const struct iovec *iov, int iovcnt, int flags = 0) override {
while(!iov->iov_base || !iov->iov_len)
if (iovcnt) { ++iov, --iovcnt; }
else return -1;
return recv(iov->iov_base, iov->iov_len);
}
virtual ssize_t send(const void *buf, size_t count, int flags = 0) override {
size_t n = rand() % 8 + 1;
if (n > count) n = count;
auto p = (const char*)buf;
_out.append(p, p + n);
return n;
}
virtual ssize_t send(const struct iovec *iov, int iovcnt, int flags = 0) override {
while(!iov->iov_base || !iov->iov_len)
if (iovcnt) { ++iov, --iovcnt; }
else return -1;
return send(iov->iov_base, iov->iov_len);
}
virtual ssize_t sendfile(int in_fd, off_t offset, size_t count) override { return 0; }
virtual int close() override { return 0; }
virtual ssize_t read(void *buf, size_t count) override {
if (count > _inv.size()) count = _inv.size();
memcpy(buf, _inv.data(), count);
return count;
}
virtual ssize_t readv(const struct iovec *iov, int iovcnt) override {
ssize_t s = 0;
for (int i = 0; i < iovcnt; ++i) {
ssize_t ret = read(iov[i].iov_base, iov[i].iov_len);
if (ret < 0) return ret;
s += ret;
if (ret < (ssize_t)iov[i].iov_len) break;
}
return s;
}
virtual ssize_t write(const void *buf, size_t count) override {
auto p = (const char*)buf;
_out.append(p, p + count);
return count;
}
virtual ssize_t writev(const struct iovec *iov, int iovcnt) override {
ssize_t s = 0;
for (int i = 0; i < iovcnt; ++i) {
ssize_t ret = write(iov[i].iov_base, iov[i].iov_len);
if (ret < 0) return ret;
s += ret;
if (ret < (ssize_t)iov[i].iov_len) break;
}
return s;
}
virtual Object* get_underlay_object(uint64_t recursion = 0) override { return 0; }
virtual int setsockopt(int level, int option_name, const void* option_value, socklen_t option_len) override { return 0; }
virtual int getsockopt(int level, int option_name, void* option_value, socklen_t* option_len) override { return 0; }
virtual int getsockname(EndPoint& addr) override { return 0; }
virtual int getpeername(EndPoint& addr) override { return 0; }
virtual int getsockname(char* path, size_t count) override { return 0; }
virtual int getpeername(char* path, size_t count) override { return 0; }
};

IStream* new_fault_stream(IStream* stream, int flag, bool ownership) {
return new FaultStream(stream, flag, ownership);
}

StringSocketStream* new_string_socket_stream() {
return new StringSocketStreamImpl;
}
27 changes: 27 additions & 0 deletions common/memory-stream/memory-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ limitations under the License.

#pragma once
#include <cinttypes>
#include <string>
#include <photon/common/stream.h>
#include <photon/net/socket.h>
#include <photon/common/string_view.h>


class DuplexMemoryStream;
Expand All @@ -33,6 +36,9 @@ extern "C" DuplexMemoryStream* new_duplex_memory_stream(uint32_t capacity);
// default is 11(3 in 10-based integer), both read and write operation may fail
extern "C" IStream* new_fault_stream(IStream* stream, int flag=3, bool ownership=false);

class StringSocketStream;
StringSocketStream* new_string_socket_stream();

class DuplexMemoryStream
{
public:
Expand All @@ -41,3 +47,24 @@ class DuplexMemoryStream
IStream* endpoint_b; // do NOT delete it!!!
virtual int close() = 0;
};

class StringSocketStream : public photon::net::ISocketStream {
protected:
std::string _in, _out;
std::string_view _inv;
public:
void set_input(std::string_view in) {
_in = in;
_inv = {_in};
}
void set_input(std::string_view in, bool copy) {
if (!copy) _inv = in;
else set_input(in);
}
std::string_view input() {
return _inv;
}
std::string& output() {
return _out;
}
};
10 changes: 5 additions & 5 deletions ecosystem/redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace photon {
using namespace net;
namespace redis {

any BufferedStream::parse_response_item() {
any _RedisClient::parse_response_item() {
switch (auto mark = this->get_char()) {
case simple_string::mark():
return get_simple_string();
Expand All @@ -44,7 +44,7 @@ any BufferedStream::parse_response_item() {
}
}

void BufferedStream::__refill(size_t atleast) {
void _RedisClient::__refill(size_t atleast) {
size_t room = _bufsize - _j;
if (!room || room < atleast) { if (_refcnt > 0) {
LOG_ERROR_RETURN(0, , "no enough buffer space");
Expand All @@ -60,7 +60,7 @@ void BufferedStream::__refill(size_t atleast) {
_j += ret;
}

std::string_view BufferedStream::__getline() {
std::string_view _RedisClient::__getline() {
size_t pos;
assert(_j >= _i);
estring_view sv(ibuf() + _i, _j - _i);
Expand All @@ -85,7 +85,7 @@ std::string_view BufferedStream::__getline() {
return {begin, size_t(end - begin)};
}

std::string_view BufferedStream::__getstring(size_t length) {
std::string_view _RedisClient::__getstring(size_t length) {
assert(_i <= _j);
size_t available = _j - _i;
if (available < length + 2)
Expand All @@ -101,7 +101,7 @@ std::string_view BufferedStream::__getstring(size_t length) {
return {begin, length};
}

void BufferedStream::flush(const void* extra_buffer, size_t size) {
void _RedisClient::flush(const void* extra_buffer, size_t size) {
iovec iov[2];
iov[0] = {obuf(), _o};
int iovcnt = 1;
Expand Down
52 changes: 27 additions & 25 deletions ecosystem/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,29 @@ namespace redis {
#endif


class BufferedStream;
class _RedisClient;
class refstring : public std::string_view {
BufferedStream* _bs = nullptr;
_RedisClient* _rc = nullptr;
void add_ref();
void del_ref();
public:
using std::string_view::string_view;
refstring(std::string_view sv) : std::string_view(sv) { }
refstring(BufferedStream* bs, std::string_view sv) :
std::string_view(sv), _bs(bs) { add_ref(); }
refstring(_RedisClient* bs, std::string_view sv) :
std::string_view(sv), _rc(bs) { add_ref(); }
refstring(const refstring& rhs) :
std::string_view(rhs), _bs(rhs._bs) { add_ref(); }
std::string_view(rhs), _rc(rhs._rc) { add_ref(); }
refstring& operator = (const refstring& rhs) {
if (this == &rhs) return *this;
*(std::string_view*)this = rhs;
del_ref();
_bs = rhs._bs;
_rc = rhs._rc;
add_ref();
return *this;
}
void release() {
del_ref();
_bs = nullptr;
_rc = nullptr;
(std::string_view&) *this = {};
}
~refstring() { del_ref(); }
Expand Down Expand Up @@ -169,7 +169,7 @@ using net::ISocketStream;
#define CRLF "\r\n"
#define BSMARK "$"

class BufferedStream {
class _RedisClient {
protected:
ISocketStream* _s = nullptr;
uint32_t _i = 0, _j = 0, _o = 0, _bufsize = 0, _refcnt = 0;
Expand All @@ -196,7 +196,7 @@ class BufferedStream {
size_t __MAX_SIZE(_array_header x) { return 32; }
size_t __MAX_SIZE(_char x) { return 1; }

explicit BufferedStream(ISocketStream* s, uint32_t bufsize) :
explicit _RedisClient(ISocketStream* s, uint32_t bufsize) :
_s(s), _bufsize(bufsize) { }

public:
Expand All @@ -205,7 +205,7 @@ class BufferedStream {
return (_o + threshold < _bufsize) ? false :
(flush(ebuf, size), true);
}
BufferedStream& put(std::string_view x) {
_RedisClient& put(std::string_view x) {
assert(_o + __MAX_SIZE(x) < _bufsize);
memcpy(_o + obuf(), x.data(), x.size());
_o += x.size();
Expand All @@ -220,7 +220,7 @@ class BufferedStream {
_o += ret;
return {buf, (size_t)ret};
}
BufferedStream& put(_strint x) {
_RedisClient& put(_strint x) {
assert(_o + __MAX_SIZE(x) < _bufsize);
static_assert(sizeof(x) == sizeof(long long), "...");
auto s = _snprintf("$00\r\n%ld\r\n", (long)x._x);
Expand All @@ -230,44 +230,44 @@ class BufferedStream {
(char&)s[2] = '0' + n % 10;
return *this;
}
BufferedStream& put(int64_t x) {
_RedisClient& put(int64_t x) {
assert(_o + __MAX_SIZE(x) < _bufsize);
_snprintf("%ld", (long)x);
return *this;
}
BufferedStream& put(_char x) {
_RedisClient& put(_char x) {
assert(_o + __MAX_SIZE(x) < _bufsize);
obuf()[_o++] = x._x;
return *this;
}
BufferedStream& put() { return *this; }
_RedisClient& put() { return *this; }
template<typename Ta, typename Tb, typename...Ts>
BufferedStream& put(const Ta& xa, const Tb& xb, const Ts&...xs) {
_RedisClient& put(const Ta& xa, const Tb& xb, const Ts&...xs) {
return put(xa), put(xb, xs...);
}
BufferedStream& operator << (int64_t x) {
_RedisClient& operator << (int64_t x) {
// flush_if_low_space(__MAX_SIZE(x));
return put(x);
}
BufferedStream& operator << (_char x) {
_RedisClient& operator << (_char x) {
// flush_if_low_space(__MAX_SIZE(x));
return put(x);
}
BufferedStream& operator << (const _strint& x) {
_RedisClient& operator << (const _strint& x) {
// flush_if_low_space(__MAX_SIZE(x));
return put(x);
}
template<typename...Ts>
BufferedStream& write_item(const Ts&...xs) {
_RedisClient& write_item(const Ts&...xs) {
auto size = _sum(__MAX_SIZE(xs)...);
flush_if_low_space(size);
return put(xs...);
}
BufferedStream& operator << (std::string_view x) {
_RedisClient& operator << (std::string_view x) {
return x.empty() ? write_item(BSMARK "-1" CRLF) :
write_item(_char{BSMARK[0]}, (int64_t)x.size(), CRLF, x, CRLF);
}
BufferedStream& operator << (_array_header x) {
_RedisClient& operator << (_array_header x) {
return write_item(_char{array<>::mark()}, x.n, CRLF);
}

Expand Down Expand Up @@ -696,16 +696,18 @@ class BufferedStream {

};

inline void refstring::add_ref() { if (_bs) _bs->_refcnt++; }
inline void refstring::del_ref() { if (_bs) _bs->_refcnt--; }
inline void refstring::add_ref() { if (_rc) _rc->_refcnt++; }
inline void refstring::del_ref() { if (_rc) _rc->_refcnt--; }

template<uint32_t BUF_SIZE = 16*1024UL>
class _BufferedStream : public BufferedStream {
class __RedisClient : public _RedisClient {
char _buf[BUF_SIZE * 2];
public:
_BufferedStream(ISocketStream* s) : BufferedStream(s, BUF_SIZE) { }
__RedisClient(ISocketStream* s) : _RedisClient(s, BUF_SIZE) { }
};

using RedisClient = __RedisClient<16*1024UL>;

#pragma GCC diagnostic pop


Expand Down
Loading

0 comments on commit 1dc374c

Please sign in to comment.