Skip to content

Commit

Permalink
Message delivery history + Live tracing 💎
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Aug 14, 2024
1 parent be656dd commit abd318b
Show file tree
Hide file tree
Showing 72 changed files with 5,558 additions and 2,897 deletions.
2 changes: 1 addition & 1 deletion crates/common/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl Core {
// SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <[email protected]>
// SPDX-License-Identifier: LicenseRef-SEL
#[cfg(feature = "enterprise")]
let enterprise = crate::enterprise::Enterprise::parse(config, &data).await;
let enterprise = crate::enterprise::Enterprise::parse(config, &stores, &data).await;

#[cfg(feature = "enterprise")]
if enterprise.is_none() {
Expand Down
13 changes: 0 additions & 13 deletions crates/common/src/config/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,3 @@ impl Display for ServerProtocol {
f.write_str(self.as_str())
}
}

impl From<ServerProtocol> for trc::Value {
fn from(value: ServerProtocol) -> Self {
trc::Value::Protocol(match value {
ServerProtocol::Smtp => trc::Protocol::Smtp,
ServerProtocol::Lmtp => trc::Protocol::Lmtp,
ServerProtocol::Imap => trc::Protocol::Imap,
ServerProtocol::Pop3 => trc::Protocol::Pop3,
ServerProtocol::Http => trc::Protocol::Http,
ServerProtocol::ManageSieve => trc::Protocol::ManageSieve,
})
}
}
51 changes: 47 additions & 4 deletions crates/common/src/config/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use opentelemetry_sdk::{
Resource,
};
use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
use trc::{subscriber::Interests, EventType, Level, TelemetryEvent};
use store::Stores;
use trc::{ipc::subscriber::Interests, EventType, Level, TelemetryEvent};
use utils::config::{utils::ParseValue, Config};

#[derive(Debug)]
Expand All @@ -42,6 +43,8 @@ pub enum TelemetrySubscriberType {
Webhook(WebhookTracer),
#[cfg(unix)]
JournalTracer(crate::telemetry::tracers::journald::Subscriber),
#[cfg(feature = "enterprise")]
StoreTracer(StoreTracer),
}

#[derive(Debug)]
Expand Down Expand Up @@ -87,6 +90,12 @@ pub struct WebhookTracer {
pub headers: HeaderMap,
}

#[derive(Debug)]
#[cfg(feature = "enterprise")]
pub struct StoreTracer {
pub store: store::Store,
}

#[derive(Debug)]
pub enum RotationStrategy {
Daily,
Expand Down Expand Up @@ -120,9 +129,9 @@ pub struct PrometheusMetrics {
}

impl Telemetry {
pub fn parse(config: &mut Config) -> Self {
pub fn parse(config: &mut Config, stores: &Stores) -> Self {
let mut telemetry = Telemetry {
tracers: Tracers::parse(config),
tracers: Tracers::parse(config, stores),
metrics: Interests::default(),
};

Expand Down Expand Up @@ -155,7 +164,7 @@ impl Telemetry {
}

impl Tracers {
pub fn parse(config: &mut Config) -> Self {
pub fn parse(config: &mut Config, stores: &Stores) -> Self {
// Parse custom logging levels
let mut custom_levels = AHashMap::new();
for event_name in config
Expand Down Expand Up @@ -485,6 +494,8 @@ impl Tracers {
TelemetrySubscriberType::JournalTracer(_) => {
EventType::Telemetry(TelemetryEvent::JournalError).into()
}
#[cfg(feature = "enterprise")]
TelemetrySubscriberType::StoreTracer(_) => None,
};

// Parse disabled events
Expand Down Expand Up @@ -515,6 +526,38 @@ impl Tracers {
}
}

// Parse tracing history
#[cfg(feature = "enterprise")]
{
if config
.property_or_default("tracing.history.enable", "false")
.unwrap_or(false)
{
if let Some(store_id) = config.value_require("tracing.history.store") {
if let Some(store) = stores.stores.get(store_id) {
let mut tracer = TelemetrySubscriber {
id: "history".to_string(),
interests: Default::default(),
lossy: false,
typ: TelemetrySubscriberType::StoreTracer(StoreTracer {
store: store.clone(),
}),
};

for event_type in StoreTracer::default_events() {
tracer.interests.set(event_type);
global_interests.set(event_type);
}

tracers.push(tracer);
} else {
let err = format!("Store {store_id} not found");
config.new_build_error("tracing.history.store", err);
}
}
}
}

// Parse webhooks
for id in config
.sub_keys("webhook", ".url")
Expand Down
13 changes: 10 additions & 3 deletions crates/common/src/enterprise/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
use std::time::Duration;

use jmap_proto::types::collection::Collection;
use store::{BitmapKey, Store};
use store::{BitmapKey, Store, Stores};
use utils::config::Config;

use super::{license::LicenseValidator, Enterprise};

impl Enterprise {
pub async fn parse(config: &mut Config, data: &Store) -> Option<Self> {
pub async fn parse(config: &mut Config, stores: &Stores, data: &Store) -> Option<Self> {
let license = match LicenseValidator::new()
.try_parse(config.value("enterprise.license-key")?)
.and_then(|key| {
Expand Down Expand Up @@ -57,8 +57,15 @@ impl Enterprise {
Some(Enterprise {
license,
undelete_period: config
.property_or_default::<Option<Duration>>("enterprise.undelete-period", "false")
.property_or_default::<Option<Duration>>("storage.undelete.hold-for", "false")
.unwrap_or_default(),
trace_hold_period: config
.property_or_default::<Option<Duration>>("tracing.history.hold-for", "90d")
.unwrap_or(Some(Duration::from_secs(90 * 24 * 60 * 60))),
trace_store: config
.value("tracing.history.store")
.and_then(|name| stores.stores.get(name))
.cloned(),
})
}
}
3 changes: 3 additions & 0 deletions crates/common/src/enterprise/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ use std::time::Duration;

use license::LicenseKey;
use mail_parser::DateTime;
use store::Store;

use crate::Core;

#[derive(Clone)]
pub struct Enterprise {
pub license: LicenseKey,
pub undelete_period: Option<Duration>,
pub trace_hold_period: Option<Duration>,
pub trace_store: Option<Store>,
}

impl Core {
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/listener/acme/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde::Deserialize;
use serde_json::json;
use store::write::Bincode;
use store::Serialize;
use trc::conv::AssertSuccess;
use trc::event::conv::AssertSuccess;

use super::jose::{
key_authorization, key_authorization_sha256, key_authorization_sha256_base64, sign,
Expand Down
40 changes: 28 additions & 12 deletions crates/common/src/listener/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::{
sync::watch,
};
use tokio_rustls::server::TlsStream;
use trc::{EventType, HttpEvent, ImapEvent, ManageSieveEvent, Pop3Event, SmtpEvent};
use utils::{config::Config, UnwrapFailure};

use crate::{
Expand Down Expand Up @@ -69,7 +70,6 @@ impl Server {
trc::event!(
Network(trc::NetworkEvent::ListenStart),
ListenerId = instance.id.clone(),
Protocol = instance.protocol,
LocalIp = local_addr.ip(),
LocalPort = local_addr.port(),
Tls = is_tls,
Expand All @@ -81,7 +81,6 @@ impl Server {
trc::event!(
Network(trc::NetworkEvent::ListenError),
ListenerId = instance.id.clone(),
Protocol = instance.protocol,
LocalIp = local_addr.ip(),
LocalPort = local_addr.port(),
Tls = is_tls,
Expand All @@ -98,6 +97,29 @@ impl Server {
let instance = instance.clone();
let core = core.clone();
tokio::spawn(async move {
let (span_start, span_end) = match self.protocol {
ServerProtocol::Smtp | ServerProtocol::Lmtp => (
EventType::Smtp(SmtpEvent::ConnectionStart),
EventType::Smtp(SmtpEvent::ConnectionEnd),
),
ServerProtocol::Imap => (
EventType::Imap(ImapEvent::ConnectionStart),
EventType::Imap(ImapEvent::ConnectionEnd),
),
ServerProtocol::Pop3 => (
EventType::Pop3(Pop3Event::ConnectionStart),
EventType::Pop3(Pop3Event::ConnectionEnd),
),
ServerProtocol::Http => (
EventType::Http(HttpEvent::ConnectionStart),
EventType::Http(HttpEvent::ConnectionEnd),
),
ServerProtocol::ManageSieve => (
EventType::ManageSieve(ManageSieveEvent::ConnectionStart),
EventType::ManageSieve(ManageSieveEvent::ConnectionEnd),
),
};

loop {
tokio::select! {
stream = listener.accept() => {
Expand All @@ -122,14 +144,13 @@ impl Server {
.unwrap_or(remote_addr);
if let Some(session) = instance.build_session(stream, local_addr, remote_addr, &core) {
// Spawn session
manager.spawn(session, is_tls, enable_acme);
manager.spawn(session, is_tls, enable_acme, span_start, span_end);
}
}
Err(err) => {
trc::event!(
Network(trc::NetworkEvent::ProxyError),
ListenerId = instance.id.clone(),
Protocol = instance.protocol,
LocalIp = local_addr.ip(),
LocalPort = local_addr.port(),
Tls = is_tls,
Expand All @@ -143,14 +164,13 @@ impl Server {
opts.apply(&session.stream);

// Spawn session
manager.spawn(session, is_tls, enable_acme);
manager.spawn(session, is_tls, enable_acme, span_start, span_end);
}
}
Err(err) => {
trc::event!(
Network(trc::NetworkEvent::AcceptError),
ListenerId = instance.id.clone(),
Protocol = instance.protocol,
LocalIp = local_addr.ip(),
LocalPort = local_addr.port(),
Tls = is_tls,
Expand All @@ -164,7 +184,6 @@ impl Server {
trc::event!(
Network(trc::NetworkEvent::ListenStop),
ListenerId = instance.id.clone(),
Protocol = instance.protocol,
LocalIp = local_addr.ip(),
Tls = is_tls,
LocalPort = local_addr.port(),
Expand Down Expand Up @@ -213,7 +232,7 @@ impl BuildSession for Arc<ServerInstance> {
trc::event!(
Network(trc::NetworkEvent::DropBlocked),
ListenerId = self.id.clone(),
Protocol = self.protocol,
LocalPort = local_addr.port(),
RemoteIp = remote_ip,
RemotePort = remote_port,
);
Expand All @@ -236,7 +255,7 @@ impl BuildSession for Arc<ServerInstance> {
trc::event!(
Limit(trc::LimitEvent::ConcurrentConnection),
ListenerId = self.id.clone(),
Protocol = self.protocol,
LocalPort = local_addr.port(),
RemoteIp = remote_ip,
RemotePort = remote_port,
Limit = self.limiter.max_concurrent,
Expand Down Expand Up @@ -349,7 +368,6 @@ impl ServerInstance {
trc::event!(
Tls(trc::TlsEvent::Handshake),
ListenerId = self.id.clone(),
Protocol = self.protocol,
SpanId = session_id,
Version = format!(
"{:?}",
Expand All @@ -374,7 +392,6 @@ impl ServerInstance {
trc::event!(
Tls(trc::TlsEvent::HandshakeError),
ListenerId = self.id.clone(),
Protocol = self.protocol,
SpanId = session_id,
Reason = err.to_string(),
);
Expand All @@ -385,7 +402,6 @@ impl ServerInstance {
trc::event!(
Tls(trc::TlsEvent::NotConfigured),
ListenerId = self.id.clone(),
Protocol = self.protocol,
SpanId = session_id,
);
Err(())
Expand Down
Loading

0 comments on commit abd318b

Please sign in to comment.