Skip to content

Commit

Permalink
Add elapsed times to message filtering events
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Aug 23, 2024
1 parent dcc31e8 commit e4cd866
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 48 deletions.
3 changes: 3 additions & 0 deletions crates/common/src/telemetry/tracers/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,9 @@ impl StoreTracer {
| AuthEvent::Banned
| AuthEvent::Error
)
| EventType::Sieve(_)
| EventType::Milter(_)
| EventType::MtaHook(_)
)
})
}
Expand Down
14 changes: 9 additions & 5 deletions crates/jmap/src/api/management/sieve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ impl JMAP {
path: Vec<&str>,
body: Option<Vec<u8>>,
) -> trc::Result<HttpResponse> {
let script = match (
path.get(1)
.and_then(|name| self.core.sieve.scripts.get(*name))
.cloned(),
let (script, script_id) = match (
path.get(1).and_then(|name| {
self.core
.sieve
.scripts
.get(*name)
.map(|s| (s.clone(), name.to_string()))
}),
req.method(),
) {
(Some(script), &Method::POST) => script,
Expand Down Expand Up @@ -94,7 +98,7 @@ impl JMAP {
}

// Run script
let result = match self.smtp.run_script(script, params, 0).await {
let result = match self.smtp.run_script(script_id, script, params, 0).await {
ScriptResult::Accept { modifications } => Response::Accept { modifications },
ScriptResult::Replace {
message,
Expand Down
18 changes: 15 additions & 3 deletions crates/smtp/src/inbound/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ impl<T: SessionStream> Session<T> {
{
command.arg(argument);
}
let time = Instant::now();
match command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
Expand Down Expand Up @@ -475,20 +476,23 @@ impl<T: SessionStream> Session<T> {
SpanId = self.data.session_id,
Path = command_,
Result = output.status.to_string(),
Elapsed = time.elapsed(),
);
}
Ok(Err(err)) => {
trc::event!(
Smtp(SmtpEvent::PipeError),
SpanId = self.data.session_id,
Reason = err.to_string(),
Elapsed = time.elapsed(),
);
}
Err(_) => {
trc::event!(
Smtp(SmtpEvent::PipeError),
SpanId = self.data.session_id,
Reason = "Timeout",
Elapsed = time.elapsed(),
);
}
}
Expand All @@ -498,13 +502,15 @@ impl<T: SessionStream> Session<T> {
Smtp(SmtpEvent::PipeError),
SpanId = self.data.session_id,
Reason = err.to_string(),
Elapsed = time.elapsed(),
);
}
Err(_) => {
trc::event!(
Smtp(SmtpEvent::PipeError),
SpanId = self.data.session_id,
Reason = "Stdin timeout",
Elapsed = time.elapsed(),
);
}
}
Expand All @@ -513,6 +519,7 @@ impl<T: SessionStream> Session<T> {
Smtp(SmtpEvent::PipeError),
SpanId = self.data.session_id,
Reason = "Stdin not available",
Elapsed = time.elapsed(),
);
}
}
Expand All @@ -528,12 +535,17 @@ impl<T: SessionStream> Session<T> {
}

// Sieve filtering
if let Some(script) = self
if let Some((script, script_id)) = self
.core
.core
.eval_if::<String, _>(&dc.script, self, self.data.session_id)
.await
.and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id))
.and_then(|name| {
self.core
.core
.get_sieve_script(&name, self.data.session_id)
.map(|s| (s, name))
})
{
let params = self
.build_script_parameters("data")
Expand Down Expand Up @@ -584,7 +596,7 @@ impl<T: SessionStream> Session<T> {
.unwrap_or_default(),
);

let modifications = match self.run_script(script.clone(), params).await {
let modifications = match self.run_script(script_id, script.clone(), params).await {
ScriptResult::Accept { modifications } => modifications,
ScriptResult::Replace {
message,
Expand Down
15 changes: 12 additions & 3 deletions crates/smtp/src/inbound/ehlo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<T: SessionStream> Session<T> {
}

// Sieve filtering
if let Some(script) = self
if let Some((script, script_id)) = self
.core
.core
.eval_if::<String, _>(
Expand All @@ -84,10 +84,19 @@ impl<T: SessionStream> Session<T> {
self.data.session_id,
)
.await
.and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id))
.and_then(|name| {
self.core
.core
.get_sieve_script(&name, self.data.session_id)
.map(|s| (s, name))
})
{
if let ScriptResult::Reject(message) = self
.run_script(script.clone(), self.build_script_parameters("ehlo"))
.run_script(
script_id,
script.clone(),
self.build_script_parameters("ehlo"),
)
.await
{
self.data.mail_from = None;
Expand Down
10 changes: 5 additions & 5 deletions crates/smtp/src/inbound/hooks/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

use std::time::Instant;

use ahash::AHashMap;
use common::{
config::smtp::session::{MTAHook, Stage},
Expand Down Expand Up @@ -50,6 +52,7 @@ impl<T: SessionStream> Session<T> {
continue;
}

let time = Instant::now();
match self.run_mta_hook(stage, mta_hook, message).await {
Ok(response) => {
trc::event!(
Expand All @@ -61,11 +64,7 @@ impl<T: SessionStream> Session<T> {
}),
SpanId = self.data.session_id,
Id = mta_hook.id.clone(),
Contents = response
.modifications
.iter()
.map(|m| format!("{m:?}"))
.collect::<Vec<_>>()
Elapsed = time.elapsed(),
);

let mut new_modifications = Vec::with_capacity(response.modifications.len());
Expand Down Expand Up @@ -157,6 +156,7 @@ impl<T: SessionStream> Session<T> {
SpanId = self.data.session_id,
Id = mta_hook.id.clone(),
Reason = err,
Elapsed = time.elapsed(),
);

if mta_hook.tempfail_on_error {
Expand Down
15 changes: 12 additions & 3 deletions crates/smtp/src/inbound/mail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<T: SessionStream> Session<T> {
.into();

// Sieve filtering
if let Some(script) = self
if let Some((script, script_id)) = self
.core
.core
.eval_if::<String, _>(
Expand All @@ -130,10 +130,19 @@ impl<T: SessionStream> Session<T> {
self.data.session_id,
)
.await
.and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id))
.and_then(|name| {
self.core
.core
.get_sieve_script(&name, self.data.session_id)
.map(|s| (s, name))
})
{
match self
.run_script(script.clone(), self.build_script_parameters("mail"))
.run_script(
script_id,
script.clone(),
self.build_script_parameters("mail"),
)
.await
{
ScriptResult::Accept { modifications } => {
Expand Down
10 changes: 5 additions & 5 deletions crates/smtp/src/inbound/milter/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/

use std::borrow::Cow;
use std::{borrow::Cow, time::Instant};

use common::{
config::smtp::session::{Milter, Stage},
Expand Down Expand Up @@ -53,16 +53,14 @@ impl<T: SessionStream> Session<T> {
continue;
}

let time = Instant::now();
match self.connect_and_run(milter, message).await {
Ok(new_modifications) => {
trc::event!(
Milter(MilterEvent::ActionAccept),
SpanId = self.data.session_id,
Id = milter.id.to_string(),
Contents = new_modifications
.iter()
.map(|m| m.to_string())
.collect::<Vec<_>>(),
Elapsed = time.elapsed(),
);

if !modifications.is_empty() {
Expand Down Expand Up @@ -95,6 +93,7 @@ impl<T: SessionStream> Session<T> {
}),
SpanId = self.data.session_id,
Id = milter.id.to_string(),
Elapsed = time.elapsed(),
);

return Err(match action {
Expand Down Expand Up @@ -144,6 +143,7 @@ impl<T: SessionStream> Session<T> {
SpanId = self.data.session_id,
Id = milter.id.to_string(),
Details = details,
Elapsed = time.elapsed(),
);

if milter.tempfail_on_error {
Expand Down
16 changes: 12 additions & 4 deletions crates/smtp/src/inbound/rcpt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,12 @@ impl<T: SessionStream> Session<T> {
self.data.session_id,
)
.await
.and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id))
.cloned();
.and_then(|name| {
self.core
.core
.get_sieve_script(&name, self.data.session_id)
.map(|s| (s.clone(), name))
});

if rcpt_script.is_some()
|| !self.core.core.smtp.session.rcpt.rewrite.is_empty()
Expand All @@ -100,9 +104,13 @@ impl<T: SessionStream> Session<T> {
.any(|m| m.run_on_stage.contains(&Stage::Rcpt))
{
// Sieve filtering
if let Some(script) = rcpt_script {
if let Some((script, script_id)) = rcpt_script {
match self
.run_script(script.clone(), self.build_script_parameters("rcpt"))
.run_script(
script_id,
script.clone(),
self.build_script_parameters("rcpt"),
)
.await
{
ScriptResult::Accept { modifications } => {
Expand Down
15 changes: 12 additions & 3 deletions crates/smtp/src/inbound/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,24 @@ impl<T: SessionStream> Session<T> {
let config = &self.core.core.smtp.session.connect;

// Sieve filtering
if let Some(script) = self
if let Some((script, script_id)) = self
.core
.core
.eval_if::<String, _>(&config.script, self, self.data.session_id)
.await
.and_then(|name| self.core.core.get_sieve_script(&name, self.data.session_id))
.and_then(|name| {
self.core
.core
.get_sieve_script(&name, self.data.session_id)
.map(|s| (s, name))
})
{
if let ScriptResult::Reject(message) = self
.run_script(script.clone(), self.build_script_parameters("connect"))
.run_script(
script_id,
script.clone(),
self.build_script_parameters("connect"),
)
.await
{
let _ = self.write(message.as_bytes()).await;
Expand Down
Loading

0 comments on commit e4cd866

Please sign in to comment.