From a5c5d84243f6fd8017b90fe2467a0b7277934d43 Mon Sep 17 00:00:00 2001 From: Jeff Lucovsky Date: Sat, 14 Feb 2026 09:26:03 -0500 Subject: [PATCH 1/4] output/flush: Add EVE flushing logic Add flushing logic driven off of the file contexts. This is a simpler solution that removes the need for logger registration changes. Overview: Use the heartbeat-driven thread to periodically flush all registered EVE contexts via a global flush list. The global flush list is a mutex-protected TAILQ of LogFileFlushEntry nodes; each node points to a LogFileCtx. Mutex = log_file_flush_mutex Periodic flushing performed by a thread according to the heartbeat.output-flush-interval [1,60]. LogFileFlushAll() is invoked to initiate flushing of registered LogFileCtx structs; each struct's fp_mutex is obtained while the flush occurs to synchronize with LogFileWrite activity. Interacts with file-rotation via the fp_mutex. Deadlock prevention: the log_file_flush_mutex must be obtained before the fp_mutex. Issue: 8286 (cherry picked from commit a78911fce7467c8356c1b580298cb37e21b63685) --- src/log-flush.c | 5 +- src/log-flush.h | 1 + src/util-logopenfile.c | 107 +++++++++++++++++++++++++++++++++++++++-- src/util-logopenfile.h | 12 ++++- 4 files changed, 119 insertions(+), 6 deletions(-) diff --git a/src/log-flush.c b/src/log-flush.c index 8da1157df7d5..cebfa44ebe6f 100644 --- a/src/log-flush.c +++ b/src/log-flush.c @@ -31,6 +31,7 @@ #include "conf.h" #include "conf-yaml-loader.h" #include "util-privs.h" +#include "util-logopenfile.h" /** * \brief Trigger detect threads to flush their output logs @@ -119,7 +120,7 @@ static void WorkerFlushLogs(void) return; } -static int OutputFlushInterval(void) +int OutputFlushInterval(void) { intmax_t output_flush_interval = 0; if (SCConfGetInt("heartbeat.output-flush-interval", &output_flush_interval) == 0) { @@ -168,7 +169,7 @@ static void *LogFlusherWakeupThread(void *arg) if (++wait_count == flush_wait_count) { worker_flush_count++; - WorkerFlushLogs(); + LogFileFlushAll(); wait_count = 0; } diff --git a/src/log-flush.h b/src/log-flush.h index e201942b2eb6..8676e75745ba 100644 --- a/src/log-flush.h +++ b/src/log-flush.h @@ -23,4 +23,5 @@ #ifndef SURICATA_LOG_FLUSH_H__ #define SURICATA_LOG_FLUSH_H__ void LogFlushThreads(void); +int OutputFlushInterval(void); #endif /* SURICATA_LOG_FLUSH_H__ */ diff --git a/src/util-logopenfile.c b/src/util-logopenfile.c index 52170113a0db..4e8511b63f38 100644 --- a/src/util-logopenfile.c +++ b/src/util-logopenfile.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2022 Open Information Security Foundation +/* Copyright (C) 2007-2026 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -33,6 +33,7 @@ #include "util-path.h" #include "util-misc.h" #include "util-time.h" +#include "log-flush.h" #if defined(HAVE_SYS_UN_H) && defined(HAVE_SYS_SOCKET_H) && defined(HAVE_SYS_TYPES_H) #define BUILD_WITH_UNIXSOCKET @@ -53,6 +54,11 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, // Threaded eve.json identifier static SC_ATOMIC_DECL_AND_INIT_WITH_VAL(uint16_t, eve_file_id, 1); +/* Flush list for heartbeat-triggered flushing */ +static SCMutex log_file_flush_mutex = SCMUTEX_INITIALIZER; +static TAILQ_HEAD(, LogFileFlushEntry_) log_file_flush_list = TAILQ_HEAD_INITIALIZER( + log_file_flush_list); + #ifdef BUILD_WITH_UNIXSOCKET /** \brief connect to the indicated local stream socket, logging any errors * \param path filesystem path to connect to @@ -640,6 +646,10 @@ int SCConfLogOpenGeneric( if (rotate) { OutputRegisterFileRotationFlag(&log_ctx->rotation_flag); } + /* Register non-threaded regular files for direct heartbeat flushing */ + if (!log_ctx->threaded && log_ctx->is_regular) { + LogFileRegisterForFlush(log_ctx); + } } else { SCLogError("Invalid entry for " "%s.filetype. Expected \"regular\" (default), \"unix_stream\", " @@ -882,9 +892,15 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, goto error; } thread->is_regular = true; - thread->Write = SCLogFileWriteNoLock; - thread->Close = SCLogFileCloseNoLock; + if (OutputFlushInterval() > 0) { + thread->Write = SCLogFileWrite; + thread->Close = SCLogFileClose; + } else { + thread->Write = SCLogFileWriteNoLock; + thread->Close = SCLogFileCloseNoLock; + } OutputRegisterFileRotationFlag(&thread->rotation_flag); + LogFileRegisterForFlush(thread); } else if (parent_ctx->type == LOGFILE_TYPE_FILETYPE) { entry->slot_number = SC_ATOMIC_ADD(eve_file_id, 1); SCLogDebug("%s - thread %d [slot %d]", log_path, entry->internal_thread_id, @@ -923,6 +939,11 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx) SCReturnInt(0); } + /* Unregister from flush list first, before closing files. + * This ensures the heartbeat thread won't try to flush a context + * that's being destroyed. */ + LogFileUnregisterForFlush(lf_ctx); + if (lf_ctx->type == LOGFILE_TYPE_FILETYPE && lf_ctx->filetype.filetype->ThreadDeinit) { lf_ctx->filetype.filetype->ThreadDeinit( lf_ctx->filetype.init_data, lf_ctx->filetype.thread_data); @@ -988,6 +1009,86 @@ void LogFileFlush(LogFileCtx *file_ctx) file_ctx->Flush(file_ctx); } +/** + * \brief Register a LogFileCtx for flush operations + * + * Adds a LogFileCtx to the global flush list so the heartbeat thread + * can flush it directly without using pseudo packets. + * + * \param ctx The LogFileCtx to register (must be LOGFILE_TYPE_FILE) + */ +void LogFileRegisterForFlush(LogFileCtx *ctx) +{ + if (!OutputFlushInterval()) { + SCLogDebug("heartbeat disabled; skipping flush registration"); + return; + } + + if (ctx == NULL || ctx->type != LOGFILE_TYPE_FILE) { + return; + } + + LogFileFlushEntry *entry = SCMalloc(sizeof(LogFileFlushEntry)); + if (entry == NULL) { + SCLogError("Unable to allocate memory for flush entry"); + return; + } + + entry->ctx = ctx; + + SCMutexLock(&log_file_flush_mutex); + TAILQ_INSERT_TAIL(&log_file_flush_list, entry, entries); + SCMutexUnlock(&log_file_flush_mutex); +} + +/** + * \brief Unregister a LogFileCtx from flush operations + * + * Removes a LogFileCtx from the global flush list. + * + * \param ctx The LogFileCtx to unregister + */ +void LogFileUnregisterForFlush(LogFileCtx *ctx) +{ + if (!OutputFlushInterval()) { + SCLogDebug("heartbeat disabled; skipping flush deregistration"); + return; + } + + if (ctx == NULL) { + return; + } + + SCMutexLock(&log_file_flush_mutex); + LogFileFlushEntry *entry, *safe; + TAILQ_FOREACH_SAFE (entry, &log_file_flush_list, entries, safe) { + if (entry->ctx == ctx) { + TAILQ_REMOVE(&log_file_flush_list, entry, entries); + SCFree(entry); + break; + } + } + SCMutexUnlock(&log_file_flush_mutex); +} + +/** + * \brief Flush all registered LogFileCtx instances + * + * Called by the heartbeat thread to flush all active file-based loggers. + * Iterates through the flush list and calls LogFileFlush on each context. + */ +void LogFileFlushAll(void) +{ + SCMutexLock(&log_file_flush_mutex); + LogFileFlushEntry *entry; + TAILQ_FOREACH (entry, &log_file_flush_list, entries) { + if (entry->ctx != NULL) { + LogFileFlush(entry->ctx); + } + } + SCMutexUnlock(&log_file_flush_mutex); +} + int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer) { if (file_ctx->type == LOGFILE_TYPE_FILE || file_ctx->type == LOGFILE_TYPE_UNIX_DGRAM || diff --git a/src/util-logopenfile.h b/src/util-logopenfile.h index e12df01af6b8..09846ce869e6 100644 --- a/src/util-logopenfile.h +++ b/src/util-logopenfile.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2021 Open Information Security Foundation +/* Copyright (C) 2007-2026 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -68,6 +68,11 @@ typedef struct LogFileTypeCtx_ { void *thread_data; } LogFileTypeCtx; +typedef struct LogFileFlushEntry_ { + struct LogFileCtx_ *ctx; + TAILQ_ENTRY(LogFileFlushEntry_) entries; +} LogFileFlushEntry; + /** Global structure for Output Context */ typedef struct LogFileCtx_ { union { @@ -184,4 +189,9 @@ int SCConfLogOpenGeneric(SCConfNode *conf, LogFileCtx *, const char *, int); int SCConfLogReopen(LogFileCtx *); bool SCLogOpenThreadedFile(const char *log_path, const char *append, LogFileCtx *parent_ctx); +/* Flush list management functions */ +void LogFileRegisterForFlush(LogFileCtx *ctx); +void LogFileUnregisterForFlush(LogFileCtx *ctx); +void LogFileFlushAll(void); + #endif /* SURICATA_UTIL_LOGOPENFILE_H */ From 912a07d654ab9c6edc37e45ed8b1e982910f99d9 Mon Sep 17 00:00:00 2001 From: Jeff Lucovsky Date: Sat, 14 Feb 2026 09:24:41 -0500 Subject: [PATCH 2/4] output/flush: Remove pkt-based flush logic Remove packet-based flush logic in favor of simpler solution Issue: 8286 (cherry picked from commit d0ba1c4c5e590815d5aa225464735884d6ee5b96) --- src/decode.h | 6 +-- src/detect-engine.c | 32 ---------------- src/detect-engine.h | 2 - src/flow-worker.c | 23 ------------ src/flow-worker.h | 2 - src/log-flush.c | 89 +++------------------------------------------ 6 files changed, 7 insertions(+), 147 deletions(-) diff --git a/src/decode.h b/src/decode.h index 62c699d7b2d5..917f09a6145a 100644 --- a/src/decode.h +++ b/src/decode.h @@ -1315,12 +1315,8 @@ void DecodeUnregisterCounters(void); #define PKT_FIRST_ALERTS BIT_U32(29) #define PKT_FIRST_TAG BIT_U32(30) -#define PKT_PSEUDO_LOG_FLUSH BIT_U32(31) /**< Detect/log flush for protocol upgrade */ - /** \brief return 1 if the packet is a pseudo packet */ -#define PKT_IS_PSEUDOPKT(p) \ - ((p)->flags & (PKT_PSEUDO_STREAM_END|PKT_PSEUDO_DETECTLOG_FLUSH)) -#define PKT_IS_FLUSHPKT(p) ((p)->flags & (PKT_PSEUDO_LOG_FLUSH)) +#define PKT_IS_PSEUDOPKT(p) ((p)->flags & (PKT_PSEUDO_STREAM_END | PKT_PSEUDO_DETECTLOG_FLUSH)) #define PKT_SET_SRC(p, src_val) ((p)->pkt_src = src_val) diff --git a/src/detect-engine.c b/src/detect-engine.c index 95d2c53d7091..850d1d8f7456 100644 --- a/src/detect-engine.c +++ b/src/detect-engine.c @@ -2241,38 +2241,6 @@ int DetectEngineInspectPktBufferGeneric( } } -/** \internal - * \brief inject a pseudo packet into each detect thread - * if the thread should flush its output logs. - */ -void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs) -{ - /* inject a fake packet if the detect thread that needs it. This function - * is called when a heartbeat log-flush request has been made - * and it should process a pseudo packet and flush its output logs - * to speed the process. */ -#if DEBUG - int count = 0; -#endif - for (int i = 0; i < no_of_detect_tvs; i++) { - if (detect_tvs[i]) { // && detect_tvs[i]->inq != NULL) { - Packet *p = PacketGetFromAlloc(); - if (p != NULL) { - SCLogDebug("Injecting pkt for tv %s[i=%d] %d", detect_tvs[i]->name, i, count++); - p->flags |= PKT_PSEUDO_STREAM_END; - p->flags |= PKT_PSEUDO_LOG_FLUSH; - PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH); - PacketQueue *q = detect_tvs[i]->stream_pq; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - } - } - } - SCLogDebug("leaving: thread notification count = %d", count); -} - /** \internal * \brief inject a pseudo packet into each detect thread * -that doesn't use the new det_ctx yet diff --git a/src/detect-engine.h b/src/detect-engine.h index c1e03425de10..822f40a9b271 100644 --- a/src/detect-engine.h +++ b/src/detect-engine.h @@ -209,6 +209,4 @@ bool DetectMd5ValidateCallback( void DeStateRegisterTests(void); -/* packet injection */ -void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs); #endif /* SURICATA_DETECT_ENGINE_H */ diff --git a/src/flow-worker.c b/src/flow-worker.c index 0bd4ae278595..49f144a9efa7 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -73,8 +73,6 @@ typedef struct FlowWorkerThreadData_ { SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread); - SC_ATOMIC_DECLARE(bool, flush_ack); - void *output_thread; /* Output thread data. */ void *output_thread_flow; /* Output thread data. */ @@ -564,15 +562,6 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) SCLogDebug("packet %"PRIu64, p->pcap_cnt); - if ((PKT_IS_FLUSHPKT(p))) { - SCLogDebug("thread %s flushing", tv->printable_name); - OutputLoggerFlush(tv, p, fw->output_thread); - /* Ack if a flush was requested */ - bool notset = false; - SC_ATOMIC_CAS(&fw->flush_ack, notset, true); - return TM_ECODE_OK; - } - /* handle Flow */ if (det_ctx != NULL && det_ctx->de_ctx->PreFlowHook != NULL) { const uint8_t action = det_ctx->de_ctx->PreFlowHook(tv, det_ctx, p); @@ -756,18 +745,6 @@ void *FlowWorkerGetThreadData(void *flow_worker) return (FlowWorkerThreadData *)flow_worker; } -bool FlowWorkerGetFlushAck(void *flow_worker) -{ - FlowWorkerThreadData *fw = flow_worker; - return SC_ATOMIC_GET(fw->flush_ack) == true; -} - -void FlowWorkerSetFlushAck(void *flow_worker) -{ - FlowWorkerThreadData *fw = flow_worker; - SC_ATOMIC_SET(fw->flush_ack, false); -} - const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi) { switch (fwi) { diff --git a/src/flow-worker.h b/src/flow-worker.h index 6bdea551935d..476f296ff4c5 100644 --- a/src/flow-worker.h +++ b/src/flow-worker.h @@ -33,8 +33,6 @@ const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi); void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx); void *FlowWorkerGetDetectCtxPtr(void *flow_worker); void *FlowWorkerGetThreadData(void *flow_worker); -bool FlowWorkerGetFlushAck(void *flow_worker); -void FlowWorkerSetFlushAck(void *flow_worker); void TmModuleFlowWorkerRegister (void); diff --git a/src/log-flush.c b/src/log-flush.c index cebfa44ebe6f..bfe22e1a0cc8 100644 --- a/src/log-flush.c +++ b/src/log-flush.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2024 Open Information Security Foundation +/* Copyright (C) 2026 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -23,10 +23,8 @@ #include "suricata-common.h" #include "suricata.h" -#include "detect.h" -#include "detect-engine.h" -#include "flow-worker.h" #include "log-flush.h" +#include "util-logopenfile.h" #include "tm-threads.h" #include "conf.h" #include "conf-yaml-loader.h" @@ -34,90 +32,15 @@ #include "util-logopenfile.h" /** - * \brief Trigger detect threads to flush their output logs + * \brief Trigger flush of all registered log files * * This function is intended to be called at regular intervals to force - * buffered log data to be persisted + * buffered log data to be persisted. With the new design, this simply calls + * LogFileFlushAll() which directly flushes all registered file contexts. */ static void WorkerFlushLogs(void) { - SCEnter(); - - /* count detect threads in use */ - uint32_t no_of_detect_tvs = TmThreadCountThreadsByTmmFlags(TM_FLAG_FLOWWORKER_TM); - /* can be zero in unix socket mode */ - if (no_of_detect_tvs == 0) { - return; - } - - /* prepare swap structures */ - void *fw_threads[no_of_detect_tvs]; - ThreadVars *detect_tvs[no_of_detect_tvs]; - memset(fw_threads, 0x00, (no_of_detect_tvs * sizeof(void *))); - memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *))); - - /* start by initiating the log flushes */ - - uint32_t i = 0; - SCMutexLock(&tv_root_lock); - /* get reference to tv's and setup fw_threads array */ - for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) { - if ((tv->tmm_flags & TM_FLAG_FLOWWORKER_TM) == 0) { - continue; - } - for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) { - TmModule *tm = TmModuleGetById(s->tm_id); - if (!(tm->flags & TM_FLAG_FLOWWORKER_TM)) { - continue; - } - - if (suricata_ctl_flags != 0) { - SCMutexUnlock(&tv_root_lock); - goto error; - } - - fw_threads[i] = FlowWorkerGetThreadData(SC_ATOMIC_GET(s->slot_data)); - if (fw_threads[i]) { - FlowWorkerSetFlushAck(fw_threads[i]); - SCLogDebug("Setting flush-ack for thread %s[i=%d]", tv->printable_name, i); - detect_tvs[i] = tv; - } - - i++; - break; - } - } - BUG_ON(i != no_of_detect_tvs); - - SCMutexUnlock(&tv_root_lock); - - SCLogDebug("Creating flush pseudo packets for %d threads", no_of_detect_tvs); - InjectPacketsForFlush(detect_tvs, no_of_detect_tvs); - - uint32_t threads_done = 0; -retry: - for (i = 0; i < no_of_detect_tvs; i++) { - if (suricata_ctl_flags != 0) { - threads_done = no_of_detect_tvs; - break; - } - SleepMsec(1); - if (fw_threads[i] && FlowWorkerGetFlushAck(fw_threads[i])) { - SCLogDebug("thread slot %d has ack'd flush request", i); - threads_done++; - } else if (detect_tvs[i]) { - SCLogDebug("thread slot %d not yet ack'd flush request", i); - TmThreadsCaptureBreakLoop(detect_tvs[i]); - } - } - if (threads_done < no_of_detect_tvs) { - threads_done = 0; - SleepMsec(250); - goto retry; - } - -error: - return; + LogFileFlushAll(); } int OutputFlushInterval(void) From 4278cb1eaecd23bc9bd575090b98019b626882dc Mon Sep 17 00:00:00 2001 From: Jeff Lucovsky Date: Sat, 14 Feb 2026 09:37:32 -0500 Subject: [PATCH 3/4] output/flush: Remove flush functions/registration Remove log flush functions and update registration logic as context-based flushing doesn't require it. Issue: 8286 (cherry picked from commit 1923ca1aa0def3419e988b3763f9d085b2703037) --- src/alert-debuglog.c | 1 - src/alert-fastlog.c | 1 - src/alert-syslog.c | 1 - src/log-flush.c | 12 ------------ src/log-pcap.c | 1 - src/output-eve-stream.c | 1 - src/output-json-alert.c | 9 --------- src/output-json-anomaly.c | 9 --------- src/output-json-arp.c | 1 - src/output-json-common.c | 9 --------- src/output-json-drop.c | 1 - src/output-json-frame.c | 1 - src/output-json-metadata.c | 1 - src/output-json.c | 6 ------ src/output-json.h | 2 -- src/output.c | 18 ------------------ src/output.h | 4 ---- 17 files changed, 78 deletions(-) diff --git a/src/alert-debuglog.c b/src/alert-debuglog.c index e3e07be2fe05..8888f477820f 100644 --- a/src/alert-debuglog.c +++ b/src/alert-debuglog.c @@ -482,7 +482,6 @@ void AlertDebugLogRegister(void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = AlertDebugLogLogger, - .FlushFunc = NULL, .ConditionFunc = AlertDebugLogCondition, .ThreadInitFunc = AlertDebugLogThreadInit, .ThreadDeinitFunc = AlertDebugLogThreadDeinit, diff --git a/src/alert-fastlog.c b/src/alert-fastlog.c index 75854900ba36..398ce8d5e698 100644 --- a/src/alert-fastlog.c +++ b/src/alert-fastlog.c @@ -78,7 +78,6 @@ void AlertFastLogRegister(void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = AlertFastLogger, - .FlushFunc = NULL, .ConditionFunc = AlertFastLogCondition, .ThreadInitFunc = AlertFastLogThreadInit, .ThreadDeinitFunc = AlertFastLogThreadDeinit, diff --git a/src/alert-syslog.c b/src/alert-syslog.c index 7beaaa1be765..e0e3d260c202 100644 --- a/src/alert-syslog.c +++ b/src/alert-syslog.c @@ -386,7 +386,6 @@ void AlertSyslogRegister (void) #ifndef OS_WIN32 OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = AlertSyslogLogger, - .FlushFunc = NULL, .ConditionFunc = AlertSyslogCondition, .ThreadInitFunc = AlertSyslogThreadInit, .ThreadDeinitFunc = AlertSyslogThreadDeinit, diff --git a/src/log-flush.c b/src/log-flush.c index bfe22e1a0cc8..7022e1359bf9 100644 --- a/src/log-flush.c +++ b/src/log-flush.c @@ -31,18 +31,6 @@ #include "util-privs.h" #include "util-logopenfile.h" -/** - * \brief Trigger flush of all registered log files - * - * This function is intended to be called at regular intervals to force - * buffered log data to be persisted. With the new design, this simply calls - * LogFileFlushAll() which directly flushes all registered file contexts. - */ -static void WorkerFlushLogs(void) -{ - LogFileFlushAll(); -} - int OutputFlushInterval(void) { intmax_t output_flush_interval = 0; diff --git a/src/log-pcap.c b/src/log-pcap.c index 1cb97fa0a72b..e76d62a6dcc5 100644 --- a/src/log-pcap.c +++ b/src/log-pcap.c @@ -220,7 +220,6 @@ void PcapLogRegister(void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = PcapLog, - .FlushFunc = NULL, .ConditionFunc = PcapLogCondition, .ThreadInitFunc = PcapLogDataInit, .ThreadDeinitFunc = PcapLogDataDeinit, diff --git a/src/output-eve-stream.c b/src/output-eve-stream.c index 88d8bf50b93d..da41ef5ea6a8 100644 --- a/src/output-eve-stream.c +++ b/src/output-eve-stream.c @@ -454,7 +454,6 @@ void EveStreamLogRegister(void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = EveStreamLogger, - .FlushFunc = OutputJsonLogFlush, .ConditionFunc = EveStreamLogCondition, .ThreadInitFunc = EveStreamLogThreadInit, .ThreadDeinitFunc = EveStreamLogThreadDeinit, diff --git a/src/output-json-alert.c b/src/output-json-alert.c index 7d2c6b822275..8cf5679ff7b2 100644 --- a/src/output-json-alert.c +++ b/src/output-json-alert.c @@ -854,14 +854,6 @@ static int AlertJsonDecoderEvent(ThreadVars *tv, JsonAlertLogThread *aft, const return TM_ECODE_OK; } -static int JsonAlertFlush(ThreadVars *tv, void *thread_data, const Packet *p) -{ - JsonAlertLogThread *aft = thread_data; - SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename); - OutputJsonFlush(aft->ctx); - return 0; -} - static int JsonAlertLogger(ThreadVars *tv, void *thread_data, const Packet *p) { JsonAlertLogThread *aft = thread_data; @@ -1109,7 +1101,6 @@ void JsonAlertLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonAlertLogger, - .FlushFunc = JsonAlertFlush, .ConditionFunc = JsonAlertLogCondition, .ThreadInitFunc = JsonAlertLogThreadInit, .ThreadDeinitFunc = JsonAlertLogThreadDeinit, diff --git a/src/output-json-anomaly.c b/src/output-json-anomaly.c index 5c8551df436b..4392820106fd 100644 --- a/src/output-json-anomaly.c +++ b/src/output-json-anomaly.c @@ -271,14 +271,6 @@ static int AnomalyJson(ThreadVars *tv, JsonAnomalyLogThread *aft, const Packet * return rc; } -static int JsonAnomalyFlush(ThreadVars *tv, void *thread_data, const Packet *p) -{ - JsonAnomalyLogThread *aft = thread_data; - SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename); - OutputJsonFlush(aft->ctx); - return 0; -} - static int JsonAnomalyLogger(ThreadVars *tv, void *thread_data, const Packet *p) { JsonAnomalyLogThread *aft = thread_data; @@ -457,7 +449,6 @@ void JsonAnomalyLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonAnomalyLogger, - .FlushFunc = JsonAnomalyFlush, .ConditionFunc = JsonAnomalyLogCondition, .ThreadInitFunc = JsonAnomalyLogThreadInit, .ThreadDeinitFunc = JsonAnomalyLogThreadDeinit, diff --git a/src/output-json-arp.c b/src/output-json-arp.c index 6b1414dbc09a..a9c35577d527 100644 --- a/src/output-json-arp.c +++ b/src/output-json-arp.c @@ -105,7 +105,6 @@ void JsonArpLogRegister(void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonArpLogger, - .FlushFunc = NULL, .ConditionFunc = JsonArpLogCondition, .ThreadInitFunc = JsonLogThreadInit, .ThreadDeinitFunc = JsonLogThreadDeinit, diff --git a/src/output-json-common.c b/src/output-json-common.c index 3ac80774b4d6..02e9a19bf04c 100644 --- a/src/output-json-common.c +++ b/src/output-json-common.c @@ -70,15 +70,6 @@ static void OutputJsonLogDeInitCtxSub(OutputCtx *output_ctx) SCFree(output_ctx); } -int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p) -{ - OutputJsonThreadCtx *aft = thread_data; - LogFileCtx *file_ctx = aft->ctx->file_ctx; - SCLogDebug("%s flushing %s", tv->name, file_ctx->filename); - LogFileFlush(file_ctx); - return 0; -} - OutputInitResult OutputJsonLogInitSub(SCConfNode *conf, OutputCtx *parent_ctx) { OutputInitResult result = { NULL, false }; diff --git a/src/output-json-drop.c b/src/output-json-drop.c index ae18ac991873..5d42e907386f 100644 --- a/src/output-json-drop.c +++ b/src/output-json-drop.c @@ -392,7 +392,6 @@ void JsonDropLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonDropLogger, - .FlushFunc = OutputJsonLogFlush, .ConditionFunc = JsonDropLogCondition, .ThreadInitFunc = JsonDropLogThreadInit, .ThreadDeinitFunc = JsonDropLogThreadDeinit, diff --git a/src/output-json-frame.c b/src/output-json-frame.c index e5353aa3a9b4..24a9538385ee 100644 --- a/src/output-json-frame.c +++ b/src/output-json-frame.c @@ -558,7 +558,6 @@ void JsonFrameLogRegister(void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonFrameLogger, - .FlushFunc = OutputJsonLogFlush, .ConditionFunc = JsonFrameLogCondition, .ThreadInitFunc = JsonFrameLogThreadInit, .ThreadDeinitFunc = JsonFrameLogThreadDeinit, diff --git a/src/output-json-metadata.c b/src/output-json-metadata.c index ab94550fc8ce..2b93e3740774 100644 --- a/src/output-json-metadata.c +++ b/src/output-json-metadata.c @@ -96,7 +96,6 @@ void JsonMetadataLogRegister (void) { OutputPacketLoggerFunctions output_logger_functions = { .LogFunc = JsonMetadataLogger, - .FlushFunc = OutputJsonLogFlush, .ConditionFunc = JsonMetadataLogCondition, .ThreadInitFunc = JsonLogThreadInit, .ThreadDeinitFunc = JsonLogThreadDeinit, diff --git a/src/output-json.c b/src/output-json.c index f496df192f14..1a3d141aadb4 100644 --- a/src/output-json.c +++ b/src/output-json.c @@ -988,12 +988,6 @@ int OutputJSONBuffer(json_t *js, LogFileCtx *file_ctx, MemBuffer **buffer) return 0; } -void OutputJsonFlush(OutputJsonThreadCtx *ctx) -{ - LogFileCtx *file_ctx = ctx->file_ctx; - LogFileFlush(file_ctx); -} - void OutputJsonBuilderBuffer( ThreadVars *tv, const Packet *p, Flow *f, SCJsonBuilder *js, OutputJsonThreadCtx *ctx) { diff --git a/src/output-json.h b/src/output-json.h index 1f4fec70d041..a3b6006a496b 100644 --- a/src/output-json.h +++ b/src/output-json.h @@ -108,7 +108,6 @@ TmEcode JsonLogThreadDeinit(ThreadVars *t, void *data); void EveAddCommonOptions(const OutputJsonCommonSettings *cfg, const Packet *p, const Flow *f, SCJsonBuilder *js, enum SCOutputJsonLogDirection dir); -int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p); void EveAddMetadata(const Packet *p, const Flow *f, SCJsonBuilder *js); int OutputJSONMemBufferCallback(const char *str, size_t size, void *data); @@ -116,6 +115,5 @@ int OutputJSONMemBufferCallback(const char *str, size_t size, void *data); OutputJsonThreadCtx *CreateEveThreadCtx(ThreadVars *t, OutputJsonCtx *ctx); void FreeEveThreadCtx(OutputJsonThreadCtx *ctx); void JSONFormatAndAddMACAddr(SCJsonBuilder *js, const char *key, const uint8_t *val, bool is_array); -void OutputJsonFlush(OutputJsonThreadCtx *ctx); #endif /* SURICATA_OUTPUT_JSON_H */ diff --git a/src/output.c b/src/output.c index 5d3794a4e0e9..f870eec39e69 100644 --- a/src/output.c +++ b/src/output.c @@ -87,7 +87,6 @@ typedef struct RootLogger_ { OutputLogFunc LogFunc; - OutputFlushFunc FlushFunc; ThreadInitFunc ThreadInit; ThreadDeinitFunc ThreadDeinit; OutputGetActiveCountFunc ActiveCntFunc; @@ -211,7 +210,6 @@ void OutputRegisterPacketModule(LoggerId id, const char *name, const char *conf_ module->conf_name = conf_name; module->InitFunc = InitFunc; module->PacketLogFunc = output_module_functions->LogFunc; - module->PacketFlushFunc = output_module_functions->FlushFunc; module->PacketConditionFunc = output_module_functions->ConditionFunc; module->ThreadInit = output_module_functions->ThreadInitFunc; module->ThreadDeinit = output_module_functions->ThreadDeinitFunc; @@ -251,7 +249,6 @@ void OutputRegisterPacketSubModule(LoggerId id, const char *parent_name, const c module->parent_name = parent_name; module->InitSubFunc = InitFunc; module->PacketLogFunc = output_logger_functions->LogFunc; - module->PacketFlushFunc = output_logger_functions->FlushFunc; module->PacketConditionFunc = output_logger_functions->ConditionFunc; module->ThreadInit = output_logger_functions->ThreadInitFunc; module->ThreadDeinit = output_logger_functions->ThreadDeinitFunc; @@ -785,21 +782,6 @@ void SCOnLoggingReady(void) } } -TmEcode OutputLoggerFlush(ThreadVars *tv, Packet *p, void *thread_data) -{ - LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data; - RootLogger *logger = TAILQ_FIRST(&active_loggers); - LoggerThreadStoreNode *thread_store_node = TAILQ_FIRST(thread_store); - while (logger && thread_store_node) { - if (logger->FlushFunc) - logger->FlushFunc(tv, p, thread_store_node->thread_data); - - logger = TAILQ_NEXT(logger, entries); - thread_store_node = TAILQ_NEXT(thread_store_node, entries); - } - return TM_ECODE_OK; -} - TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data) { LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data; diff --git a/src/output.h b/src/output.h index dd47fa488046..c09e13a48d7b 100644 --- a/src/output.h +++ b/src/output.h @@ -51,7 +51,6 @@ typedef struct OutputInitResult_ { typedef OutputInitResult (*OutputInitFunc)(SCConfNode *); typedef OutputInitResult (*OutputInitSubFunc)(SCConfNode *, OutputCtx *); typedef TmEcode (*OutputLogFunc)(ThreadVars *, Packet *, void *); -typedef TmEcode (*OutputFlushFunc)(ThreadVars *, Packet *, void *); typedef uint32_t (*OutputGetActiveCountFunc)(void); typedef struct OutputModule_ { @@ -66,7 +65,6 @@ typedef struct OutputModule_ { ThreadDeinitFunc ThreadDeinit; PacketLogger PacketLogFunc; - PacketLogger PacketFlushFunc; PacketLogCondition PacketConditionFunc; TxLogger TxLogFunc; TxLoggerCondition TxLogCondition; @@ -86,7 +84,6 @@ typedef struct OutputModule_ { /* struct for packet module and packet sub-module registration */ typedef struct OutputPacketLoggerFunctions_ { PacketLogger LogFunc; - PacketLogger FlushFunc; PacketLogCondition ConditionFunc; ThreadInitFunc ThreadInitFunc; ThreadDeinitFunc ThreadDeinitFunc; @@ -168,7 +165,6 @@ void OutputRegisterRootLogger(ThreadInitFunc ThreadInit, ThreadDeinitFunc Thread void TmModuleLoggerRegister(void); TmEcode OutputLoggerLog(ThreadVars *, Packet *, void *); -TmEcode OutputLoggerFlush(ThreadVars *, Packet *, void *); TmEcode OutputLoggerThreadInit(ThreadVars *, const void *, void **); TmEcode OutputLoggerThreadDeinit(ThreadVars *, void *); void OutputLoggerExitPrintStats(ThreadVars *, void *); From 8d30e2b61a0008319b2ea595dc52e192f2174715 Mon Sep 17 00:00:00 2001 From: Jeff Lucovsky Date: Sat, 14 Feb 2026 10:03:29 -0500 Subject: [PATCH 4/4] doc/config: Update flushing description Update output flushing description to reflect EVE based approach in documentation and config template. Added: Provide update callout for out-of-tree output plugins. Issue: 8286 (cherry picked from commit e7dc0d885bede2b58094c46ff886e4e6a3a95575) --- doc/userguide/output/eve/eve-json-output.rst | 4 ++-- doc/userguide/partials/eve-log.yaml | 10 +++++----- doc/userguide/upgrade.rst | 19 +++++++++++++++++++ suricata.yaml.in | 10 +++++----- 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/doc/userguide/output/eve/eve-json-output.rst b/doc/userguide/output/eve/eve-json-output.rst index a6c88d1e640a..7272fb982b34 100644 --- a/doc/userguide/output/eve/eve-json-output.rst +++ b/doc/userguide/output/eve/eve-json-output.rst @@ -32,9 +32,9 @@ may be held in memory and written a short time later opening the possibility -- loss. Hence, a heartbeat mechanism is introduced to limit the amount of time buffered data may exist before being -flushed. Control is provided to instruct Suricata's detection threads to flush their EVE output. With default +flushed. A heartbeat thread periodically flushes all active EVE log files directly. With default values, there is no change in output buffering and flushing behavior. ``output-flush-interval`` controls -how often Suricata's detect threads will flush output in a heartbeat fashion. A value of ``0`` means +how often Suricata will flush EVE output in a heartbeat fashion. A value of ``0`` means "never"; non-zero values must be in ``[1-60]`` seconds. Flushing should be considered when ``outputs.buffer-size`` is greater than 0 to limit the amount and diff --git a/doc/userguide/partials/eve-log.yaml b/doc/userguide/partials/eve-log.yaml index 845d7e1157c1..f9c579670be7 100644 --- a/doc/userguide/partials/eve-log.yaml +++ b/doc/userguide/partials/eve-log.yaml @@ -286,12 +286,12 @@ outputs: # spurious-retransmission: false # log spurious retransmission packets # heartbeat: - # The output-flush-interval value governs how often Suricata will instruct the - # detection threads to flush their EVE output. Specify the value in seconds [1-60] - # and Suricata will initiate EVE log output flushes at that interval. A value - # of 0 means no EVE log output flushes are initiated. When the EVE output + # The output-flush-interval value governs how often Suricata will flush + # EVE log file output. Specify the value in seconds [1-60] and Suricata will + # flush all active EVE log files at that interval. A value of 0 means + # no EVE log output flushes are performed. When the EVE output # buffer-size value is non-zero, some EVE output that was written may remain # buffered. The output-flush-interval governs how much buffered data exists. # - # The default value is: 0 (never instruct detection threads to flush output) + # The default value is: 0 (no periodic flushing) #output-flush-interval: 0 diff --git a/doc/userguide/upgrade.rst b/doc/userguide/upgrade.rst index 5ab7e2cb03eb..c75858805727 100644 --- a/doc/userguide/upgrade.rst +++ b/doc/userguide/upgrade.rst @@ -34,6 +34,25 @@ also check all the new features that have been added but are not covered by this guide. Those features are either not enabled by default or require dedicated new configuration. +Upgrading to 8.0.5 +------------------ + +Other Changes +~~~~~~~~~~~~~ + +- We've made a change in the way that background EVE flushing driven by the + heartbeat mechanism operates. The heartbeat mechanism provides a way to + periodically flush EVE outputs when ``eve-log.buffer-size`` is non-zero and + ``heartbeat.output-flush-interval`` is non-zero. + + There is no change to the functionality enabled by the heartbeat mechanism; we + overhauled and simplified the implementation. Developers maintaining + out-of-tree output plugins may need to update their code in order for the + plugin to compile and load, due to the removal of the packet-logger + ``FlushFunc`` registration field and related helpers (``OutputJsonFlush``, + ``OutputJsonLogFlush``, ``OutputLoggerFlush``). Periodic heartbeat flushing is + specific to EVE output types and handled entirely within the EVE logic. + Upgrading to 8.0.3 ------------------ diff --git a/suricata.yaml.in b/suricata.yaml.in index c40013f9c0c2..5eeb6b585282 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -588,14 +588,14 @@ outputs: # - script1.lua heartbeat: - # The output-flush-interval value governs how often Suricata will instruct the - # detection threads to flush their EVE output. Specify the value in seconds [1-60] - # and Suricata will initiate EVE log output flushes at that interval. A value - # of 0 means no EVE log output flushes are initiated. When the EVE output + # The output-flush-interval value governs how often Suricata will flush + # EVE log file output. Specify the value in seconds [1-60] and Suricata will + # flush all active EVE log files at that interval. A value of 0 means + # no EVE log output flushes are performed. When the EVE output # buffer-size value is non-zero, some EVE output that was written may remain # buffered. The output-flush-interval governs how much buffered data exists. # - # The default value is: 0 (never instruct detection threads to flush output) + # The default value is: 0 (no periodic flushing) #output-flush-interval: 0 # Logging configuration. This is not about logging IDS alerts/events, but