diff --git a/.github/workflows/ci.linux.arm.yml b/.github/workflows/ci.linux.arm.yml index 4808159c..f2b1e324 100644 --- a/.github/workflows/ci.linux.arm.yml +++ b/.github/workflows/ci.linux.arm.yml @@ -38,7 +38,9 @@ jobs: - name: Test run: | cd build + nohup redis-server ctest -E test-lockfree --timeout 3600 -V + pkill redis-server gcc921-build-debug: runs-on: [self-hosted, Linux, ARM64] diff --git a/.github/workflows/ci.linux.x86_64.yml b/.github/workflows/ci.linux.x86_64.yml index eed2eded..f3cdbd0e 100644 --- a/.github/workflows/ci.linux.x86_64.yml +++ b/.github/workflows/ci.linux.x86_64.yml @@ -31,7 +31,9 @@ jobs: cmake --build build -j $(nproc) -- VERBOSE=1 - name: Test epoll run: | + nohup redis-server cd build && ctest -E test-lockfree --timeout 3600 -V + pkill redis-server - name: Test io_uring run: | export PHOTON_CI_EV_ENGINE=io_uring @@ -66,7 +68,9 @@ jobs: cmake --build build -j $(nproc) -- VERBOSE=1 - name: Test epoll run: | + nohup redis-server cd build && ctest -E test-lockfree --timeout 3600 -V + pkill redis-server - name: Test io_uring run: | export PHOTON_CI_EV_ENGINE=io_uring @@ -101,7 +105,9 @@ jobs: cmake --build build -j $(nproc) -- VERBOSE=1 - name: Test epoll run: | + nohup redis-server cd build && ctest -E test-lockfree --timeout 3600 -V + pkill redis-server - name: Test io_uring run: | export PHOTON_CI_EV_ENGINE=io_uring @@ -136,7 +142,9 @@ jobs: cmake --build build -j $(nproc) -- VERBOSE=1 - name: Test epoll run: | + nohup redis-server cd build && ctest -E test-lockfree --timeout 3600 -V + pkill redis-server - name: Test io_uring run: | export PHOTON_CI_EV_ENGINE=io_uring @@ -171,7 +179,9 @@ jobs: cmake --build build -j $(nproc) -- VERBOSE=1 - name: Test epoll run: | + nohup redis-server cd build && ctest -E test-lockfree --timeout 3600 -V + pkill redis-server - name: Test io_uring run: | export PHOTON_CI_EV_ENGINE=io_uring diff --git a/CMakeLists.txt b/CMakeLists.txt index 054e5050..b2f6d252 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,7 +43,6 @@ set(PHOTON_GOOGLETEST_SOURCE "https://github.com/google/googletest/archive/refs/ set(PHOTON_RAPIDJSON_GIT "https://github.com/Tencent/rapidjson.git" CACHE STRING "") set(PHOTON_RAPIDXML_SOURCE "https://sourceforge.net/projects/rapidxml/files/rapidxml/rapidxml%201.13/rapidxml-1.13.zip/download" CACHE STRING "") set(PHOTON_RAPIDYAML_SOURCE "https://github.com/biojppm/rapidyaml/releases/download/v0.5.0/rapidyaml-0.5.0.hpp" CACHE STRING "") -set(PHOTON_CPP_REDIS_SOURCE "https://github.com/cpp-redis/cpp_redis/archive/refs/tags/4.3.1.tar.gz" CACHE STRING "") # Get CPU arch execute_process(COMMAND uname -m OUTPUT_VARIABLE ARCH OUTPUT_STRIP_TRAILING_WHITESPACE) diff --git a/ecosystem/CMakeLists.txt b/ecosystem/CMakeLists.txt index 6ab41bfb..ae03f11d 100644 --- a/ecosystem/CMakeLists.txt +++ b/ecosystem/CMakeLists.txt @@ -3,7 +3,7 @@ include(FetchContent) -if(CMAKE_VERSION VERSION_GREATER_EQUAL "3.24.0") +if(CMAKE_VERSION VERSION_GREATER_EQUAL "3.24.0") # set DOWNLOAD_EXTRACT_TIMESTAMP ON cmake 3.24 or above cmake_policy(SET CMP0135 NEW) endif() @@ -50,26 +50,11 @@ if (CMAKE_VERSION VERSION_LESS "3.18.0") endif() message(STATUS "Rapidyaml source dir: ${rapidyaml_SOURCE_DIR}") -# cpp-redis -FetchContent_Declare( - cpp-redis - URL ${PHOTON_CPP_REDIS_SOURCE} - URL_HASH - SHA256=3859289d8254685fc775bda73de03dad27df923423b8ceb375b02d036c03b02f - UPDATE_DISCONNECTED 1) -# uses only a simple header, so do not add sub directory to avoid unnecessary build -# do not use FetchContent_MakeAvailable, just populate it. -FetchContent_GetProperties(cpp-redis) -if(NOT cpp-redis_POPULATED) - FetchContent_Populate(cpp-redis) -endif() -message(STATUS "cpp-redis source dir: ${cpp-redis_SOURCE_DIR}") - add_library(ecosystem_deps INTERFACE) target_include_directories( ecosystem_deps INTERFACE ${rapidjson_SOURCE_DIR}/include ${rapidxml_SOURCE_DIR} ${rapidyaml_SOURCE_DIR} - ${cpp-redis_SOURCE_DIR}/includes) + ) get_property( incs TARGET ecosystem_deps diff --git a/ecosystem/redis.cpp b/ecosystem/redis.cpp index f3c4a584..3902f9ba 100644 --- a/ecosystem/redis.cpp +++ b/ecosystem/redis.cpp @@ -25,7 +25,7 @@ namespace photon { using namespace net; namespace redis { -any BufferedStream::parse_item() { +any BufferedStream::parse_response_item() { switch (auto mark = this->get_char()) { case simple_string::mark(): return get_simple_string(); @@ -35,9 +35,11 @@ any BufferedStream::parse_item() { return get_integer(); case bulk_string::mark(): return get_bulk_string(); - case array<>::mark(): - return {array<>(), get_integer()}; + case array_header::mark(): { + auto x = get_integer(); + return array_header{x};} default: + LOG_ERROR("uncognized mark: ", mark); return {}; } } diff --git a/ecosystem/redis.h b/ecosystem/redis.h index c9798907..851968aa 100644 --- a/ecosystem/redis.h +++ b/ecosystem/redis.h @@ -24,6 +24,12 @@ limitations under the License. namespace photon { namespace redis { +#pragma GCC diagnostic push +#if __GNUC__ >= 10 +#pragma GCC diagnostic ignored "-stringop-overflow" +#endif + + class BufferedStream; class refstring : public std::string_view { BufferedStream* _bs = nullptr; @@ -31,6 +37,7 @@ class refstring : public std::string_view { void del_ref(); public: using std::string_view::string_view; + refstring(std::string_view sv) : std::string_view(sv) { } refstring(BufferedStream* bs, std::string_view sv) : std::string_view(sv), _bs(bs) { add_ref(); } refstring(const refstring& rhs) : @@ -55,6 +62,7 @@ class simple_string : public refstring { public: simple_string() = default; simple_string(refstring rs) : refstring(rs) { } + // simple_string(std::string_view sv) : refstring(sv) { } using refstring::operator=; constexpr static char mark() { return '+'; } }; @@ -70,7 +78,11 @@ class error_message : public refstring { class bulk_string : public refstring { public: bulk_string() = default; + using refstring::refstring; bulk_string(refstring rs) : refstring(rs) { } + bulk_string(std::string_view sv) : refstring(nullptr, sv) { } + // template + // bulk_string(const char(&s)[N]) : refstring(nullptr, {s, N-1}) { } using refstring::operator=; constexpr static char mark() { return '$'; } }; @@ -79,6 +91,7 @@ class integer : public refstring { public: int64_t val; integer() = default; + integer(const integer&) = default; integer(int64_t v) : val(v) { }; integer(const refstring& rs) : val(((estring_view&)rs).to_int64()) { } @@ -86,6 +99,15 @@ class integer : public refstring { constexpr static char mark() { return ':'; } }; +class array_header : public integer { +public: + using integer::integer; + array_header() = default; + array_header(const array_header&) = default; + array_header(integer x) : integer(x) { } + constexpr static char mark() { return '*'; } +}; + class null { }; template @@ -112,9 +134,7 @@ struct any { std::is_base_of::value>::type> bool is_type() { return mark == T::mark(); } - template<> - bool is_type() { return mark == integer::mark() || - mark == array<>::mark(); } + bool is_failed() { return is_type(); } template::value>::type> @@ -123,6 +143,10 @@ struct any { return *(T*)value; } + std::string_view get_error_message() { + return get(); + } + template::value>::type> void set(const T& x) { @@ -133,7 +157,6 @@ struct any { any() = default; template any(const T& x) { set(x); } - any(array<>, integer n) { set(n); mark = array<>::mark(); } template any& operator = (const T& x) { return set(x), *this; } @@ -141,16 +164,10 @@ struct any { using net::ISocketStream; -// template::value>::type> -// inline constexpr size_t __MAX_SIZE(T) { return INT32_MAX; } -inline constexpr size_t __MAX_SIZE(std::string_view x) { return x.size(); } -inline constexpr size_t __MAX_SIZE(int64_t x) { return 32; } -inline constexpr size_t __MAX_SIZE(uint64_t x) { return 32; } -inline constexpr size_t __MAX_SIZE(char x) { return 1; } -constexpr static char CRLF[] = "\r\n"; +#define CRLF "\r\n" +#define BSMARK "$" class BufferedStream { protected: @@ -170,6 +187,15 @@ class BufferedStream { std::string_view __getstring(size_t length); std::string_view __getline(); + struct _strint { int64_t _x; }; + struct _char { char _x; }; + struct _array_header { size_t n; }; + size_t __MAX_SIZE(std::string_view x) { return x.size(); } + size_t __MAX_SIZE(int64_t x) { return 32; } + size_t __MAX_SIZE(_strint x) { return 32; } + size_t __MAX_SIZE(_array_header x) { return 32; } + size_t __MAX_SIZE(_char x) { return 1; } + explicit BufferedStream(ISocketStream* s, uint32_t bufsize) : _s(s), _bufsize(bufsize) { } @@ -185,21 +211,33 @@ class BufferedStream { _o += x.size(); return *this; } - BufferedStream& put(int64_t x) { + template + std::string_view _snprintf(const char* fmt, const Ts&...args) { + auto buf = obuf() + _o; + auto size = _bufsize - _o; + int ret = snprintf(buf, size - _o, fmt, args...); + assert(0 <= ret && (uint32_t)ret <= size); + _o += ret; + return {buf, (size_t)ret}; + } + BufferedStream& put(_strint x) { assert(_o + __MAX_SIZE(x) < _bufsize); static_assert(sizeof(x) == sizeof(long long), "..."); - _o += snprintf(_o + obuf(), _bufsize - _o, "%ld", (long)x); + auto s = _snprintf("$00\r\n%ld\r\n", (long)x._x); + assert(7 < s.size() && s.size() < 99); + auto n = s.size() - 7; + (char&)s[1] = '0' + n / 10; + (char&)s[2] = '0' + n % 10; return *this; } - BufferedStream& put(uint64_t x) { + BufferedStream& put(int64_t x) { assert(_o + __MAX_SIZE(x) < _bufsize); - static_assert(sizeof(x) == sizeof(long long), "..."); - _o += snprintf(_o + obuf(), _bufsize - _o, "%lu", (unsigned long)x); + _snprintf("%ld", (long)x); return *this; } - BufferedStream& put(char x) { + BufferedStream& put(_char x) { assert(_o + __MAX_SIZE(x) < _bufsize); - obuf()[_o++] = x; + obuf()[_o++] = x._x; return *this; } BufferedStream& put() { return *this; } @@ -207,52 +245,34 @@ class BufferedStream { BufferedStream& put(const Ta& xa, const Tb& xb, const Ts&...xs) { return put(xa), put(xb, xs...); } - // BufferedStream& operator << (std::string_view x) { - // auto f = flush_if_low_space(__MAX_SIZE(x), x.data(), x.size()); - // return f ? *this : put(x); - // } - // BufferedStream& operator << (int64_t x) { - // flush_if_low_space(__MAX_SIZE(x)); - // return put(x); - // } + BufferedStream& operator << (int64_t x) { + // flush_if_low_space(__MAX_SIZE(x)); + return put(x); + } + BufferedStream& operator << (_char x) { + // flush_if_low_space(__MAX_SIZE(x)); + return put(x); + } + BufferedStream& operator << (const _strint& x) { + // flush_if_low_space(__MAX_SIZE(x)); + return put(x); + } template BufferedStream& write_item(const Ts&...xs) { auto size = _sum(__MAX_SIZE(xs)...); flush_if_low_space(size); return put(xs...); } - BufferedStream& operator << (simple_string x) { - return write_item(x.mark(), x, CRLF); - } - BufferedStream& operator << (error_message x) { - return write_item(x.mark(), x, CRLF); - } - BufferedStream& operator << (integer x) { - return write_item(x.mark(), x.val, CRLF); - } - BufferedStream& operator << (bulk_string x) { - return x.empty() ? write_item(x.mark(), "-1\r\n") : - write_item(x.mark(), (int64_t)x.size(), CRLF, x, CRLF); - } - BufferedStream& operator << (null) { - return *this << bulk_string{}; - } - void write_items() { } - template - void write_items(const T& x, const Ts&...xs) { - *this << x; - write_items(xs...); + BufferedStream& operator << (std::string_view x) { + return x.empty() ? write_item(BSMARK "-1" CRLF) : + write_item(_char{BSMARK[0]}, (int64_t)x.size(), CRLF, x, CRLF); } - template - BufferedStream& operator << (const array& x) { - write_item(x.mark(), (int64_t)sizeof...(Ts), CRLF); - auto f = [this](const Ts&...xs) { write_items(xs...); }; - tuple_assistance::apply(f, (const std::tuple&)x); - return *this; + BufferedStream& operator << (_array_header x) { + return write_item(_char{array<>::mark()}, x.n, CRLF); } char get_char() { - ensure_input_data(__MAX_SIZE('c')); + ensure_input_data(__MAX_SIZE(_char{'c'})); return ibuf()[_i++]; } refstring getline() { @@ -277,7 +297,403 @@ class BufferedStream { if (available < min_available) __refill(min_available - available); } - any parse_item(); + + void write_items() { } + template + void write_items(const T& x, const Ts&...xs) { + *this << x; + write_items(xs...); + } + + _strint filter(int64_t x) { return {x}; } + std::string_view filter(std::string_view x) { return x; } + + template + void send_cmd_no_flush(std::string_view cmd, const Args&...args) { + write_items(_array_header{1 + sizeof...(args)}, cmd, filter(args)...); + } + + any parse_response_item(); + + template + any execute(bulk_string cmd, const Args&...args) { + send_cmd_no_flush(cmd, args...); + flush(); + return parse_response_item(); + } + +#define DEFINE_COMMAND(cmd) \ + any cmd() { return execute(#cmd); } +#define DEFINE_COMMAND1(cmd, arg) \ + any cmd(bulk_string arg) { \ + return execute(#cmd, arg); \ + } +#define DEFINE_COMMAND1s(cmd, arg) \ + template \ + any cmd(bulk_string arg, const arg##s_type&...arg##s) { \ + return execute(#cmd, arg, arg##s...); \ + } +#define DEFINE_COMMAND1m(cmd, arg, more_args) \ + template \ + any cmd(bulk_string arg, const ARGS&...more_args) { \ + return execute(#cmd, arg, more_args...); \ + } +#define DEFINE_COMMAND1sn(cmd, arg1) \ + template \ + any cmd(bulk_string arg1,const arg1##s_type&...arg1##s){\ + auto n = (1 + sizeof...(arg1##s)); \ + return execute(#cmd, n, arg1, arg1##s...); \ + } +#define DEFINE_COMMAND2(cmd, arg1, arg2) \ + any cmd(bulk_string arg1, bulk_string arg2) { \ + return execute(#cmd, arg1, arg2); \ + } +#define DEFINE_COMMAND2m(cmd, arg1, arg2, more_args) \ + template \ + any cmd(bulk_string arg1, bulk_string arg2, \ + const ARGS&...more_args) { \ + return execute(#cmd, arg1, arg2, more_args...); \ + } +#define DEFINE_COMMAND2s(cmd, arg1, arg2) \ + template \ + any cmd(bulk_string arg1, bulk_string arg2, \ + const arg2##s_type&...arg2##s) { \ + return execute(#cmd, arg1, arg2, arg2##s...); \ + } +#define DEFINE_COMMAND2sn(cmd, arg1, arg2) \ + template \ + any cmd(bulk_string arg1, bulk_string arg2, \ + const arg2##s_type&...arg2##s) { \ + auto n = (1 + sizeof...(arg2##s)); \ + return execute(#cmd, arg1, n, arg2, arg2##s...); \ + } +#define DEFINE_COMMAND2fs(cmd, arg1) \ + template \ + any cmd(bulk_string arg1, bulk_string field1, \ + Fields...fields) { \ + auto n = (1 + sizeof...(fields)); \ + return execute(#cmd, arg1, "FIELDS", n, \ + field1, fields...); \ + } +#define DEFINE_COMMAND3(cmd, arg1, arg2, arg3) \ + any cmd(bulk_string arg1, bulk_string arg2, \ + bulk_string arg3) { \ + return execute(#cmd, arg1, arg2, arg3); \ + } +#define DEFINE_COMMAND3s(cmd, arg1, arg2, arg3) \ + template \ + any cmd(bulk_string arg1, bulk_string arg2, bulk_string \ + arg3, const arg3##s_type&...arg3##s) { \ + return execute(#cmd, arg1, arg2, arg3, arg3##s...); \ + } +#define DEFINE_COMMAND4(cmd, arg1, arg2, arg3, arg4) \ + any cmd(bulk_string arg1, bulk_string arg2, \ + bulk_string arg3, bulk_string arg4) { \ + return execute(#cmd, arg1, arg2, arg3, arg4); \ + } +#define DEFINE_COMMAND5(cmd, arg1, arg2, arg3, arg4, arg5) \ + any cmd(bulk_string arg1, bulk_string arg2, bulk_string \ + arg3, bulk_string arg4, bulk_string arg5) { \ + return execute(#cmd, arg1, arg2, arg3, arg4, arg5); \ + } + // Generic + DEFINE_COMMAND2 (COPY, source, destination); + DEFINE_COMMAND1s(DEL, key); + DEFINE_COMMAND1 (DUMP, key); + DEFINE_COMMAND1s(EXISTS, key); + DEFINE_COMMAND2 (EXPIRE, key, seconds); + DEFINE_COMMAND2 (EXPIREAT, key, unix_time_seconds); + DEFINE_COMMAND1 (EXPIRETIME, key); + DEFINE_COMMAND1 (KEYS, pattern); + DEFINE_COMMAND2 (MOVE, key, db); + DEFINE_COMMAND1 (PERSIST, key); + DEFINE_COMMAND2 (PEXPIRE, key, milliseconds); + DEFINE_COMMAND2 (PEXPIREAT, key, unix_time_milliseconds); + DEFINE_COMMAND1 (PEXPIRETIME, key); + + // Set + DEFINE_COMMAND2s(SADD, key, member); + DEFINE_COMMAND1 (SCARD, key); + DEFINE_COMMAND1s(SDIFF, key); + DEFINE_COMMAND2s(SDIFFSTORE, destination, key); + DEFINE_COMMAND1s(SINTER, key); + DEFINE_COMMAND2s(SINTERCARD, numkeys, key); + DEFINE_COMMAND2s(SINTERSTORE, destination, key); + DEFINE_COMMAND2 (SISMEMBER, key, member); + DEFINE_COMMAND2s(SMISMEMBER, key, member); + DEFINE_COMMAND1 (SMEMBERS, key); + DEFINE_COMMAND3 (SMOVE, source, destination, member); + DEFINE_COMMAND2 (SPOP, key, count); + DEFINE_COMMAND1 (SPOP, key); + DEFINE_COMMAND2 (SRANDMEMBER, key, count); + DEFINE_COMMAND1 (SRANDMEMBER, key); + DEFINE_COMMAND2s(SREM, key, member); + DEFINE_COMMAND1s(SUNION, key); + DEFINE_COMMAND2s(SUNIONSTORE, destination, key); + + // Hash + DEFINE_COMMAND2s(HDEL, key, field); + DEFINE_COMMAND2 (HEXISTS, key, field); + DEFINE_COMMAND1 (HGETALL, key); + DEFINE_COMMAND3 (HINCRBY, key, field, increment); + DEFINE_COMMAND3 (HINCRBYFLOAT, key, field, increment); + DEFINE_COMMAND1 (HKEYS, key); + DEFINE_COMMAND1 (HLEN, key); + DEFINE_COMMAND2 (HGET, key, field); + DEFINE_COMMAND2s(HMGET, key, field); + DEFINE_COMMAND3s(HMSET, key, field, value); + DEFINE_COMMAND3s(HSET, key, field, value); + DEFINE_COMMAND2fs(HPERSIST, key); + DEFINE_COMMAND2fs(HEXPIRETIME, key); + DEFINE_COMMAND2fs(HPEXPIRETIME, key); + DEFINE_COMMAND2fs(HPTTL, key); + DEFINE_COMMAND1 (HRANDFIELD, key); + DEFINE_COMMAND2 (HRANDFIELD, key, count); + DEFINE_COMMAND3 (HRANDFIELD, key, count, WITHVALUES); + DEFINE_COMMAND3 (HSETNX, key, field, value); + DEFINE_COMMAND2 (HSTRLEN, key, field); + DEFINE_COMMAND2fs(HTTL, key); + DEFINE_COMMAND1 (HVALS, key); + + // List + DEFINE_COMMAND5 (BLMOVE, source, destination, LEFT_or_RIGHT1, LEFT_or_RIGHT2, timeout); + DEFINE_COMMAND3 (BRPOPLPUSH, source, destination, timeout); + DEFINE_COMMAND2 (LINDEX, key, index); + DEFINE_COMMAND4 (LINSERT, key, BEFORE_or_AFTER, pivot, element); + DEFINE_COMMAND1 (LLEN, key); + DEFINE_COMMAND4 (LMOVE, source, destination, LEFT_or_RIGHT1, LEFT_or_RIGHT2); + DEFINE_COMMAND1 (LPOP, key); + DEFINE_COMMAND2 (LPOP, key, count); + DEFINE_COMMAND2s(LPUSH, key, element); + DEFINE_COMMAND2s(LPUSHX, key, element); + DEFINE_COMMAND3 (LRANGE, key, start, stop); + DEFINE_COMMAND3 (LREM, key, count, element); + DEFINE_COMMAND3 (LSET, key, index, element); + DEFINE_COMMAND3 (LTRIM, key, start, stop); + DEFINE_COMMAND1 (RPOP, key); + DEFINE_COMMAND2 (RPOP, key, count); + DEFINE_COMMAND2 (RPOPLPUSH, source, destination); + DEFINE_COMMAND2s(RPUSH, key, element); + DEFINE_COMMAND2s(RPUSHX, key, element); + + //Sorted Set + DEFINE_COMMAND1 (ZCARD, key); + DEFINE_COMMAND3 (ZCOUNT, key, min, max); + DEFINE_COMMAND2sn(ZDIFFSTORE, destination, key); + DEFINE_COMMAND3 (ZINCRBY, key, increment, member); + DEFINE_COMMAND3 (ZLEXCOUNT, key, min, max); + DEFINE_COMMAND2s(ZMSCORE, key, member); + DEFINE_COMMAND1 (ZPOPMAX, key); + DEFINE_COMMAND2 (ZPOPMAX, key, count); + DEFINE_COMMAND1 (ZPOPMIN, key); + DEFINE_COMMAND2 (ZPOPMIN, key, count); + DEFINE_COMMAND1 (ZRANDMEMBER, key); + DEFINE_COMMAND2 (ZRANDMEMBER, key, count); + DEFINE_COMMAND3 (ZRANDMEMBER, key, count, WITHSCORES); + DEFINE_COMMAND3 (ZRANGEBYLEX, key, min, max); + DEFINE_COMMAND3 (ZRANGEBYSCORE, key, min, max); + DEFINE_COMMAND4 (ZRANGESTORE, dst, src, min, max); + DEFINE_COMMAND2 (ZRANK, key, member); + DEFINE_COMMAND2s(ZREM, key, member); + DEFINE_COMMAND3 (ZREMRANGEBYLEX, key, min, max); + DEFINE_COMMAND3 (ZREMRANGEBYRANK, key, start, stop); + DEFINE_COMMAND3 (ZREMRANGEBYSCORE, key, min, max); + DEFINE_COMMAND3 (ZREVRANGE, key, start, stop); + DEFINE_COMMAND3 (ZREVRANGEBYLEX, key, max, min); + DEFINE_COMMAND3 (ZREVRANGEBYSCORE, key, max, min); + DEFINE_COMMAND2 (ZREVRANK, key, member); + DEFINE_COMMAND2 (ZSCAN, key, cursor); + DEFINE_COMMAND2 (ZSCORE, key, member); + DEFINE_COMMAND1sn(ZUNION, key); + DEFINE_COMMAND2sn(ZUNIONSTORE, destination, key); + + // HyperLogLog + DEFINE_COMMAND1m(PFADD, key, elements); + DEFINE_COMMAND1s(PFCOUNT, key); + DEFINE_COMMAND2 (PFDEBUG, subcommand, key); + DEFINE_COMMAND1m(PFMERGE, destkey, sourcekeys); + DEFINE_COMMAND (PFSELFTEST); + + //Bitmap + DEFINE_COMMAND1 (BITCOUNT, key); + DEFINE_COMMAND1m(BITFIELD, key, args); + DEFINE_COMMAND1m(BITFIELD_RO, key, args); + DEFINE_COMMAND3s(BITOP, AND_OR_XOR_NOT, destkey, key); + DEFINE_COMMAND2m(BITPOS, key, bit, opt_start_end_BYTE_BIT); + DEFINE_COMMAND2 (GETBIT, key, offset); + DEFINE_COMMAND3 (SETBIT, key, offset, value); + + //Strings + DEFINE_COMMAND2 (APPEND, key, value); + DEFINE_COMMAND1 (DECR, key); + DEFINE_COMMAND2 (DECRBY, key, decrement); + DEFINE_COMMAND1 (GET, key); + DEFINE_COMMAND1 (GETDEL, key); + DEFINE_COMMAND1m(GETEX, key, args); + DEFINE_COMMAND3 (GETRANGE, key, start, end); + DEFINE_COMMAND2 (GETSET, key, value); + DEFINE_COMMAND1 (INCR, key); + DEFINE_COMMAND2 (INCRBY, key, increment); + DEFINE_COMMAND2 (INCRBYFLOAT, key, increment); + DEFINE_COMMAND2m(LCS, key1, key2, args); + DEFINE_COMMAND1s(MGET, key); + DEFINE_COMMAND2m(MSET, key, value, more_key_values); + DEFINE_COMMAND2m(MSETNX, key, value, more_key_values); + DEFINE_COMMAND3 (PSETEX, key, milliseconds, value); + DEFINE_COMMAND2m(SET, key, value, more_key_values); + DEFINE_COMMAND3 (SETEX, key, seconds, value); + + +#undef DEFINE_COMMAND1 +#undef DEFINE_COMMAND1s +#undef DEFINE_COMMAND2s + template + any __hexp(bulk_string op, bulk_string key, bulk_string time, + bulk_string condition, const Fields&...fields) { + bool ok = (condition == "NX" || condition == "XX" || + condition == "GT" || condition == "LT" ); + assert(condition.empty() || ok); + auto n = (sizeof...(fields)); + if (condition.empty() || !ok) { + return execute(op, key, time, "FIELDS",n , fields...); + } else { + return execute(op, key, time, condition, "FIELDS", n, fields...); + } + } + template + any HEXPIRE(bulk_string key, bulk_string seconds, bulk_string condition, + bulk_string field, const Fields&...fields) { + return __hexp("HEXPIRE", key, seconds, condition, field, fields...); + } + template + any HEXPIREAT(bulk_string key, bulk_string seconds, bulk_string condition, + bulk_string field, const Fields&...fields) { + return __hexp("HEXPIREAT", key, seconds, condition, field, fields...); + } + template + any HPEXPIRE(bulk_string key, bulk_string milliseconds, bulk_string condition, + bulk_string field, const Fields&...fields) { + return __hexp("HPEXPIRE", key, milliseconds, condition, field, fields...); + } + template + any HPEXPIREAT(bulk_string key, bulk_string unix_time_milliseconds, bulk_string condition, + bulk_string field, const Fields&...fields) { + return __hexp("HPEXPIREAT", key, unix_time_milliseconds, condition, field, fields...); + } + + any SSCAN(bulk_string key, bulk_string cursor, bulk_string pattern, uint64_t count) { + return execute("SSCAN", key, cursor, "PATTERN", pattern, "COUNT", count); + } + any SSCAN(bulk_string key, bulk_string cursor, bulk_string pattern) { + return execute("SSCAN", key, cursor, "PATTERN", pattern); + } + any SSCAN(bulk_string key, bulk_string cursor, uint64_t count) { + return execute("SSCAN", key, cursor, "COUNT", count); + } + any SSCAN(bulk_string key, bulk_string cursor) { + return execute("SSCAN", key, cursor); + } + any ZRANGEBYLEX(bulk_string key, bulk_string min, bulk_string max, + bulk_string limit_offset, bulk_string limit_count) { + return execute("ZRANGEBYLEX", key, min, max, "LIMIT", limit_offset, limit_count); + } + any ZRANGEBYSCORE(bulk_string key, bulk_string min, + bulk_string max, bulk_string WITHSCORES) { + return execute("ZRANGEBYSCORE", key, min, max, "WITHSCORES"); + } + any ZRANGEBYSCORE(bulk_string key, bulk_string min, bulk_string max, bulk_string + WITHSCORES, bulk_string limit_offset, bulk_string limit_count) { + return execute("ZRANGEBYSCORE", key, min, max, "WITHSCORES", + "LIMIT", limit_offset, limit_count); + } + any ZRANK(bulk_string key, bulk_string member, bulk_string WITHSCORE) { + return execute("ZRANK", key, member, "WITHSCORE"); + } + any ZREVRANGE(bulk_string key, bulk_string start, bulk_string stop, bulk_string WITHSCORE) { + return execute("ZREVRANGE", key, start, stop, "WITHSCORE"); + } + any ZREVRANGEBYLEX(bulk_string key, bulk_string max, bulk_string min, + bulk_string limit_offset, bulk_string limit_count) { + return execute("ZREVRANGEBYLEX", key, max, min, + "LIMIT", limit_offset, limit_count); + } + any ZREVRANGEBYSCORE(bulk_string key, bulk_string max, bulk_string min, bulk_string + WITHSCORES, bulk_string limit_offset, bulk_string limit_count) { + return execute("ZREVRANGEBYSCORE", key, max, min, "WITHSCORES", + "LIMIT", limit_offset, limit_count); + } + any ZREVRANK(bulk_string key, bulk_string member, bulk_string WITHSCORE) { + return execute("ZREVRANK", key, member, "WITHSCORE"); + } + any ZSCAN_PATTERN(bulk_string key, bulk_string cursor, bulk_string pattern) { + return execute("ZSCAN", key, cursor, "PATTERN", pattern); + } + any ZSCAN_COUNT(bulk_string key, bulk_string cursor, bulk_string count) { + return execute("ZSCAN", key, cursor, "COUNT", count); + } + any ZSCAN(bulk_string key, bulk_string cursor, bulk_string pattern, bulk_string count) { + return execute("ZSCAN", key, cursor, "PATTERN", pattern, "COUNT", count); + } + any EXPIRE(bulk_string key, bulk_string seconds, bulk_string NX_or_XX_or_GT_or_LT) { + assert(NX_or_XX_or_GT_or_LT == "NX" || NX_or_XX_or_GT_or_LT == "XX" || + NX_or_XX_or_GT_or_LT == "GT" || NX_or_XX_or_GT_or_LT == "LT"); + return execute("EXPIRE", key, seconds, NX_or_XX_or_GT_or_LT); + } + any EXPIREAT(bulk_string key, bulk_string unix_time_seconds, bulk_string NX_or_XX_or_GT_or_LT) { + assert(NX_or_XX_or_GT_or_LT == "NX" || NX_or_XX_or_GT_or_LT == "XX" || + NX_or_XX_or_GT_or_LT == "GT" || NX_or_XX_or_GT_or_LT == "LT"); + return execute("EXPIREAT", key, unix_time_seconds, NX_or_XX_or_GT_or_LT); + } + any PEXPIRE(bulk_string key, bulk_string milliseconds, bulk_string NX_or_XX_or_GT_or_LT) { + assert(NX_or_XX_or_GT_or_LT == "NX" || NX_or_XX_or_GT_or_LT == "XX" || + NX_or_XX_or_GT_or_LT == "GT" || NX_or_XX_or_GT_or_LT == "LT"); + return execute("PEXPIRE", key, milliseconds, NX_or_XX_or_GT_or_LT); + } + any PEXPIREAT(bulk_string key, bulk_string unix_time_milliseconds, bulk_string NX_or_XX_or_GT_or_LT) { + assert(NX_or_XX_or_GT_or_LT == "NX" || NX_or_XX_or_GT_or_LT == "XX" || + NX_or_XX_or_GT_or_LT == "GT" || NX_or_XX_or_GT_or_LT == "LT"); + return execute("PEXPIREAT", key, unix_time_milliseconds, NX_or_XX_or_GT_or_LT); + } + any OBJECT_ENCODING(bulk_string key) { + return execute("OBJECT", "ENCODING", key); + } + any OBJECT_FREQ(bulk_string key) { + return execute("OBJECT", "FREQ", key); + } + any OBJECT_IDLETIME(bulk_string key) { + return execute("OBJECT", "IDLETIME", key); + } + any OBJECT_IREFCOUNT(bulk_string key) { + return execute("OBJECT", "REFCOUNT", key); + } + any BITCOUNT(bulk_string key, bulk_string start, bulk_string end) { + return execute("BITCOUNT", key, start, end); + } + any BITCOUNT(bulk_string key, bulk_string start, bulk_string end, bulk_string BYTE_or_BIT) { + assert(BYTE_or_BIT == "BYTE" || BYTE_or_BIT == "BIT"); + return execute("BITCOUNT", key, start, end, BYTE_or_BIT); + } + + // ZMPOP numkeys key [key ...] [COUNT count] + // ZRANGE key start stop [BYSCORE | BYLEX] [REV] [LIMIT offset count] [WITHSCORES] + // HSCAN key cursor [MATCH pattern] [COUNT count] [NOVALUES] + // BLMPOP timeout numkeys key [key ...] [COUNT count] + // BLPOP key [key ...] timeout + // BRPOP key [key ...] timeout + // LPOS key element [RANK rank] [COUNT num-matches] [MAXLEN len] + // LMPOP numkeys key [key ...] [COUNT count] + // BZMPOP timeout numkeys key [key ...] [COUNT count] + // BZPOPMAX key [key ...] timeout + // BZPOPMIN key [key ...] timeout + // ZADD key [NX | XX] [GT | LT] [CH] [INCR] score member [score member ...] + // ZDIFF numkeys key [key ...] [WITHSCORES] + // ZINTER numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE ] [WITHSCORES] + // ZINTERCARD numkeys key [key ...] [LIMIT limit] + // ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE ] + // ZRANGESTORE dst src min max [BYSCORE | BYLEX] [REV] [LIMIT offset count] + // ZUNION numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE ] [WITHSCORES] + // ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE ] + }; inline void refstring::add_ref() { if (_bs) _bs->_refcnt++; } @@ -290,6 +706,9 @@ class _BufferedStream : public BufferedStream { _BufferedStream(ISocketStream* s) : BufferedStream(s, BUF_SIZE) { } }; +#pragma GCC diagnostic pop + + } } diff --git a/ecosystem/test/test_redis.cpp b/ecosystem/test/test_redis.cpp index 42235e79..04ec9666 100644 --- a/ecosystem/test/test_redis.cpp +++ b/ecosystem/test/test_redis.cpp @@ -23,6 +23,7 @@ #include "../redis.h" #include #include +#include #include #include using namespace photon; @@ -53,7 +54,8 @@ class FakeStream : public ISocketStream { virtual ssize_t send(const void *buf, size_t count, int flags = 0) override { size_t n = rand() % 8 + 1; if (n > count) n = count; - output += std::string_view{(const char*)buf, n}; + auto p = (const char*)buf; + output.append(p, p + n); return n; } virtual ssize_t send(const struct iovec *iov, int iovcnt, int flags = 0) override { @@ -80,7 +82,8 @@ class FakeStream : public ISocketStream { return s; } virtual ssize_t write(const void *buf, size_t count) override { - output += std::string_view{(char*)buf, count}; + auto p = (const char*)buf; + output.append(p, p + count); return count; } virtual ssize_t writev(const struct iovec *iov, int iovcnt) override { @@ -102,55 +105,162 @@ class FakeStream : public ISocketStream { virtual int getpeername(char* path, size_t count) override { return 0; } }; -const static char RESP[] = "+asldkfjasfkd\r\n:234\r\n$21\r\nthis-is-a-bulk_string\r\n*3\r\n+asdf\r\n:75\r\n$3\r\njkl\r\n:-1234234\r\n"; + +#define SSTR(s) "+" #s CRLF +#define BSTR(n, s) "$" #n CRLF #s CRLF +#define INTEGER(n) ":" #n CRLF +#define ARRAY_HEADER(n) "*" #n CRLF + +struct __BS : public _BufferedStream<> { +public: + using _BufferedStream::_strint; +}; TEST(redis, serialization) { FakeStream s; _BufferedStream<> bs(&s); - bs << simple_string("asldkfjasfkd") - << integer(234) - << bulk_string("this-is-a-bulk_string") - << make_array(simple_string("asdf"), integer(75), bulk_string("jkl")) - << integer(-1234234); + bs << "asldkfjasfkd" + << __BS::_strint{234} + << "this-is-another-string" + << __BS::_strint{-1234234} + ; bs.flush(); puts(s.output.c_str()); + const static char RESP[] = BSTR(12,asldkfjasfkd) BSTR(03,234) + BSTR(22,this-is-another-string) BSTR(08,-1234234); EXPECT_EQ(s.output, RESP); } +void print_resp(const std::string_view s) { + for (char c: s) { + if (c == '\r') printf("\\r\\n"); + else if (c == '\n') { } + else putchar(c); + } + puts(""); +} + TEST(redis, deserialization) { FakeStream s; - s.in = RESP; + s.in = SSTR(asldkfjasfkd) INTEGER(234) BSTR(21,this-is-a-bulk_string) + ARRAY_HEADER(3) SSTR(asdf) INTEGER(75) BSTR(3,jkl) INTEGER(-1234234); + print_resp(s.in); _BufferedStream<> bs(&s); - auto a = bs.parse_item(); + auto a = bs.parse_response_item(); EXPECT_EQ(a.mark, simple_string::mark()); EXPECT_EQ(a.get(), "asldkfjasfkd"); - auto b = bs.parse_item(); + auto b = bs.parse_response_item(); EXPECT_EQ(b.mark, integer::mark()); EXPECT_EQ(b.get().val, 234); - auto c = bs.parse_item(); + auto c = bs.parse_response_item(); EXPECT_EQ(c.mark, bulk_string::mark()); EXPECT_EQ(c.get(), "this-is-a-bulk_string"); - auto d = bs.parse_item(); - EXPECT_EQ(d.mark, redis::array<>::mark()); - EXPECT_EQ(d.get().val, 3); + auto d = bs.parse_response_item(); + EXPECT_EQ(d.mark, array_header::mark()); + EXPECT_EQ(d.get().val, 3); - auto e = bs.parse_item(); + auto e = bs.parse_response_item(); EXPECT_EQ(e.mark, simple_string::mark()); EXPECT_EQ(e.get(), "asdf"); - auto f = bs.parse_item(); + auto f = bs.parse_response_item(); EXPECT_EQ(f.mark, integer::mark()); EXPECT_EQ(f.get().val, 75); - auto g = bs.parse_item(); + auto g = bs.parse_response_item(); EXPECT_EQ(g.mark, bulk_string::mark()); EXPECT_EQ(g.get(), "jkl"); - auto h = bs.parse_item(); + auto h = bs.parse_response_item(); EXPECT_EQ(h.mark, integer::mark()); EXPECT_EQ(h.get().val, -1234234); } + +__attribute__((used)) +void asdfjkl(_BufferedStream<>& bs) { + bs.BLMOVE("src", "dest", "LEFT", "RIGHT", "234"); +} + +TEST(redis, cmd_serialization) { + FakeStream s; + _BufferedStream<> bs(&s); +#define TEST_CMD(cmd, truth) { \ + s.in = "-ERR unknown command 'asdf'\r\n"; \ + auto r = cmd; \ + EXPECT_EQ(s.output, truth); \ + print_resp(s.output); \ + EXPECT_TRUE(r.is_type()); \ + EXPECT_EQ(r.get(), "ERR unknown command 'asdf'"); \ + s.output.clear(); \ +} +#define AKey BSTR(4, akey) +#define Key1 BSTR(4, key1) +#define Key2 BSTR(4, key2) +#define Key3 BSTR(4, key3) +#define Key4 BSTR(4, key4) +#define N(n) "*"#n"\r\n" +#define F1 BSTR(2, f1) +#define F2 BSTR(2, f2) +#define F3 BSTR(2, f3) +#define F4 BSTR(2, f4) +#define F5 BSTR(2, f5) +#define V1 BSTR(2, v1) +#define V2 BSTR(2, v2) +#define V3 BSTR(2, v3) +#define V4 BSTR(2, v4) +#define V5 BSTR(2, v5) +#define nFIELDS(n) BSTR(6,FIELDS) BSTR(01,n) + + TEST_CMD(bs.execute("SDIFF", "asdf", "jkl", "hahaha"), + N(4) BSTR(5,SDIFF) BSTR(4,asdf) BSTR(3,jkl) BSTR(6,hahaha)); + + // DEFINE_COMMAND1 (SCARD, key); + TEST_CMD(bs.SCARD("akey"), N(2) BSTR(5,SCARD) AKey); + + // DEFINE_COMMAND3 (SMOVE, source, destination, member); + TEST_CMD(bs.SMOVE("source", "dest", "member"), + N(4) BSTR(5,SMOVE) BSTR(6,source) BSTR(4,dest) BSTR(6,member)); + + // DEFINE_COMMAND3s(HMSET, key, field, value); + TEST_CMD(bs.HMSET("akey", "f1", "v1", "f2", "v2", "f3", "v3"), + N(8) BSTR(5,HMSET) AKey F1 V1 F2 V2 F3 V3); + + // DEFINE_COMMAND4 (LINSERT, key, BEFORE_or_AFTER, pivot, element); + TEST_CMD(bs.LINSERT("akey", "BEFORE", "pivot", "element"), + N(5) BSTR(7,LINSERT) AKey BSTR(6,BEFORE) BSTR(5,pivot) BSTR(7,element)); + + // DEFINE_COMMAND2fs(HTTL, key); + TEST_CMD(bs.HTTL("akey", "f1", "f2", "f3", "f4"), + N(8) BSTR(4,HTTL) AKey nFIELDS(4) F1 F2 F3 F4); + + // DEFINE_COMMAND2sn(ZUNIONSTORE, destination, key); + TEST_CMD(bs.ZUNIONSTORE("destination", "key1", "key2", "key3"), + N(6) BSTR(11,ZUNIONSTORE) BSTR(11,destination) BSTR(01,3) Key1 Key2 Key3); + + // DEFINE_COMMAND2m(BITPOS, key, bit, opt_start_end_BYTE_BIT); + TEST_CMD(bs.BITPOS("akey", "bit", "start", "end", "BIT"), + N(6) BSTR(6,BITPOS) AKey BSTR(3,bit) BSTR(5,start) BSTR(3,end) BSTR(3,BIT)); +} + +TEST(redis, cmd) { + photon::init(INIT_EVENT_EPOLL | INIT_EVENT_KQUEUE, 0); + DEFER(photon::fini()); + auto client = new_tcp_socket_client(); + DEFER(delete client); + auto s = client->connect({IPAddr::V4Loopback(), 6379}); + DEFER(delete s); + _BufferedStream<> bs(s); + const char key[] = "zvxbhm"; + bs.DEL(key); + DEFER(bs.DEL(key)); + auto r = bs.HSET(key, "hahaha", "qwer", "key2", "value2"); + EXPECT_TRUE(r.is_type()); + EXPECT_EQ(r.get().val, 2); + r = bs.LLEN(key); + EXPECT_TRUE(r.is_failed()); + LOG_DEBUG(r.get_error_message()); +}