Skip to content

Commit

Permalink
Add timeout to IStream interface so RPC able to control per-call timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Coldwings committed Sep 9, 2024
1 parent 67fb4c5 commit e434c6d
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 34 deletions.
15 changes: 15 additions & 0 deletions common/memory-stream/memory-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class SimplexMemoryStream final : public IStream
if (closed) return -1;
return m_ringbuf.writev(iov, iovcnt);
}
virtual uint64_t timeout() const override
{
return -1;
}
virtual void timeout(uint64_t) override {}
};

class DuplexMemoryStreamImpl : public DuplexMemoryStream
Expand Down Expand Up @@ -91,6 +96,11 @@ class DuplexMemoryStreamImpl : public DuplexMemoryStream
if (closed) return -1;
return s2->writev(iov, iovcnt);
}
virtual uint64_t timeout() const override
{
return -1;
}
virtual void timeout(uint64_t) override {}
};

EndPoint epa, epb;
Expand Down Expand Up @@ -196,6 +206,11 @@ class FaultStream : public IStream
return m_stream->writev(iov, iovcnt);
}

virtual uint64_t timeout() const override
{
return -1;
}
virtual void timeout(uint64_t) override {}
};

IStream* new_fault_stream(IStream* stream, int flag, bool ownership) {
Expand Down
4 changes: 4 additions & 0 deletions common/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class IStream : public Object
return writev(iov, iovcnt);
}

// get/set default timeout, in us, (default +∞)
virtual uint64_t timeout() const = 0;
virtual void timeout(uint64_t tm) = 0;

