diff --git a/ecosystem/redis.cpp b/ecosystem/redis.cpp index e25ddf2f..f3c4a584 100644 --- a/ecosystem/redis.cpp +++ b/ecosystem/redis.cpp @@ -15,93 +15,103 @@ limitations under the License. */ #include -#include -#include +// #include +// #include #include #include -#include +// #include namespace photon { using namespace net; -namespace integration { +namespace redis { -static thread_local std::unique_ptr - _client { new_tcp_socket_client() }; - -class tcp_client_redis : public cpp_redis::network::tcp_client_iface { -public: - ISocketStream* _s = nullptr; - ~tcp_client_redis() { - delete _s; - } - void connect(const std::string& addr, - uint32_t port, uint32_t timeout_msecs) override { - if (_s) - LOG_ERROR_RETURN(0, , "client is already connected"); - EndPoint ep(addr.c_str(), port); - _s = _client->connect(ep); - if (_s == nullptr) - LOG_ERRNO_RETURN(0, , "failed to connect to ", ep); - } - bool is_connected(void) const override { - return _s; +any BufferedStream::parse_item() { + switch (auto mark = this->get_char()) { + case simple_string::mark(): + return get_simple_string(); + case error_message::mark(): + return get_error_message(); + case integer::mark(): + return get_integer(); + case bulk_string::mark(): + return get_bulk_string(); + case array<>::mark(): + return {array<>(), get_integer()}; + default: + return {}; } - void disconnect(bool /*wait_for_removal*/ = false) override { - delete _s; - _s = nullptr; - _dch(); - } - disconnection_handler_t _dch; - void set_on_disconnection_handler(const disconnection_handler_t& h) override { - _dch = h; +} + +void BufferedStream::__refill(size_t atleast) { + size_t room = _bufsize - _j; + if (!room || room < atleast) { if (_refcnt > 0) { + LOG_ERROR_RETURN(0, , "no enough buffer space"); + } else { + size_t available = _j - _i; + memmove(ibuf(), ibuf() + _i, available); + _i = 0; _j = available; + room = _bufsize - _j; + } } + ssize_t ret = _s->recv_at_least(ibuf() + _j, room, atleast); + if (ret < (ssize_t)atleast) + LOG_ERRNO_RETURN(0,, "failed to recv at least ` bytes", atleast); + _j += ret; +} + +std::string_view BufferedStream::__getline() { + size_t pos; + assert(_j >= _i); + estring_view sv(ibuf() + _i, _j - _i); + while ((pos = sv.find('\n')) == sv.npos) { + size_t j = _j; + __refill(0); + assert(_j > j); + sv = {ibuf() + j, (uint32_t)(_j - j)}; } - void async_read(read_request& request) override { - ssize_t cnt; - read_result r; - if (!_s) { - LOG_ERROR("not connected"); - goto fail; - } - if (!request.size) { fail: - r.success = false; - goto out; - } - r.buffer.resize(request.size); - cnt = _s->read(&r.buffer[0], r.buffer.size()); - if (cnt <= 0) { - if (cnt == 0) errno = ENETRESET; - LOG_ERROR("failed to read socket: ", ERRNO()); - r.buffer.clear(); - disconnect(); - } else { r.success = true; } -out: - request.async_read_callback(r); + assert(sv.begin() >= ibuf()); + auto begin = ibuf() + _i; // not sv.begin()! + auto end = &sv[pos]; + assert(*end == '\n'); + _i = end - ibuf() + 1; + assert(_i <= _j); + if (likely(end[-1] == '\r')) { + *(uint16_t*)--end = 0; + } else { + *(char*)end = 0; } - void async_write(write_request& request) override { - ssize_t cnt; - write_result r{false, 0}; - if (!_s) { - LOG_ERROR("not connected"); - goto out; - } - if (!request.buffer.size()) { - goto out; - } + return {begin, size_t(end - begin)}; +} - cnt = _s->write(&request.buffer[0], request.buffer.size()); - if (cnt <= 0) { - if (cnt == 0) errno = ENETRESET; - LOG_ERROR("failed to write socket: ", ERRNO()); - disconnect(); - } else { r = {true, (size_t)cnt}; } -out: - request.async_write_callback(r); +std::string_view BufferedStream::__getstring(size_t length) { + assert(_i <= _j); + size_t available = _j - _i; + if (available < length + 2) + __refill(length + 2 - available); + auto begin = ibuf() + _i; + _i += length; + assert(_i + 2 <= _j); + auto ptr = ibuf() + _i; + if (likely(ptr[0] == '\r')) { + ptr[0] = '\0'; + _i += 1 + (ptr[1]=='\n'); } -}; + return {begin, length}; +} -cpp_redis::network::tcp_client_iface* new_tcp_client_redis() { - return new tcp_client_redis; +void BufferedStream::flush(const void* extra_buffer, size_t size) { + iovec iov[2]; + iov[0] = {obuf(), _o}; + int iovcnt = 1; + ssize_t sum = _o; + if (extra_buffer && size) { + iov[iovcnt++] = {(void*)extra_buffer, size}; + sum += size; + } + ssize_t ret = _s->writev_mutable(iov, iovcnt); + if (ret < sum) + LOG_ERRNO_RETURN(0,, "failed to write to socket stream"); + _o = 0; } } diff --git a/ecosystem/redis.h b/ecosystem/redis.h index fb7748d3..c9798907 100644 --- a/ecosystem/redis.h +++ b/ecosystem/redis.h @@ -15,19 +15,280 @@ limitations under the License. */ #pragma once +#include +#include +#include +#include +#include +#include +namespace photon { +namespace redis { -namespace cpp_redis { -namespace network { +class BufferedStream; +class refstring : public std::string_view { + BufferedStream* _bs = nullptr; + void add_ref(); + void del_ref(); +public: + using std::string_view::string_view; + refstring(BufferedStream* bs, std::string_view sv) : + std::string_view(sv), _bs(bs) { add_ref(); } + refstring(const refstring& rhs) : + std::string_view(rhs), _bs(rhs._bs) { add_ref(); } + refstring& operator = (const refstring& rhs) { + if (this == &rhs) return *this; + *(std::string_view*)this = rhs; + del_ref(); + _bs = rhs._bs; + add_ref(); + return *this; + } + void release() { + del_ref(); + _bs = nullptr; + (std::string_view&) *this = {}; + } + ~refstring() { del_ref(); } +}; -class tcp_client_iface; +class simple_string : public refstring { +public: + simple_string() = default; + simple_string(refstring rs) : refstring(rs) { } + using refstring::operator=; + constexpr static char mark() { return '+'; } +}; -} +class error_message : public refstring { +public: + error_message() = default; + error_message(refstring rs) : refstring(rs) { } + using refstring::operator=; + constexpr static char mark() { return '-'; } +}; + +class bulk_string : public refstring { +public: + bulk_string() = default; + bulk_string(refstring rs) : refstring(rs) { } + using refstring::operator=; + constexpr static char mark() { return '$'; } +}; + +class integer : public refstring { +public: + int64_t val; + integer() = default; + integer(int64_t v) : val(v) { }; + integer(const refstring& rs) : + val(((estring_view&)rs).to_int64()) { } + using refstring::operator=; + constexpr static char mark() { return ':'; } +}; + +class null { }; + +template +class array : public std::tuple { +public: + array() = default; + using std::tuple::tuple; + using std::tuple::operator=; + constexpr static char mark() { return '*'; } +}; + +template +array make_array(const Ts&...xs) { + return {xs...}; } -namespace photon { -namespace net { +struct any { + constexpr static size_t MAX1 = std::max(sizeof(simple_string), sizeof(error_message)); + constexpr static size_t MAX2 = std::max(sizeof(bulk_string), sizeof(integer)); + constexpr static size_t MAX = std::max(MAX1, MAX2); + char value[MAX] = {0}, mark = 0; + + template::value>::type> + bool is_type() { return mark == T::mark(); } + + template<> + bool is_type() { return mark == integer::mark() || + mark == array<>::mark(); } + + template::value>::type> + T& get() { + assert(is_type()); + return *(T*)value; + } + + template::value>::type> + void set(const T& x) { + *(T*)value = x; + mark = T::mark(); + } + + any() = default; + template + any(const T& x) { set(x); } + any(array<>, integer n) { set(n); mark = array<>::mark(); } + + template + any& operator = (const T& x) { return set(x), *this; } +}; + +using net::ISocketStream; + +// template::value>::type> +// inline constexpr size_t __MAX_SIZE(T) { return INT32_MAX; } + +inline constexpr size_t __MAX_SIZE(std::string_view x) { return x.size(); } +inline constexpr size_t __MAX_SIZE(int64_t x) { return 32; } +inline constexpr size_t __MAX_SIZE(uint64_t x) { return 32; } +inline constexpr size_t __MAX_SIZE(char x) { return 1; } + +constexpr static char CRLF[] = "\r\n"; + +class BufferedStream { +protected: + ISocketStream* _s = nullptr; + uint32_t _i = 0, _j = 0, _o = 0, _bufsize = 0, _refcnt = 0; + char _xbuf[0]; + + friend class refstring; + + size_t _sum() const { return 0; } + template + size_t _sum(T x, const Ts&...xs) { + return x + _sum(xs...); + } + char* ibuf() { return _xbuf; } + char* obuf() { return _xbuf + _bufsize; } + std::string_view __getstring(size_t length); + std::string_view __getline(); + + explicit BufferedStream(ISocketStream* s, uint32_t bufsize) : + _s(s), _bufsize(bufsize) { } + +public: + void flush(const void* extra_buffer = 0, size_t size = 0); + bool flush_if_low_space(size_t threshold, const void* ebuf = 0, size_t size = 0) { + return (_o + threshold < _bufsize) ? false : + (flush(ebuf, size), true); + } + BufferedStream& put(std::string_view x) { + assert(_o + __MAX_SIZE(x) < _bufsize); + memcpy(_o + obuf(), x.data(), x.size()); + _o += x.size(); + return *this; + } + BufferedStream& put(int64_t x) { + assert(_o + __MAX_SIZE(x) < _bufsize); + static_assert(sizeof(x) == sizeof(long long), "..."); + _o += snprintf(_o + obuf(), _bufsize - _o, "%ld", (long)x); + return *this; + } + BufferedStream& put(uint64_t x) { + assert(_o + __MAX_SIZE(x) < _bufsize); + static_assert(sizeof(x) == sizeof(long long), "..."); + _o += snprintf(_o + obuf(), _bufsize - _o, "%lu", (unsigned long)x); + return *this; + } + BufferedStream& put(char x) { + assert(_o + __MAX_SIZE(x) < _bufsize); + obuf()[_o++] = x; + return *this; + } + BufferedStream& put() { return *this; } + template + BufferedStream& put(const Ta& xa, const Tb& xb, const Ts&...xs) { + return put(xa), put(xb, xs...); + } + // BufferedStream& operator << (std::string_view x) { + // auto f = flush_if_low_space(__MAX_SIZE(x), x.data(), x.size()); + // return f ? *this : put(x); + // } + // BufferedStream& operator << (int64_t x) { + // flush_if_low_space(__MAX_SIZE(x)); + // return put(x); + // } + template + BufferedStream& write_item(const Ts&...xs) { + auto size = _sum(__MAX_SIZE(xs)...); + flush_if_low_space(size); + return put(xs...); + } + BufferedStream& operator << (simple_string x) { + return write_item(x.mark(), x, CRLF); + } + BufferedStream& operator << (error_message x) { + return write_item(x.mark(), x, CRLF); + } + BufferedStream& operator << (integer x) { + return write_item(x.mark(), x.val, CRLF); + } + BufferedStream& operator << (bulk_string x) { + return x.empty() ? write_item(x.mark(), "-1\r\n") : + write_item(x.mark(), (int64_t)x.size(), CRLF, x, CRLF); + } + BufferedStream& operator << (null) { + return *this << bulk_string{}; + } + void write_items() { } + template + void write_items(const T& x, const Ts&...xs) { + *this << x; + write_items(xs...); + } + template + BufferedStream& operator << (const array& x) { + write_item(x.mark(), (int64_t)sizeof...(Ts), CRLF); + auto f = [this](const Ts&...xs) { write_items(xs...); }; + tuple_assistance::apply(f, (const std::tuple&)x); + return *this; + } + + char get_char() { + ensure_input_data(__MAX_SIZE('c')); + return ibuf()[_i++]; + } + refstring getline() { + return {this, __getline()}; + } + refstring getstring(size_t length) { + return {this, __getstring(length)}; + } + simple_string get_simple_string() { return getline(); } + error_message get_error_message() { return getline(); } + integer get_integer() { return getline(); } + bulk_string get_bulk_string() { + auto length = get_integer().val; + if (length >= 0) + return getstring((size_t)length); + return {}; + } + void __refill(size_t atleast); // refill input buffer + void ensure_input_data(size_t min_available) { + assert(_j >= _i); + size_t available = _j - _i; + if (available < min_available) + __refill(min_available - available); + } + any parse_item(); +}; + +inline void refstring::add_ref() { if (_bs) _bs->_refcnt++; } +inline void refstring::del_ref() { if (_bs) _bs->_refcnt--; } -cpp_redis::network::tcp_client_iface* new_tcp_client(); +template +class _BufferedStream : public BufferedStream { + char _buf[BUF_SIZE * 2]; +public: + _BufferedStream(ISocketStream* s) : BufferedStream(s, BUF_SIZE) { } +}; } } diff --git a/ecosystem/test/CMakeLists.txt b/ecosystem/test/CMakeLists.txt index 5b90fbee..bc1b2229 100644 --- a/ecosystem/test/CMakeLists.txt +++ b/ecosystem/test/CMakeLists.txt @@ -1,3 +1,3 @@ -add_executable(test-ecosystem test.cpp test_simple_dom.cpp) +add_executable(test-ecosystem test.cpp test_simple_dom.cpp test_redis.cpp) target_link_libraries(test-ecosystem PRIVATE photon_shared) add_test(NAME test-ecosystem COMMAND $) diff --git a/ecosystem/test/test_redis.cpp b/ecosystem/test/test_redis.cpp new file mode 100644 index 00000000..42235e79 --- /dev/null +++ b/ecosystem/test/test_redis.cpp @@ -0,0 +1,156 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2017 Simon Ninon +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +#include "../redis.h" +#include +#include +#include +#include +using namespace photon; +using namespace photon::net; +using namespace photon::redis; +using namespace std; + +class FakeStream : public ISocketStream { +public: + estring_view in; + // estring_view in = "48293\r\n"; + // estring_view out_truth = "*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n"; + std::string output; + virtual ssize_t recv(void *buf, size_t count, int flags = 0) override { + size_t n = rand() % 8 + 1; + if (n > in.size()) n = in.size(); + if (n > count) n = count; + memcpy(buf, in.data(), n); + in = in.substr(n); + return n; + } + virtual ssize_t recv(const struct iovec *iov, int iovcnt, int flags = 0) override { + while(!iov->iov_base || !iov->iov_len) + if (iovcnt) { ++iov, --iovcnt; } + else return -1; + return recv(iov->iov_base, iov->iov_len); + } + virtual ssize_t send(const void *buf, size_t count, int flags = 0) override { + size_t n = rand() % 8 + 1; + if (n > count) n = count; + output += std::string_view{(const char*)buf, n}; + return n; + } + virtual ssize_t send(const struct iovec *iov, int iovcnt, int flags = 0) override { + while(!iov->iov_base || !iov->iov_len) + if (iovcnt) { ++iov, --iovcnt; } + else return -1; + return send(iov->iov_base, iov->iov_len); + } + virtual ssize_t sendfile(int in_fd, off_t offset, size_t count) override { return 0; } + virtual int close() override { return 0; } + virtual ssize_t read(void *buf, size_t count) override { + if (count > in.size()) count = in.size(); + memcpy(buf, in.data(), count); + return count; + } + virtual ssize_t readv(const struct iovec *iov, int iovcnt) override { + ssize_t s = 0; + for (int i = 0; i < iovcnt; ++i) { + ssize_t ret = read(iov[i].iov_base, iov[i].iov_len); + if (ret < 0) return ret; + s += ret; + if (ret < (ssize_t)iov[i].iov_len) break; + } + return s; + } + virtual ssize_t write(const void *buf, size_t count) override { + output += std::string_view{(char*)buf, count}; + return count; + } + virtual ssize_t writev(const struct iovec *iov, int iovcnt) override { + ssize_t s = 0; + for (int i = 0; i < iovcnt; ++i) { + ssize_t ret = write(iov[i].iov_base, iov[i].iov_len); + if (ret < 0) return ret; + s += ret; + if (ret < (ssize_t)iov[i].iov_len) break; + } + return s; + } + virtual Object* get_underlay_object(uint64_t recursion = 0) override { return 0; } + virtual int setsockopt(int level, int option_name, const void* option_value, socklen_t option_len) override { return 0; } + virtual int getsockopt(int level, int option_name, void* option_value, socklen_t* option_len) override { return 0; } + virtual int getsockname(EndPoint& addr) override { return 0; } + virtual int getpeername(EndPoint& addr) override { return 0; } + virtual int getsockname(char* path, size_t count) override { return 0; } + virtual int getpeername(char* path, size_t count) override { return 0; } +}; + +const static char RESP[] = "+asldkfjasfkd\r\n:234\r\n$21\r\nthis-is-a-bulk_string\r\n*3\r\n+asdf\r\n:75\r\n$3\r\njkl\r\n:-1234234\r\n"; + +TEST(redis, serialization) { + FakeStream s; + _BufferedStream<> bs(&s); + bs << simple_string("asldkfjasfkd") + << integer(234) + << bulk_string("this-is-a-bulk_string") + << make_array(simple_string("asdf"), integer(75), bulk_string("jkl")) + << integer(-1234234); + + bs.flush(); + puts(s.output.c_str()); + EXPECT_EQ(s.output, RESP); +} + +TEST(redis, deserialization) { + FakeStream s; + s.in = RESP; + _BufferedStream<> bs(&s); + auto a = bs.parse_item(); + EXPECT_EQ(a.mark, simple_string::mark()); + EXPECT_EQ(a.get(), "asldkfjasfkd"); + + auto b = bs.parse_item(); + EXPECT_EQ(b.mark, integer::mark()); + EXPECT_EQ(b.get().val, 234); + + auto c = bs.parse_item(); + EXPECT_EQ(c.mark, bulk_string::mark()); + EXPECT_EQ(c.get(), "this-is-a-bulk_string"); + + auto d = bs.parse_item(); + EXPECT_EQ(d.mark, redis::array<>::mark()); + EXPECT_EQ(d.get().val, 3); + + auto e = bs.parse_item(); + EXPECT_EQ(e.mark, simple_string::mark()); + EXPECT_EQ(e.get(), "asdf"); + + auto f = bs.parse_item(); + EXPECT_EQ(f.mark, integer::mark()); + EXPECT_EQ(f.get().val, 75); + + auto g = bs.parse_item(); + EXPECT_EQ(g.mark, bulk_string::mark()); + EXPECT_EQ(g.get(), "jkl"); + + auto h = bs.parse_item(); + EXPECT_EQ(h.mark, integer::mark()); + EXPECT_EQ(h.get().val, -1234234); +}