Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 55 additions & 86 deletions rust/src/mqtt/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ impl MQTTState {
}

fn new_tx(&mut self, msg: MQTTMessage, toclient: bool) -> MQTTTransaction {
let direction = if toclient {
Direction::ToClient
} else {
Direction::ToServer
};
let direction = if toclient {
Direction::ToClient
} else {
Direction::ToServer
};
let mut tx = MQTTTransaction::new(msg, direction);
self.tx_id += 1;
tx.tx_id = self.tx_id;
Expand Down Expand Up @@ -217,104 +217,82 @@ impl MQTTState {
match msg.op {
MQTTOperation::CONNECT(ref conn) => {
self.protocol_version = conn.protocol_version;
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
if self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
self.transactions.push_back(tx);
}
self.transactions.push_back(tx);
}
MQTTOperation::PUBLISH(ref publish) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
match msg.header.qos_level {
let qos = msg.header.qos_level;
let pkt_id = publish.message_id;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
if let Some(pkt_id) = publish.message_id {
let mut tx = self.new_tx(msg, toclient);
if let Some(pkt_id) = pkt_id {
tx.pkt_id = Some(pkt_id as u32);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
self.transactions.push_back(tx);
}
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
}
MQTTOperation::SUBSCRIBE(ref subscribe) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
self.transactions.push_back(tx);
}
MQTTOperation::SUBSCRIBE(ref subscribe) => {
let pkt_id = subscribe.message_id as u32;
match msg.header.qos_level {
let qos = msg.header.qos_level;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push_back(tx);
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
}
MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
self.transactions.push_back(tx);
}
MQTTOperation::UNSUBSCRIBE(ref unsubscribe) => {
let pkt_id = unsubscribe.message_id as u32;
match msg.header.qos_level {
let qos = msg.header.qos_level;
let mut tx = self.new_tx(msg, toclient);
match qos {
0 => {
// with QOS level 0, we do not need to wait for a
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push_back(tx);
}
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push_back(tx);
}
}
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
self.transactions.push_back(tx);
}
MQTTOperation::CONNACK(ref _connack) => {
if let Some(tx) = self.get_tx_by_pkt_id(MQTT_CONNECT_PKT_ID) {
Expand All @@ -325,72 +303,65 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBREC(ref v) | MQTTOperation::PUBREL(ref v) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
tx.msg.push(msg);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::PUBACK(ref v) | MQTTOperation::PUBCOMP(ref v) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::SUBACK(ref suback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
MQTTOperation::UNSUBACK(ref unsuback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
tx.msg.push(msg);
tx.complete = true;
tx.pkt_id = None;
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
if !self.connected {
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
}
tx.complete = true;
self.transactions.push_back(tx);
}
}
Expand All @@ -406,25 +377,19 @@ impl MQTTState {
self.transactions.push_back(tx);
}
MQTTOperation::AUTH(_) | MQTTOperation::DISCONNECT(_) => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
MQTTOperation::PINGREQ | MQTTOperation::PINGRESP => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push_back(tx);
}
}
Expand Down Expand Up @@ -608,7 +573,11 @@ impl MQTTState {
}

fn set_event_notx(&mut self, event: MQTTEvent, toclient: bool) {
let mut tx = MQTTTransaction::new_empty(if toclient { Direction::ToClient } else { Direction::ToServer });
let mut tx = MQTTTransaction::new_empty(if toclient {
Direction::ToClient
} else {
Direction::ToServer
});
self.tx_id += 1;
tx.tx_id = self.tx_id;
if toclient {
Expand Down
68 changes: 50 additions & 18 deletions src/detect.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,24 @@ static void DetectRun(ThreadVars *th_v,
DetectRunFrames(th_v, de_ctx, det_ctx, p, pflow, &scratch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit message is very unclear, not understanding what we're doing and why

// PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX);
}
// no update to transactions
if (!PKT_IS_PSEUDOPKT(p) && p->app_update_direction == 0 &&
((PKT_IS_TOSERVER(p) && (p->flow->flags & FLOW_TS_APP_UPDATED) == 0) ||
(PKT_IS_TOCLIENT(p) && (p->flow->flags & FLOW_TC_APP_UPDATED) == 0))) {
goto end;
}
} else if (p->proto == IPPROTO_UDP) {
DetectRunFrames(th_v, de_ctx, det_ctx, p, pflow, &scratch);
}

PACKET_PROFILING_DETECT_START(p, PROF_DETECT_TX);
DetectRunTx(th_v, de_ctx, det_ctx, p, pflow, &scratch);
PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX);
/* see if we need to increment the inspect_id and reset the de_state */
PACKET_PROFILING_DETECT_START(p, PROF_DETECT_TX_UPDATE);
AppLayerParserSetTransactionInspectId(
pflow, pflow->alparser, pflow->alstate, scratch.flow_flags, (scratch.sgh == NULL));
PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX_UPDATE);
}

end:
Expand Down Expand Up @@ -919,14 +930,6 @@ static inline void DetectRunPostRules(
Flow * const pflow,
DetectRunScratchpad *scratch)
{
/* see if we need to increment the inspect_id and reset the de_state */
if (pflow && pflow->alstate) {
PACKET_PROFILING_DETECT_START(p, PROF_DETECT_TX_UPDATE);
AppLayerParserSetTransactionInspectId(pflow, pflow->alparser, pflow->alstate,
scratch->flow_flags, (scratch->sgh == NULL));
PACKET_PROFILING_DETECT_END(p, PROF_DETECT_TX_UPDATE);
}

/* so now let's iterate the alerts and remove the ones after a pass rule
* matched (if any). This is done inside PacketAlertFinalize() */
/* PR: installed "tag" keywords are handled after the threshold inspection */
Expand Down Expand Up @@ -1223,7 +1226,7 @@ static bool DetectRunTxInspectRule(ThreadVars *tv,
} else if ((inspect_flags & DE_STATE_FLAG_FULL_INSPECT) == 0 && mpm_in_progress) {
TRACE_SID_TXS(s->id, tx, "no need to store no-match sig, "
"mpm will revisit it");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit message not explaining why/how and problem this solves

} else {
} else if (inspect_flags != 0) {
TRACE_SID_TXS(s->id, tx, "storing state: flags %08x", inspect_flags);
DetectRunStoreStateTx(scratch->sgh, f, tx->tx_ptr, tx->tx_id, s,
inspect_flags, flow_flags, file_no_match);
Expand Down Expand Up @@ -1375,21 +1378,50 @@ static void DetectRunTx(ThreadVars *tv,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit message should explain why & how

/* merge 'state' rules from the regular prefilter */
uint32_t x = array_idx;
uint32_t j = x;
for (uint32_t i = 0; i < det_ctx->match_array_cnt; i++) {
const Signature *s = det_ctx->match_array[i];
if (s->app_inspect != NULL) {
const SigIntId id = s->num;
det_ctx->tx_candidates[array_idx].s = s;
det_ctx->tx_candidates[array_idx].id = id;
det_ctx->tx_candidates[array_idx].flags = NULL;
det_ctx->tx_candidates[array_idx].stream_reset = 0;
array_idx++;

SCLogDebug("%p/%"PRIu64" rule %u (%u) added from 'match' list",
tx.tx_ptr, tx.tx_id, s->id, id);
}
}
do_sort = (array_idx > x); // sort if match added anything
uint32_t k = array_idx;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this into a static inline helper func? Code is getting too large here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

for (uint32_t i = det_ctx->match_array_cnt; i > 0;) {
const Signature *s = det_ctx->match_array[i - 1];
if (s->app_inspect != NULL) {
const SigIntId id = s->num;
if (j > 0) {
const RuleMatchCandidateTx *s0 = &det_ctx->tx_candidates[j - 1];
if (s->id > s0->id) {
det_ctx->tx_candidates[k - 1].s = s;
det_ctx->tx_candidates[k - 1].id = id;
det_ctx->tx_candidates[k - 1].flags = NULL;
det_ctx->tx_candidates[k - 1].stream_reset = 0;
i--;
} else {
// progress in the sorted list
det_ctx->tx_candidates[k - 1].s = det_ctx->tx_candidates[j - 1].s;
det_ctx->tx_candidates[k - 1].id = det_ctx->tx_candidates[j - 1].id;
det_ctx->tx_candidates[k - 1].flags = det_ctx->tx_candidates[j - 1].flags;
det_ctx->tx_candidates[k - 1].stream_reset =
det_ctx->tx_candidates[j - 1].stream_reset;
j--;
}
} else {
// simply append the end of sorted list
det_ctx->tx_candidates[k - 1].s = s;
det_ctx->tx_candidates[k - 1].id = id;
det_ctx->tx_candidates[k - 1].flags = NULL;
det_ctx->tx_candidates[k - 1].stream_reset = 0;
i--;
SCLogDebug("%p/%" PRIu64 " rule %u (%u) added from 'match' list", tx.tx_ptr,
tx.tx_id, s->id, id);
}
k--;
} else {
i--;
}
}
SCLogDebug("%p/%" PRIu64 " rules added from 'match' list: %u", tx.tx_ptr, tx.tx_id,
array_idx - x);

Expand Down
1 change: 1 addition & 0 deletions src/util-unittest-helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ Packet *UTHBuildPacketReal(uint8_t *payload, uint16_t payload_len,
}
SET_PKT_LEN(p, hdr_offset + payload_len);
p->payload = GET_PKT_DATA(p)+hdr_offset;
p->app_update_direction = UPDATE_DIR_BOTH;

return p;

Expand Down