Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/migrations/20251028105101_mailbox.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Add down migration script here
DROP TABLE IF EXISTS mailbox;
DROP TYPE IF EXISTS mailbox_type;
DROP SEQUENCE IF EXISTS mailbox_id_seq;
DROP INDEX IF EXISTS idx_mailbox_type_mailbox_id_message_id;
19 changes: 19 additions & 0 deletions backend/migrations/20251028105101_mailbox.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Add up migration script here
CREATE SEQUENCE IF NOT EXISTS mailbox_id_seq;

CREATE TYPE mailbox_type AS ENUM (
'trigger',
'debouncing_stale_data'
);

CREATE TABLE mailbox(
message_id BIGINT DEFAULT nextval('mailbox_id_seq') PRIMARY KEY, -- Also indicates position in stack
mailbox_id TEXT, -- Can be NULL
workspace_id character varying(50) NOT NULL,
type mailbox_type NOT NULL, -- Type of mailbox
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
payload JSONB NOT NULL -- Payload of specific message
);

CREATE INDEX idx_mailbox_type_mailbox_id_message_id
ON mailbox(type, mailbox_id, message_id ASC);
1 change: 1 addition & 0 deletions backend/windmill-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub mod git_sync_ee;
pub mod git_sync_oss;
pub mod jobs;
pub mod jwt;
pub mod mailbox;
pub mod mcp_client;
pub mod more_serde;
pub mod oauth2;
Expand Down
302 changes: 302 additions & 0 deletions backend/windmill-common/src/mailbox.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
use sqlx::Postgres;

use crate::error;

#[derive(Clone)]
pub struct Mailbox {
mailbox_id: Option<String>,
mailbox_type: MailboxType,
workspace_id: String,
}

pub type MsgPayload = serde_json::Value;

#[derive(sqlx::FromRow, Debug, Clone)]
pub struct MailboxMsg {
pub id: i64,
pub payload: MsgPayload,
pub created_at: chrono::DateTime<chrono::Utc>,
}

#[derive(sqlx::Type, Clone, Copy)]
#[sqlx(rename_all = "snake_case", type_name = "mailbox_type")]
pub enum MailboxType {
Trigger,
DebouncingStaleData,
}

impl Mailbox {
pub fn open(mailbox_id: Option<&str>, mailbox_type: MailboxType, workspace_id: &str) -> Self {
Self {
mailbox_id: mailbox_id.map(str::to_owned),
mailbox_type,
workspace_id: workspace_id.to_owned(),
}
}

pub async fn push<'c>(
&self,
payload: MsgPayload,
e: impl sqlx::Executor<'c, Database = Postgres>,
) -> error::Result<()> {
sqlx::query!(
r#"INSERT INTO mailbox(mailbox_id, type, payload, workspace_id) VALUES ($1, $2, $3, $4)"#,
self.mailbox_id.as_ref(),
self.mailbox_type as MailboxType,
payload,
self.workspace_id
)
.execute(e)
.await?;

Ok(())
}

pub async fn pull<'c>(
&self,
e: impl sqlx::Executor<'c, Database = Postgres>,
) -> error::Result<Option<MailboxMsg>> {
sqlx::query_as!(
MailboxMsg,
r#"
DELETE FROM mailbox
WHERE message_id = ( SELECT message_id ║
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the stray '║' character in the SQL query and consider using 'IS NOT DISTINCT FROM' for mailbox_id (to correctly match NULL values). Also, add an ORDER BY (e.g., ORDER BY message_id ASC) in the subquery for deterministic behavior.

Suggested change
WHERE message_id = ( SELECT message_id
WHERE message_id = ( SELECT message_id

FROM mailbox
WHERE type = $1 AND mailbox_id = $2 AND workspace_id = $3
LIMIT 1
)
RETURNING payload, created_at, message_id as id;
"#,
self.mailbox_type as MailboxType,
self.mailbox_id.as_ref(),
&self.workspace_id,
)
.fetch_optional(e)
.await
.map_err(error::Error::from)
}

pub async fn pull_all<'c>(
&self,
e: impl sqlx::Executor<'c, Database = Postgres>,
) -> error::Result<Vec<MailboxMsg>> {
sqlx::query_as!(
MailboxMsg,
r#"
DELETE FROM mailbox
WHERE type = $1 AND mailbox_id IS NOT DISTINCT FROM $2 AND workspace_id = $3
RETURNING payload, created_at, message_id as id;
"#,
self.mailbox_type as MailboxType,
self.mailbox_id.as_ref(),
&self.workspace_id,
)
.fetch_all(e)
.await
.map_err(error::Error::from)
}

pub async fn delete<'c>(
&self,
message_id: i64,
e: impl sqlx::Executor<'c, Database = Postgres>,
) -> error::Result<()> {
sqlx::query!(
r#"
DELETE FROM mailbox
WHERE message_id = $1
AND workspace_id = $2
AND type = $3
AND mailbox_id IS NOT DISTINCT FROM $4
"#,
message_id,
&self.workspace_id,
self.mailbox_type as MailboxType,
self.mailbox_id.as_ref(),
)
.fetch_all(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use .execute() instead of .fetch_all() in the delete() method since the returned rows are not used.

Suggested change
.fetch_all(e)
.execute(e)

.await?;
Ok(())
}

pub async fn delete_batch<'c>(
&self,
message_ids: Vec<i64>,
e: impl sqlx::Executor<'c, Database = Postgres>,
) -> error::Result<()> {
sqlx::query!(
r#"
DELETE FROM mailbox
WHERE message_id = ANY($1)
AND workspace_id = $2
AND type = $3
AND mailbox_id IS NOT DISTINCT FROM $4
"#,
&message_ids,
&self.workspace_id,
self.mailbox_type as MailboxType,
self.mailbox_id.as_ref(),
)
.fetch_all(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use .execute() instead of .fetch_all() in the delete_batch() method to improve performance by avoiding retrieval of result rows.

Suggested change
.fetch_all(e)
.execute(e)

.await?;
Ok(())
}

pub async fn read<'c>(
&self,
e: impl sqlx::Executor<'c, Database = Postgres>,
) -> error::Result<Option<MailboxMsg>> {
sqlx::query_as!(
MailboxMsg,
r#"
SELECT payload, created_at, message_id as id
FROM mailbox
WHERE type = $1 AND mailbox_id IS NOT DISTINCT FROM $2 AND workspace_id = $3
"#,
self.mailbox_type as MailboxType,
self.mailbox_id.as_ref(),
&self.workspace_id,
)
.fetch_optional(e)
.await
.map_err(error::Error::from)
}

