diff --git a/common/memory-stream/memory-stream.cpp b/common/memory-stream/memory-stream.cpp index aee8d395..aa92caf3 100644 --- a/common/memory-stream/memory-stream.cpp +++ b/common/memory-stream/memory-stream.cpp @@ -54,6 +54,11 @@ class SimplexMemoryStream final : public IStream if (closed) return -1; return m_ringbuf.writev(iov, iovcnt); } + virtual uint64_t timeout() const override + { + return -1; + } + virtual void timeout(uint64_t) override {} }; class DuplexMemoryStreamImpl : public DuplexMemoryStream @@ -91,6 +96,11 @@ class DuplexMemoryStreamImpl : public DuplexMemoryStream if (closed) return -1; return s2->writev(iov, iovcnt); } + virtual uint64_t timeout() const override + { + return -1; + } + virtual void timeout(uint64_t) override {} }; EndPoint epa, epb; @@ -196,6 +206,11 @@ class FaultStream : public IStream return m_stream->writev(iov, iovcnt); } + virtual uint64_t timeout() const override + { + return -1; + } + virtual void timeout(uint64_t) override {} }; IStream* new_fault_stream(IStream* stream, int flag, bool ownership) { diff --git a/common/stream.h b/common/stream.h index cc21cb87..156859a3 100644 --- a/common/stream.h +++ b/common/stream.h @@ -46,6 +46,10 @@ class IStream : public Object return writev(iov, iovcnt); } + // get/set default timeout, in us, (default +∞) + virtual uint64_t timeout() const = 0; + virtual void timeout(uint64_t tm) = 0; + struct ReadAll { struct FreeDeleter { void operator()(void* ptr) { diff --git a/examples/rpc/client.h b/examples/rpc/client.h index 2b3791ac..1a5acb9d 100644 --- a/examples/rpc/client.h +++ b/examples/rpc/client.h @@ -29,8 +29,7 @@ struct ExampleClient { // TCP connection will failed in 1 second(1UL*1000*1000) if not accepted // and connection send/recv will take 5 socneds(5UL*1000*1000) as timedout ExampleClient() - : pool(photon::rpc::new_stub_pool(10UL * 1000 * 1000, 1UL * 1000 * 1000, - 5UL * 1000 * 1000)) {} + : pool(photon::rpc::new_stub_pool(10UL * 1000 * 1000, 1UL * 1000 * 1000)) {} int64_t RPCHeartbeat(photon::net::EndPoint ep); diff --git a/fs/filesystem.h b/fs/filesystem.h index 71c6ea81..115b65aa 100644 --- a/fs/filesystem.h +++ b/fs/filesystem.h @@ -98,6 +98,10 @@ namespace fs UNIMPLEMENTED(int sync_file_range(off_t offset, off_t nbytes, unsigned int flags)); UNIMPLEMENTED(int fallocate(int mode, off_t offset, off_t len)); UNIMPLEMENTED(int fiemap(struct fiemap* map)); // query the extent map for + UNIMPLEMENTED(uint64_t timeout() const override); + void timeout(uint64_t tm) override { + // unimplemented by default + } // append write to the file // if `offset` is specified, it must be equal to current size of the diff --git a/net/datagram_socket.h b/net/datagram_socket.h index 344b44a6..4ed021bb 100644 --- a/net/datagram_socket.h +++ b/net/datagram_socket.h @@ -52,6 +52,9 @@ class IDatagramSocket : public IMessage, } using IMessage::recv; using IMessage::send; + + virtual uint64_t timeout() const = 0; + virtual void timeout(uint64_t) = 0; }; class UDPSocket : public IDatagramSocket { diff --git a/net/http/body.cpp b/net/http/body.cpp index 60562ae8..f67dd693 100644 --- a/net/http/body.cpp +++ b/net/http/body.cpp @@ -103,6 +103,13 @@ class BodyReadStream : public ROStream { return ret; } + virtual uint64_t timeout() const override { + return m_stream ? m_stream->timeout() : -1UL; + } + virtual void timeout(uint64_t timeout) override { + if (m_stream) m_stream->timeout(timeout); + } + protected: net::ISocketStream *m_stream; char* m_partial_body_buf; @@ -255,6 +262,13 @@ class BodyWriteStream: public WOStream { return wc; } + virtual uint64_t timeout() const override { + return m_stream ? m_stream->timeout() : -1UL; + } + virtual void timeout(uint64_t timeout) override { + if (m_stream) m_stream->timeout(timeout); + } + protected: net::ISocketStream *m_stream; size_t m_size = 0; @@ -297,6 +311,13 @@ class ChunkedBodyWriteStream: public WOStream { return count; } + virtual uint64_t timeout() const override { + return m_stream ? m_stream->timeout() : -1UL; + } + virtual void timeout(uint64_t timeout) override { + if (m_stream) m_stream->timeout(timeout); + } + protected: net::ISocketStream *m_stream; bool m_finish = false; diff --git a/net/http/message.h b/net/http/message.h index 9b85b9b3..71e2408a 100644 --- a/net/http/message.h +++ b/net/http/message.h @@ -98,6 +98,9 @@ class Message : public IStream { ssize_t write_stream(IStream *stream, size_t size_limit = -1); int close() override { return 0; } + uint64_t timeout() const override { return -1UL; } + void timeout(uint64_t timeout) override {} + // size of body size_t body_size() const; // size of origin resource diff --git a/net/socket.h b/net/socket.h index 1751f358..8adb4cb3 100644 --- a/net/socket.h +++ b/net/socket.h @@ -204,10 +204,6 @@ namespace net { if (ret >= 0) *value = v; return ret; } - - // get/set default timeout, in us, (default +∞) - virtual uint64_t timeout() const = 0; - virtual void timeout(uint64_t tm) = 0; }; class ISocketName { @@ -254,6 +250,9 @@ namespace net { virtual ISocketStream* connect(const EndPoint& remote, const EndPoint* local = nullptr) = 0; // Connect to a Unix Domain Socket. virtual ISocketStream* connect(const char* path, size_t count = 0) = 0; + + virtual uint64_t timeout() const = 0; + virtual void timeout(uint64_t) = 0; }; class ISocketServer : public ISocketBase, public ISocketName, public Object { @@ -276,6 +275,9 @@ namespace net { virtual int start_loop(bool block = false) = 0; // Close the listening fd. It's the user's responsibility to close the active connections. virtual void terminate() = 0; + + virtual uint64_t timeout() const = 0; + virtual void timeout(uint64_t) = 0; }; extern "C" ISocketClient* new_tcp_socket_client(); diff --git a/net/test/zerocopy-client.cpp b/net/test/zerocopy-client.cpp index 0a28d7a9..03419c10 100644 --- a/net/test/zerocopy-client.cpp +++ b/net/test/zerocopy-client.cpp @@ -154,7 +154,7 @@ int main(int argc, char** argv) { DEFER(delete[] g_read_buffers); prepare_read_buffers(); - auto pool = rpc::new_stub_pool(60 * 1000 * 1000, 10 * 1000 * 1000, -1); + auto pool = rpc::new_stub_pool(60 * 1000 * 1000, 10 * 1000 * 1000); DEFER(delete pool); photon::thread_create11(show_performance_statis); diff --git a/rpc/rpc.cpp b/rpc/rpc.cpp index 85d1ba45..5bbdc594 100644 --- a/rpc/rpc.cpp +++ b/rpc/rpc.cpp @@ -80,6 +80,8 @@ namespace rpc { auto iov = args->request; auto ret = iov->push_front({&header, sizeof(header)}); if (ret != sizeof(header)) return -1; + m_stream->timeout(args->timeout.timeout()); + DEFER(m_stream->timeout(-1)); ret = args->RET = m_stream->writev(iov->iovec(), iov->iovcnt()); if (ret != header.size + sizeof(header)) { ERRNO err; @@ -96,6 +98,8 @@ namespace rpc { // m_stream->shutdown(ShutdownHow::ReadWrite); LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timeout before read header "); } + m_stream->timeout(args->timeout.timeout()); + DEFER(m_stream->timeout(-1)); auto ret = args->RET = m_stream->read(&m_header, sizeof(m_header)); args->tag = m_header.tag; if (ret != sizeof(m_header)) { @@ -119,6 +123,8 @@ namespace rpc { if (iov->iovcnt() == 0) { iov->malloc(m_header.size); } + m_stream->timeout(args->timeout.timeout()); + DEFER(m_stream->timeout(-1)); auto ret = m_stream->readv((const iovec*)iov->iovec(), iov->iovcnt()); // return 0 means it has been disconnected // should take as fault @@ -419,12 +425,11 @@ namespace rpc { class StubPoolImpl : public StubPool { public: - explicit StubPoolImpl(uint64_t expiration, uint64_t connect_timeout, uint64_t transfer_timeout) { + explicit StubPoolImpl(uint64_t expiration, uint64_t connect_timeout) { tls_ctx = net::new_tls_context(nullptr, nullptr, nullptr); tcpclient = net::new_tcp_socket_client(); tcpclient->timeout(connect_timeout); m_pool = new ObjectCache(expiration); - m_transfer_timeout = transfer_timeout; } ~StubPoolImpl() { @@ -454,7 +459,7 @@ namespace rpc { } uint64_t get_timeout() const override { - return m_transfer_timeout; + return tcpclient->timeout(); } protected: @@ -463,7 +468,7 @@ namespace rpc { if (!sock) LOG_ERRNO_RETURN(0, nullptr, "failed to connect to ", ep); LOG_DEBUG("connected to ", ep); - sock->timeout(m_transfer_timeout); + sock->timeout(-1UL); if (tls) { sock = net::new_tls_stream(tls_ctx, sock, net::SecurityRole::Client, true); } @@ -473,7 +478,6 @@ namespace rpc { ObjectCache* m_pool; net::ISocketClient *tcpclient; net::TLSContext* tls_ctx = nullptr; - uint64_t m_transfer_timeout; }; // dummy pool, for unix domain socket connection to only one point only @@ -481,8 +485,8 @@ namespace rpc { class UDSStubPoolImpl : public StubPoolImpl { public: explicit UDSStubPoolImpl(const char* path, uint64_t expiration, - uint64_t connect_timeout, uint64_t transfer_timeout) - : StubPoolImpl(expiration, connect_timeout, transfer_timeout), + uint64_t connect_timeout) + : StubPoolImpl(expiration, connect_timeout), m_path(path), m_client(net::new_uds_client()) { m_client->timeout(connect_timeout); } @@ -498,7 +502,8 @@ namespace rpc { LOG_ERRNO_RETURN(0, nullptr, "Connect to unix domain socket failed"); } - sock->timeout(m_transfer_timeout); + // stub socket always set timeout for single action + sock->timeout(-1UL); return new_rpc_stub(sock, true); }); } @@ -508,16 +513,13 @@ namespace rpc { net::ISocketClient * m_client; }; - StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout, - uint64_t transfer_timeout) { - return new StubPoolImpl(expiration, connect_timeout, transfer_timeout); + StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout) { + return new StubPoolImpl(expiration, connect_timeout); } StubPool* new_uds_stub_pool(const char* path, uint64_t expiration, - uint64_t connect_timeout, - uint64_t transfer_timeout) { - return new UDSStubPoolImpl(path, expiration, connect_timeout, - transfer_timeout); + uint64_t connect_timeout) { + return new UDSStubPoolImpl(path, expiration, connect_timeout); } } // namespace rpc } diff --git a/rpc/rpc.h b/rpc/rpc.h index e4419a95..f863368a 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -251,19 +251,14 @@ namespace rpc About timeout: 1. When a socket/stub not used by any caller for `expiration` microsecs, it will be dropped. 2. When socket connecting, it will fail by be timed out after `connect_timeout` microsecs. - 3. When socket needs to read/write (like sending request and waiting for any response), but got - nothing in `transfer_timeout` microsecs, it will be timed out. It's just a timeout for a single - read/write operation, not a RPC timeout, which is controlled by the caller when invoking `Stub::call`. 4. `Stub::call` measures the time from invoking `call` before sending request to received response head. Receiving response body is not considered. **/ extern "C" StubPool* new_stub_pool(uint64_t expiration, - uint64_t connect_timeout, - uint64_t transfer_timeout); + uint64_t connect_timeout); extern "C" StubPool* new_uds_stub_pool(const char* path, uint64_t expiration, - uint64_t connect_timeout, - uint64_t transfer_timeout); + uint64_t connect_timeout); extern "C" Skeleton* new_skeleton(uint32_t pool_size = 128); __attribute__((deprecated)) diff --git a/rpc/test/test-rpc-message.cpp b/rpc/test/test-rpc-message.cpp index 1a632d04..4285602f 100644 --- a/rpc/test/test-rpc-message.cpp +++ b/rpc/test/test-rpc-message.cpp @@ -174,7 +174,7 @@ TEST(rpc, message) { TestRPCServer server; ASSERT_EQ(0, server.run()); - auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + auto pool = photon::rpc::new_stub_pool(-1, -1); DEFER(delete pool); photon::net::EndPoint ep; diff --git a/rpc/test/test.cpp b/rpc/test/test.cpp index ef92ba1c..6e237408 100644 --- a/rpc/test/test.cpp +++ b/rpc/test/test.cpp @@ -376,7 +376,7 @@ TEST_F(RpcTest, shutdown) { RpcServer rpc_server(sk, socket_server); GTEST_ASSERT_EQ(0, rpc_server.run()); - auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + auto pool = photon::rpc::new_stub_pool(-1, -1); DEFER(delete pool); auto& ep = rpc_server.m_endpoint; @@ -419,7 +419,7 @@ TEST_F(RpcTest, passive_shutdown) { auto& ep = rpc_server.m_endpoint; photon::thread_create11([&]{ // Should always succeed in 3 seconds - auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + auto pool = photon::rpc::new_stub_pool(-1, -1); DEFER(delete pool); auto stub = pool->get_stub(ep, false); if (!stub) abort(); @@ -437,7 +437,7 @@ TEST_F(RpcTest, passive_shutdown) { photon::thread_create11([&]{ photon::thread_sleep(2); // Should get connection refused after 2 seconds. Because socket closed listen fd at 1 second. - auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + auto pool = photon::rpc::new_stub_pool(-1, -1); DEFER(delete pool); auto stub = pool->get_stub(ep, false); if (stub) { @@ -549,7 +549,7 @@ TEST_F(RpcTest, timeout_with_hb) { GTEST_ASSERT_EQ(0, rpc_server.run()); auto& ep = rpc_server.m_endpoint; - auto pool = photon::rpc::new_stub_pool(-1, -1, 1'000'000); + auto pool = photon::rpc::new_stub_pool(-1, -1); DEFER(delete pool); auto th1 = photon::thread_enable_join(photon::thread_create11([&]{ // Should always succeed in 5 seconds