Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 55 additions & 86 deletions rust/src/mqtt/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ impl MQTTState {
}

fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
let direction = if toclient {
Direction::ToClient
} else {
Direction::ToServer
};
let direction = if toclient {
Direction::ToClient
} else {
Direction::ToServer
};
let mut tx = MQTTTransaction::new(msg, direction);
self.tx_id += 1;
tx.tx_id = self.tx_id;
Expand Down Expand Up @@ -217,104 +217,82 @@ impl MQTTState {
match msg.op {
MQTTOperation::CONNECT(ref conn) => {
self.protocol_version = conn.protocol_version;
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
if self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
self.transactions.push_back(tx);
}
self.transactions.push_back(tx);
}
MQTTOperation::PUBLISH(ref publish) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
match msg.header.qos_level {
let qos = msg.header.qos_level;
let pkt_id = publish.message_id;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
if let Some(pkt_id) = publish.message_id {
let mut tx = self.new_tx(msg, toclient);
if let Some(pkt_id) = pkt_id {
tx.pkt_id = Some(pkt_id as u32);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
self.transactions.push_back(tx);
}
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
}
MQTTOperation::SUBSCRIBE(ref subscribe) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
self.transactions.push_back(tx);
}
MQTTOperation::SUBSCRIBE(ref subscribe) => {
let pkt_id = subscribe.message_id as u32;
match msg.header.qos_level {
let qos = msg.header.qos_level;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push_back(tx);
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
}
MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
self.transactions.push_back(tx);
}
MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
let pkt_id = unsubscribe.message_id as u32;
match msg.header.qos_level {
let qos = msg.header.qos_level;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push_back(tx);
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
self.transactions.push_back(tx);
}
MQTTOperation::CONNACK(ref _connack) => {
if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
Expand All @@ -325,72 +303,65 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
tx.msg.push(msg);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::SUBACK(ref suback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::UNSUBACK(ref unsuback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
Expand All @@ -406,25 +377,19 @@ impl MQTTState {
self.transactions.push_back(tx);
}
MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
}
Expand Down Expand Up @@ -608,7 +573,11 @@ impl MQTTState {
}

fn set_event_notx(&mut self, event: MQTTEvent, toclient: bool) {
let mut tx = MQTTTransaction::new_empty(if toclient { Direction::ToClient } else { Direction::ToServer });
let mut tx = MQTTTransaction::new_empty(if toclient {
Direction::ToClient
} else {
Direction::ToServer
});
self.tx_id += 1;
tx.tx_id = self.tx_id;
if toclient {
Expand Down
71 changes: 53 additions & 18 deletions src/detect.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,24 @@ static void DetectRun(ThreadVars *th_v,
DetectRunFrames(th_v, de_ctx, det_ctx, p, pflow, &scratch);
// PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX);
}
// no update to transactions
if (!PKT_IS_PSEUDOPKT(p) && p->app_update_direction == 0 &&
((PKT_IS_TOSERVER(p) && (p->flow->flags & FLOW_TS_APP_UPDATED) == 0) ||
(PKT_IS_TOCLIENT(p) && (p->flow->flags & FLOW_TC_APP_UPDATED) == 0))) {
goto end;
}
} else if (p->proto == IPPROTO_UDP) {
DetectRunFrames(th_v, de_ctx, det_ctx, p, pflow, &scratch);
}

PACKET_PROFILING_DETECT_START(p, PROF_DETECT_TX);
DetectRunTx(th_v, de_ctx, det_ctx, p, pflow, &scratch);
PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX);
/* see if we need to increment the inspect_id and reset the de_state */
PACKET_PROFILING_DETECT_START(p, PROF_DETECT_TX_UPDATE);
AppLayerParserSetTransactionInspectId(
pflow, pflow->alparser, pflow->alstate, scratch.flow_flags, (scratch.sgh == NULL));
PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX_UPDATE);
}

end:
Expand Down Expand Up @@ -911,14 +922,6 @@ static inline void DetectRunPostRules(
Flow * const pflow,
DetectRunScratchpad *scratch)
{
/* see if we need to increment the inspect_id and reset the de_state */
if (pflow && pflow->alstate) {
PACKET_PROFILING_DETECT_START(p, PROF_DETECT_TX_UPDATE);
AppLayerParserSetTransactionInspectId(pflow, pflow->alparser, pflow->alstate,
scratch->flow_flags, (scratch->sgh == NULL));
PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX_UPDATE);
}

/* so now let's iterate the alerts and remove the ones after a pass rule
* matched (if any). This is done inside PacketAlertFinalize() */
/* PR: installed "tag" keywords are handled after the threshold inspection */
Expand Down Expand Up @@ -1215,7 +1218,7 @@ static bool DetectRunTxInspectRule(ThreadVars *tv,
} else if ((inspect_flags & DE_STATE_FLAG_FULL_INSPECT) == 0 && mpm_in_progress) {
TRACE_SID_TXS(s->id, tx, "no need to store no-match sig, "
"mpm will revisit it");
} else {
} else if (inspect_flags != 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we store more than just the flags, we also store file_no_match. Can we have a case where we'd need that stored still?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point from your knowledge and wisdom. Will look that through

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note, feels weird to see file_no_match as u16 when it is only 0 or 1 up to its use in StoreFileNoMatchCnt as += file_no_match;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the logic, but I wouldn't be surprised if it is broken, is to stop tracking the files if all sigs that need it definitively failed to match. So it should increment this for each unique sig that fails to match.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the logic, but I wouldn't be surprised if it is broken, is to stop tracking the files if all sigs that need it definitively failed to match. So it should increment this for each unique sig that fails to match.

That is what I understood from reading the code

TRACE_SID_TXS(s->id, tx, "storing state: flags %08x", inspect_flags);
DetectRunStoreStateTx(scratch->sgh, f, tx->tx_ptr, tx->tx_id, s,
inspect_flags, flow_flags, file_no_match);
Expand Down Expand Up @@ -1293,6 +1296,46 @@ static inline void StoreDetectFlags(DetectTransaction *tx, const uint8_t flow_fl
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we merge this one, could we have a comment indicating what params j and k are?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, will do. IIRC they are size of both sorted lists to merge into a big one

static inline RuleMatchCandidateMergeSorted(DetectEngineThreadCtx *det_ctx, uint32_t j, uint32_t k)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having trouble understanding this. When we get called we have have 0-N entries in tx_candidates from prefilter. These are already ordered. The candidates coming from the match_array are merged into this, and the result has to be a sorted list by the candidates s->num (internal sig id). I guess I'm not seeing how we insert sort things into the already existing list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a matter of where this has been called, together with the fact that we still have the qsort called later on, (and we're not taking num into consideration when doing the ordering comparison), or are these factors irrelevant here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the fact that we still have the qsort called later on

The point is to avoid the call to qsort (in the case we do not have match candidates added after by stored flags) cf removal of do_sort = (array_idx > x); // sort if match added anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having trouble understanding this

Will put more comments.

Quick and dirty answer :
This is basically merging two sorted lists.
The trick is do it in place, without shifting all the elements.
So we start from the end (if we start from the beginning, and the new element should be first, we would have to shift all j elements already in place)

{
for (uint32_t i = det_ctx->match_array_cnt; i > 0;) {
const Signature *s = det_ctx->match_array[i - 1];
if (s->app_inspect != NULL) {
const SigIntId id = s->num;
if (j > 0) {
const RuleMatchCandidateTx *s0 = &det_ctx->tx_candidates[j - 1];
if (s->id > s0->id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this comparison be with num instead of id, since num is what's used for sorting?

Copy link
Member

@inashivb inashivb Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id is correct. I do think the names of objects should probably have been consistent in these structs.
Edit: Just realized what you meant, indeed the first item should be id or s->num.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about renaming Signature::num to Signature::iid (internal id)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cf DetectRunTxSortHelper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch Juliana

det_ctx->tx_candidates[k - 1].s = s;
det_ctx->tx_candidates[k - 1].id = id;
det_ctx->tx_candidates[k - 1].flags = NULL;
det_ctx->tx_candidates[k - 1].stream_reset = 0;
i--;
} else {
// progress in the sorted list
det_ctx->tx_candidates[k - 1].s = det_ctx->tx_candidates[j - 1].s;
det_ctx->tx_candidates[k - 1].id = det_ctx->tx_candidates[j - 1].id;
det_ctx->tx_candidates[k - 1].flags = det_ctx->tx_candidates[j - 1].flags;
det_ctx->tx_candidates[k - 1].stream_reset =
det_ctx->tx_candidates[j - 1].stream_reset;
j--;
}
} else {
// simply append the end of sorted list
det_ctx->tx_candidates[k - 1].s = s;
det_ctx->tx_candidates[k - 1].id = id;
det_ctx->tx_candidates[k - 1].flags = NULL;
det_ctx->tx_candidates[k - 1].stream_reset = 0;
i--;
SCLogDebug("%p/%" PRIu64 " rule %u (%u) added from 'match' list", tx.tx_ptr,
tx.tx_id, s->id, id);
}
k--;
} else {
i--;
}
}
}

static void DetectRunTx(ThreadVars *tv,
DetectEngineCtx *de_ctx,
DetectEngineThreadCtx *det_ctx,
Expand Down Expand Up @@ -1370,18 +1413,10 @@ static void DetectRunTx(ThreadVars *tv,
for (uint32_t i = 0; i < det_ctx->match_array_cnt; i++) {
const Signature *s = det_ctx->match_array[i];
if (s->app_inspect != NULL) {
const SigIntId id = s->num;
det_ctx->tx_candidates[array_idx].s = s;
det_ctx->tx_candidates[array_idx].id = id;
det_ctx->tx_candidates[array_idx].flags = NULL;
det_ctx->tx_candidates[array_idx].stream_reset = 0;
array_idx++;

SCLogDebug("%p/%"PRIu64" rule %u (%u) added from 'match' list",
tx.tx_ptr, tx.tx_id, s->id, id);
}
}
do_sort = (array_idx > x); // sort if match added anything
RuleMatchCandidateMergeSorted(det_ctx, x, array_idx);
SCLogDebug("%p/%" PRIu64 " rules added from 'match' list: %u", tx.tx_ptr, tx.tx_id,
array_idx - x);

Expand Down
Loading