pub async fn read_all<'c>(
&self,
e: impl sqlx::Executor<'c, Database = Postgres>,
) -> error::Result<Vec<MailboxMsg>> {
sqlx::query_as!(
MailboxMsg,
r#"
SELECT payload, created_at, message_id as id
FROM mailbox
WHERE type = $1 AND mailbox_id IS NOT DISTINCT FROM $2 AND workspace_id = $3
"#,
self.mailbox_type as MailboxType,
self.mailbox_id.as_ref(),
&self.workspace_id
)
.fetch_all(e)
.await
.map_err(error::Error::from)
}
}

#[cfg(test)]
mod mailbox_tests {
use serde_json::json;

use crate::mailbox::Mailbox;

#[sqlx::test(fixtures("../../migrations/20251028105101_mailbox.up.sql"))]
async fn test_mailbox(db: sqlx::Pool<sqlx::Postgres>) -> anyhow::Result<()> {
let db = &db;
let push = async |mbox: Mailbox| {
mbox.push(json!(1), db).await.unwrap();
mbox.push(json!(2), db).await.unwrap();
mbox.push(json!(3), db).await.unwrap();
};

let assert_read = async |mbox: Mailbox| {
assert_eq!(mbox.read(db).await.unwrap().unwrap().payload, json!(1));
};

let assert_read_all = async |mbox: Mailbox| {
let all = mbox.read_all(db).await.unwrap();
assert_eq!(all.len(), 3);
assert_eq!(all[0].payload, json!(1));
assert_eq!(all[1].payload, json!(2));
assert_eq!(all[2].payload, json!(3));
};

let assert_pull = async |mbox: Mailbox| {
assert_eq!(mbox.pull(db).await.unwrap().unwrap().payload, json!(1));
assert_eq!(mbox.pull(db).await.unwrap().unwrap().payload, json!(2));
assert_eq!(mbox.pull(db).await.unwrap().unwrap().payload, json!(3));
assert!(mbox.pull(db).await.unwrap().is_none());
};

let assert_pull_all = async |mbox: Mailbox| {
let all = mbox.pull_all(db).await.unwrap();
assert_eq!(all.len(), 3);
assert_eq!(all[0].payload, json!(1));
assert_eq!(all[1].payload, json!(2));
assert_eq!(all[2].payload, json!(3));
};

// Run those in parallel to make sure they are not conflicting
tokio::join!(
// Main body
// All others will be small deviations from this one
async {
let mbox = Mailbox::open(
Some("mymailbox"),
crate::mailbox::MailboxType::Trigger,
"test-workspace_id",
);
push(mbox.clone()).await;
assert_read(mbox.clone()).await;
assert_read_all(mbox.clone()).await;
assert_pull(mbox.clone()).await;
},
// Same as above, but different workspace_id
async {
let mbox = Mailbox::open(
Some("mymailbox"),
crate::mailbox::MailboxType::Trigger,
"another-workspace_id",
);
push(mbox.clone()).await;
assert_read(mbox.clone()).await;
assert_read_all(mbox.clone()).await;
assert_pull(mbox.clone()).await;
},
// Different id
async {
let mbox = Mailbox::open(
Some("another id"),
crate::mailbox::MailboxType::Trigger,
"test-workspace_id",
);
push(mbox.clone()).await;
assert_read(mbox.clone()).await;
assert_read_all(mbox.clone()).await;
assert_pull(mbox.clone()).await;
},
// Different kind
async {
let mbox = Mailbox::open(
Some("mymailbox"),
crate::mailbox::MailboxType::DebouncingStaleData,
"test-workspace_id",
);
push(mbox.clone()).await;
assert_read(mbox.clone()).await;
assert_read_all(mbox.clone()).await;
assert_pull(mbox.clone()).await;
},
// Global mailboix
async {
let mbox = Mailbox::open(
None,
crate::mailbox::MailboxType::Trigger,
"test-workspace_id",
);
push(mbox.clone()).await;
dbg!(
sqlx::query!("SELECT mailbox_id, payload, workspace_id FROM mailbox")
.fetch_all(db)
.await
.unwrap()
);
assert_read(mbox.clone()).await;
assert_read_all(mbox.clone()).await;
// Also test pull_all
assert_pull_all(mbox.clone()).await;
},
);

Ok(())
}
}
Loading