Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions doc/userguide/output/eve/eve-json-output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ may be held in memory and written a short time later opening the possibility --
loss.

Hence, a heartbeat mechanism is introduced to limit the amount of time buffered data may exist before being
flushed. Control is provided to instruct Suricata's detection threads to flush their EVE output. With default
flushed. A heartbeat thread periodically flushes all active EVE log files directly. With default
values, there is no change in output buffering and flushing behavior. ``output-flush-interval`` controls
how often Suricata's detect threads will flush output in a heartbeat fashion. A value of ``0`` means
how often Suricata will flush EVE output in a heartbeat fashion. A value of ``0`` means
"never"; non-zero values must be in ``[1-60]`` seconds.

Flushing should be considered when ``outputs.buffer-size`` is greater than 0 to limit the amount and
Expand Down
10 changes: 5 additions & 5 deletions doc/userguide/partials/eve-log.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,12 @@ outputs:
# spurious-retransmission: false # log spurious retransmission packets
#
heartbeat:
# The output-flush-interval value governs how often Suricata will instruct the
# detection threads to flush their EVE output. Specify the value in seconds [1-60]
# and Suricata will initiate EVE log output flushes at that interval. A value
# of 0 means no EVE log output flushes are initiated. When the EVE output
# The output-flush-interval value governs how often Suricata will flush
# EVE log file output. Specify the value in seconds [1-60] and Suricata will
# flush all active EVE log files at that interval. A value of 0 means
# no EVE log output flushes are performed. When the EVE output
# buffer-size value is non-zero, some EVE output that was written may remain
# buffered. The output-flush-interval governs how much buffered data exists.
#
# The default value is: 0 (never instruct detection threads to flush output)
# The default value is: 0 (no periodic flushing)
#output-flush-interval: 0
19 changes: 19 additions & 0 deletions doc/userguide/upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,25 @@ also check all the new features that have been added but are not covered by
this guide. Those features are either not enabled by default or require
dedicated new configuration.

Upgrading to 8.0.5
------------------

Other Changes
~~~~~~~~~~~~~

- We've made a change in the way that background EVE flushing driven by the
heartbeat mechanism operates. The heartbeat mechanism provides a way to
periodically flush EVE outputs when ``eve-log.buffer-size`` is non-zero and
``heartbeat.output-flush-interval`` is non-zero.

There is no change to the functionality enabled by the heartbeat mechanism; we
overhauled and simplified the implementation. Developers maintaining
out-of-tree output plugins may need to update their code in order for the
plugin to compile and load, due to the removal of the packet-logger
``FlushFunc`` registration field and related helpers (``OutputJsonFlush``,
``OutputJsonLogFlush``, ``OutputLoggerFlush``). Periodic heartbeat flushing is
specific to EVE output types and handled entirely within the EVE logic.

Upgrading to 8.0.3
------------------

