Skip to content

Commit

Permalink
Join background threads when LOG4CXX_EVENTS_AT_EXIT=on (#381)
Browse files Browse the repository at this point in the history
  • Loading branch information
swebb2066 authored May 7, 2024
1 parent 78eee3f commit 16ff766
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 82 deletions.
40 changes: 17 additions & 23 deletions src/main/cpp/asyncappender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,24 +124,14 @@ struct AsyncAppender::AsyncAppenderPriv : public AppenderSkeleton::AppenderSkele
locationInfo(false),
blocking(true)
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{atExitActivated();})
, atExitRegistryRaii([this]{stopDispatcher();})
#endif
, eventCount(0)
, dispatchedCount(0)
, commitCount(0)
{
}

#if LOG4CXX_EVENTS_AT_EXIT
void atExitActivated()
{
std::unique_lock<std::mutex> lock(bufferMutex);
bufferNotFull.wait(lock, [this]() -> bool
{ return buffer.empty() || closed; }
);
}
#endif

/**
* Event buffer.
*/
Expand Down Expand Up @@ -180,6 +170,21 @@ struct AsyncAppender::AsyncAppenderPriv : public AppenderSkeleton::AppenderSkele
*/
std::thread dispatcher;

void stopDispatcher()
{
{
std::lock_guard<std::mutex> lock(bufferMutex);
closed = true;
}
bufferNotEmpty.notify_all();
bufferNotFull.notify_all();

if (dispatcher.joinable())
{
dispatcher.join();
}
}

/**
* Should location info be included in dispatched messages.
*/
Expand Down Expand Up @@ -350,18 +355,7 @@ void AsyncAppender::append(const spi::LoggingEventPtr& event, Pool& p)

void AsyncAppender::close()
{
{
std::lock_guard<std::mutex> lock(priv->bufferMutex);
priv->closed = true;
priv->bufferNotEmpty.notify_all();
priv->bufferNotFull.notify_all();
}

if ( priv->dispatcher.joinable() )
{
priv->dispatcher.join();
}

priv->stopDispatcher();
for (auto item : priv->appenders.getAllAppenders())
{
item->close();
Expand Down
33 changes: 25 additions & 8 deletions src/main/cpp/filewatchdog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
#include <functional>
#include <chrono>

#if LOG4CXX_EVENTS_AT_EXIT
#include <log4cxx/private/atexitregistry.h>
#endif

using namespace LOG4CXX_NS;
using namespace LOG4CXX_NS::helpers;

Expand All @@ -33,8 +37,11 @@ long FileWatchdog::DEFAULT_DELAY = 60000;
struct FileWatchdog::FileWatchdogPrivate{
FileWatchdogPrivate(const File& file1) :
file(file1), delay(DEFAULT_DELAY), lastModif(0),
warnedAlready(false), interrupted(0), thread(){}

warnedAlready(false), interrupted(0), thread()
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{stopWatcher();})
#endif
{}
/**
The name of the file to observe for changes.
*/
Expand All @@ -51,6 +58,21 @@ struct FileWatchdog::FileWatchdogPrivate{
std::thread thread;
std::condition_variable interrupt;
std::mutex interrupt_mutex;

#if LOG4CXX_EVENTS_AT_EXIT
helpers::AtExitRegistry::Raii atExitRegistryRaii;
#endif

void stopWatcher()
{
{
std::lock_guard<std::mutex> lock(interrupt_mutex);
interrupted = 0xFFFF;
}
interrupt.notify_all();
if (thread.joinable())
thread.join();
}
};

FileWatchdog::FileWatchdog(const File& file1)
Expand All @@ -73,12 +95,7 @@ bool FileWatchdog::is_active()
void FileWatchdog::stop()
{
LogLog::debug(LOG4CXX_STR("Stopping file watchdog"));
{
std::lock_guard<std::mutex> lock(m_priv->interrupt_mutex);
m_priv->interrupted = 0xFFFF;
}
m_priv->interrupt.notify_all();
m_priv->thread.join();
m_priv->stopWatcher();
}

const File& FileWatchdog::file()
Expand Down
31 changes: 15 additions & 16 deletions src/main/cpp/socketappenderskeleton.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,8 @@ void SocketAppenderSkeleton::activateOptions(Pool& p)

void SocketAppenderSkeleton::close()
{
{
std::lock_guard<std::mutex> lock(_priv->interrupt_mutex);

if (_priv->closed)
{
return;
}

_priv->closed = true;
cleanUp(_priv->pool);
}
_priv->interrupt.notify_all();
if ( _priv->thread.joinable() )
{
_priv->thread.join();
}
_priv->stopMonitor();
cleanUp(_priv->pool);
}

void SocketAppenderSkeleton::connect(Pool& p)
Expand Down Expand Up @@ -208,6 +194,19 @@ void SocketAppenderSkeleton::monitor()
}
}

void SocketAppenderSkeleton::SocketAppenderSkeletonPriv::stopMonitor()
{
{
std::lock_guard<std::mutex> lock(this->interrupt_mutex);
if (this->closed)
return;
this->closed = true;
}
this->interrupt.notify_all();
if (this->thread.joinable())
this->thread.join();
}

bool SocketAppenderSkeleton::is_closed()
{
return _priv->closed;
Expand Down
62 changes: 34 additions & 28 deletions src/main/cpp/telnetappender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
#include <log4cxx/private/appenderskeleton_priv.h>
#include <mutex>

#if LOG4CXX_EVENTS_AT_EXIT
#include <log4cxx/private/atexitregistry.h>
#endif

using namespace LOG4CXX_NS;
using namespace LOG4CXX_NS::helpers;
using namespace LOG4CXX_NS::net;
Expand All @@ -38,8 +42,11 @@ struct TelnetAppender::TelnetAppenderPriv : public AppenderSkeletonPrivate
connections(maxConnections),
encoding(LOG4CXX_STR("UTF-8")),
encoder(CharsetEncoder::getUTF8Encoder()),
sh(),
activeConnections(0) {}
activeConnections(0)
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{stopAcceptingConnections();})
#endif
{}

