diff --git a/common/memory-stream/memory-stream.cpp b/common/memory-stream/memory-stream.cpp index aee8d395..704d53d8 100644 --- a/common/memory-stream/memory-stream.cpp +++ b/common/memory-stream/memory-stream.cpp @@ -198,6 +198,81 @@ class FaultStream : public IStream }; +using namespace photon::net; +class StringSocketStreamImpl : public StringSocketStream { +public: + virtual ssize_t recv(void *buf, size_t count, int flags = 0) override { + size_t n = rand() % 8 + 1; + if (n > _inv.size()) n = _inv.size(); + if (n > count) n = count; + memcpy(buf, _inv.data(), n); + _inv = _inv.substr(n); + return n; + } + virtual ssize_t recv(const struct iovec *iov, int iovcnt, int flags = 0) override { + while(!iov->iov_base || !iov->iov_len) + if (iovcnt) { ++iov, --iovcnt; } + else return -1; + return recv(iov->iov_base, iov->iov_len); + } + virtual ssize_t send(const void *buf, size_t count, int flags = 0) override { + size_t n = rand() % 8 + 1; + if (n > count) n = count; + auto p = (const char*)buf; + _out.append(p, p + n); + return n; + } + virtual ssize_t send(const struct iovec *iov, int iovcnt, int flags = 0) override { + while(!iov->iov_base || !iov->iov_len) + if (iovcnt) { ++iov, --iovcnt; } + else return -1; + return send(iov->iov_base, iov->iov_len); + } + virtual ssize_t sendfile(int in_fd, off_t offset, size_t count) override { return 0; } + virtual int close() override { return 0; } + virtual ssize_t read(void *buf, size_t count) override { + if (count > _inv.size()) count = _inv.size(); + memcpy(buf, _inv.data(), count); + return count; + } + virtual ssize_t readv(const struct iovec *iov, int iovcnt) override { + ssize_t s = 0; + for (int i = 0; i < iovcnt; ++i) { + ssize_t ret = read(iov[i].iov_base, iov[i].iov_len); + if (ret < 0) return ret; + s += ret; + if (ret < (ssize_t)iov[i].iov_len) break; + } + return s; + } + virtual ssize_t write(const void *buf, size_t count) override { + auto p = (const char*)buf; + _out.append(p, p + count); + return count; + } + virtual ssize_t writev(const struct iovec *iov, int iovcnt) override { + ssize_t s = 0; + for (int i = 0; i < iovcnt; ++i) { + ssize_t ret = write(iov[i].iov_base, iov[i].iov_len); + if (ret < 0) return ret; + s += ret; + if (ret < (ssize_t)iov[i].iov_len) break; + } + return s; + } + virtual Object* get_underlay_object(uint64_t recursion = 0) override { return 0; } + virtual int setsockopt(int level, int option_name, const void* option_value, socklen_t option_len) override { return 0; } + virtual int getsockopt(int level, int option_name, void* option_value, socklen_t* option_len) override { return 0; } + virtual int getsockname(EndPoint& addr) override { return 0; } + virtual int getpeername(EndPoint& addr) override { return 0; } + virtual int getsockname(char* path, size_t count) override { return 0; } + virtual int getpeername(char* path, size_t count) override { return 0; } +}; + IStream* new_fault_stream(IStream* stream, int flag, bool ownership) { return new FaultStream(stream, flag, ownership); } + +StringSocketStream* new_string_socket_stream() { + return new StringSocketStreamImpl; +} diff --git a/common/memory-stream/memory-stream.h b/common/memory-stream/memory-stream.h index 27226420..a9f899ba 100644 --- a/common/memory-stream/memory-stream.h +++ b/common/memory-stream/memory-stream.h @@ -16,7 +16,10 @@ limitations under the License. #pragma once #include +#include #include +#include +#include class DuplexMemoryStream; @@ -33,6 +36,9 @@ extern "C" DuplexMemoryStream* new_duplex_memory_stream(uint32_t capacity); // default is 11(3 in 10-based integer), both read and write operation may fail extern "C" IStream* new_fault_stream(IStream* stream, int flag=3, bool ownership=false); +class StringSocketStream; +StringSocketStream* new_string_socket_stream(); + class DuplexMemoryStream { public: @@ -41,3 +47,24 @@ class DuplexMemoryStream IStream* endpoint_b; // do NOT delete it!!! virtual int close() = 0; }; + +class StringSocketStream : public photon::net::ISocketStream { +protected: + std::string _in, _out; + std::string_view _inv; +public: + void set_input(std::string_view in) { + _in = in; + _inv = {_in}; + } + void set_input(std::string_view in, bool copy) { + if (!copy) _inv = in; + else set_input(in); + } + std::string_view input() { + return _inv; + } + std::string& output() { + return _out; + } +}; diff --git a/ecosystem/redis.cpp b/ecosystem/redis.cpp index 3902f9ba..944174a9 100644 --- a/ecosystem/redis.cpp +++ b/ecosystem/redis.cpp @@ -25,7 +25,7 @@ namespace photon { using namespace net; namespace redis { -any BufferedStream::parse_response_item() { +any _RedisClient::parse_response_item() { switch (auto mark = this->get_char()) { case simple_string::mark(): return get_simple_string(); @@ -44,7 +44,7 @@ any BufferedStream::parse_response_item() { } } -void BufferedStream::__refill(size_t atleast) { +void _RedisClient::__refill(size_t atleast) { size_t room = _bufsize - _j; if (!room || room < atleast) { if (_refcnt > 0) { LOG_ERROR_RETURN(0, , "no enough buffer space"); @@ -60,7 +60,7 @@ void BufferedStream::__refill(size_t atleast) { _j += ret; } -std::string_view BufferedStream::__getline() { +std::string_view _RedisClient::__getline() { size_t pos; assert(_j >= _i); estring_view sv(ibuf() + _i, _j - _i); @@ -85,7 +85,7 @@ std::string_view BufferedStream::__getline() { return {begin, size_t(end - begin)}; } -std::string_view BufferedStream::__getstring(size_t length) { +std::string_view _RedisClient::__getstring(size_t length) { assert(_i <= _j); size_t available = _j - _i; if (available < length + 2) @@ -101,7 +101,7 @@ std::string_view BufferedStream::__getstring(size_t length) { return {begin, length}; } -void BufferedStream::flush(const void* extra_buffer, size_t size) { +void _RedisClient::flush(const void* extra_buffer, size_t size) { iovec iov[2]; iov[0] = {obuf(), _o}; int iovcnt = 1; diff --git a/ecosystem/redis.h b/ecosystem/redis.h index a71d42c0..d1e6b29c 100644 --- a/ecosystem/redis.h +++ b/ecosystem/redis.h @@ -30,29 +30,29 @@ namespace redis { #endif -class BufferedStream; +class _RedisClient; class refstring : public std::string_view { - BufferedStream* _bs = nullptr; + _RedisClient* _rc = nullptr; void add_ref(); void del_ref(); public: using std::string_view::string_view; refstring(std::string_view sv) : std::string_view(sv) { } - refstring(BufferedStream* bs, std::string_view sv) : - std::string_view(sv), _bs(bs) { add_ref(); } + refstring(_RedisClient* bs, std::string_view sv) : + std::string_view(sv), _rc(bs) { add_ref(); } refstring(const refstring& rhs) : - std::string_view(rhs), _bs(rhs._bs) { add_ref(); } + std::string_view(rhs), _rc(rhs._rc) { add_ref(); } refstring& operator = (const refstring& rhs) { if (this == &rhs) return *this; *(std::string_view*)this = rhs; del_ref(); - _bs = rhs._bs; + _rc = rhs._rc; add_ref(); return *this; } void release() { del_ref(); - _bs = nullptr; + _rc = nullptr; (std::string_view&) *this = {}; } ~refstring() { del_ref(); } @@ -169,7 +169,7 @@ using net::ISocketStream; #define CRLF "\r\n" #define BSMARK "$" -class BufferedStream { +class _RedisClient { protected: ISocketStream* _s = nullptr; uint32_t _i = 0, _j = 0, _o = 0, _bufsize = 0, _refcnt = 0; @@ -196,7 +196,7 @@ class BufferedStream { size_t __MAX_SIZE(_array_header x) { return 32; } size_t __MAX_SIZE(_char x) { return 1; } - explicit BufferedStream(ISocketStream* s, uint32_t bufsize) : + explicit _RedisClient(ISocketStream* s, uint32_t bufsize) : _s(s), _bufsize(bufsize) { } public: @@ -205,7 +205,7 @@ class BufferedStream { return (_o + threshold < _bufsize) ? false : (flush(ebuf, size), true); } - BufferedStream& put(std::string_view x) { + _RedisClient& put(std::string_view x) { assert(_o + __MAX_SIZE(x) < _bufsize); memcpy(_o + obuf(), x.data(), x.size()); _o += x.size(); @@ -220,7 +220,7 @@ class BufferedStream { _o += ret; return {buf, (size_t)ret}; } - BufferedStream& put(_strint x) { + _RedisClient& put(_strint x) { assert(_o + __MAX_SIZE(x) < _bufsize); static_assert(sizeof(x) == sizeof(long long), "..."); auto s = _snprintf("$00\r\n%ld\r\n", (long)x._x); @@ -230,44 +230,44 @@ class BufferedStream { (char&)s[2] = '0' + n % 10; return *this; } - BufferedStream& put(int64_t x) { + _RedisClient& put(int64_t x) { assert(_o + __MAX_SIZE(x) < _bufsize); _snprintf("%ld", (long)x); return *this; } - BufferedStream& put(_char x) { + _RedisClient& put(_char x) { assert(_o + __MAX_SIZE(x) < _bufsize); obuf()[_o++] = x._x; return *this; } - BufferedStream& put() { return *this; } + _RedisClient& put() { return *this; } template - BufferedStream& put(const Ta& xa, const Tb& xb, const Ts&...xs) { + _RedisClient& put(const Ta& xa, const Tb& xb, const Ts&...xs) { return put(xa), put(xb, xs...); } - BufferedStream& operator << (int64_t x) { + _RedisClient& operator << (int64_t x) { // flush_if_low_space(__MAX_SIZE(x)); return put(x); } - BufferedStream& operator << (_char x) { + _RedisClient& operator << (_char x) { // flush_if_low_space(__MAX_SIZE(x)); return put(x); } - BufferedStream& operator << (const _strint& x) { + _RedisClient& operator << (const _strint& x) { // flush_if_low_space(__MAX_SIZE(x)); return put(x); } template - BufferedStream& write_item(const Ts&...xs) { + _RedisClient& write_item(const Ts&...xs) { auto size = _sum(__MAX_SIZE(xs)...); flush_if_low_space(size); return put(xs...); } - BufferedStream& operator << (std::string_view x) { + _RedisClient& operator << (std::string_view x) { return x.empty() ? write_item(BSMARK "-1" CRLF) : write_item(_char{BSMARK[0]}, (int64_t)x.size(), CRLF, x, CRLF); } - BufferedStream& operator << (_array_header x) { + _RedisClient& operator << (_array_header x) { return write_item(_char{array<>::mark()}, x.n, CRLF); } @@ -696,16 +696,18 @@ class BufferedStream { }; -inline void refstring::add_ref() { if (_bs) _bs->_refcnt++; } -inline void refstring::del_ref() { if (_bs) _bs->_refcnt--; } +inline void refstring::add_ref() { if (_rc) _rc->_refcnt++; } +inline void refstring::del_ref() { if (_rc) _rc->_refcnt--; } template -class _BufferedStream : public BufferedStream { +class __RedisClient : public _RedisClient { char _buf[BUF_SIZE * 2]; public: - _BufferedStream(ISocketStream* s) : BufferedStream(s, BUF_SIZE) { } + __RedisClient(ISocketStream* s) : _RedisClient(s, BUF_SIZE) { } }; +using RedisClient = __RedisClient<16*1024UL>; + #pragma GCC diagnostic pop diff --git a/ecosystem/test/test_redis.cpp b/ecosystem/test/test_redis.cpp index d0f9c27f..c16d3f25 100644 --- a/ecosystem/test/test_redis.cpp +++ b/ecosystem/test/test_redis.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include "../../test/ci-tools.h" @@ -32,105 +33,30 @@ using namespace photon::net; using namespace photon::redis; using namespace std; -class FakeStream : public ISocketStream { -public: - estring_view in; - // estring_view in = "48293\r\n"; - // estring_view out_truth = "*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n"; - std::string output; - virtual ssize_t recv(void *buf, size_t count, int flags = 0) override { - size_t n = rand() % 8 + 1; - if (n > in.size()) n = in.size(); - if (n > count) n = count; - memcpy(buf, in.data(), n); - in = in.substr(n); - return n; - } - virtual ssize_t recv(const struct iovec *iov, int iovcnt, int flags = 0) override { - while(!iov->iov_base || !iov->iov_len) - if (iovcnt) { ++iov, --iovcnt; } - else return -1; - return recv(iov->iov_base, iov->iov_len); - } - virtual ssize_t send(const void *buf, size_t count, int flags = 0) override { - size_t n = rand() % 8 + 1; - if (n > count) n = count; - auto p = (const char*)buf; - output.append(p, p + n); - return n; - } - virtual ssize_t send(const struct iovec *iov, int iovcnt, int flags = 0) override { - while(!iov->iov_base || !iov->iov_len) - if (iovcnt) { ++iov, --iovcnt; } - else return -1; - return send(iov->iov_base, iov->iov_len); - } - virtual ssize_t sendfile(int in_fd, off_t offset, size_t count) override { return 0; } - virtual int close() override { return 0; } - virtual ssize_t read(void *buf, size_t count) override { - if (count > in.size()) count = in.size(); - memcpy(buf, in.data(), count); - return count; - } - virtual ssize_t readv(const struct iovec *iov, int iovcnt) override { - ssize_t s = 0; - for (int i = 0; i < iovcnt; ++i) { - ssize_t ret = read(iov[i].iov_base, iov[i].iov_len); - if (ret < 0) return ret; - s += ret; - if (ret < (ssize_t)iov[i].iov_len) break; - } - return s; - } - virtual ssize_t write(const void *buf, size_t count) override { - auto p = (const char*)buf; - output.append(p, p + count); - return count; - } - virtual ssize_t writev(const struct iovec *iov, int iovcnt) override { - ssize_t s = 0; - for (int i = 0; i < iovcnt; ++i) { - ssize_t ret = write(iov[i].iov_base, iov[i].iov_len); - if (ret < 0) return ret; - s += ret; - if (ret < (ssize_t)iov[i].iov_len) break; - } - return s; - } - virtual Object* get_underlay_object(uint64_t recursion = 0) override { return 0; } - virtual int setsockopt(int level, int option_name, const void* option_value, socklen_t option_len) override { return 0; } - virtual int getsockopt(int level, int option_name, void* option_value, socklen_t* option_len) override { return 0; } - virtual int getsockname(EndPoint& addr) override { return 0; } - virtual int getpeername(EndPoint& addr) override { return 0; } - virtual int getsockname(char* path, size_t count) override { return 0; } - virtual int getpeername(char* path, size_t count) override { return 0; } -}; - - #define SSTR(s) "+" #s CRLF #define BSTR(n, s) "$" #n CRLF #s CRLF #define INTEGER(n) ":" #n CRLF #define ARRAY_HEADER(n) "*" #n CRLF -struct __BS : public _BufferedStream<> { +struct __RC : public RedisClient { public: - using _BufferedStream::_strint; + using RedisClient::_strint; }; TEST(redis, serialization) { - FakeStream s; - _BufferedStream<> bs(&s); - bs << "asldkfjasfkd" - << __BS::_strint{234} + auto s = new_string_socket_stream(); + DEFER(delete s); + RedisClient rc(s); + rc << "asldkfjasfkd" + << __RC::_strint{234} << "this-is-another-string" - << __BS::_strint{-1234234} + << __RC::_strint{-1234234} ; - - bs.flush(); - puts(s.output.c_str()); + rc.flush(); + puts(s->output().c_str()); const static char RESP[] = BSTR(12,asldkfjasfkd) BSTR(03,234) BSTR(22,this-is-another-string) BSTR(08,-1234234); - EXPECT_EQ(s.output, RESP); + EXPECT_EQ(s->output(), RESP); } void print_resp(const std::string_view s) { @@ -143,60 +69,65 @@ void print_resp(const std::string_view s) { } TEST(redis, deserialization) { - FakeStream s; - s.in = SSTR(asldkfjasfkd) INTEGER(234) BSTR(21,this-is-a-bulk_string) + auto s = new_string_socket_stream(); + DEFER(delete s); + auto RESP = SSTR(asldkfjasfkd) INTEGER(234) BSTR(21,this-is-a-bulk_string) ARRAY_HEADER(3) SSTR(asdf) INTEGER(75) BSTR(3,jkl) INTEGER(-1234234); - print_resp(s.in); - _BufferedStream<> bs(&s); - auto a = bs.parse_response_item(); + s->set_input(RESP, false); + print_resp(s->input()); + RedisClient rc(s); + auto a = rc.parse_response_item(); EXPECT_EQ(a.mark, simple_string::mark()); EXPECT_EQ(a.get(), "asldkfjasfkd"); - auto b = bs.parse_response_item(); + auto b = rc.parse_response_item(); EXPECT_EQ(b.mark, integer::mark()); EXPECT_EQ(b.get().val, 234); - auto c = bs.parse_response_item(); + auto c = rc.parse_response_item(); EXPECT_EQ(c.mark, bulk_string::mark()); EXPECT_EQ(c.get(), "this-is-a-bulk_string"); - auto d = bs.parse_response_item(); + auto d = rc.parse_response_item(); EXPECT_EQ(d.mark, array_header::mark()); EXPECT_EQ(d.get().val, 3); - auto e = bs.parse_response_item(); + auto e = rc.parse_response_item(); EXPECT_EQ(e.mark, simple_string::mark()); EXPECT_EQ(e.get(), "asdf"); - auto f = bs.parse_response_item(); + auto f = rc.parse_response_item(); EXPECT_EQ(f.mark, integer::mark()); EXPECT_EQ(f.get().val, 75); - auto g = bs.parse_response_item(); + auto g = rc.parse_response_item(); EXPECT_EQ(g.mark, bulk_string::mark()); EXPECT_EQ(g.get(), "jkl"); - auto h = bs.parse_response_item(); + auto h = rc.parse_response_item(); EXPECT_EQ(h.mark, integer::mark()); EXPECT_EQ(h.get().val, -1234234); } __attribute__((used)) -void asdfjkl(_BufferedStream<>& bs) { +void asdfjkl(RedisClient& bs) { bs.BLMOVE("src", "dest", "LEFT", "RIGHT", "234"); } + TEST(redis, cmd_serialization) { - FakeStream s; - _BufferedStream<> bs(&s); + auto s = new_string_socket_stream(); + DEFER(delete s); + RedisClient rc(s); +#define ERRMSG "ERR unknown command 'asdf'" #define TEST_CMD(cmd, truth) { \ - s.in = "-ERR unknown command 'asdf'\r\n"; \ + s->set_input("-" ERRMSG CRLF, false); \ auto r = cmd; \ - EXPECT_EQ(s.output, truth); \ - print_resp(s.output); \ + EXPECT_EQ(s->output(), truth); \ + print_resp(s->output()); \ EXPECT_TRUE(r.is_type()); \ - EXPECT_EQ(r.get(), "ERR unknown command 'asdf'"); \ - s.output.clear(); \ + EXPECT_EQ(r.get(), ERRMSG); \ + s->output().clear(); \ } #define AKey BSTR(4, akey) #define Key1 BSTR(4, key1) @@ -216,34 +147,34 @@ TEST(redis, cmd_serialization) { #define V5 BSTR(2, v5) #define nFIELDS(n) BSTR(6,FIELDS) BSTR(01,n) - TEST_CMD(bs.execute("SDIFF", "asdf", "jkl", "hahaha"), + TEST_CMD(rc.execute("SDIFF", "asdf", "jkl", "hahaha"), N(4) BSTR(5,SDIFF) BSTR(4,asdf) BSTR(3,jkl) BSTR(6,hahaha)); // DEFINE_COMMAND1 (SCARD, key); - TEST_CMD(bs.SCARD("akey"), N(2) BSTR(5,SCARD) AKey); + TEST_CMD(rc.SCARD("akey"), N(2) BSTR(5,SCARD) AKey); // DEFINE_COMMAND3 (SMOVE, source, destination, member); - TEST_CMD(bs.SMOVE("source", "dest", "member"), + TEST_CMD(rc.SMOVE("source", "dest", "member"), N(4) BSTR(5,SMOVE) BSTR(6,source) BSTR(4,dest) BSTR(6,member)); // DEFINE_COMMAND3s(HMSET, key, field, value); - TEST_CMD(bs.HMSET("akey", "f1", "v1", "f2", "v2", "f3", "v3"), + TEST_CMD(rc.HMSET("akey", "f1", "v1", "f2", "v2", "f3", "v3"), N(8) BSTR(5,HMSET) AKey F1 V1 F2 V2 F3 V3); // DEFINE_COMMAND4 (LINSERT, key, BEFORE_or_AFTER, pivot, element); - TEST_CMD(bs.LINSERT("akey", "BEFORE", "pivot", "element"), + TEST_CMD(rc.LINSERT("akey", "BEFORE", "pivot", "element"), N(5) BSTR(7,LINSERT) AKey BSTR(6,BEFORE) BSTR(5,pivot) BSTR(7,element)); // DEFINE_COMMAND2fs(HTTL, key); - TEST_CMD(bs.HTTL("akey", "f1", "f2", "f3", "f4"), + TEST_CMD(rc.HTTL("akey", "f1", "f2", "f3", "f4"), N(8) BSTR(4,HTTL) AKey nFIELDS(4) F1 F2 F3 F4); // DEFINE_COMMAND2sn(ZUNIONSTORE, destination, key); - TEST_CMD(bs.ZUNIONSTORE("destination", "key1", "key2", "key3"), + TEST_CMD(rc.ZUNIONSTORE("destination", "key1", "key2", "key3"), N(6) BSTR(11,ZUNIONSTORE) BSTR(11,destination) BSTR(01,3) Key1 Key2 Key3); // DEFINE_COMMAND2m(BITPOS, key, bit, opt_start_end_BYTE_BIT); - TEST_CMD(bs.BITPOS("akey", "bit", "start", "end", "BIT"), + TEST_CMD(rc.BITPOS("akey", "bit", "start", "end", "BIT"), N(6) BSTR(6,BITPOS) AKey BSTR(3,bit) BSTR(5,start) BSTR(3,end) BSTR(3,BIT)); } @@ -260,14 +191,14 @@ TEST(redis, cmd) { #endif } DEFER(delete s); - _BufferedStream<> bs(s); + RedisClient rc(s); const char key[] = "zvxbhm"; - bs.DEL(key); - DEFER(bs.DEL(key)); - auto r = bs.HSET(key, "hahaha", "qwer", "key2", "value2"); + rc.DEL(key); + DEFER(rc.DEL(key)); + auto r = rc.HSET(key, "hahaha", "qwer", "key2", "value2"); EXPECT_TRUE(r.is_type()); EXPECT_EQ(r.get().val, 2); - r = bs.LLEN(key); + r = rc.LLEN(key); EXPECT_TRUE(r.is_failed()); LOG_DEBUG(r.get_error_message()); }