Expand Down
1 change: 0 additions & 1 deletion src/alert-debuglog.c
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ void AlertDebugLogRegister(void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = AlertDebugLogLogger,
.FlushFunc = NULL,
.ConditionFunc = AlertDebugLogCondition,
.ThreadInitFunc = AlertDebugLogThreadInit,
.ThreadDeinitFunc = AlertDebugLogThreadDeinit,
Expand Down
1 change: 0 additions & 1 deletion src/alert-fastlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ void AlertFastLogRegister(void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = AlertFastLogger,
.FlushFunc = NULL,
.ConditionFunc = AlertFastLogCondition,
.ThreadInitFunc = AlertFastLogThreadInit,
.ThreadDeinitFunc = AlertFastLogThreadDeinit,
Expand Down
1 change: 0 additions & 1 deletion src/alert-syslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ void AlertSyslogRegister (void)
#ifndef OS_WIN32
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = AlertSyslogLogger,
.FlushFunc = NULL,
.ConditionFunc = AlertSyslogCondition,
.ThreadInitFunc = AlertSyslogThreadInit,
.ThreadDeinitFunc = AlertSyslogThreadDeinit,
Expand Down
6 changes: 1 addition & 5 deletions src/decode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1315,12 +1315,8 @@ void DecodeUnregisterCounters(void);
#define PKT_FIRST_ALERTS BIT_U32(29)
#define PKT_FIRST_TAG BIT_U32(30)

#define PKT_PSEUDO_LOG_FLUSH BIT_U32(31) /**< Detect/log flush for protocol upgrade */

/** \brief return 1 if the packet is a pseudo packet */
#define PKT_IS_PSEUDOPKT(p) \
((p)->flags & (PKT_PSEUDO_STREAM_END|PKT_PSEUDO_DETECTLOG_FLUSH))
#define PKT_IS_FLUSHPKT(p) ((p)->flags & (PKT_PSEUDO_LOG_FLUSH))
#define PKT_IS_PSEUDOPKT(p) ((p)->flags & (PKT_PSEUDO_STREAM_END | PKT_PSEUDO_DETECTLOG_FLUSH))

#define PKT_SET_SRC(p, src_val) ((p)->pkt_src = src_val)

Expand Down
32 changes: 0 additions & 32 deletions src/detect-engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -2241,38 +2241,6 @@ int DetectEngineInspectPktBufferGeneric(
}
}

/** \internal
* \brief inject a pseudo packet into each detect thread
* if the thread should flush its output logs.
*/
void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs)
{
/* inject a fake packet if the detect thread that needs it. This function
* is called when a heartbeat log-flush request has been made
* and it should process a pseudo packet and flush its output logs
* to speed the process. */
#if DEBUG
int count = 0;
#endif
for (int i = 0; i < no_of_detect_tvs; i++) {
if (detect_tvs[i]) { // && detect_tvs[i]->inq != NULL) {
Packet *p = PacketGetFromAlloc();
if (p != NULL) {
SCLogDebug("Injecting pkt for tv %s[i=%d] %d", detect_tvs[i]->name, i, count++);
p->flags |= PKT_PSEUDO_STREAM_END;
p->flags |= PKT_PSEUDO_LOG_FLUSH;
PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
PacketQueue *q = detect_tvs[i]->stream_pq;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
}
}
SCLogDebug("leaving: thread notification count = %d", count);
}

/** \internal
* \brief inject a pseudo packet into each detect thread
* -that doesn't use the new det_ctx yet
Expand Down
2 changes: 0 additions & 2 deletions src/detect-engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,4 @@ bool DetectMd5ValidateCallback(

void DeStateRegisterTests(void);

/* packet injection */
void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs);
#endif /* SURICATA_DETECT_ENGINE_H */
23 changes: 0 additions & 23 deletions src/flow-worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ typedef struct FlowWorkerThreadData_ {

SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);

SC_ATOMIC_DECLARE(bool, flush_ack);

void *output_thread; /* Output thread data. */
void *output_thread_flow; /* Output thread data. */

Expand Down Expand Up @@ -564,15 +562,6 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)

SCLogDebug("packet %"PRIu64, p->pcap_cnt);

if ((PKT_IS_FLUSHPKT(p))) {
SCLogDebug("thread %s flushing", tv->printable_name);
OutputLoggerFlush(tv, p, fw->output_thread);
/* Ack if a flush was requested */
bool notset = false;
SC_ATOMIC_CAS(&fw->flush_ack, notset, true);
return TM_ECODE_OK;
}

/* handle Flow */
if (det_ctx != NULL && det_ctx->de_ctx->PreFlowHook != NULL) {
const uint8_t action = det_ctx->de_ctx->PreFlowHook(tv, det_ctx, p);
Expand Down Expand Up @@ -756,18 +745,6 @@ void *FlowWorkerGetThreadData(void *flow_worker)
return (FlowWorkerThreadData *)flow_worker;
}