int port;
ConnectionList connections;
Expand All @@ -48,6 +55,30 @@ struct TelnetAppender::TelnetAppenderPriv : public AppenderSkeletonPrivate
std::unique_ptr<helpers::ServerSocket> serverSocket;
std::thread sh;
size_t activeConnections;

#if LOG4CXX_EVENTS_AT_EXIT
helpers::AtExitRegistry::Raii atExitRegistryRaii;
#endif

void stopAcceptingConnections()
{
{
std::lock_guard<std::recursive_mutex> lock(this->mutex);
if (!this->serverSocket || this->closed)
return;
this->closed = true;
// Interrupt accept()
try
{
this->serverSocket->close();
}
catch (Exception&)
{
}
}
if (this->sh.joinable())
this->sh.join();
}
};

#define _priv static_cast<TelnetAppenderPriv*>(m_priv.get())
Expand Down Expand Up @@ -112,17 +143,9 @@ void TelnetAppender::setEncoding(const LogString& value)

void TelnetAppender::close()
{
_priv->stopAcceptingConnections();
std::lock_guard<std::recursive_mutex> lock(_priv->mutex);

if (_priv->closed)
{
return;
}

_priv->closed = true;

SocketPtr nullSocket;

for (auto& item : _priv->connections)
{
if (item)
Expand All @@ -131,23 +154,6 @@ void TelnetAppender::close()
item = nullSocket;
}
}

if (_priv->serverSocket != NULL)
{
try
{
_priv->serverSocket->close();
}
catch (Exception&)
{
}
}

if ( _priv->sh.joinable() )
{
_priv->sh.join();
}

_priv->activeConnections = 0;
}

Expand Down
31 changes: 25 additions & 6 deletions src/main/include/log4cxx/private/socketappenderskeleton_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
#include <log4cxx/private/appenderskeleton_priv.h>
#include <log4cxx/helpers/inetaddress.h>

#if LOG4CXX_EVENTS_AT_EXIT
#include <log4cxx/private/atexitregistry.h>
#endif

namespace LOG4CXX_NS
{
namespace net
Expand All @@ -34,26 +38,35 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : public AppenderSkele
address(),
port(defaultPort),
reconnectionDelay(reconnectionDelay),
locationInfo(false),
thread() {}
locationInfo(false)
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{stopMonitor();})
#endif
{}

SocketAppenderSkeletonPriv(helpers::InetAddressPtr address, int defaultPort, int reconnectionDelay) :
AppenderSkeletonPrivate(),
remoteHost(),
address(address),
port(defaultPort),
reconnectionDelay(reconnectionDelay),
locationInfo(false),
thread() {}
locationInfo(false)
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{stopMonitor();})
#endif
{}

SocketAppenderSkeletonPriv(const LogString& host, int port, int delay) :
AppenderSkeletonPrivate(),
remoteHost(host),
address(helpers::InetAddress::getByName(host)),
port(port),
reconnectionDelay(delay),
locationInfo(false),
thread() {}
locationInfo(false)
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{stopMonitor();})
#endif
{}

/**
host name
Expand All @@ -71,6 +84,12 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv : public AppenderSkele
std::thread thread;
std::condition_variable interrupt;
std::mutex interrupt_mutex;

#if LOG4CXX_EVENTS_AT_EXIT
helpers::AtExitRegistry::Raii atExitRegistryRaii;
#endif

void stopMonitor();
};

} // namespace net
Expand Down
20 changes: 19 additions & 1 deletion src/test/cpp/net/telnetappendertestcase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class TelnetAppenderTestCase : public AppenderSkeletonTestCase
LOGUNIT_TEST(testActivateClose);
LOGUNIT_TEST(testActivateSleepClose);
LOGUNIT_TEST(testActivateWriteClose);
LOGUNIT_TEST(testActivateWriteNoClose);

LOGUNIT_TEST_SUITE_END();

Expand Down Expand Up @@ -75,7 +76,7 @@ class TelnetAppenderTestCase : public AppenderSkeletonTestCase
appender->setPort(TEST_PORT);
Pool p;
appender->activateOptions(p);
std::this_thread::sleep_for( std::chrono::milliseconds( 1000 ) );
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
appender->close();
}

Expand All @@ -95,6 +96,23 @@ class TelnetAppenderTestCase : public AppenderSkeletonTestCase
}

appender->close();
root->removeAppender(appender);
}

void testActivateWriteNoClose()
{
TelnetAppenderPtr appender(new TelnetAppender());
appender->setLayout(createLayout());
appender->setPort(TEST_PORT);
Pool p;
appender->activateOptions(p);
LoggerPtr root(Logger::getRootLogger());
root->addAppender(appender);

for (int i = 0; i < 50; i++)
{
LOG4CXX_INFO(root, "Hello, World " << i);
}
}

};
Expand Down
2 changes: 2 additions & 0 deletions src/test/cpp/terminationtestcase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ LOGUNIT_CLASS(TerminationTestCase)
static struct initializer
{
initializer() { setDefaultAppender(); }
#if !LOG4CXX_EVENTS_AT_EXIT
~initializer() { LogManager::shutdown(); }
#endif
} x;
auto r = LogManager::getLoggerRepository();
return name.empty() ? r->getLogger(name) : r->getRootLogger();
Expand Down

0 comments on commit 16ff766

Please sign in to comment.