diff --git a/rust/src/mqtt/mqtt.rs b/rust/src/mqtt/mqtt.rs index f1c37d83c881..3f110dfc038c 100644 --- a/rust/src/mqtt/mqtt.rs +++ b/rust/src/mqtt/mqtt.rs @@ -183,11 +183,11 @@ impl MQTTState { } fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction { - let direction = if toclient { - Direction::ToClient - } else { - Direction::ToServer - }; + let direction = if toclient { + Direction::ToClient + } else { + Direction::ToServer + }; let mut tx = MQTTTransaction::new(msg, direction); self.tx_id += 1; tx.tx_id = self.tx_id; @@ -217,104 +217,82 @@ impl MQTTState { match msg.op { MQTTOperation::CONNECT(ref conn) => { self.protocol_version = conn.protocol_version; + let mut tx = self.new_tx(msg, toclient); + tx.pkt_id = Some(MQTT_CONNECT_PKT_ID); if self.connected { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect); - self.transactions.push_back(tx); - } else { - let mut tx = self.new_tx(msg, toclient); - tx.pkt_id = Some(MQTT_CONNECT_PKT_ID); - self.transactions.push_back(tx); } + self.transactions.push_back(tx); } MQTTOperation::PUBLISH(ref publish) => { - if !self.connected { - let mut tx = self.new_tx(msg, toclient); - MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; - } - match msg.header.qos_level { + let qos = msg.header.qos_level; + let pkt_id = publish.message_id; + let mut tx = self.new_tx(msg, toclient); + match qos { 0 => { // with QOS level 0, we do not need to wait for a // response - let mut tx = self.new_tx(msg, toclient); tx.complete = true; - self.transactions.push_back(tx); } 1..=2 => { - if let Some(pkt_id) = publish.message_id { - let mut tx = self.new_tx(msg, toclient); + if let Some(pkt_id) = pkt_id { tx.pkt_id = Some(pkt_id as u32); - self.transactions.push_back(tx); } else { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId); - self.transactions.push_back(tx); } } _ => { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); - self.transactions.push_back(tx); } } - } - MQTTOperation::SUBSCRIBE(ref subscribe) => { if !self.connected { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; } + self.transactions.push_back(tx); + } + MQTTOperation::SUBSCRIBE(ref subscribe) => { let pkt_id = subscribe.message_id as u32; - match msg.header.qos_level { + let qos = msg.header.qos_level; + let mut tx = self.new_tx(msg, toclient); + match qos { 0 => { // with QOS level 0, we do not need to wait for a // response - let mut tx = self.new_tx(msg, toclient); tx.complete = true; - self.transactions.push_back(tx); } 1..=2 => { - let mut tx = self.new_tx(msg, toclient); tx.pkt_id = Some(pkt_id); - self.transactions.push_back(tx); } _ => { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); - self.transactions.push_back(tx); } } - } - MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => { if !self.connected { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; } + self.transactions.push_back(tx); + } + MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => { let pkt_id = unsubscribe.message_id as u32; - match msg.header.qos_level { + let qos = msg.header.qos_level; + let mut tx = self.new_tx(msg, toclient); + match qos { 0 => { // with QOS level 0, we do not need to wait for a // response - let mut tx = self.new_tx(msg, toclient); tx.complete = true; - self.transactions.push_back(tx); } 1..=2 => { - let mut tx = self.new_tx(msg, toclient); tx.pkt_id = Some(pkt_id); - self.transactions.push_back(tx); } _ => { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel); - self.transactions.push_back(tx); } } + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + self.transactions.push_back(tx); } MQTTOperation::CONNACK(ref _connack) => { if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) { @@ -325,31 +303,24 @@ impl MQTTState { } else { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect); + tx.complete = true; self.transactions.push_back(tx); } } MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => { - if !self.connected { - let mut tx = self.new_tx(msg, toclient); - MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; - } if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) { tx.msg.push(msg); } else { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; self.transactions.push_back(tx); } } MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => { - if !self.connected { - let mut tx = self.new_tx(msg, toclient); - MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; - } if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) { tx.msg.push(msg); tx.complete = true; @@ -357,16 +328,14 @@ impl MQTTState { } else { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; self.transactions.push_back(tx); } } MQTTOperation::SUBACK(ref suback) => { - if !self.connected { - let mut tx = self.new_tx(msg, toclient); - MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; - } if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) { tx.msg.push(msg); tx.complete = true; @@ -374,16 +343,14 @@ impl MQTTState { } else { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; self.transactions.push_back(tx); } } MQTTOperation::UNSUBACK(ref unsuback) => { - if !self.connected { - let mut tx = self.new_tx(msg, toclient); - MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; - } if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) { tx.msg.push(msg); tx.complete = true; @@ -391,6 +358,10 @@ impl MQTTState { } else { let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe); + if !self.connected { + MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); + } + tx.complete = true; self.transactions.push_back(tx); } } @@ -406,25 +377,19 @@ impl MQTTState { self.transactions.push_back(tx); } MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => { + let mut tx = self.new_tx(msg, toclient); + tx.complete = true; if !self.connected { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; } - let mut tx = self.new_tx(msg, toclient); - tx.complete = true; self.transactions.push_back(tx); } MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => { + let mut tx = self.new_tx(msg, toclient); + tx.complete = true; if !self.connected { - let mut tx = self.new_tx(msg, toclient); MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage); - self.transactions.push_back(tx); - return; } - let mut tx = self.new_tx(msg, toclient); - tx.complete = true; self.transactions.push_back(tx); } } @@ -608,7 +573,11 @@ impl MQTTState { } fn set_event_notx(&mut self, event: MQTTEvent, toclient: bool) { - let mut tx = MQTTTransaction::new_empty(if toclient { Direction::ToClient } else { Direction::ToServer }); + let mut tx = MQTTTransaction::new_empty(if toclient { + Direction::ToClient + } else { + Direction::ToServer + }); self.tx_id += 1; tx.tx_id = self.tx_id; if toclient { diff --git a/src/detect.c b/src/detect.c index d671a3866fa5..40b46dde1e1c 100644 --- a/src/detect.c +++ b/src/detect.c @@ -152,6 +152,12 @@ static void DetectRun(ThreadVars *th_v, DetectRunFrames(th_v, de_ctx, det_ctx, p, pflow, &scratch); // PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX); } + // no update to transactions + if (!PKT_IS_PSEUDOPKT(p) && p->app_update_direction == 0 && + ((PKT_IS_TOSERVER(p) && (p->flow->flags & FLOW_TS_APP_UPDATED) == 0) || + (PKT_IS_TOCLIENT(p) && (p->flow->flags & FLOW_TC_APP_UPDATED) == 0))) { + goto end; + } } else if (p->proto == IPPROTO_UDP) { DetectRunFrames(th_v, de_ctx, det_ctx, p, pflow, &scratch); } @@ -159,6 +165,11 @@ static void DetectRun(ThreadVars *th_v, PACKET_PROFILING_DETECT_START(p, PROF_DETECT_TX); DetectRunTx(th_v, de_ctx, det_ctx, p, pflow, &scratch); PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX); + /* see if we need to increment the inspect_id and reset the de_state */ + PACKET_PROFILING_DETECT_START(p, PROF_DETECT_TX_UPDATE); + AppLayerParserSetTransactionInspectId( + pflow, pflow->alparser, pflow->alstate, scratch.flow_flags, (scratch.sgh == NULL)); + PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX_UPDATE); } end: @@ -919,14 +930,6 @@ static inline void DetectRunPostRules( Flow * const pflow, DetectRunScratchpad *scratch) { - /* see if we need to increment the inspect_id and reset the de_state */ - if (pflow && pflow->alstate) { - PACKET_PROFILING_DETECT_START(p, PROF_DETECT_TX_UPDATE); - AppLayerParserSetTransactionInspectId(pflow, pflow->alparser, pflow->alstate, - scratch->flow_flags, (scratch->sgh == NULL)); - PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX_UPDATE); - } - /* so now let's iterate the alerts and remove the ones after a pass rule * matched (if any). This is done inside PacketAlertFinalize() */ /* PR: installed "tag" keywords are handled after the threshold inspection */ @@ -1301,6 +1304,81 @@ static inline void StoreDetectFlags(DetectTransaction *tx, const uint8_t flow_fl } } +// Merge 'state' rules from the regular prefilter +// updates array_idx on the way +static inline void RuleMatchCandidateMergeStateRules( + DetectEngineThreadCtx *det_ctx, uint32_t *array_idx) +{ + // Now, we will merge 2 sorted lists : + // the one in det_ctx->tx_candidates + // and the one in det_ctx->match_array + // For match_array, we take only the relevant elements where s->app_inspect != NULL + + // Basically, we iterate at the same time over the 2 lists + // comparing and taking an element from either. + + // Trick is to do so in place in det_ctx->tx_candidates, + // so as to minimize the number of moves in det_ctx->tx_candidates. + // For this, the algorithm traverses the lists in reverse order. + // Otherwise, if the first element of match_array was to be put before + // all tx_candidates, we would need to shift all tx_candidates + + // Retain the number of elements sorted in tx_candidates before merge + uint32_t j = *array_idx; + // First loop only counting the number of elements to add + for (uint32_t i = 0; i < det_ctx->match_array_cnt; i++) { + const Signature *s = det_ctx->match_array[i]; + if (s->app_inspect != NULL) { + (*array_idx)++; + } + } + // Future number of elements in tx_candidates after merge + uint32_t k = *array_idx; + + if (k == j) { + // no new element from match_array to merge in tx_candidates + return; + } + + // variable i is for all elements of match_array (even not relevant ones) + // variable j is for elements of tx_candidates before merge + // variable k is for elements of tx_candidates after merge + for (uint32_t i = det_ctx->match_array_cnt; i > 0;) { + const Signature *s = det_ctx->match_array[i - 1]; + if (s->app_inspect == NULL) { + // no relevant element, get the next one from match_array + i--; + continue; + } + // we have one element from match_array to merge in tx_candidates + k--; + if (j > 0) { + // j > 0 means there is still at least one element in tx_candidates to merge + const RuleMatchCandidateTx *s0 = &det_ctx->tx_candidates[j - 1]; + if (s->num <= s0->id) { + // get next element from previous tx_candidates + j--; + // take the element from tx_candidates before merge + det_ctx->tx_candidates[k].s = det_ctx->tx_candidates[j].s; + det_ctx->tx_candidates[k].id = det_ctx->tx_candidates[j].id; + det_ctx->tx_candidates[k].flags = det_ctx->tx_candidates[j].flags; + det_ctx->tx_candidates[k].stream_reset = det_ctx->tx_candidates[j].stream_reset; + continue; + } + } // otherwise + // get next element from match_array + i--; + // take the element from match_array + det_ctx->tx_candidates[k].s = s; + det_ctx->tx_candidates[k].id = s->num; + det_ctx->tx_candidates[k].flags = NULL; + det_ctx->tx_candidates[k].stream_reset = 0; + } + // Even if k > 0 or j > 0, the loop is over. (Note that j == k now) + // The remaining elements in tx_candidates up to k were already sorted + // and come before any other element later in the list +} + static void DetectRunTx(ThreadVars *tv, DetectEngineCtx *de_ctx, DetectEngineThreadCtx *det_ctx, @@ -1374,24 +1452,10 @@ static void DetectRunTx(ThreadVars *tv, } /* merge 'state' rules from the regular prefilter */ +#ifdef PROFILING uint32_t x = array_idx; - for (uint32_t i = 0; i < det_ctx->match_array_cnt; i++) { - const Signature *s = det_ctx->match_array[i]; - if (s->app_inspect != NULL) { - const SigIntId id = s->num; - det_ctx->tx_candidates[array_idx].s = s; - det_ctx->tx_candidates[array_idx].id = id; - det_ctx->tx_candidates[array_idx].flags = NULL; - det_ctx->tx_candidates[array_idx].stream_reset = 0; - array_idx++; - - SCLogDebug("%p/%"PRIu64" rule %u (%u) added from 'match' list", - tx.tx_ptr, tx.tx_id, s->id, id); - } - } - do_sort = (array_idx > x); // sort if match added anything - SCLogDebug("%p/%" PRIu64 " rules added from 'match' list: %u", tx.tx_ptr, tx.tx_id, - array_idx - x); +#endif + RuleMatchCandidateMergeStateRules(det_ctx, &array_idx); /* merge stored state into results */ if (tx.de_state != NULL) { diff --git a/src/util-unittest-helper.c b/src/util-unittest-helper.c index 80356cf82e2b..414f5054b9df 100644 --- a/src/util-unittest-helper.c +++ b/src/util-unittest-helper.c @@ -316,6 +316,7 @@ Packet *UTHBuildPacketReal(uint8_t *payload, uint16_t payload_len, } SET_PKT_LEN(p, hdr_offset + payload_len); p->payload = GET_PKT_DATA(p)+hdr_offset; + p->app_update_direction = UPDATE_DIR_BOTH; return p;