From 44abd5c3c9169ccd021e3368e85a95ca5396e690 Mon Sep 17 00:00:00 2001 From: Coldwings Date: Fri, 13 Sep 2024 09:50:06 +0800 Subject: [PATCH] fix RPC stop by timeout during calling (#552) * fix RPC stop by timeout during calling * fix typo and make stub call timeout parameter using Timeout instead of uint64 * Refactoring OutOfOrderEngine, make it able to share between vCPUs * Remove unused code in test * Add timeout to IStream interface so RPC able to control per-call timeout * Add default timeout methods (Failure with ENOSYS) to IStream * Since already have a default implementation of timeout in IStream, remove unused override --- common/stream.h | 13 ++- examples/rpc/client.h | 3 +- net/datagram_socket.h | 3 + net/http/body.cpp | 14 +++ net/socket.h | 10 +- net/test/zerocopy-client.cpp | 2 +- rpc/out-of-order-execution.cpp | 204 ++++++++++++++++++++++----------- rpc/out-of-order-execution.h | 44 +++++-- rpc/rpc.cpp | 41 +++---- rpc/rpc.h | 26 +++-- rpc/test/test-rpc-message.cpp | 2 +- rpc/test/test.cpp | 129 ++++++++++++++++++--- 12 files changed, 360 insertions(+), 131 deletions(-) diff --git a/common/stream.h b/common/stream.h index cc21cb87..b04c1c08 100644 --- a/common/stream.h +++ b/common/stream.h @@ -15,10 +15,12 @@ limitations under the License. */ #pragma once +#include +#include #include #include + #include -#include struct iovec; @@ -46,6 +48,15 @@ class IStream : public Object return writev(iov, iovcnt); } + // get/set default timeout, in us, (default +∞) + virtual uint64_t timeout() const { + errno = ENOSYS; + return -1; + } + virtual void timeout(uint64_t tm) { + errno = ENOSYS; + } + struct ReadAll { struct FreeDeleter { void operator()(void* ptr) { diff --git a/examples/rpc/client.h b/examples/rpc/client.h index 2b3791ac..1a5acb9d 100644 --- a/examples/rpc/client.h +++ b/examples/rpc/client.h @@ -29,8 +29,7 @@ struct ExampleClient { // TCP connection will failed in 1 second(1UL*1000*1000) if not accepted // and connection send/recv will take 5 socneds(5UL*1000*1000) as timedout ExampleClient() - : pool(photon::rpc::new_stub_pool(10UL * 1000 * 1000, 1UL * 1000 * 1000, - 5UL * 1000 * 1000)) {} + : pool(photon::rpc::new_stub_pool(10UL * 1000 * 1000, 1UL * 1000 * 1000)) {} int64_t RPCHeartbeat(photon::net::EndPoint ep); diff --git a/net/datagram_socket.h b/net/datagram_socket.h index 344b44a6..4ed021bb 100644 --- a/net/datagram_socket.h +++ b/net/datagram_socket.h @@ -52,6 +52,9 @@ class IDatagramSocket : public IMessage, } using IMessage::recv; using IMessage::send; + + virtual uint64_t timeout() const = 0; + virtual void timeout(uint64_t) = 0; }; class UDPSocket : public IDatagramSocket { diff --git a/net/http/body.cpp b/net/http/body.cpp index 60562ae8..eb112184 100644 --- a/net/http/body.cpp +++ b/net/http/body.cpp @@ -255,6 +255,13 @@ class BodyWriteStream: public WOStream { return wc; } + virtual uint64_t timeout() const override { + return m_stream ? m_stream->timeout() : -1UL; + } + virtual void timeout(uint64_t timeout) override { + if (m_stream) m_stream->timeout(timeout); + } + protected: net::ISocketStream *m_stream; size_t m_size = 0; @@ -297,6 +304,13 @@ class ChunkedBodyWriteStream: public WOStream { return count; } + virtual uint64_t timeout() const override { + return m_stream ? m_stream->timeout() : -1UL; + } + virtual void timeout(uint64_t timeout) override { + if (m_stream) m_stream->timeout(timeout); + } + protected: net::ISocketStream *m_stream; bool m_finish = false; diff --git a/net/socket.h b/net/socket.h index 1751f358..8adb4cb3 100644 --- a/net/socket.h +++ b/net/socket.h @@ -204,10 +204,6 @@ namespace net { if (ret >= 0) *value = v; return ret; } - - // get/set default timeout, in us, (default +∞) - virtual uint64_t timeout() const = 0; - virtual void timeout(uint64_t tm) = 0; }; class ISocketName { @@ -254,6 +250,9 @@ namespace net { virtual ISocketStream* connect(const EndPoint& remote, const EndPoint* local = nullptr) = 0; // Connect to a Unix Domain Socket. virtual ISocketStream* connect(const char* path, size_t count = 0) = 0; + + virtual uint64_t timeout() const = 0; + virtual void timeout(uint64_t) = 0; }; class ISocketServer : public ISocketBase, public ISocketName, public Object { @@ -276,6 +275,9 @@ namespace net { virtual int start_loop(bool block = false) = 0; // Close the listening fd. It's the user's responsibility to close the active connections. virtual void terminate() = 0; + + virtual uint64_t timeout() const = 0; + virtual void timeout(uint64_t) = 0; }; extern "C" ISocketClient* new_tcp_socket_client(); diff --git a/net/test/zerocopy-client.cpp b/net/test/zerocopy-client.cpp index 0a28d7a9..03419c10 100644 --- a/net/test/zerocopy-client.cpp +++ b/net/test/zerocopy-client.cpp @@ -154,7 +154,7 @@ int main(int argc, char** argv) { DEFER(delete[] g_read_buffers); prepare_read_buffers(); - auto pool = rpc::new_stub_pool(60 * 1000 * 1000, 10 * 1000 * 1000, -1); + auto pool = rpc::new_stub_pool(60 * 1000 * 1000, 10 * 1000 * 1000); DEFER(delete pool); photon::thread_create11(show_performance_statis); diff --git a/rpc/out-of-order-execution.cpp b/rpc/out-of-order-execution.cpp index 04d653aa..4c7c9d90 100644 --- a/rpc/out-of-order-execution.cpp +++ b/rpc/out-of-order-execution.cpp @@ -28,12 +28,19 @@ namespace rpc { { public: unordered_map m_map; - condition_variable m_cond_collected; - mutex m_mutex_w, m_mutex_r; + condition_variable m_cond_collected, m_wait; + mutex m_mutex_w, m_mutex_r, m_mutex_map; uint64_t m_issuing = 0; uint64_t m_tag = 0; bool m_running = true; + // rlock used as both reader lock and wait notifier. + // add yield in lock will break the assuption that threads + // not holding lock should kept in sleep. + // so do not yield, just put into sleep when needed + // make sure it able to wake by interrupts + OooEngine(): m_mutex_r(0) {} + ~OooEngine() { shutdown(); } @@ -53,9 +60,9 @@ namespace rpc { } int issue_operation(OutOfOrderContext& args) //firing issue { + SCOPED_LOCK(m_mutex_w); m_issuing ++; DEFER(m_issuing --); - scoped_lock lock(m_mutex_w); if (!m_running) LOG_ERROR_RETURN(ESHUTDOWN, -1, "engine is been shuting down"); if (!args.flag_tag_valid) @@ -64,101 +71,158 @@ namespace rpc { args.tag = ++m_tag; // auto increase if it is not user defined tag } args.th = CURRENT; - args.collected = false; + { + SCOPED_LOCK(args.phaselock); + args.phase = OooPhase::BEFORE_ISSUE; + } args.ret = 0; - auto ret = m_map.insert({args.tag, &args}); //the return value is {iter, bool} - if (!ret.second) // means insert failed because of key already exists { - auto tag = args.tag; - auto th = (ret.first == m_map.end()) ? nullptr : ret.first->second->th; - LOG_ERROR("failed to insert record into unordered hash map", - VALUE(tag), VALUE(CURRENT), VALUE(th)); - if (args.flag_tag_valid) // user set tag, need to tell user it is a failure - LOG_ERROR_RETURN(EINVAL, -1, "the tag in argument is NOT valid"); - goto again; + SCOPED_LOCK(m_mutex_map); + auto ret = m_map.insert({args.tag, &args}); //the return value is {iter, bool} + if (!ret.second) // means insert failed because of key already exists + { + auto tag = args.tag; + auto th = (ret.first == m_map.end()) ? nullptr : ret.first->second->th; + LOG_ERROR("failed to insert record into unordered hash map", + VALUE(tag), VALUE(CURRENT), VALUE(th)); + if (args.flag_tag_valid) // user set tag, need to tell user it is a failure + LOG_ERROR_RETURN(EINVAL, -1, "the tag in argument is NOT valid"); + goto again; + } } int ret2 = args.do_issue(&args); if (ret2 < 0) { + SCOPED_LOCK(m_mutex_map); m_map.erase(args.tag); + m_cond_collected.notify_one(); LOG_ERROR_RETURN(0, -1, "failed to do_issue()"); } + { + SCOPED_LOCK(args.phaselock); + args.phase = OooPhase::ISSUED; + } return 0; } + + static void wait_check(void* args) { + OutOfOrderContext& ctx = *(OutOfOrderContext*)args; + ctx.phase = OooPhase::WAITING; + ctx.phaselock.unlock(); + }; int wait_completion(OutOfOrderContext& args) //recieving work { - // lock with param 1 means allow entry without lock - // when interuptted - scoped_lock lock(m_mutex_r, 1); - - // when wait_completion returned, - // always have tag removed from the map - // notify the waiting function (like shutdown()) - DEFER(m_cond_collected.notify_one()); - - auto o_tag = args.tag; { - auto o_it = m_map.find(o_tag); - if (o_it == m_map.end()) { - LOG_ERROR_RETURN(EINVAL, -1, "issue of ` not found", VALUE(args.tag)); - } - if (o_it->second->th != CURRENT) - { - LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT)); - } - if (o_it->second->collected) { - // my completion has been done - // just collect it, clear the trace, - // then return result - auto ret = o_it->second->ret; - m_map.erase(o_it); - return ret; + // check if context issued + SCOPED_LOCK(m_mutex_map); + if (m_map.find(args.tag) == m_map.end()) { + LOG_ERROR_RETURN(EINVAL, -1, + "context not found in map"); } } - //Hold the lock, but not get the result. - while (true) + DEFER(m_wait.notify_one()); { - int ret = args.do_completion(&args); //this do_completion may recieve results for other threads. + SCOPED_LOCK(args.phaselock); + if (args.phase == OooPhase::BEFORE_ISSUE) + LOG_ERROR_RETURN(EINVAL, -1, "context not issued"); + if (args.phase == OooPhase::WAITING) + LOG_ERROR_RETURN(EINVAL, -1, "context already in waiting"); + for (bool hold_lock = false; !hold_lock;) { + switch (args.phase) { + case OooPhase::COLLECTED: + // result alread collected before wait + if (args.th != CURRENT) + LOG_ERROR_RETURN(EINVAL, -1, "context is not issued by current thread"); + return args.ret; + case OooPhase::ISSUED: + args.th = photon::CURRENT; + args.phase = OooPhase::WAITING; + case OooPhase::WAITING: + { + if (m_mutex_r.try_lock() == 0) { + hold_lock = true; + break; + } + auto ret = m_wait.wait(args.phaselock, args.timeout); + // Check if collected + if (args.phase == OooPhase::COLLECTED && + args.th == CURRENT) { + return args.ret; + } + if (ret == -1) { + // or just timed out + { + SCOPED_LOCK(m_mutex_map); + m_map.erase(args.tag); + m_cond_collected.notify_one(); + } + LOG_ERROR_RETURN(ETIMEDOUT, -1, "waiting for completion timeout"); + } + break; + } + default: + LOG_ERROR_RETURN(EINVAL, -1, "unexpected phase"); + } + } + } + + // Holding mutex_r + // My origin tag is o_tag + auto o_tag = args.tag; + DEFER(m_mutex_r.unlock()); + for (;;) { + int ret = args.do_completion(&args); + //this do_completion may recieve results for other threads. // but still works because even if tag of this issue have a unique do_completion // which make other threads never could recieve it's result // the thread will waiting till it hold the lock and get it by itself // Since thread may not know the result of an issue will recieve by which thread // User must make sure that the do_completion can atleast recieve the result of it's own issue. - if (ret < 0) { - // set with nullptr means the thread is once issued but failed when wait_completion - m_map.erase(o_tag); - LOG_ERROR_RETURN(0, -1, "failed to do_completion()"); - } - - if (o_tag == args.tag) { - m_map.erase(o_tag); - break; // it's my result, let's break, and collect it - } + OutOfOrderContext* targ = nullptr; + unordered_map::iterator it; + { + SCOPED_LOCK(m_mutex_map); + DEFER(m_cond_collected.notify_one()); + if (ret < 0) { + // set with nullptr means the thread is once issued but failed when wait_completion + m_map.erase(o_tag); + LOG_ERROR_RETURN(0, -1, "failed to do_completion()"); + } - auto it = m_map.find(args.tag); + it = m_map.find(args.tag); - if (it == m_map.end()) { - // response tag never issued - m_map.erase(o_tag); - LOG_ERROR_RETURN(ENOENT, -2, "response's tag ` not found, response should be dropped", args.tag); + if (it == m_map.end()) { + // response tag never issued + m_map.erase(o_tag); + LOG_ERROR_RETURN(ENOENT, -2, "response's tag ` not found, response should be dropped", args.tag); + } + targ = it->second; + m_map.erase(it); } - auto targ = it->second; - auto th = targ->th; + // collect with mutex_r + targ->ret = targ->do_collect(targ); - if (!th) - // issued but requesting thread just failed in completion when waiting - LOG_ERROR_RETURN(ENOENT, -2, "response recvd, but requesting thread is NULL!"); - - it->second->ret = targ->do_collect(targ); - it->second->collected = true; - thread_interrupt(th); // other threads' response, resume him + { + photon::thread *th; + { + SCOPED_LOCK(targ->phaselock); + th = targ->th; + targ->phase = OooPhase::COLLECTED; + } + if (o_tag == args.tag) { + if (th != CURRENT) { + LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT)); + } + return args.ret; // it's my result, let's break, and + // collect it + } + if (!th) + // issued but requesting thread just failed in completion when waiting + LOG_ERROR_RETURN(ENOENT, -2, "response recvd, but requesting thread is NULL!"); + thread_interrupt(th, EINTR); // other threads' response, resume him + } } - // only break can bring out the while-loop - // means my result has been completed, - // ready to collect - DEFER(thread_yield_to(nullptr)); - return args.do_collect(&args); } int issue_wait(OutOfOrderContext& args) { diff --git a/rpc/out-of-order-execution.h b/rpc/out-of-order-execution.h index 038541a4..5eca9779 100644 --- a/rpc/out-of-order-execution.h +++ b/rpc/out-of-order-execution.h @@ -25,8 +25,9 @@ collection of result. The first 2 parts are realized via callbacks. */ #pragma once -#include #include +#include +#include namespace photon{ @@ -41,6 +42,12 @@ namespace rpc { void delete_ooo_execution_engine(OutOfOrder_Execution_Engine* engine); + enum class OooPhase : int { + BEFORE_ISSUE = 0, + ISSUED = 1, + WAITING = 2, + COLLECTED = 3 + }; struct OutOfOrderContext { OutOfOrder_Execution_Engine* engine; @@ -48,7 +55,7 @@ namespace rpc { // an unique tag of the opeartion, which can be filled // by user (together with `flag_tag_valid` = true), // by the `engine`, or by `do_completion`. - uint64_t tag; + uint64_t tag = 0; // The `CallbackType` have an prototype of // either `int (*)(void*, OutOfOrderContext*)`, @@ -75,17 +82,36 @@ namespace rpc { // It's guaranteed not to be called concurrently. CallbackType do_collect; - // whether or not the `tag` field is valid - bool flag_tag_valid = false; - // thread that binding with this argument - thread * th; + thread * th = nullptr; + + // Timeout for wait + Timeout timeout; - // whether the context result is collected - bool collected; + // Context phase + photon::spinlock phaselock; + volatile OooPhase phase = OooPhase::BEFORE_ISSUE; // return value of collection - int ret; + int ret = -1; + + // whether or not the `tag` field is valid + bool flag_tag_valid = false; + + OutOfOrderContext() = default; + OutOfOrderContext& operator=(const OutOfOrderContext& rhs) { + engine = rhs.engine; + tag = rhs.tag; + do_issue = rhs.do_issue; + do_completion = rhs.do_completion; + do_collect = rhs.do_collect; + th = rhs.th; + timeout = rhs.timeout; + flag_tag_valid = rhs.flag_tag_valid; + phase = rhs.phase; + ret = rhs.ret; + return *this; + } }; diff --git a/rpc/rpc.cpp b/rpc/rpc.cpp index 01c6b88d..5bbdc594 100644 --- a/rpc/rpc.cpp +++ b/rpc/rpc.cpp @@ -80,6 +80,8 @@ namespace rpc { auto iov = args->request; auto ret = iov->push_front({&header, sizeof(header)}); if (ret != sizeof(header)) return -1; + m_stream->timeout(args->timeout.timeout()); + DEFER(m_stream->timeout(-1)); ret = args->RET = m_stream->writev(iov->iovec(), iov->iovcnt()); if (ret != header.size + sizeof(header)) { ERRNO err; @@ -96,6 +98,8 @@ namespace rpc { // m_stream->shutdown(ShutdownHow::ReadWrite); LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timeout before read header "); } + m_stream->timeout(args->timeout.timeout()); + DEFER(m_stream->timeout(-1)); auto ret = args->RET = m_stream->read(&m_header, sizeof(m_header)); args->tag = m_header.tag; if (ret != sizeof(m_header)) { @@ -119,6 +123,8 @@ namespace rpc { if (iov->iovcnt() == 0) { iov->malloc(m_header.size); } + m_stream->timeout(args->timeout.timeout()); + DEFER(m_stream->timeout(-1)); auto ret = m_stream->readv((const iovec*)iov->iovec(), iov->iovcnt()); // return 0 means it has been disconnected // should take as fault @@ -137,9 +143,7 @@ namespace rpc { FunctionID function; }; iovector *request, *response; - Timeout timeout; - OooArgs(StubImpl* stub, FunctionID function, iovector* req, iovector* resp, uint64_t timeout_): - timeout(timeout_) + OooArgs(StubImpl* stub, FunctionID function, iovector* req, iovector* resp, Timeout timeout_) { request = req; response = resp; @@ -148,14 +152,14 @@ namespace rpc { do_issue.bind(stub, &StubImpl::do_send); do_completion.bind(stub, &StubImpl::do_recv_header); do_collect.bind(stub, &StubImpl::do_recv_body); + timeout = timeout_; } }; - int do_call(FunctionID function, iovector* request, iovector* response, uint64_t timeout) override { + int do_call(FunctionID function, iovector* request, iovector* response, Timeout tmo) override { scoped_rwlock rl(m_rwlock, photon::RLOCK); - Timeout tmo(timeout); if (tmo.expiration() < photon::now) { - LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timed out before rpc start", VALUE(timeout), VALUE(tmo.timeout())); + LOG_ERROR_RETURN(ETIMEDOUT, -1, "Timed out before rpc start", VALUE(tmo.timeout())); } int ret = 0; OooArgs args(this, function, request, response, tmo.timeout()); @@ -421,12 +425,11 @@ namespace rpc { class StubPoolImpl : public StubPool { public: - explicit StubPoolImpl(uint64_t expiration, uint64_t connect_timeout, uint64_t rpc_timeout) { + explicit StubPoolImpl(uint64_t expiration, uint64_t connect_timeout) { tls_ctx = net::new_tls_context(nullptr, nullptr, nullptr); tcpclient = net::new_tcp_socket_client(); tcpclient->timeout(connect_timeout); m_pool = new ObjectCache(expiration); - m_rpc_timeout = rpc_timeout; } ~StubPoolImpl() { @@ -456,7 +459,7 @@ namespace rpc { } uint64_t get_timeout() const override { - return m_rpc_timeout; + return tcpclient->timeout(); } protected: @@ -465,7 +468,7 @@ namespace rpc { if (!sock) LOG_ERRNO_RETURN(0, nullptr, "failed to connect to ", ep); LOG_DEBUG("connected to ", ep); - sock->timeout(m_rpc_timeout); + sock->timeout(-1UL); if (tls) { sock = net::new_tls_stream(tls_ctx, sock, net::SecurityRole::Client, true); } @@ -475,7 +478,6 @@ namespace rpc { ObjectCache* m_pool; net::ISocketClient *tcpclient; net::TLSContext* tls_ctx = nullptr; - uint64_t m_rpc_timeout; }; // dummy pool, for unix domain socket connection to only one point only @@ -483,8 +485,8 @@ namespace rpc { class UDSStubPoolImpl : public StubPoolImpl { public: explicit UDSStubPoolImpl(const char* path, uint64_t expiration, - uint64_t connect_timeout, uint64_t rpc_timeout) - : StubPoolImpl(expiration, connect_timeout, rpc_timeout), + uint64_t connect_timeout) + : StubPoolImpl(expiration, connect_timeout), m_path(path), m_client(net::new_uds_client()) { m_client->timeout(connect_timeout); } @@ -500,7 +502,8 @@ namespace rpc { LOG_ERRNO_RETURN(0, nullptr, "Connect to unix domain socket failed"); } - sock->timeout(m_rpc_timeout); + // stub socket always set timeout for single action + sock->timeout(-1UL); return new_rpc_stub(sock, true); }); } @@ -510,15 +513,13 @@ namespace rpc { net::ISocketClient * m_client; }; - StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout, uint64_t rpc_timeout) { - return new StubPoolImpl(expiration, connect_timeout, rpc_timeout); + StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout) { + return new StubPoolImpl(expiration, connect_timeout); } StubPool* new_uds_stub_pool(const char* path, uint64_t expiration, - uint64_t connect_timeout, - uint64_t rpc_timeout) { - return new UDSStubPoolImpl(path, expiration, connect_timeout, - rpc_timeout); + uint64_t connect_timeout) { + return new UDSStubPoolImpl(path, expiration, connect_timeout); } } // namespace rpc } diff --git a/rpc/rpc.h b/rpc/rpc.h index eb195342..f863368a 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -59,15 +59,18 @@ namespace rpc /** * @param req Request of Message * @param resp Response of Message + * @param timeout RPC timeout, counting from sending request to receiving response header * @return The number of bytes received, -1 for failure * @note Request and Response should assign to external memory buffers if they have variable-length fields. * Via this, we can achieve zero-copy send and receive. * For Response, there could be only 1 buffer at most. For Request, there is no limit. + * Attention: RPC stub do not support multi vCPU, when multiple vCPUs are used, the RPC stub should be + * vCPU local object. */ template int call(typename Operation::Request& req, typename Operation::Response& resp, - uint64_t timeout = -1UL) + Timeout timeout = {}) { SerializerIOV reqmsg; reqmsg.serialize(req); @@ -107,6 +110,7 @@ namespace rpc /** * @param req Request of Message * @param resp_iov iovector for the Response + * @param timeout timeout in milliseconds, -1UL for no timeout. * @return Pointer of the Response. nullptr for failure. No need to delete. * @note For this call, we don't need to assign buffers for the Response any more. * `resp_iov` will use its internal allocator to fulfill the memory requirement. @@ -114,7 +118,7 @@ namespace rpc */ template typename Operation::Response* call(typename Operation::Request& req, iovector& resp_iov, - uint64_t timeout = -1UL) { + Timeout timeout = {}) { assert(resp_iov.iovcnt() == 0); SerializerIOV reqmsg; reqmsg.serialize(req); @@ -136,7 +140,7 @@ namespace rpc protected: // This call can be invoked concurrently, and may return out-of-order. // Return the number of bytes received. - virtual int do_call(FunctionID function, iovector* request, iovector* response, uint64_t timeout) = 0; + virtual int do_call(FunctionID function, iovector* request, iovector* response, Timeout timeout) = 0; }; class Skeleton : public Object @@ -243,10 +247,18 @@ namespace rpc }; extern "C" Stub* new_rpc_stub(IStream* stream, bool ownership = false); - extern "C" StubPool* new_stub_pool(uint64_t expiration, uint64_t connect_timeout, uint64_t rpc_timeout); - extern "C" StubPool* new_uds_stub_pool(const char* path, uint64_t expiration, - uint64_t connect_timeout, - uint64_t rpc_timeout); + /** + About timeout: + 1. When a socket/stub not used by any caller for `expiration` microsecs, it will be dropped. + 2. When socket connecting, it will fail by be timed out after `connect_timeout` microsecs. + 4. `Stub::call` measures the time from invoking `call` before sending request to received + response head. Receiving response body is not considered. + **/ + extern "C" StubPool* new_stub_pool(uint64_t expiration, + uint64_t connect_timeout); + extern "C" StubPool* new_uds_stub_pool(const char* path, + uint64_t expiration, + uint64_t connect_timeout); extern "C" Skeleton* new_skeleton(uint32_t pool_size = 128); __attribute__((deprecated)) diff --git a/rpc/test/test-rpc-message.cpp b/rpc/test/test-rpc-message.cpp index 1a632d04..4285602f 100644 --- a/rpc/test/test-rpc-message.cpp +++ b/rpc/test/test-rpc-message.cpp @@ -174,7 +174,7 @@ TEST(rpc, message) { TestRPCServer server; ASSERT_EQ(0, server.run()); - auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + auto pool = photon::rpc::new_stub_pool(-1, -1); DEFER(delete pool); photon::net::EndPoint ep; diff --git a/rpc/test/test.cpp b/rpc/test/test.cpp index 86bf890d..6e237408 100644 --- a/rpc/test/test.cpp +++ b/rpc/test/test.cpp @@ -80,7 +80,6 @@ rpc::Header rpc_server_read(IStream* s) { rpc::Header header; s->read(&header, sizeof(header)); -// EXPECT_EQ(header.tag, 1); IOVector iov; iov.push_back(header.size); @@ -132,7 +131,6 @@ int server_function(void* instance, iovector* request, rpc::Skeleton::ResponseSe IOVector iov; iov.push_back(STR, LEN(STR)); sender(&iov); - // LOG_DEBUG("exit"); return 0; } @@ -174,9 +172,7 @@ void do_call(StubImpl& stub, uint64_t function) args.init(); args.serialize(req_iov.iov); - // LOG_DEBUG("before call"); stub.do_call(function, &req_iov.iov, &resp_iov.iov, -1); - // LOG_DEBUG("after call recvd: '`'", (char*)resp_iov.iov.back().iov_base); EXPECT_EQ(memcmp(STR, resp_iov.iov.back().iov_base, LEN(STR)), 0); } @@ -195,11 +191,9 @@ uint64_t ncallers; void* do_concurrent_call(void* arg) { ncallers++; - // LOG_DEBUG("enter"); auto stub = (StubImpl*)arg; for (int i = 0; i < 10; ++i) do_call(*stub, 234); - // LOG_DEBUG("exit"); ncallers--; return nullptr; } @@ -251,21 +245,17 @@ void do_call_timeout(StubImpl& stub, uint64_t function) args.init(); args.serialize(req_iov.iov); - // LOG_DEBUG("before call"); int ret = stub.do_call(function, &req_iov.iov, &resp_iov.iov, 1UL*1000*1000); if (ret >= 0) { - // LOG_DEBUG("after call recvd: '`'", (char*)resp_iov.iov.back().iov_base); } } void* do_concurrent_call_timeout(void* arg) { ncallers++; - // LOG_DEBUG("enter"); auto stub = (StubImpl*)arg; for (int i = 0; i < 10; ++i) do_call_timeout(*stub, 234); - // LOG_DEBUG("exit"); ncallers--; return nullptr; } @@ -357,7 +347,6 @@ class RpcServer { } int run() { if (m_socket->bind_v4localhost() != 0) - // if (m_socket->bind(9527, net::IPAddr::V6Any()) != 0) LOG_ERRNO_RETURN(0, -1, "bind failed"); if (m_socket->listen() != 0) LOG_ERRNO_RETURN(0, -1, "listen failed"); @@ -387,11 +376,10 @@ TEST_F(RpcTest, shutdown) { RpcServer rpc_server(sk, socket_server); GTEST_ASSERT_EQ(0, rpc_server.run()); - auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + auto pool = photon::rpc::new_stub_pool(-1, -1); DEFER(delete pool); auto& ep = rpc_server.m_endpoint; - // photon::net::EndPoint ep(net::IPAddr::V4Loopback(), 9527); auto stub = pool->get_stub(ep, false); ASSERT_NE(nullptr, stub); DEFER(pool->put_stub(ep, true)); @@ -428,11 +416,10 @@ TEST_F(RpcTest, passive_shutdown) { RpcServer rpc_server(sk, socket_server); GTEST_ASSERT_EQ(0, rpc_server.run()); - // photon::net::EndPoint ep(net::IPAddr::V4Loopback(), 9527); auto& ep = rpc_server.m_endpoint; photon::thread_create11([&]{ // Should always succeed in 3 seconds - auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + auto pool = photon::rpc::new_stub_pool(-1, -1); DEFER(delete pool); auto stub = pool->get_stub(ep, false); if (!stub) abort(); @@ -450,7 +437,7 @@ TEST_F(RpcTest, passive_shutdown) { photon::thread_create11([&]{ photon::thread_sleep(2); // Should get connection refused after 2 seconds. Because socket closed listen fd at 1 second. - auto pool = photon::rpc::new_stub_pool(-1, -1, -1); + auto pool = photon::rpc::new_stub_pool(-1, -1); DEFER(delete pool); auto stub = pool->get_stub(ep, false); if (stub) { @@ -485,6 +472,116 @@ TEST_F(RpcTest, passive_shutdown) { GTEST_ASSERT_LT(duration, 3500); } +class RpcServerTimeout { +public: + RpcServerTimeout(Skeleton* skeleton, net::ISocketServer* socket) : m_socket(socket), m_skeleton(skeleton) { + m_skeleton->register_service(this); + m_socket->set_handler({this, &RpcServerTimeout::serve}); + } + struct Operation { + const static uint32_t IID = 0x1; + const static uint32_t FID = 0x2; + struct Request : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + struct Response : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + }; + + struct OperationT { + const static uint32_t IID = 0x1; + const static uint32_t FID = 0x3; + struct Request : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + struct Response : public photon::rpc::Message { + int code = 0; + PROCESS_FIELDS(code); + }; + }; + int do_rpc_service(Operation::Request* req, Operation::Response* resp, IOVector* iov, IStream* stream) { + resp->code = req->code; + return 0; + } + int do_rpc_service(OperationT::Request* req, OperationT::Response* resp, IOVector* iov, IStream* stream) { + resp->code = req->code; + photon::thread_usleep(5UL*1000*1000); + return 0; + } + + int serve(photon::net::ISocketStream* stream) { + return m_skeleton->serve(stream); + } + int run() { + if (m_socket->bind_v4localhost() != 0) + LOG_ERRNO_RETURN(0, -1, "bind failed"); + if (m_socket->listen() != 0) + LOG_ERRNO_RETURN(0, -1, "listen failed"); + m_endpoint = m_socket->getsockname(); + LOG_DEBUG("bound to ", m_endpoint); + return m_socket->start_loop(false); + } + net::ISocketServer* m_socket; + Skeleton* m_skeleton; + photon::net::EndPoint m_endpoint; +}; + +static uint64_t do_call_hb(Stub* stub) { + RpcServerTimeout::Operation::Request req; + RpcServerTimeout::Operation::Response resp; + stub->call(req, resp); + return 0; +} + +TEST_F(RpcTest, timeout_with_hb) { + auto socket_server = photon::net::new_tcp_socket_server(); + GTEST_ASSERT_NE(nullptr, socket_server); + DEFER(delete socket_server); + auto sk = photon::rpc::new_skeleton(); + GTEST_ASSERT_NE(nullptr, sk); + DEFER(delete sk); + + RpcServerTimeout rpc_server(sk, socket_server); + GTEST_ASSERT_EQ(0, rpc_server.run()); + + auto& ep = rpc_server.m_endpoint; + auto pool = photon::rpc::new_stub_pool(-1, -1); + DEFER(delete pool); + auto th1 = photon::thread_enable_join(photon::thread_create11([&]{ + // Should always succeed in 5 seconds + Timeout timeout(5'000'000); + while(timeout.expired() > photon::now) { + auto stub = pool->get_stub(ep, false); + int ret = do_call_hb(stub); + DEFER(pool->put_stub(ep, ret < 0)); + photon::thread_yield(); + } + })); + photon::thread_yield_to((photon::thread*)th1); + + auto th2 = photon::thread_enable_join(photon::thread_create11([&]{ + // Should get connection refused after 2 seconds. Because socket closed listen fd at 1 second. + auto stub = pool->get_stub(ep, false); + if (!stub) { + abort(); + } + DEFER(pool->put_stub(ep, true)); + RpcServerTimeout::OperationT::Request req; + RpcServerTimeout::OperationT::Response resp; + auto before = photon::now; + auto ret = stub->call(req, resp, 1'000'000); + ERRNO err; + EXPECT_EQ(ret, -1); + EXPECT_LE(photon::now - before, 2'000'000); + })); + + photon::thread_join(th2); + photon::thread_join(th1); +} int main(int argc, char** arg) { ::testing::InitGoogleTest(&argc, arg);