Skip to content

Commit

Permalink
Fix tracking locked queue ids (closes #1066)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Jan 11, 2025
1 parent fcbf58f commit dc19f20
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 126 deletions.
15 changes: 10 additions & 5 deletions crates/common/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,21 @@ pub struct EncryptionKeys {
pub enum QueueEvent {
Refresh(Option<u64>),
WorkerDone(u64),
OnHold(OnHold<QueuedMessage>),
OnHold { queue_id: u64, status: OnHold },
Paused(bool),
Stop,
}

#[derive(Debug)]
pub struct OnHold<T> {
pub next_due: Option<u64>,
pub limiters: Vec<ConcurrencyLimiter>,
pub message: T,
pub enum OnHold {
InFlight,
Locked {
until: u64,
},
ConcurrencyLimited {
limiters: Vec<ConcurrencyLimiter>,
next_due: Option<u64>,
},
}

#[derive(Debug, Clone, Copy)]
Expand Down
4 changes: 2 additions & 2 deletions crates/main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ tokio = { version = "1.23", features = ["full"] }
jemallocator = "0.5.0"

[features]
default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "enterprise"]
#default = ["rocks", "enterprise"]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "enterprise"]
default = ["rocks", "enterprise"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation", "common/foundation"]
postgres = ["store/postgres"]
Expand Down
50 changes: 22 additions & 28 deletions crates/smtp/src/outbound/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common::config::{
server::ServerProtocol,
smtp::{queue::RequireOptional, report::AggregateFrequency},
};
use common::ipc::{OnHold, PolicyType, QueueEvent, QueuedMessage, TlsEvent};
use common::ipc::{OnHold, PolicyType, QueueEvent, TlsEvent};
use common::Server;
use mail_auth::{
mta_sts::TlsRpt,
Expand All @@ -41,16 +41,7 @@ use super::{lookup::ToNextHop, mta_sts, session::SessionParams, NextHop, TlsStra
use crate::queue::{throttle, DeliveryAttempt, Domain, Error, QueueEnvelope, Status};

impl DeliveryAttempt {
pub fn try_deliver(mut self, server: Server) -> Option<OnHold<QueuedMessage>> {
// Global concurrency limiter
if let Err(limiter) = server.is_outbound_allowed(&mut self.in_flight) {
return Some(OnHold {
next_due: None,
limiters: vec![limiter],
message: self.event,
});
}

pub fn try_deliver(self, server: Server) {
tokio::spawn(async move {
// Lock queue event
let queue_id = self.event.queue_id;
Expand Down Expand Up @@ -123,11 +114,12 @@ impl DeliveryAttempt {
QueueEvent::WorkerDone(queue_id)
}
} else {
QueueEvent::OnHold(OnHold {
next_due: Some(LOCK_EXPIRY + 1),
limiters: vec![],
message: self.event,
})
QueueEvent::OnHold {
queue_id: self.event.queue_id,
status: OnHold::Locked {
until: now() + LOCK_EXPIRY + 1,
},
}
};

// Notify queue manager
Expand All @@ -139,8 +131,6 @@ impl DeliveryAttempt {
);
}
});

None
}

async fn deliver_task(mut self, server: Server, mut message: Message) -> QueueEvent {
Expand Down Expand Up @@ -193,11 +183,13 @@ impl DeliveryAttempt {
SpanId = span_id,
);

QueueEvent::OnHold(OnHold {
next_due,
limiters: vec![limiter],
message: self.event,
})
QueueEvent::OnHold {
queue_id: self.event.queue_id,
status: OnHold::ConcurrencyLimited {
next_due,
limiters: vec![limiter],
},
}
}
throttle::Error::Rate { retry_at } => {
// Save changes to disk
Expand Down Expand Up @@ -1339,11 +1331,13 @@ impl DeliveryAttempt {
SpanId = span_id,
);

QueueEvent::OnHold(OnHold {
next_due,
limiters: on_hold,
message: self.event,
})
QueueEvent::OnHold {
queue_id: self.event.queue_id,
status: OnHold::ConcurrencyLimited {
next_due,
limiters: on_hold,
},
}
} else if let Some(due) = message.next_event() {
trc::event!(
Queue(trc::QueueEvent::Rescheduled),
Expand Down
Loading

0 comments on commit dc19f20

Please sign in to comment.