struct ReadAll {
struct FreeDeleter {
void operator()(void* ptr) {
Expand Down
3 changes: 1 addition & 2 deletions examples/rpc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ struct ExampleClient {
// TCP connection will failed in 1 second(1UL*1000*1000) if not accepted
// and connection send/recv will take 5 socneds(5UL*1000*1000) as timedout
ExampleClient()
: pool(photon::rpc::new_stub_pool(10UL * 1000 * 1000, 1UL * 1000 * 1000,
5UL * 1000 * 1000)) {}
: pool(photon::rpc::new_stub_pool(10UL * 1000 * 1000, 1UL * 1000 * 1000)) {}

int64_t RPCHeartbeat(photon::net::EndPoint ep);

Expand Down
4 changes: 4 additions & 0 deletions fs/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ namespace fs
UNIMPLEMENTED(int sync_file_range(off_t offset, off_t nbytes, unsigned int flags));
UNIMPLEMENTED(int fallocate(int mode, off_t offset, off_t len));
UNIMPLEMENTED(int fiemap(struct fiemap* map)); // query the extent map for
UNIMPLEMENTED(uint64_t timeout() const override);
void timeout(uint64_t tm) override {
// unimplemented by default
}

// append write to the file
// if `offset` is specified, it must be equal to current size of the
Expand Down
3 changes: 3 additions & 0 deletions net/datagram_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class IDatagramSocket : public IMessage,
}
using IMessage::recv;
using IMessage::send;

virtual uint64_t timeout() const = 0;
virtual void timeout(uint64_t) = 0;
};

class UDPSocket : public IDatagramSocket {
Expand Down
21 changes: 21 additions & 0 deletions net/http/body.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ class BodyReadStream : public ROStream {
return ret;
}

virtual uint64_t timeout() const override {
return m_stream ? m_stream->timeout() : -1UL;
}
virtual void timeout(uint64_t timeout) override {
if (m_stream) m_stream->timeout(timeout);
}

protected:
net::ISocketStream *m_stream;
char* m_partial_body_buf;
Expand Down Expand Up @@ -255,6 +262,13 @@ class BodyWriteStream: public WOStream {
return wc;
}

virtual uint64_t timeout() const override {
return m_stream ? m_stream->timeout() : -1UL;
}
virtual void timeout(uint64_t timeout) override {
if (m_stream) m_stream->timeout(timeout);
}

protected:
net::ISocketStream *m_stream;
size_t m_size = 0;
Expand Down Expand Up @@ -297,6 +311,13 @@ class ChunkedBodyWriteStream: public WOStream {
return count;
}

virtual uint64_t timeout() const override {
return m_stream ? m_stream->timeout() : -1UL;
}
virtual void timeout(uint64_t timeout) override {
if (m_stream) m_stream->timeout(timeout);
}

protected:
net::ISocketStream *m_stream;
bool m_finish = false;
Expand Down
3 changes: 3 additions & 0 deletions net/http/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class Message : public IStream {
ssize_t write_stream(IStream *stream, size_t size_limit = -1);
int close() override { return 0; }

uint64_t timeout() const override { return -1UL; }
void timeout(uint64_t timeout) override {}

// size of body
size_t body_size() const;
// size of origin resource
Expand Down
10 changes: 6 additions & 4 deletions net/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,6 @@ namespace net {
if (ret >= 0) *value = v;
return ret;
}

// get/set default timeout, in us, (default +∞)
virtual uint64_t timeout() const = 0;
virtual void timeout(uint64_t tm) = 0;
};

class ISocketName {
Expand Down Expand Up @@ -254,6 +250,9 @@ namespace net {
virtual ISocketStream* connect(const EndPoint& remote, const EndPoint* local = nullptr) = 0;
// Connect to a Unix Domain Socket.
virtual ISocketStream* connect(const char* path, size_t count = 0) = 0;

virtual uint64_t timeout() const = 0;
virtual void timeout(uint64_t) = 0;
};

class ISocketServer : public ISocketBase, public ISocketName, public Object {
Expand All @@ -276,6 +275,9 @@ namespace net {
virtual int start_loop(bool block = false) = 0;
// Close the listening fd. It's the user's responsibility to close the active connections.
virtual void terminate() = 0;

virtual uint64_t timeout() const = 0;
virtual void timeout(uint64_t) = 0;
};

extern "C" ISocketClient* new_tcp_socket_client();
Expand Down
2 changes: 1 addition & 1 deletion net/test/zerocopy-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ int main(int argc, char** argv) {
DEFER(delete[] g_read_buffers);
prepare_read_buffers();

auto pool = rpc::new_stub_pool(60 * 1000 * 1000, 10 * 1000 * 1000, -1);
auto pool = rpc::new_stub_pool(60 * 1000 * 1000, 10 * 1000 * 1000);
DEFER(delete pool);

photon::thread_create11(show_performance_statis);
Expand Down
32 changes: 17 additions & 15 deletions rpc/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ namespace rpc {
auto iov = args->request;
auto ret = iov->push_front({&header, sizeof(header)});
if (ret != sizeof(header)) return -1;
m_stream->timeout(args->timeout.timeout());
DEFER(m_stream->timeout(-1));
ret = args->RET = m_stream->writev(iov->iovec(), iov->iovcnt());
if (ret != header.size + sizeof(header)) {
ERRNO err;
Expand All @@ -96,6 +98,8 @@ namespace rpc {
// m_stream->shutdown(ShutdownHow::ReadWrite);
LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timeout before read header ");
}
m_stream->timeout(args->timeout.timeout());
DEFER(m_stream->timeout(-1));
auto ret = args->RET = m_stream->read(&m_header, sizeof(m_header));
args->tag = m_header.tag;
if (ret != sizeof(m_header)) {
Expand All @@ -119,6 +123,8 @@ namespace rpc {
if (iov->iovcnt() == 0) {
iov->malloc(m_header.size);
}
m_stream->timeout(args->timeout.timeout());
DEFER(m_stream->timeout(-1));
auto ret = m_stream->readv((const iovec*)iov->iovec(), iov->iovcnt());
// return 0 means it has been disconnected
// should take as fault
Expand Down Expand Up @@ -419,12 +425,11 @@ namespace rpc {

class StubPoolImpl : public StubPool {
public:
explicit StubPoolImpl(uint64_t expiration, uint64_t connect_timeout, uint64_t transfer_timeout) {
explicit StubPoolImpl(uint64_t expiration, uint64_t connect_timeout) {
tls_ctx = net::new_tls_context(nullptr, nullptr, nullptr);
tcpclient = net::new_tcp_socket_client();
tcpclient->timeout(connect_timeout);
m_pool = new ObjectCache<net::EndPoint, rpc::Stub*>(expiration);
m_transfer_timeout = transfer_timeout;
}

~StubPoolImpl() {
Expand Down Expand Up @@ -454,7 +459,7 @@ namespace rpc {
}

uint64_t get_timeout() const override {
return m_transfer_timeout;
return tcpclient->timeout();
}

protected:
Expand All @@ -463,7 +468,7 @@ namespace rpc {
if (!sock)
LOG_ERRNO_RETURN(0, nullptr, "failed to connect to ", ep);
LOG_DEBUG("connected to ", ep);
sock->timeout(m_transfer_timeout);
sock->timeout(-1UL);
if (tls) {
sock = net::new_tls_stream(tls_ctx, sock, net::SecurityRole::Client, true);
}
Expand All @@ -473,16 +478,15 @@ namespace rpc {
ObjectCache<net::EndPoint, rpc::Stub*>* m_pool;
net::ISocketClient *tcpclient;
net::TLSContext* tls_ctx = nullptr;
uint64_t m_transfer_timeout;
};

// dummy pool, for unix domain socket connection to only one point only
// so no-mather what connection in, gets domain socket
class UDSStubPoolImpl : public StubPoolImpl {
public:
explicit UDSStubPoolImpl(const char* path, uint64_t expiration,
uint64_t connect_timeout, uint64_t transfer_timeout)
: StubPoolImpl(expiration, connect_timeout, transfer_timeout),
uint64_t connect_timeout)
: StubPoolImpl(expiration, connect_timeout),
m_path(path), m_client(net::new_uds_client()) {
m_client->timeout(connect_timeout);
}
Expand All @@ -498,7 +502,8 @@ namespace rpc {
LOG_ERRNO_RETURN(0, nullptr,
"Connect to unix domain socket failed");
}
sock->timeout(m_transfer_timeout);
// stub socket always set timeout for single action
sock->timeout(-1UL);
return new_rpc_stub(sock, true);
});
}
Expand All @@ -508,16 +513,13 @@ namespace rpc {
net::ISocketClient * m_client;
};

StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout,
uint64_t transfer_timeout) {
return new StubPoolImpl(expiration, connect_timeout, transfer_timeout);
StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout) {
return new StubPoolImpl(expiration, connect_timeout);
}

StubPool* new_uds_stub_pool(const char* path, uint64_t expiration,
uint64_t connect_timeout,
uint64_t transfer_timeout) {
return new UDSStubPoolImpl(path, expiration, connect_timeout,
transfer_timeout);
uint64_t connect_timeout) {
return new UDSStubPoolImpl(path, expiration, connect_timeout);
}
} // namespace rpc
}
9 changes: 2 additions & 7 deletions rpc/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,14 @@ namespace rpc
About timeout:
1. When a socket/stub not used by any caller for `expiration` microsecs, it will be dropped.
2. When socket connecting, it will fail by be timed out after `connect_timeout` microsecs.
3. When socket needs to read/write (like sending request and waiting for any response), but got
nothing in `transfer_timeout` microsecs, it will be timed out. It's just a timeout for a single
read/write operation, not a RPC timeout, which is controlled by the caller when invoking `Stub::call`.
4. `Stub::call` measures the time from invoking `call` before sending request to received
response head. Receiving response body is not considered.
**/
extern "C" StubPool* new_stub_pool(uint64_t expiration,
uint64_t connect_timeout,
uint64_t transfer_timeout);
uint64_t connect_timeout);
extern "C" StubPool* new_uds_stub_pool(const char* path,
uint64_t expiration,
uint64_t connect_timeout,
uint64_t transfer_timeout);
uint64_t connect_timeout);
extern "C" Skeleton* new_skeleton(uint32_t pool_size = 128);

__attribute__((deprecated))
Expand Down
2 changes: 1 addition & 1 deletion rpc/test/test-rpc-message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ TEST(rpc, message) {
TestRPCServer server;
ASSERT_EQ(0, server.run());

auto pool = photon::rpc::new_stub_pool(-1, -1, -1);
auto pool = photon::rpc::new_stub_pool(-1, -1);
DEFER(delete pool);

photon::net::EndPoint ep;
Expand Down
8 changes: 4 additions & 4 deletions rpc/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ TEST_F(RpcTest, shutdown) {
RpcServer rpc_server(sk, socket_server);
GTEST_ASSERT_EQ(0, rpc_server.run());

auto pool = photon::rpc::new_stub_pool(-1, -1, -1);
auto pool = photon::rpc::new_stub_pool(-1, -1);
DEFER(delete pool);

auto& ep = rpc_server.m_endpoint;
Expand Down Expand Up @@ -419,7 +419,7 @@ TEST_F(RpcTest, passive_shutdown) {
auto& ep = rpc_server.m_endpoint;
photon::thread_create11([&]{
// Should always succeed in 3 seconds
auto pool = photon::rpc::new_stub_pool(-1, -1, -1);
auto pool = photon::rpc::new_stub_pool(-1, -1);
DEFER(delete pool);
auto stub = pool->get_stub(ep, false);
if (!stub) abort();
Expand All @@ -437,7 +437,7 @@ TEST_F(RpcTest, passive_shutdown) {
photon::thread_create11([&]{
photon::thread_sleep(2);
// Should get connection refused after 2 seconds. Because socket closed listen fd at 1 second.
auto pool = photon::rpc::new_stub_pool(-1, -1, -1);
auto pool = photon::rpc::new_stub_pool(-1, -1);
DEFER(delete pool);
auto stub = pool->get_stub(ep, false);
if (stub) {
Expand Down Expand Up @@ -549,7 +549,7 @@ TEST_F(RpcTest, timeout_with_hb) {
GTEST_ASSERT_EQ(0, rpc_server.run());

auto& ep = rpc_server.m_endpoint;
auto pool = photon::rpc::new_stub_pool(-1, -1, 1'000'000);
auto pool = photon::rpc::new_stub_pool(-1, -1);
DEFER(delete pool);
auto th1 = photon::thread_enable_join(photon::thread_create11([&]{
// Should always succeed in 5 seconds
Expand Down

0 comments on commit e434c6d

Please sign in to comment.