Skip to content

Commit

Permalink
fix RPC stop by timeout during calling (#552)
Browse files Browse the repository at this point in the history
* fix RPC stop by timeout during calling

* fix typo and make stub call timeout parameter using Timeout instead of uint64

* Refactoring OutOfOrderEngine, make it able to share between vCPUs

* Remove unused code in test

* Add timeout to IStream interface so RPC able to control per-call timeout

* Add default timeout methods (Failure with ENOSYS) to IStream

* Since already have a default implementation of timeout in IStream, remove unused override
  • Loading branch information
Coldwings authored Sep 13, 2024
1 parent 10c0f0a commit 44abd5c
Show file tree
Hide file tree
Showing 12 changed files with 360 additions and 131 deletions.
13 changes: 12 additions & 1 deletion common/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ limitations under the License.
*/

#pragma once
#include <errno.h>
#include <photon/common/object.h>
#include <stdlib.h>
#include <sys/types.h>

#include <memory>
#include <photon/common/object.h>

struct iovec;

Expand Down Expand Up @@ -46,6 +48,15 @@ class IStream : public Object
return writev(iov, iovcnt);
}

// get/set default timeout, in us, (default +∞)
virtual uint64_t timeout() const {
errno = ENOSYS;
return -1;
}
virtual void timeout(uint64_t tm) {
errno = ENOSYS;
}

