From d7fedd3ea2e588cf9e0832d7130ed878260d81e2 Mon Sep 17 00:00:00 2001 From: Dmitry Yemanov Date: Thu, 2 Mar 2023 17:41:58 +0300 Subject: [PATCH] Better shutdown approach for the replication server --- src/remote/server/ReplServer.cpp | 54 +++++++++++++++++----- src/remote/server/ReplServer.h | 2 +- src/remote/server/os/posix/inet_server.cpp | 4 +- src/remote/server/os/win32/srvr_w32.cpp | 2 +- 4 files changed, 45 insertions(+), 17 deletions(-) diff --git a/src/remote/server/ReplServer.cpp b/src/remote/server/ReplServer.cpp index 3411bb99c2d..a0665888dd1 100644 --- a/src/remote/server/ReplServer.cpp +++ b/src/remote/server/ReplServer.cpp @@ -75,8 +75,26 @@ namespace const USHORT CTL_VERSION1 = 1; const USHORT CTL_CURRENT_VERSION = CTL_VERSION1; - volatile bool* shutdownPtr = NULL; + volatile bool shutdownFlag = false; AtomicCounter activeThreads; + Semaphore shutdownSemaphore; + + int shutdownHandler(const int, const int, void*) + { + if (activeThreads.value()) + { + gds__log("Shutting down the replication server with %d replicated database(s)", + (int) activeThreads.value()); + + shutdownFlag = true; + shutdownSemaphore.release(activeThreads.value() + 1); + + while (activeThreads.value()) + Thread::sleep(10); + } + + return 0; + } struct ActiveTransaction { @@ -626,7 +644,7 @@ namespace } } - enum ProcessStatus { PROCESS_SUSPEND, PROCESS_CONTINUE, PROCESS_ERROR }; + enum ProcessStatus { PROCESS_SUSPEND, PROCESS_CONTINUE, PROCESS_ERROR, PROCESS_SHUTDOWN }; ProcessStatus process_archive(MemoryPool& pool, Target* target) { @@ -645,6 +663,9 @@ namespace for (iter = PathUtils::newDirIterator(pool, config->sourceDirectory); *iter; ++(*iter)) { + if (shutdownFlag) + return PROCESS_SHUTDOWN; + const auto filename = **iter; #ifdef PRESERVE_LOG @@ -755,6 +776,9 @@ namespace for (Segment** iter = queue.begin(); iter != queue.end(); ++iter) { + if (shutdownFlag) + return PROCESS_SHUTDOWN; + Segment* const segment = *iter; const FB_UINT64 sequence = segment->header.hdr_sequence; const Guid& guid = segment->header.hdr_guid; @@ -845,6 +869,9 @@ namespace ULONG totalLength = sizeof(SegmentHeader); while (totalLength < segment->header.hdr_length) { + if (shutdownFlag) + return PROCESS_SHUTDOWN; + Block header; if (read(file, &header, sizeof(Block)) != sizeof(Block)) raiseError("Journal file %s read failed (error %d)", segment->filename.c_str(), ERRNO); @@ -959,14 +986,12 @@ namespace THREAD_ENTRY_DECLARE process_thread(THREAD_ENTRY_PARAM arg) { - fb_assert(shutdownPtr); - AutoPtr target(static_cast(arg)); const auto config = target->getConfig(); target->verbose("Started replication thread"); - while (!*shutdownPtr) + while (!shutdownFlag) { AutoMemoryPool workingPool(MemoryPool::createPool()); ContextPoolHolder threadContext(workingPool); @@ -978,42 +1003,47 @@ namespace target->shutdown(); - if (!*shutdownPtr) + if (ret == PROCESS_SHUTDOWN) + break; + + if (!shutdownFlag) { const ULONG timeout = (ret == PROCESS_SUSPEND) ? config->applyIdleTimeout : config->applyErrorTimeout; - Thread::sleep(timeout * 1000); + shutdownSemaphore.tryEnter(timeout); } } target->verbose("Finished replication thread"); - --activeThreads; return 0; } } -bool REPL_server(CheckStatusWrapper* status, bool wait, bool* aShutdownPtr) + +bool REPL_server(CheckStatusWrapper* status, bool wait) { try { - shutdownPtr = aShutdownPtr; + fb_shutdown_callback(0, shutdownHandler, fb_shut_finish, 0); TargetList targets; readConfig(targets); for (auto target : targets) { + Thread::start(process_thread, target, THREAD_medium, NULL); ++activeThreads; - Thread::start((ThreadEntryPoint*) process_thread, target, THREAD_medium, NULL); } if (wait) { + shutdownSemaphore.enter(); + do { - Thread::sleep(100); + Thread::sleep(10); } while (activeThreads.value()); } } diff --git a/src/remote/server/ReplServer.h b/src/remote/server/ReplServer.h index 85007b9b978..81ae22b19bc 100644 --- a/src/remote/server/ReplServer.h +++ b/src/remote/server/ReplServer.h @@ -23,6 +23,6 @@ #ifndef UTIL_REPL_SERVER_H #define UTIL_REPL_SERVER_H -bool REPL_server(Firebird::CheckStatusWrapper*, bool, bool*); +bool REPL_server(Firebird::CheckStatusWrapper*, bool); #endif // UTIL_REPL_SERVER_H diff --git a/src/remote/server/os/posix/inet_server.cpp b/src/remote/server/os/posix/inet_server.cpp index e79e8cd1848..21045abacf5 100644 --- a/src/remote/server/os/posix/inet_server.cpp +++ b/src/remote/server/os/posix/inet_server.cpp @@ -119,8 +119,6 @@ static void signal_handler(int); static TEXT protocol[128]; static int INET_SERVER_start = 0; -static bool serverClosing = false; - #if defined(HAVE_SETRLIMIT) && defined(HAVE_GETRLIMIT) #define FB_RAISE_LIMITS 1 static void raiseLimit(int resource); @@ -483,7 +481,7 @@ int CLIB_ROUTINE main( int argc, char** argv) fb_shutdown_callback(NULL, closePort, fb_shut_exit, port); Firebird::FbLocalStatus localStatus; - if (!REPL_server(&localStatus, false, &serverClosing)) + if (!REPL_server(&localStatus, false)) { const char* errorMsg = "Replication server initialization error"; iscLogStatus(errorMsg, localStatus->getErrors()); diff --git a/src/remote/server/os/win32/srvr_w32.cpp b/src/remote/server/os/win32/srvr_w32.cpp index 680693333ff..be7a705cdd3 100644 --- a/src/remote/server/os/win32/srvr_w32.cpp +++ b/src/remote/server/os/win32/srvr_w32.cpp @@ -527,7 +527,7 @@ static THREAD_ENTRY_DECLARE start_connections_thread(THREAD_ENTRY_PARAM) } FbLocalStatus localStatus; - if (!REPL_server(&localStatus, false, &server_shutdown)) + if (!REPL_server(&localStatus, false)) { const char* errorMsg = "Replication server initialization error"; iscLogStatus(errorMsg, localStatus->getErrors());