Skip to content

Commit

Permalink
string_socket_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
lihuiba committed Oct 16, 2024
1 parent 6b396c7 commit c843f17
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 90 deletions.
75 changes: 75 additions & 0 deletions common/memory-stream/memory-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,81 @@ class FaultStream : public IStream

};

using namespace photon::net;
class StringSocketStreamImpl : public StringSocketStream {
public:
virtual ssize_t recv(void *buf, size_t count, int flags = 0) override {
size_t n = rand() % 8 + 1;
if (n > _inv.size()) n = _inv.size();
if (n > count) n = count;
memcpy(buf, _inv.data(), n);
_inv = _inv.substr(n);
return n;
}
virtual ssize_t recv(const struct iovec *iov, int iovcnt, int flags = 0) override {
while(!iov->iov_base || !iov->iov_len)
if (iovcnt) { ++iov, --iovcnt; }
else return -1;
return recv(iov->iov_base, iov->iov_len);
}
virtual ssize_t send(const void *buf, size_t count, int flags = 0) override {
size_t n = rand() % 8 + 1;
if (n > count) n = count;
auto p = (const char*)buf;
_out.append(p, p + n);
return n;
}
virtual ssize_t send(const struct iovec *iov, int iovcnt, int flags = 0) override {
while(!iov->iov_base || !iov->iov_len)
if (iovcnt) { ++iov, --iovcnt; }
else return -1;
return send(iov->iov_base, iov->iov_len);
}
virtual ssize_t sendfile(int in_fd, off_t offset, size_t count) override { return 0; }
virtual int close() override { return 0; }
virtual ssize_t read(void *buf, size_t count) override {
if (count > _inv.size()) count = _inv.size();
memcpy(buf, _inv.data(), count);
return count;
}
virtual ssize_t readv(const struct iovec *iov, int iovcnt) override {
ssize_t s = 0;
for (int i = 0; i < iovcnt; ++i) {
ssize_t ret = read(iov[i].iov_base, iov[i].iov_len);
if (ret < 0) return ret;
s += ret;
if (ret < (ssize_t)iov[i].iov_len) break;
}
return s;
}
virtual ssize_t write(const void *buf, size_t count) override {
auto p = (const char*)buf;
_out.append(p, p + count);
return count;
}
virtual ssize_t writev(const struct iovec *iov, int iovcnt) override {
ssize_t s = 0;
for (int i = 0; i < iovcnt; ++i) {
ssize_t ret = write(iov[i].iov_base, iov[i].iov_len);
if (ret < 0) return ret;
s += ret;
if (ret < (ssize_t)iov[i].iov_len) break;
}
return s;
}
virtual Object* get_underlay_object(uint64_t recursion = 0) override { return 0; }
virtual int setsockopt(int level, int option_name, const void* option_value, socklen_t option_len) override { return 0; }
virtual int getsockopt(int level, int option_name, void* option_value, socklen_t* option_len) override { return 0; }
virtual int getsockname(EndPoint& addr) override { return 0; }
virtual int getpeername(EndPoint& addr) override { return 0; }
virtual int getsockname(char* path, size_t count) override { return 0; }
virtual int getpeername(char* path, size_t count) override { return 0; }
};

IStream* new_fault_stream(IStream* stream, int flag, bool ownership) {
return new FaultStream(stream, flag, ownership);
}

StringSocketStream* new_string_socket_stream() {
return new StringSocketStreamImpl;
}
26 changes: 26 additions & 0 deletions common/memory-stream/memory-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ limitations under the License.

#pragma once
#include <cinttypes>
#include <string>
#include <photon/common/stream.h>
#include <photon/net/socket.h>
#include <photon/common/string_view.h>


class DuplexMemoryStream;
Expand All @@ -33,6 +36,9 @@ extern "C" DuplexMemoryStream* new_duplex_memory_stream(uint32_t capacity);
// default is 11(3 in 10-based integer), both read and write operation may fail
extern "C" IStream* new_fault_stream(IStream* stream, int flag=3, bool ownership=false);

class StringSocketStream;
StringSocketStream* new_string_socket_stream();

class DuplexMemoryStream
{
public:
Expand All @@ -41,3 +47,23 @@ class DuplexMemoryStream
IStream* endpoint_b; // do NOT delete it!!!
virtual int close() = 0;
};

class StringSocketStream : public photon::net::ISocketStream {
protected:
std::string _in, _out;
std::string_view _inv;
public:
void set_input(std::string_view in) {
_inv = (_in = in);
}
void set_input(std::string_view in, bool copy) {
if (!copy) _inv = in;
else set_input(in);
}
std::string_view input() {
return _inv;
}
std::string& output() {
return _out;
}
};
112 changes: 22 additions & 90 deletions ecosystem/test/test_redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <photon/photon.h>
#include <photon/common/estring.h>
#include <photon/common/alog-stdstring.h>
#include <photon/common/memory-stream/memory-stream.h>
#include <photon/net/socket.h>
#include <gtest/gtest.h>
#include "../../test/ci-tools.h"
Expand All @@ -32,81 +33,6 @@ using namespace photon::net;
using namespace photon::redis;
using namespace std;