bool FlowWorkerGetFlushAck(void *flow_worker)
{
FlowWorkerThreadData *fw = flow_worker;
return SC_ATOMIC_GET(fw->flush_ack) == true;
}

void FlowWorkerSetFlushAck(void *flow_worker)
{
FlowWorkerThreadData *fw = flow_worker;
SC_ATOMIC_SET(fw->flush_ack, false);
}

const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
{
switch (fwi) {
Expand Down
2 changes: 0 additions & 2 deletions src/flow-worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi);
void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx);
void *FlowWorkerGetDetectCtxPtr(void *flow_worker);
void *FlowWorkerGetThreadData(void *flow_worker);
bool FlowWorkerGetFlushAck(void *flow_worker);
void FlowWorkerSetFlushAck(void *flow_worker);

void TmModuleFlowWorkerRegister (void);

Expand Down
98 changes: 5 additions & 93 deletions src/log-flush.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (C) 2024 Open Information Security Foundation
/* Copyright (C) 2026 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
Expand All @@ -23,103 +23,15 @@

#include "suricata-common.h"
#include "suricata.h"
#include "detect.h"
#include "detect-engine.h"
#include "flow-worker.h"
#include "log-flush.h"
#include "util-logopenfile.h"
#include "tm-threads.h"
#include "conf.h"
#include "conf-yaml-loader.h"
#include "util-privs.h"
#include "util-logopenfile.h"

/**
* \brief Trigger detect threads to flush their output logs
*
* This function is intended to be called at regular intervals to force
* buffered log data to be persisted
*/
static void WorkerFlushLogs(void)
{
SCEnter();

/* count detect threads in use */
uint32_t no_of_detect_tvs = TmThreadCountThreadsByTmmFlags(TM_FLAG_FLOWWORKER_TM);
/* can be zero in unix socket mode */
if (no_of_detect_tvs == 0) {
return;
}

/* prepare swap structures */
void *fw_threads[no_of_detect_tvs];
ThreadVars *detect_tvs[no_of_detect_tvs];
memset(fw_threads, 0x00, (no_of_detect_tvs * sizeof(void *)));
memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *)));

/* start by initiating the log flushes */

uint32_t i = 0;
SCMutexLock(&tv_root_lock);
/* get reference to tv's and setup fw_threads array */
for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
if ((tv->tmm_flags & TM_FLAG_FLOWWORKER_TM) == 0) {
continue;
}
for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
TmModule *tm = TmModuleGetById(s->tm_id);
if (!(tm->flags & TM_FLAG_FLOWWORKER_TM)) {
continue;
}

if (suricata_ctl_flags != 0) {
SCMutexUnlock(&tv_root_lock);
goto error;
}

fw_threads[i] = FlowWorkerGetThreadData(SC_ATOMIC_GET(s->slot_data));
if (fw_threads[i]) {
FlowWorkerSetFlushAck(fw_threads[i]);
SCLogDebug("Setting flush-ack for thread %s[i=%d]", tv->printable_name, i);
detect_tvs[i] = tv;
}

i++;
break;
}
}
BUG_ON(i != no_of_detect_tvs);

SCMutexUnlock(&tv_root_lock);

SCLogDebug("Creating flush pseudo packets for %d threads", no_of_detect_tvs);
InjectPacketsForFlush(detect_tvs, no_of_detect_tvs);

uint32_t threads_done = 0;
retry:
for (i = 0; i < no_of_detect_tvs; i++) {
if (suricata_ctl_flags != 0) {
threads_done = no_of_detect_tvs;
break;
}
SleepMsec(1);
if (fw_threads[i] && FlowWorkerGetFlushAck(fw_threads[i])) {
SCLogDebug("thread slot %d has ack'd flush request", i);
threads_done++;
} else if (detect_tvs[i]) {
SCLogDebug("thread slot %d not yet ack'd flush request", i);
TmThreadsCaptureBreakLoop(detect_tvs[i]);
}
}
if (threads_done < no_of_detect_tvs) {
threads_done = 0;
SleepMsec(250);
goto retry;
}

