diff --git a/src/flow-manager.c b/src/flow-manager.c index d9ef04da8660..45b3ef54223e 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -357,7 +357,12 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT * be modified when we have both the flow and hash row lock */ /* timeout logic goes here */ - if (!FlowManagerFlowTimeout(f, ts, next_ts, emergency)) { + bool bypass_shutdown = false; +#ifdef CAPTURE_OFFLOAD + bypass_shutdown = (f->flow_state == FLOW_STATE_CAPTURE_BYPASSED) && + (SC_ATOMIC_GET(flow_flags) & FLOW_SHUTDOWN); +#endif + if (!bypass_shutdown && !FlowManagerFlowTimeout(f, ts, next_ts, emergency)) { FLOWLOCK_UNLOCK(f); counters->flows_notimeout++; @@ -448,21 +453,25 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const #endif const uint32_t ts_secs = (uint32_t)SCTIME_SECS(ts); + const bool shutdown = (SC_ATOMIC_GET(flow_flags) & FLOW_SHUTDOWN); for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) { TYPE check_bits = 0; const uint32_t check = MIN(BITS, (hash_max - idx)); - for (uint32_t i = 0; i < check; i++) { - FlowBucket *fb = &flow_hash[idx+i]; - check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT( - fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs) - << (TYPE)i; + if (!shutdown) { + for (uint32_t i = 0; i < check; i++) { + FlowBucket *fb = &flow_hash[idx + i]; + check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(fb->next_ts, + SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs) + << (TYPE)i; + } + if (check_bits == 0) + continue; } - if (check_bits == 0) - continue; - for (uint32_t i = 0; i < check; i++) { FlowBucket *fb = &flow_hash[idx+i]; - if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) { + if (((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && + SC_ATOMIC_GET(fb->next_ts) <= ts_secs) || + shutdown) { FBLOCK_LOCK(fb); Flow *evicted = NULL; if (fb->evicted != NULL || fb->head != NULL) { @@ -970,6 +979,13 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) } if (TmThreadsCheckFlag(th_v, THV_KILL)) { + SC_ATOMIC_OR(flow_flags, FLOW_SHUTDOWN); +#ifdef CAPTURE_OFFLOAD + /* Pass through all flows to gather counters from bypassed flows */ + FlowTimeoutCounters counters = { 0 }; + FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters); + FlowCountersUpdate(th_v, ftd, &counters); +#endif /* CAPTURE_OFFLOAD */ StatsSyncCounters(&th_v->stats); break; } diff --git a/src/flow-private.h b/src/flow-private.h index a7a4a011b5e0..c187b659251e 100644 --- a/src/flow-private.h +++ b/src/flow-private.h @@ -35,6 +35,9 @@ * flows for new flows and/or it's memcap limit it reached. In this state the * flow engine with evaluate flows with lower timeout settings. */ #define FLOW_EMERGENCY 0x01 +/** FlowManager is in shutdown stage. This results in a final pass of all + * flows in the flow table, during which statistics from flows are collected. */ +#define FLOW_SHUTDOWN 0x02 /* Flow Time out values */ #define FLOW_DEFAULT_NEW_TIMEOUT 30