Skip to content

Commit

Permalink
Remove old locks
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Dec 22, 2024
1 parent 3d8f242 commit 25e7983
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
3 changes: 1 addition & 2 deletions crates/main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ trc = { path = "../trc" }
utils = { path = "../utils" }
tokio = { version = "1.23", features = ["full"] }


[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

[features]
default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "enterprise"]
#default = ["rocks"]
#default = ["rocks", "enterprise"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation", "common/foundation"]
postgres = ["store/postgres"]
Expand Down
17 changes: 10 additions & 7 deletions crates/smtp/src/inbound/spam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ use spam_filter::{
bayes::SpamFilterAnalyzeBayes, date::SpamFilterAnalyzeDate, dmarc::SpamFilterAnalyzeDmarc,
domain::SpamFilterAnalyzeDomain, ehlo::SpamFilterAnalyzeEhlo, from::SpamFilterAnalyzeFrom,
headers::SpamFilterAnalyzeHeaders, html::SpamFilterAnalyzeHtml, init::SpamFilterInit,
ip::SpamFilterAnalyzeIp, llm::SpamFilterAnalyzeLlm, messageid::SpamFilterAnalyzeMid,
mime::SpamFilterAnalyzeMime, pyzor::SpamFilterAnalyzePyzor,
received::SpamFilterAnalyzeReceived, recipient::SpamFilterAnalyzeRecipient,
replyto::SpamFilterAnalyzeReplyTo, reputation::SpamFilterAnalyzeReputation,
rules::SpamFilterAnalyzeRules, score::SpamFilterAnalyzeScore,
subject::SpamFilterAnalyzeSubject, trusted_reply::SpamFilterAnalyzeTrustedReply,
url::SpamFilterAnalyzeUrl,
ip::SpamFilterAnalyzeIp, messageid::SpamFilterAnalyzeMid, mime::SpamFilterAnalyzeMime,
pyzor::SpamFilterAnalyzePyzor, received::SpamFilterAnalyzeReceived,
recipient::SpamFilterAnalyzeRecipient, replyto::SpamFilterAnalyzeReplyTo,
reputation::SpamFilterAnalyzeReputation, rules::SpamFilterAnalyzeRules,
score::SpamFilterAnalyzeScore, subject::SpamFilterAnalyzeSubject,
trusted_reply::SpamFilterAnalyzeTrustedReply, url::SpamFilterAnalyzeUrl,
},
SpamFilterInput,
};

#[cfg(feature = "enterprise")]
use spam_filter::analysis::llm::SpamFilterAnalyzeLlm;

use crate::core::Session;

impl<T: SessionStream> Session<T> {
Expand Down Expand Up @@ -90,6 +92,7 @@ impl<T: SessionStream> Session<T> {
server.spam_filter_analyze_html(&mut ctx).await;

// LLM classification
#[cfg(feature = "enterprise")]
server.spam_filter_analyze_llm(&mut ctx).await;

// Trusted reply analysis
Expand Down
27 changes: 26 additions & 1 deletion crates/smtp/src/reporting/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
time::{Duration, SystemTime},
};
use store::{
write::{now, QueueClass, ReportEvent, ValueClass},
write::{now, BatchBuilder, QueueClass, ReportEvent, ValueClass},
Deserialize, IterateParams, Store, ValueKey,
};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -115,11 +115,23 @@ async fn next_report_event(store: Store) -> Vec<QueueClass> {
)));

let mut events = Vec::new();
let mut old_locks = Vec::new();
let result = store
.iterate(
IterateParams::new(from_key, to_key).ascending().no_values(),
|key, _| {
let event = ReportEvent::deserialize(key)?;

// TODO - REMOVEME - Part of v0.11 migration
if event.seq_id == 0 {
old_locks.push(if *key.last().unwrap() == 0 {
QueueClass::DmarcReportHeader(event)
} else {
QueueClass::TlsReportHeader(event)
});
return Ok(true);
}

let do_continue = event.due <= now;
events.push(if *key.last().unwrap() == 0 {
QueueClass::DmarcReportHeader(event)
Expand All @@ -131,6 +143,19 @@ async fn next_report_event(store: Store) -> Vec<QueueClass> {
)
.await;

// TODO - REMOVEME - Part of v0.11 migration
if !old_locks.is_empty() {
let mut batch = BatchBuilder::new();
for event in old_locks {
batch.clear(ValueClass::Queue(event));
}
if let Err(err) = store.write(batch.build()).await {
trc::error!(err
.caused_by(trc::location!())
.details("Failed to remove old report events"));
}
}

if let Err(err) = result {
trc::error!(err
.caused_by(trc::location!())
Expand Down
2 changes: 1 addition & 1 deletion tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ resolver = "2"

[features]
default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "foundationdb"]
#default = ["rocks", "sqlite"]
#default = ["rocks"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation", "common/foundation"]
postgres = ["store/postgres"]
Expand Down

0 comments on commit 25e7983

Please sign in to comment.