Skip to content

Commit 57d432e

Browse files
committed
Fixed race condition in file cache and async observer under load
1 parent 8896bdc commit 57d432e

File tree

4 files changed

+41
-33
lines changed

4 files changed

+41
-33
lines changed

simulator/run_scenario.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
end
3131

3232
opts.on('--heartbeat [interval]', OptionParser::DecimalInteger, "Heartbeat Frequency") do |v|
33-
$heartbeat = v.to_f / 1000.0
33+
$heartbeat = v.to_i
3434
end
3535

3636
opts.on('-t', '--[no-]scenario', 'Run scenario or log') do |v|
@@ -79,7 +79,7 @@ def heartbeat(socket)
7979

8080
puts "Received #{r.strip}, responding with pong" if $verbose
8181
$mutex.synchronize {
82-
socket.puts "* PONG #{($heartbeat * 1000).to_i}"
82+
socket.puts "* PONG #{$heartbeat}"
8383
socket.flush
8484
}
8585
else

src/mtconnect/observation/change_observer.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ namespace mtconnect::observation {
4747

4848
void ChangeObserver::handler(boost::system::error_code ec)
4949
{
50-
boost::asio::dispatch(m_strand, boost::bind(m_handler, ec));
50+
boost::asio::post(m_strand, boost::bind(m_handler, ec));
5151
}
5252

5353
// Signaler Management

src/mtconnect/sink/rest_sink/file_cache.cpp

+34-30
Original file line numberDiff line numberDiff line change
@@ -311,43 +311,47 @@ namespace mtconnect::sink::rest_sink {
311311
{
312312
CachedFilePtr file;
313313

314-
auto cached = m_fileCache.find(name);
315-
if (cached != m_fileCache.end())
316314
{
317-
auto fp = cached->second;
318-
if (!fp->m_cached || fp->m_redirect)
315+
std::lock_guard<std::recursive_mutex> lock(m_cacheLock);
316+
317+
auto cached = m_fileCache.find(name);
318+
if (cached != m_fileCache.end())
319319
{
320-
file = fp;
321-
}
322-
else
323-
{
324-
// Cleanup files if they have changed since last cached
325-
// Also remove any gzipped content as well
326-
auto lastWrite = std::filesystem::last_write_time(fp->m_path);
327-
if (lastWrite == fp->m_lastWrite)
320+
auto fp = cached->second;
321+
if (!fp->m_cached || fp->m_redirect)
322+
{
328323
file = fp;
329-
else if (fp->m_pathGz && fs::exists(*fp->m_pathGz))
330-
fs::remove(*fp->m_pathGz);
324+
}
325+
else
326+
{
327+
// Cleanup files if they have changed since last cached
328+
// Also remove any gzipped content as well
329+
auto lastWrite = std::filesystem::last_write_time(fp->m_path);
330+
if (lastWrite == fp->m_lastWrite)
331+
file = fp;
332+
else if (fp->m_pathGz && fs::exists(*fp->m_pathGz))
333+
fs::remove(*fp->m_pathGz);
334+
}
331335
}
332-
}
333-
334-
if (!file)
335-
{
336-
auto path = m_fileMap.find(name);
337-
if (path != m_fileMap.end())
336+
337+
if (!file)
338338
{
339-
auto ext = path->second.extension().string();
340-
auto size = fs::file_size(path->second);
341-
file = make_shared<CachedFile>(path->second, getMimeType(ext),
342-
size <= m_maxCachedFileSize, size);
343-
m_fileCache.insert_or_assign(name, file);
344-
}
345-
else
346-
{
347-
file = findFileInDirectories(name);
339+
auto path = m_fileMap.find(name);
340+
if (path != m_fileMap.end())
341+
{
342+
auto ext = path->second.extension().string();
343+
auto size = fs::file_size(path->second);
344+
file = make_shared<CachedFile>(path->second, getMimeType(ext),
345+
size <= m_maxCachedFileSize, size);
346+
m_fileCache.insert_or_assign(name, file);
347+
}
348+
else
349+
{
350+
file = findFileInDirectories(name);
351+
}
348352
}
349353
}
350-
354+
351355
if (file)
352356
{
353357
if (acceptEncoding && acceptEncoding->find("gzip") != string::npos &&

src/mtconnect/sink/rest_sink/file_cache.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -148,5 +148,9 @@ namespace mtconnect::sink::rest_sink {
148148
std::map<std::string, std::string> m_mimeTypes;
149149
size_t m_maxCachedFileSize;
150150
size_t m_minCompressedFileSize;
151+
152+
// Access control to the buffer
153+
mutable std::recursive_mutex m_cacheLock;
154+
151155
};
152156
} // namespace mtconnect::sink::rest_sink

0 commit comments

Comments
 (0)