class FakeStream : public ISocketStream {
public:
estring_view in;
// estring_view in = "48293\r\n";
// estring_view out_truth = "*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n";
std::string output;
virtual ssize_t recv(void *buf, size_t count, int flags = 0) override {
size_t n = rand() % 8 + 1;
if (n > in.size()) n = in.size();
if (n > count) n = count;
memcpy(buf, in.data(), n);
in = in.substr(n);
return n;
}
virtual ssize_t recv(const struct iovec *iov, int iovcnt, int flags = 0) override {
while(!iov->iov_base || !iov->iov_len)
if (iovcnt) { ++iov, --iovcnt; }
else return -1;
return recv(iov->iov_base, iov->iov_len);
}
virtual ssize_t send(const void *buf, size_t count, int flags = 0) override {
size_t n = rand() % 8 + 1;
if (n > count) n = count;
auto p = (const char*)buf;
output.append(p, p + n);
return n;
}
virtual ssize_t send(const struct iovec *iov, int iovcnt, int flags = 0) override {
while(!iov->iov_base || !iov->iov_len)
if (iovcnt) { ++iov, --iovcnt; }
else return -1;
return send(iov->iov_base, iov->iov_len);
}
virtual ssize_t sendfile(int in_fd, off_t offset, size_t count) override { return 0; }
virtual int close() override { return 0; }
virtual ssize_t read(void *buf, size_t count) override {
if (count > in.size()) count = in.size();
memcpy(buf, in.data(), count);
return count;
}
virtual ssize_t readv(const struct iovec *iov, int iovcnt) override {
ssize_t s = 0;
for (int i = 0; i < iovcnt; ++i) {
ssize_t ret = read(iov[i].iov_base, iov[i].iov_len);
if (ret < 0) return ret;
s += ret;
if (ret < (ssize_t)iov[i].iov_len) break;
}
return s;
}
virtual ssize_t write(const void *buf, size_t count) override {
auto p = (const char*)buf;
output.append(p, p + count);
return count;
}
virtual ssize_t writev(const struct iovec *iov, int iovcnt) override {
ssize_t s = 0;
for (int i = 0; i < iovcnt; ++i) {
ssize_t ret = write(iov[i].iov_base, iov[i].iov_len);
if (ret < 0) return ret;
s += ret;
if (ret < (ssize_t)iov[i].iov_len) break;
}
return s;
}
virtual Object* get_underlay_object(uint64_t recursion = 0) override { return 0; }
virtual int setsockopt(int level, int option_name, const void* option_value, socklen_t option_len) override { return 0; }
virtual int getsockopt(int level, int option_name, void* option_value, socklen_t* option_len) override { return 0; }
virtual int getsockname(EndPoint& addr) override { return 0; }
virtual int getpeername(EndPoint& addr) override { return 0; }
virtual int getsockname(char* path, size_t count) override { return 0; }
virtual int getpeername(char* path, size_t count) override { return 0; }
};


#define SSTR(s) "+" #s CRLF
#define BSTR(n, s) "$" #n CRLF #s CRLF
#define INTEGER(n) ":" #n CRLF
Expand All @@ -118,19 +44,20 @@ struct __BS : public _BufferedStream<> {
};

TEST(redis, serialization) {
FakeStream s;
_BufferedStream<> bs(&s);
auto s = new_string_socket_stream();
DEFER(delete s);
_BufferedStream<> bs(s);
bs << "asldkfjasfkd"
<< __BS::_strint{234}
<< "this-is-another-string"
<< __BS::_strint{-1234234}
;

bs.flush();
puts(s.output.c_str());
puts(s->output().c_str());
const static char RESP[] = BSTR(12,asldkfjasfkd) BSTR(03,234)
BSTR(22,this-is-another-string) BSTR(08,-1234234);
EXPECT_EQ(s.output, RESP);
EXPECT_EQ(s->output(), RESP);
}

void print_resp(const std::string_view s) {
Expand All @@ -143,11 +70,13 @@ void print_resp(const std::string_view s) {
}

TEST(redis, deserialization) {
FakeStream s;
s.in = SSTR(asldkfjasfkd) INTEGER(234) BSTR(21,this-is-a-bulk_string)
auto s = new_string_socket_stream();
DEFER(delete s);
auto RESP = SSTR(asldkfjasfkd) INTEGER(234) BSTR(21,this-is-a-bulk_string)
ARRAY_HEADER(3) SSTR(asdf) INTEGER(75) BSTR(3,jkl) INTEGER(-1234234);
print_resp(s.in);
_BufferedStream<> bs(&s);
s->set_input(RESP, false);
print_resp(s->input());
_BufferedStream<> bs(s);
auto a = bs.parse_response_item();
EXPECT_EQ(a.mark, simple_string::mark());
EXPECT_EQ(a.get<simple_string>(), "asldkfjasfkd");
Expand Down Expand Up @@ -186,17 +115,20 @@ void asdfjkl(_BufferedStream<>& bs) {
bs.BLMOVE("src", "dest", "LEFT", "RIGHT", "234");
}


TEST(redis, cmd_serialization) {
FakeStream s;
_BufferedStream<> bs(&s);
auto s = new_string_socket_stream();
DEFER(delete s);
_BufferedStream<> bs(s);
#define ERRMSG "ERR unknown command 'asdf'"
#define TEST_CMD(cmd, truth) { \
s.in = "-ERR unknown command 'asdf'\r\n"; \
s->set_input("-" ERRMSG CRLF, false); \
auto r = cmd; \
EXPECT_EQ(s.output, truth); \
print_resp(s.output); \
EXPECT_EQ(s->output(), truth); \
print_resp(s->output()); \
EXPECT_TRUE(r.is_type<error_message>()); \
EXPECT_EQ(r.get<error_message>(), "ERR unknown command 'asdf'"); \
s.output.clear(); \
EXPECT_EQ(r.get<error_message>(), ERRMSG); \
s->output().clear(); \
}
#define AKey BSTR(4, akey)
#define Key1 BSTR(4, key1)
Expand Down

0 comments on commit c843f17

Please sign in to comment.