Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions src/flow-manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,12 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT
* be modified when we have both the flow and hash row lock */

/* timeout logic goes here */
if (!FlowManagerFlowTimeout(f, ts, next_ts, emergency)) {
bool bypass_shutdown = false;
#ifdef CAPTURE_OFFLOAD
bypass_shutdown = (f->flow_state == FLOW_STATE_CAPTURE_BYPASSED) &&
(SC_ATOMIC_GET(flow_flags) & FLOW_SHUTDOWN);
#endif
if (!bypass_shutdown && !FlowManagerFlowTimeout(f, ts, next_ts, emergency)) {
FLOWLOCK_UNLOCK(f);
counters->flows_notimeout++;

Expand Down Expand Up @@ -448,21 +453,25 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const
#endif

const uint32_t ts_secs = (uint32_t)SCTIME_SECS(ts);
const bool shutdown = (SC_ATOMIC_GET(flow_flags) & FLOW_SHUTDOWN);
for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
TYPE check_bits = 0;
const uint32_t check = MIN(BITS, (hash_max - idx));
for (uint32_t i = 0; i < check; i++) {
FlowBucket *fb = &flow_hash[idx+i];
check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(
fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
<< (TYPE)i;
if (!shutdown) {
for (uint32_t i = 0; i < check; i++) {
FlowBucket *fb = &flow_hash[idx + i];
check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(fb->next_ts,
SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
<< (TYPE)i;
}
if (check_bits == 0)
continue;
}
if (check_bits == 0)
continue;

for (uint32_t i = 0; i < check; i++) {
FlowBucket *fb = &flow_hash[idx+i];
if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) {
if (((check_bits & ((TYPE)1 << (TYPE)i)) != 0 &&
SC_ATOMIC_GET(fb->next_ts) <= ts_secs) ||
shutdown) {
FBLOCK_LOCK(fb);
Flow *evicted = NULL;
if (fb->evicted != NULL || fb->head != NULL) {
Expand Down Expand Up @@ -970,6 +979,13 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
}

if (TmThreadsCheckFlag(th_v, THV_KILL)) {
SC_ATOMIC_OR(flow_flags, FLOW_SHUTDOWN);
#ifdef CAPTURE_OFFLOAD
/* Pass through all flows to gather counters from bypassed flows */
FlowTimeoutCounters counters = { 0 };
FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);
FlowCountersUpdate(th_v, ftd, &counters);
#endif /* CAPTURE_OFFLOAD */
StatsSyncCounters(&th_v->stats);
break;
}
Expand Down
3 changes: 3 additions & 0 deletions src/flow-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
* flows for new flows and/or it's memcap limit it reached. In this state the
* flow engine with evaluate flows with lower timeout settings. */
#define FLOW_EMERGENCY 0x01
/** FlowManager is in shutdown stage. This results in a final pass of all
* flows in the flow table, during which statistics from flows are collected. */
#define FLOW_SHUTDOWN 0x02

/* Flow Time out values */
#define FLOW_DEFAULT_NEW_TIMEOUT 30
Expand Down
Loading