error:
return;
}

static int OutputFlushInterval(void)
int OutputFlushInterval(void)
{
intmax_t output_flush_interval = 0;
if (SCConfGetInt("heartbeat.output-flush-interval", &output_flush_interval) == 0) {
Expand Down Expand Up @@ -168,7 +80,7 @@ static void *LogFlusherWakeupThread(void *arg)

if (++wait_count == flush_wait_count) {
worker_flush_count++;
WorkerFlushLogs();
LogFileFlushAll();
wait_count = 0;
}

Expand Down
1 change: 1 addition & 0 deletions src/log-flush.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@
#ifndef SURICATA_LOG_FLUSH_H__
#define SURICATA_LOG_FLUSH_H__
void LogFlushThreads(void);
int OutputFlushInterval(void);
#endif /* SURICATA_LOG_FLUSH_H__ */
1 change: 0 additions & 1 deletion src/log-pcap.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ void PcapLogRegister(void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = PcapLog,
.FlushFunc = NULL,
.ConditionFunc = PcapLogCondition,
.ThreadInitFunc = PcapLogDataInit,
.ThreadDeinitFunc = PcapLogDataDeinit,
Expand Down
1 change: 0 additions & 1 deletion src/output-eve-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ void EveStreamLogRegister(void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = EveStreamLogger,
.FlushFunc = OutputJsonLogFlush,
.ConditionFunc = EveStreamLogCondition,
.ThreadInitFunc = EveStreamLogThreadInit,
.ThreadDeinitFunc = EveStreamLogThreadDeinit,
Expand Down
9 changes: 0 additions & 9 deletions src/output-json-alert.c
Original file line number Diff line number Diff line change
Expand Up @@ -854,14 +854,6 @@ static int AlertJsonDecoderEvent(ThreadVars *tv, JsonAlertLogThread *aft, const
return TM_ECODE_OK;
}

static int JsonAlertFlush(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAlertLogThread *aft = thread_data;
SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename);
OutputJsonFlush(aft->ctx);
return 0;
}

static int JsonAlertLogger(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAlertLogThread *aft = thread_data;
Expand Down Expand Up @@ -1109,7 +1101,6 @@ void JsonAlertLogRegister (void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonAlertLogger,
.FlushFunc = JsonAlertFlush,
.ConditionFunc = JsonAlertLogCondition,
.ThreadInitFunc = JsonAlertLogThreadInit,
.ThreadDeinitFunc = JsonAlertLogThreadDeinit,
Expand Down
9 changes: 0 additions & 9 deletions src/output-json-anomaly.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,6 @@ static int AnomalyJson(ThreadVars *tv, JsonAnomalyLogThread *aft, const Packet *
return rc;
}

static int JsonAnomalyFlush(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAnomalyLogThread *aft = thread_data;
SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename);
OutputJsonFlush(aft->ctx);
return 0;
}

static int JsonAnomalyLogger(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAnomalyLogThread *aft = thread_data;
Expand Down Expand Up @@ -457,7 +449,6 @@ void JsonAnomalyLogRegister (void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonAnomalyLogger,
.FlushFunc = JsonAnomalyFlush,
.ConditionFunc = JsonAnomalyLogCondition,
.ThreadInitFunc = JsonAnomalyLogThreadInit,
.ThreadDeinitFunc = JsonAnomalyLogThreadDeinit,
Expand Down
1 change: 0 additions & 1 deletion src/output-json-arp.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ void JsonArpLogRegister(void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonArpLogger,
.FlushFunc = NULL,
.ConditionFunc = JsonArpLogCondition,
.ThreadInitFunc = JsonLogThreadInit,
.ThreadDeinitFunc = JsonLogThreadDeinit,
Expand Down
Loading
Loading