struct ReadAll {
struct FreeDeleter {
void operator()(void* ptr) {
Expand Down
3 changes: 1 addition & 2 deletions examples/rpc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ struct ExampleClient {
// TCP connection will failed in 1 second(1UL*1000*1000) if not accepted
// and connection send/recv will take 5 socneds(5UL*1000*1000) as timedout
ExampleClient()
: pool(photon::rpc::new_stub_pool(10UL * 1000 * 1000, 1UL * 1000 * 1000,
5UL * 1000 * 1000)) {}
: pool(photon::rpc::new_stub_pool(10UL * 1000 * 1000, 1UL * 1000 * 1000)) {}

int64_t RPCHeartbeat(photon::net::EndPoint ep);

Expand Down
3 changes: 3 additions & 0 deletions net/datagram_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class IDatagramSocket : public IMessage,
}
using IMessage::recv;
using IMessage::send;

virtual uint64_t timeout() const = 0;
virtual void timeout(uint64_t) = 0;
};

class UDPSocket : public IDatagramSocket {
Expand Down
14 changes: 14 additions & 0 deletions net/http/body.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ class BodyWriteStream: public WOStream {
return wc;
}

virtual uint64_t timeout() const override {
return m_stream ? m_stream->timeout() : -1UL;
}
virtual void timeout(uint64_t timeout) override {
if (m_stream) m_stream->timeout(timeout);
}

protected:
net::ISocketStream *m_stream;
size_t m_size = 0;
Expand Down Expand Up @@ -297,6 +304,13 @@ class ChunkedBodyWriteStream: public WOStream {
return count;
}

virtual uint64_t timeout() const override {
return m_stream ? m_stream->timeout() : -1UL;
}
virtual void timeout(uint64_t timeout) override {
if (m_stream) m_stream->timeout(timeout);
}

protected:
net::ISocketStream *m_stream;
bool m_finish = false;
Expand Down
10 changes: 6 additions & 4 deletions net/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,6 @@ namespace net {
if (ret >= 0) *value = v;
return ret;
}

// get/set default timeout, in us, (default +∞)
virtual uint64_t timeout() const = 0;
virtual void timeout(uint64_t tm) = 0;
};

class ISocketName {
Expand Down Expand Up @@ -254,6 +250,9 @@ namespace net {
virtual ISocketStream* connect(const EndPoint& remote, const EndPoint* local = nullptr) = 0;
// Connect to a Unix Domain Socket.
virtual ISocketStream* connect(const char* path, size_t count = 0) = 0;

virtual uint64_t timeout() const = 0;
virtual void timeout(uint64_t) = 0;
};

class ISocketServer : public ISocketBase, public ISocketName, public Object {
Expand All @@ -276,6 +275,9 @@ namespace net {
virtual int start_loop(bool block = false) = 0;
// Close the listening fd. It's the user's responsibility to close the active connections.
virtual void terminate() = 0;

virtual uint64_t timeout() const = 0;
virtual void timeout(uint64_t) = 0;
};

extern "C" ISocketClient* new_tcp_socket_client();
Expand Down
2 changes: 1 addition & 1 deletion net/test/zerocopy-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ int main(int argc, char** argv) {
DEFER(delete[] g_read_buffers);
prepare_read_buffers();

auto pool = rpc::new_stub_pool(60 * 1000 * 1000, 10 * 1000 * 1000, -1);
auto pool = rpc::new_stub_pool(60 * 1000 * 1000, 10 * 1000 * 1000);
DEFER(delete pool);

photon::thread_create11(show_performance_statis);
Expand Down
204 changes: 134 additions & 70 deletions rpc/out-of-order-execution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ namespace rpc {
{
public:
unordered_map<uint64_t, OutOfOrderContext*> m_map;
condition_variable m_cond_collected;
mutex m_mutex_w, m_mutex_r;
condition_variable m_cond_collected, m_wait;
mutex m_mutex_w, m_mutex_r, m_mutex_map;
uint64_t m_issuing = 0;
uint64_t m_tag = 0;
bool m_running = true;

// rlock used as both reader lock and wait notifier.
// add yield in lock will break the assuption that threads
// not holding lock should kept in sleep.
// so do not yield, just put into sleep when needed
// make sure it able to wake by interrupts
OooEngine(): m_mutex_r(0) {}

~OooEngine() {
shutdown();
}
Expand All @@ -53,9 +60,9 @@ namespace rpc {
}
int issue_operation(OutOfOrderContext& args) //firing issue
{
SCOPED_LOCK(m_mutex_w);
m_issuing ++;
DEFER(m_issuing --);
scoped_lock lock(m_mutex_w);
if (!m_running)
LOG_ERROR_RETURN(ESHUTDOWN, -1, "engine is been shuting down");
if (!args.flag_tag_valid)
Expand All @@ -64,101 +71,158 @@ namespace rpc {
args.tag = ++m_tag; // auto increase if it is not user defined tag
}
args.th = CURRENT;
args.collected = false;
{
SCOPED_LOCK(args.phaselock);
args.phase = OooPhase::BEFORE_ISSUE;
}
args.ret = 0;
auto ret = m_map.insert({args.tag, &args}); //the return value is {iter, bool}
if (!ret.second) // means insert failed because of key already exists
{
auto tag = args.tag;
auto th = (ret.first == m_map.end()) ? nullptr : ret.first->second->th;
LOG_ERROR("failed to insert record into unordered hash map",
VALUE(tag), VALUE(CURRENT), VALUE(th));
if (args.flag_tag_valid) // user set tag, need to tell user it is a failure
LOG_ERROR_RETURN(EINVAL, -1, "the tag in argument is NOT valid");
goto again;
SCOPED_LOCK(m_mutex_map);
auto ret = m_map.insert({args.tag, &args}); //the return value is {iter, bool}
if (!ret.second) // means insert failed because of key already exists
{
auto tag = args.tag;
auto th = (ret.first == m_map.end()) ? nullptr : ret.first->second->th;
LOG_ERROR("failed to insert record into unordered hash map",
VALUE(tag), VALUE(CURRENT), VALUE(th));
if (args.flag_tag_valid) // user set tag, need to tell user it is a failure
LOG_ERROR_RETURN(EINVAL, -1, "the tag in argument is NOT valid");
goto again;
}
}

int ret2 = args.do_issue(&args);
if (ret2 < 0) {
SCOPED_LOCK(m_mutex_map);
m_map.erase(args.tag);
m_cond_collected.notify_one();
LOG_ERROR_RETURN(0, -1, "failed to do_issue()");
}
{
SCOPED_LOCK(args.phaselock);
args.phase = OooPhase::ISSUED;
}
return 0;
}

static void wait_check(void* args) {
OutOfOrderContext& ctx = *(OutOfOrderContext*)args;
ctx.phase = OooPhase::WAITING;
ctx.phaselock.unlock();
};
int wait_completion(OutOfOrderContext& args) //recieving work
{
// lock with param 1 means allow entry without lock
// when interuptted
scoped_lock lock(m_mutex_r, 1);

// when wait_completion returned,
// always have tag removed from the map
// notify the waiting function (like shutdown())
DEFER(m_cond_collected.notify_one());

auto o_tag = args.tag;
{
auto o_it = m_map.find(o_tag);
if (o_it == m_map.end()) {
LOG_ERROR_RETURN(EINVAL, -1, "issue of ` not found", VALUE(args.tag));
}
if (o_it->second->th != CURRENT)
{
LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT));
}
if (o_it->second->collected) {
// my completion has been done
// just collect it, clear the trace,
// then return result
auto ret = o_it->second->ret;
m_map.erase(o_it);
return ret;
// check if context issued
SCOPED_LOCK(m_mutex_map);
if (m_map.find(args.tag) == m_map.end()) {
LOG_ERROR_RETURN(EINVAL, -1,
"context not found in map");
}
}
//Hold the lock, but not get the result.
while (true)
DEFER(m_wait.notify_one());
{
int ret = args.do_completion(&args); //this do_completion may recieve results for other threads.
SCOPED_LOCK(args.phaselock);
if (args.phase == OooPhase::BEFORE_ISSUE)
LOG_ERROR_RETURN(EINVAL, -1, "context not issued");
if (args.phase == OooPhase::WAITING)
LOG_ERROR_RETURN(EINVAL, -1, "context already in waiting");
for (bool hold_lock = false; !hold_lock;) {
switch (args.phase) {
case OooPhase::COLLECTED:
// result alread collected before wait
if (args.th != CURRENT)
LOG_ERROR_RETURN(EINVAL, -1, "context is not issued by current thread");
return args.ret;
case OooPhase::ISSUED:
args.th = photon::CURRENT;
args.phase = OooPhase::WAITING;
case OooPhase::WAITING:
{
if (m_mutex_r.try_lock() == 0) {
hold_lock = true;
break;
}
auto ret = m_wait.wait(args.phaselock, args.timeout);
// Check if collected
if (args.phase == OooPhase::COLLECTED &&
args.th == CURRENT) {
return args.ret;
}
if (ret == -1) {
// or just timed out
{
SCOPED_LOCK(m_mutex_map);
m_map.erase(args.tag);
m_cond_collected.notify_one();
}
LOG_ERROR_RETURN(ETIMEDOUT, -1, "waiting for completion timeout");
}
break;
}
default:
LOG_ERROR_RETURN(EINVAL, -1, "unexpected phase");
}
}
}

// Holding mutex_r
// My origin tag is o_tag
auto o_tag = args.tag;
DEFER(m_mutex_r.unlock());
for (;;) {
int ret = args.do_completion(&args);
//this do_completion may recieve results for other threads.
// but still works because even if tag of this issue have a unique do_completion
// which make other threads never could recieve it's result
// the thread will waiting till it hold the lock and get it by itself
// Since thread may not know the result of an issue will recieve by which thread
// User must make sure that the do_completion can atleast recieve the result of it's own issue.
if (ret < 0) {
// set with nullptr means the thread is once issued but failed when wait_completion
m_map.erase(o_tag);
LOG_ERROR_RETURN(0, -1, "failed to do_completion()");
}

if (o_tag == args.tag) {
m_map.erase(o_tag);
break; // it's my result, let's break, and collect it
}
OutOfOrderContext* targ = nullptr;
unordered_map<uint64_t, OutOfOrderContext*>::iterator it;
{
SCOPED_LOCK(m_mutex_map);
DEFER(m_cond_collected.notify_one());
if (ret < 0) {
// set with nullptr means the thread is once issued but failed when wait_completion
m_map.erase(o_tag);
LOG_ERROR_RETURN(0, -1, "failed to do_completion()");
}

auto it = m_map.find(args.tag);
it = m_map.find(args.tag);

if (it == m_map.end()) {
// response tag never issued
m_map.erase(o_tag);
LOG_ERROR_RETURN(ENOENT, -2, "response's tag ` not found, response should be dropped", args.tag);
if (it == m_map.end()) {
// response tag never issued
m_map.erase(o_tag);
LOG_ERROR_RETURN(ENOENT, -2, "response's tag ` not found, response should be dropped", args.tag);
}
targ = it->second;
m_map.erase(it);
}

auto targ = it->second;
auto th = targ->th;
// collect with mutex_r
targ->ret = targ->do_collect(targ);

if (!th)
// issued but requesting thread just failed in completion when waiting
LOG_ERROR_RETURN(ENOENT, -2, "response recvd, but requesting thread is NULL!");

it->second->ret = targ->do_collect(targ);
it->second->collected = true;
thread_interrupt(th); // other threads' response, resume him
{
photon::thread *th;
{
SCOPED_LOCK(targ->phaselock);
th = targ->th;
targ->phase = OooPhase::COLLECTED;
}
if (o_tag == args.tag) {
if (th != CURRENT) {
LOG_ERROR_RETURN(EINVAL, -1, "args tag ` not belong to current thread `", VALUE(args.tag), VALUE(CURRENT));
}
return args.ret; // it's my result, let's break, and
// collect it
}
if (!th)
// issued but requesting thread just failed in completion when waiting
LOG_ERROR_RETURN(ENOENT, -2, "response recvd, but requesting thread is NULL!");
thread_interrupt(th, EINTR); // other threads' response, resume him
}
}
// only break can bring out the while-loop
// means my result has been completed,
// ready to collect
DEFER(thread_yield_to(nullptr));
return args.do_collect(&args);
}
int issue_wait(OutOfOrderContext& args)
{
Expand Down
Loading

0 comments on commit 44abd5c

Please sign in to comment.