diff --git a/channels-src/discord/discord.capabilities.json b/channels-src/discord/discord.capabilities.json index 9ff7a8905d..0e7d38083b 100644 --- a/channels-src/discord/discord.capabilities.json +++ b/channels-src/discord/discord.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "type": "channel", "name": "discord", "description": "Discord webhook channel for slash commands, components, and optional mention polling", diff --git a/channels-src/discord/src/lib.rs b/channels-src/discord/src/lib.rs index cdb6c51507..7035105efc 100644 --- a/channels-src/discord/src/lib.rs +++ b/channels-src/discord/src/lib.rs @@ -32,6 +32,7 @@ use exports::near::agent::channel::{ AgentResponse, ChannelConfig, Guest, HttpEndpointConfig, IncomingHttpRequest, OutgoingHttpResponse, PollConfig, StatusUpdate, }; +use exports::near::agent::channel_persistence; use near::agent::channel_host::{self, EmittedMessage}; /// Discord interaction wrapper. @@ -1205,6 +1206,12 @@ fn json_response(status: u16, value: serde_json::Value) -> OutgoingHttpResponse } } +impl channel_persistence::Guest for DiscordChannel { + fn on_message_persisted(_metadata_json: String) -> Result<(), String> { + Ok(()) // No-op: Discord does not support read receipts + } +} + export!(DiscordChannel); fn truncate_message(content: &str) -> String { diff --git a/channels-src/feishu/src/lib.rs b/channels-src/feishu/src/lib.rs index 2e7261d811..e41a99a5e5 100644 --- a/channels-src/feishu/src/lib.rs +++ b/channels-src/feishu/src/lib.rs @@ -36,6 +36,7 @@ use exports::near::agent::channel::{ AgentResponse, ChannelConfig, Guest, HttpEndpointConfig, IncomingHttpRequest, OutgoingHttpResponse, StatusUpdate, }; +use exports::near::agent::channel_persistence; use near::agent::channel_host::{self, EmittedMessage}; // ============================================================================ @@ -268,6 +269,12 @@ fn default_api_base() -> String { struct FeishuChannel; +impl channel_persistence::Guest for FeishuChannel { + fn on_message_persisted(_metadata_json: String) -> Result<(), String> { + Ok(()) // No-op: Feishu does not require post-persistence actions + } +} + export!(FeishuChannel); impl Guest for FeishuChannel { diff --git a/channels-src/slack/slack.capabilities.json b/channels-src/slack/slack.capabilities.json index 7035d9253b..5fa5ced4d0 100644 --- a/channels-src/slack/slack.capabilities.json +++ b/channels-src/slack/slack.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "type": "channel", "name": "slack", "description": "Slack Events API channel for receiving and responding to Slack messages", diff --git a/channels-src/slack/src/lib.rs b/channels-src/slack/src/lib.rs index 24f01df393..8341cdc7cf 100644 --- a/channels-src/slack/src/lib.rs +++ b/channels-src/slack/src/lib.rs @@ -29,6 +29,7 @@ use exports::near::agent::channel::{ AgentResponse, ChannelConfig, Guest, HttpEndpointConfig, IncomingHttpRequest, OutgoingHttpResponse, StatusUpdate, }; +use exports::near::agent::channel_persistence; use near::agent::channel_host::{self, EmittedMessage, InboundAttachment}; /// Slack event wrapper. @@ -712,6 +713,12 @@ fn json_response(status: u16, value: serde_json::Value) -> OutgoingHttpResponse } // Export the component +impl channel_persistence::Guest for SlackChannel { + fn on_message_persisted(_metadata_json: String) -> Result<(), String> { + Ok(()) // No-op: Slack does not require post-persistence actions + } +} + export!(SlackChannel); #[cfg(test)] diff --git a/channels-src/telegram/src/lib.rs b/channels-src/telegram/src/lib.rs index a095ccb3a2..2126de6e05 100644 --- a/channels-src/telegram/src/lib.rs +++ b/channels-src/telegram/src/lib.rs @@ -33,6 +33,7 @@ use exports::near::agent::channel::{ AgentResponse, Attachment, ChannelConfig, Guest, HttpEndpointConfig, IncomingHttpRequest, OutgoingHttpResponse, PollConfig, StatusType, StatusUpdate, }; +use exports::near::agent::channel_persistence; use near::agent::channel_host::{self, EmittedMessage, InboundAttachment}; // ============================================================================ @@ -2033,6 +2034,12 @@ fn json_response(status: u16, value: serde_json::Value) -> OutgoingHttpResponse } // Export the component +impl channel_persistence::Guest for TelegramChannel { + fn on_message_persisted(_metadata_json: String) -> Result<(), String> { + Ok(()) // No-op: Telegram does not require post-persistence actions + } +} + export!(TelegramChannel); // ============================================================================ diff --git a/channels-src/telegram/telegram.capabilities.json b/channels-src/telegram/telegram.capabilities.json index 1526762ded..a44de8c985 100644 --- a/channels-src/telegram/telegram.capabilities.json +++ b/channels-src/telegram/telegram.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.2", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "type": "channel", "name": "telegram", "description": "Telegram Bot API channel for receiving and responding to Telegram messages", diff --git a/channels-src/whatsapp/Cargo.lock b/channels-src/whatsapp/Cargo.lock index 0e55d1e532..adefa9aa3b 100644 --- a/channels-src/whatsapp/Cargo.lock +++ b/channels-src/whatsapp/Cargo.lock @@ -269,7 +269,7 @@ dependencies = [ [[package]] name = "whatsapp-channel" -version = "0.1.0" +version = "0.2.0" dependencies = [ "serde", "serde_json", diff --git a/channels-src/whatsapp/src/lib.rs b/channels-src/whatsapp/src/lib.rs index c69a9b9f90..1650f6d601 100644 --- a/channels-src/whatsapp/src/lib.rs +++ b/channels-src/whatsapp/src/lib.rs @@ -32,6 +32,7 @@ use exports::near::agent::channel::{ AgentResponse, ChannelConfig, Guest, HttpEndpointConfig, IncomingHttpRequest, OutgoingHttpResponse, StatusUpdate, }; +use exports::near::agent::channel_persistence; use near::agent::channel_host::{self, EmittedMessage, InboundAttachment}; // ============================================================================ @@ -290,6 +291,14 @@ struct WhatsAppConfig { #[serde(default)] allow_from: Option>, + + /// Whether to mark incoming messages as read (default: true) + #[serde(default = "default_mark_as_read")] + mark_as_read: bool, +} + +fn default_mark_as_read() -> bool { + true } fn default_api_version() -> String { @@ -321,6 +330,7 @@ impl Guest for WhatsAppChannel { owner_id: None, dm_policy: None, allow_from: None, + mark_as_read: default_mark_as_read(), } } }; @@ -354,6 +364,10 @@ impl Guest for WhatsAppChannel { .unwrap_or_else(|_| "[]".to_string()); let _ = channel_host::workspace_write(ALLOW_FROM_PATH, &allow_from_json); + // Persist mark_as_read setting for on_message_persisted + let mark_as_read_str = if config.mark_as_read { "true" } else { "false" }; + let _ = channel_host::workspace_write("channels/whatsapp/mark_as_read", mark_as_read_str); + // WhatsApp Cloud API is webhook-only, no polling available Ok(ChannelConfig { display_name: "WhatsApp".to_string(), @@ -524,6 +538,73 @@ impl Guest for WhatsAppChannel { } } +// ============================================================================ +// Persistence Callback Implementation +// ============================================================================ + +/// Metadata extracted from persisted message for mark_as_read callback. +#[derive(Debug, Deserialize)] +struct PersistedMessageMetadata { + phone_number_id: String, + message_id: String, +} + +impl channel_persistence::Guest for WhatsAppChannel { + /// Called after a message has been persisted to the database. + /// + /// This callback is used to mark the message as read in WhatsApp, + /// removing the "typing..." indicator from the sender's view. + fn on_message_persisted(metadata_json: String) -> Result<(), String> { + // Check if mark_as_read is enabled + let mark_as_read_enabled = match channel_host::workspace_read("channels/whatsapp/mark_as_read") { + Some(s) => s == "true", + None => return Ok(()), // Default to disabled if not set + }; + + if !mark_as_read_enabled { + channel_host::log( + channel_host::LogLevel::Debug, + "mark_as_read disabled, skipping callback", + ); + return Ok(()); + } + + // Parse metadata to extract phone_number_id and message_id + let metadata: PersistedMessageMetadata = match serde_json::from_str(&metadata_json) { + Ok(m) => m, + Err(e) => { + // Metadata parsing failed - log but don't fail the callback + // This can happen for messages without proper routing metadata + channel_host::log( + channel_host::LogLevel::Debug, + &format!("Failed to parse metadata for mark_as_read: {}", e), + ); + return Ok(()); // Return Ok to not block ACK + } + }; + + // Mark the message as read via WhatsApp Cloud API + match mark_message_as_read(&metadata.phone_number_id, &metadata.message_id) { + Ok(()) => { + channel_host::log( + channel_host::LogLevel::Debug, + &format!("Marked message {} as read", metadata.message_id), + ); + Ok(()) + } + Err(e) => { + // Log error but return Ok to not block message ACK + // The message was already persisted, so we don't want to fail the callback + channel_host::log( + channel_host::LogLevel::Warn, + &format!("Failed to mark message as read: {}", e), + ); + Ok(()) // Always return Ok - mark_as_read failure is non-blocking + } + } + } +} + // ============================================================================ // Webhook Verification // ============================================================================ @@ -945,6 +1026,53 @@ fn send_pairing_reply( } } +/// Mark a WhatsApp message as read via the Cloud API. +/// +/// https://developers.facebook.com/docs/whatsapp/cloud-api/guides/mark-message-as-read +fn mark_message_as_read(phone_number_id: &str, message_id: &str) -> Result<(), String> { + // Read api_version from workspace (set during on_start), fallback to default + let api_version = channel_host::workspace_read("channels/whatsapp/api_version") + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| "v18.0".to_string()); + + let url = format!( + "https://graph.facebook.com/{}/{}/messages", + api_version, phone_number_id + ); + + // Mark as read payload + let payload = serde_json::json!({ + "messaging_product": "whatsapp", + "status": "read", + "message_id": message_id + }); + + let payload_bytes = + serde_json::to_vec(&payload).map_err(|e| format!("Failed to serialize: {}", e))?; + + let headers = serde_json::json!({ + "Content-Type": "application/json", + "Authorization": "Bearer {WHATSAPP_ACCESS_TOKEN}" + }); + + let result = channel_host::http_request( + "POST", + &url, + &headers.to_string(), + Some(&payload_bytes), + None, + ); + + match result { + Ok(response) if response.status >= 200 && response.status < 300 => Ok(()), + Ok(response) => { + let body_str = String::from_utf8_lossy(&response.body); + Err(format!("WhatsApp API error: {} - {}", response.status, body_str)) + } + Err(e) => Err(format!("HTTP request failed: {}", e)), + } +} + /// Create a JSON HTTP response. fn json_response(status: u16, value: serde_json::Value) -> OutgoingHttpResponse { let body = serde_json::to_vec(&value).unwrap_or_default(); diff --git a/channels-src/whatsapp/whatsapp.capabilities.json b/channels-src/whatsapp/whatsapp.capabilities.json index a0115d7902..bafc072583 100644 --- a/channels-src/whatsapp/whatsapp.capabilities.json +++ b/channels-src/whatsapp/whatsapp.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "type": "channel", "name": "whatsapp", "description": "WhatsApp Cloud API channel for receiving and responding to WhatsApp messages", @@ -11,6 +11,11 @@ "prompt": "Enter your WhatsApp Cloud API permanent access token (from the Meta Developer Portal under your app's WhatsApp > API Setup).", "validation": "^[A-Za-z0-9_-]+$" }, + { + "name": "whatsapp_app_secret", + "prompt": "Enter your WhatsApp App Secret (from the Meta Developer Portal under your app's WhatsApp > Configuration). Used for webhook signature verification.", + "validation": "^[a-fA-F0-9]{32}$" + }, { "name": "whatsapp_verify_token", "prompt": "Webhook verify token (leave empty to auto-generate)", @@ -45,7 +50,9 @@ "webhook": { "secret_header": "X-Hub-Signature-256", "secret_name": "whatsapp_verify_token", - "verify_token_param": "hub.verify_token" + "verify_token_param": "hub.verify_token", + "verification_mode": "query_param", + "hmac_secret_name": "whatsapp_app_secret" } } }, @@ -54,6 +61,7 @@ "reply_to_message": true, "owner_id": null, "dm_policy": "pairing", - "allow_from": [] + "allow_from": [], + "mark_as_read": true } } diff --git a/registry/channels/discord.json b/registry/channels/discord.json index dc545d75d7..eec357f530 100644 --- a/registry/channels/discord.json +++ b/registry/channels/discord.json @@ -2,8 +2,8 @@ "name": "discord", "display_name": "Discord Channel", "kind": "channel", - "version": "0.2.1", - "wit_version": "0.3.0", + "version": "0.2.3", + "wit_version": "0.4.0", "description": "Talk to your agent in Discord", "keywords": [ "messaging", diff --git a/registry/channels/feishu.json b/registry/channels/feishu.json index 66cecf1dd2..df718a40eb 100644 --- a/registry/channels/feishu.json +++ b/registry/channels/feishu.json @@ -2,8 +2,8 @@ "name": "feishu", "display_name": "Feishu / Lark Channel", "kind": "channel", - "version": "0.1.1", - "wit_version": "0.3.0", + "version": "0.1.2", + "wit_version": "0.4.0", "description": "Talk to your agent through a Feishu or Lark bot", "keywords": [ "messaging", diff --git a/registry/channels/slack.json b/registry/channels/slack.json index e6d3660484..5390b66743 100644 --- a/registry/channels/slack.json +++ b/registry/channels/slack.json @@ -2,8 +2,8 @@ "name": "slack", "display_name": "Slack Channel", "kind": "channel", - "version": "0.2.1", - "wit_version": "0.3.0", + "version": "0.2.3", + "wit_version": "0.4.0", "description": "Talk to your agent in Slack", "keywords": [ "messaging", diff --git a/registry/channels/telegram.json b/registry/channels/telegram.json index bd07208f7d..197962907c 100644 --- a/registry/channels/telegram.json +++ b/registry/channels/telegram.json @@ -2,8 +2,8 @@ "name": "telegram", "display_name": "Telegram Channel", "kind": "channel", - "version": "0.2.4", - "wit_version": "0.3.0", + "version": "0.2.5", + "wit_version": "0.4.0", "description": "Talk to your agent through a Telegram bot", "keywords": [ "messaging", diff --git a/registry/channels/whatsapp.json b/registry/channels/whatsapp.json index be3faf0dc9..823077ef2b 100644 --- a/registry/channels/whatsapp.json +++ b/registry/channels/whatsapp.json @@ -2,8 +2,8 @@ "name": "whatsapp", "display_name": "WhatsApp Channel", "kind": "channel", - "version": "0.2.0", - "wit_version": "0.3.0", + "version": "0.2.2", + "wit_version": "0.4.0", "description": "Talk to your agent through WhatsApp", "keywords": [ "messaging", diff --git a/registry/tools/github.json b/registry/tools/github.json index e760c4df0a..d13ad8870a 100644 --- a/registry/tools/github.json +++ b/registry/tools/github.json @@ -2,8 +2,8 @@ "name": "github", "display_name": "GitHub", "kind": "tool", - "version": "0.2.1", - "wit_version": "0.3.0", + "version": "0.2.2", + "wit_version": "0.4.0", "description": "GitHub integration for issues, PRs, repos, and code search", "keywords": [ "git", diff --git a/registry/tools/gmail.json b/registry/tools/gmail.json index 08913ce697..169c991d0a 100644 --- a/registry/tools/gmail.json +++ b/registry/tools/gmail.json @@ -2,8 +2,8 @@ "name": "gmail", "display_name": "Gmail", "kind": "tool", - "version": "0.2.0", - "wit_version": "0.3.0", + "version": "0.2.1", + "wit_version": "0.4.0", "description": "Read, send, and manage Gmail messages and threads", "keywords": [ "email", diff --git a/registry/tools/google-calendar.json b/registry/tools/google-calendar.json index c43112d33b..62f1d90d46 100644 --- a/registry/tools/google-calendar.json +++ b/registry/tools/google-calendar.json @@ -2,8 +2,8 @@ "name": "google-calendar", "display_name": "Google Calendar", "kind": "tool", - "version": "0.2.0", - "wit_version": "0.3.0", + "version": "0.2.1", + "wit_version": "0.4.0", "description": "Create, read, update, and delete Google Calendar events", "keywords": [ "calendar", diff --git a/registry/tools/google-docs.json b/registry/tools/google-docs.json index 9f1ab133f0..ef5ac3ee26 100644 --- a/registry/tools/google-docs.json +++ b/registry/tools/google-docs.json @@ -2,8 +2,8 @@ "name": "google-docs", "display_name": "Google Docs", "kind": "tool", - "version": "0.2.0", - "wit_version": "0.3.0", + "version": "0.2.1", + "wit_version": "0.4.0", "description": "Create and edit Google Docs documents", "keywords": [ "documents", diff --git a/registry/tools/google-drive.json b/registry/tools/google-drive.json index 9766e555d9..7e88e42af5 100644 --- a/registry/tools/google-drive.json +++ b/registry/tools/google-drive.json @@ -2,8 +2,8 @@ "name": "google-drive", "display_name": "Google Drive", "kind": "tool", - "version": "0.2.0", - "wit_version": "0.3.0", + "version": "0.2.1", + "wit_version": "0.4.0", "description": "Upload, download, search, and manage Google Drive files and folders", "keywords": [ "storage", diff --git a/registry/tools/google-sheets.json b/registry/tools/google-sheets.json index b63265e1c8..3f4fca2565 100644 --- a/registry/tools/google-sheets.json +++ b/registry/tools/google-sheets.json @@ -2,8 +2,8 @@ "name": "google-sheets", "display_name": "Google Sheets", "kind": "tool", - "version": "0.2.0", - "wit_version": "0.3.0", + "version": "0.2.1", + "wit_version": "0.4.0", "description": "Read and write Google Sheets spreadsheet data", "keywords": [ "spreadsheets", diff --git a/registry/tools/google-slides.json b/registry/tools/google-slides.json index 54187531f8..29a211096f 100644 --- a/registry/tools/google-slides.json +++ b/registry/tools/google-slides.json @@ -2,8 +2,8 @@ "name": "google-slides", "display_name": "Google Slides", "kind": "tool", - "version": "0.2.0", - "wit_version": "0.3.0", + "version": "0.2.1", + "wit_version": "0.4.0", "description": "Create and edit Google Slides presentations", "keywords": [ "presentations", diff --git a/registry/tools/llm-context.json b/registry/tools/llm-context.json index e4e9808c5f..0d3bcb6b98 100644 --- a/registry/tools/llm-context.json +++ b/registry/tools/llm-context.json @@ -2,8 +2,8 @@ "name": "llm-context", "display_name": "LLM Context", "kind": "tool", - "version": "0.1.0", - "wit_version": "0.3.0", + "version": "0.1.1", + "wit_version": "0.4.0", "description": "Fetch pre-extracted web content from Brave Search for grounding LLM answers (RAG, fact-checking)", "keywords": [ "search", diff --git a/registry/tools/slack.json b/registry/tools/slack.json index 8e1df98968..7e0e9e2d6b 100644 --- a/registry/tools/slack.json +++ b/registry/tools/slack.json @@ -2,8 +2,8 @@ "name": "slack-tool", "display_name": "Slack Tool", "kind": "tool", - "version": "0.2.0", - "wit_version": "0.3.0", + "version": "0.2.1", + "wit_version": "0.4.0", "description": "Your agent uses Slack to post and read messages in your workspace", "keywords": [ "messaging", diff --git a/registry/tools/telegram.json b/registry/tools/telegram.json index 12e58c684d..627709e0d6 100644 --- a/registry/tools/telegram.json +++ b/registry/tools/telegram.json @@ -2,8 +2,8 @@ "name": "telegram-mtproto", "display_name": "Telegram Tool", "kind": "tool", - "version": "0.2.0", - "wit_version": "0.3.0", + "version": "0.2.1", + "wit_version": "0.4.0", "description": "Your agent uses your Telegram account to read and send messages", "keywords": [ "messaging", diff --git a/registry/tools/web-search.json b/registry/tools/web-search.json index 5c1dedefde..caf13b3814 100644 --- a/registry/tools/web-search.json +++ b/registry/tools/web-search.json @@ -2,8 +2,8 @@ "name": "web-search", "display_name": "Web Search", "kind": "tool", - "version": "0.2.1", - "wit_version": "0.3.0", + "version": "0.2.2", + "wit_version": "0.4.0", "description": "Search the web using Brave Search API", "keywords": [ "search", diff --git a/src/agent/thread_ops.rs b/src/agent/thread_ops.rs index 877a4e2777..89f45f5cff 100644 --- a/src/agent/thread_ops.rs +++ b/src/agent/thread_ops.rs @@ -383,6 +383,8 @@ impl Agent { &message.channel, &message.user_id, effective_content, + message.id, + &message.metadata, ) .await; @@ -574,12 +576,18 @@ impl Agent { /// /// This ensures the user message is durable even if the process crashes /// mid-response. Call this right after `thread.start_turn()`. + /// + /// After persistence, signals ACK to the WASM channel router so that + /// webhook handlers can return 200 OK and `on_message_persisted` callbacks + /// can fire (e.g., for mark_as_read in WhatsApp). pub(super) async fn persist_user_message( &self, thread_id: Uuid, channel: &str, user_id: &str, user_input: &str, + message_id: Uuid, + metadata: &serde_json::Value, ) { let store = match self.store() { Some(s) => Arc::clone(s), @@ -598,6 +606,20 @@ impl Agent { .await { tracing::warn!("Failed to persist user message: {}", e); + return; + } + + // Signal ACK to WASM channels via hook after successful persistence + use crate::hooks::hook::HookEvent; + let hooks = self.hooks(); + let event = HookEvent::MessagePersisted { + user_id: user_id.to_string(), + channel: channel.to_string(), + message_id: message_id.to_string(), + metadata: metadata.clone(), + }; + if let Err(e) = hooks.run(&event).await { + tracing::warn!("MessagePersisted hook failed: {}", e); } } diff --git a/src/channels/wasm/hook.rs b/src/channels/wasm/hook.rs new file mode 100644 index 0000000000..860ee8b039 --- /dev/null +++ b/src/channels/wasm/hook.rs @@ -0,0 +1,135 @@ +//! Hook for signaling message persistence ACK to WASM channels. +//! +//! This hook is registered by the WASM channel subsystem to listen for +//! `OnMessagePersisted` events. When a user message is successfully persisted +//! to the database, this hook signals the WASM router, which then: +//! 1. Unblocks any pending webhook handlers waiting for ACK +//! 2. Calls the `on_message_persisted` callback on the appropriate WASM channel +//! +//! # Performance Consideration +//! +//! The hook serializes the entire `metadata` JSON object on every message +//! persistence. This is required by the WASM `on_message_persisted` interface +//! which expects a JSON string. For most messages, metadata is small (a few +//! fields like `message_id`, `chat_type`, etc.). If metadata grows large +//! (hundreds of KB), this could impact throughput. +//! +//! The serialization is synchronous in the hook execution path but runs +//! asynchronously via the hook registry, so it doesn't block message +//! persistence itself. + +use super::WasmChannelRouter; +use crate::hooks::hook::{Hook, HookContext, HookError, HookEvent, HookOutcome, HookPoint}; +use async_trait::async_trait; +use std::sync::Arc; + +pub struct MessagePersistedHook { + router: Arc, +} + +impl MessagePersistedHook { + pub fn new(router: Arc) -> Self { + Self { router } + } +} + +#[async_trait] +impl Hook for MessagePersistedHook { + fn name(&self) -> &str { + "wasm_message_persisted" + } + + fn hook_points(&self) -> &[HookPoint] { + &[HookPoint::OnMessagePersisted] + } + + async fn execute( + &self, + event: &HookEvent, + _ctx: &HookContext, + ) -> Result { + if let HookEvent::MessagePersisted { + user_id: _, + channel, + message_id, + metadata, + } = event + { + let ack_key = format!("{}:{}", channel, message_id); + let metadata_json = metadata.to_string(); + tracing::debug!(ack_key = %ack_key, "Signaling ACK via hook"); + self.router.ack_message(&ack_key, &metadata_json).await; + } + Ok(HookOutcome::ok()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::hooks::hook::HookPoint; + use serde_json::json; + + #[test] + fn test_hook_name() { + let router = WasmChannelRouter::new(); + let hook = MessagePersistedHook::new(Arc::new(router)); + assert_eq!(hook.name(), "wasm_message_persisted"); + } + + #[test] + fn test_hook_points() { + let router = WasmChannelRouter::new(); + let hook = MessagePersistedHook::new(Arc::new(router)); + assert_eq!(hook.hook_points(), &[HookPoint::OnMessagePersisted]); + } + + #[test] + fn test_hook_formats_ack_key_correctly() { + // Test that the hook formats the ack_key as "channel:message_id" + let channel = "test-channel"; + let message_id = "msg-123"; + let expected_key = format!("{}:{}", channel, message_id); + assert_eq!(expected_key, "test-channel:msg-123"); + } + + #[tokio::test] + async fn test_hook_returns_ok_even_for_non_message_persisted_events() { + // Test that hook handles non-MessagePersisted events gracefully + let router = WasmChannelRouter::new(); + let hook = MessagePersistedHook::new(Arc::new(router)); + + // Create a different event type (Inbound) + let event = HookEvent::Inbound { + user_id: "user-1".to_string(), + channel: "test".to_string(), + content: "hello".to_string(), + thread_id: None, + }; + let ctx = HookContext::default(); + + // Hook should return ok() even though it ignores non-MessagePersisted events + let result = hook.execute(&event, &ctx).await; + assert!(result.is_ok()); + assert!(matches!( + result.unwrap(), + HookOutcome::Continue { modified: None } + )); + } + + #[tokio::test] + async fn test_hook_serializes_metadata_to_json() { + // Test that metadata is correctly serialized to JSON + let metadata = json!({ + "message_id": "msg-123", + "chat_type": "group", + "timestamp": 1234567890 + }); + let metadata_json = metadata.to_string(); + + // Verify the JSON is valid and contains expected fields + assert!(metadata_json.contains("msg-123")); + assert!(metadata_json.contains("group")); + assert!(metadata_json.contains("1234567890")); + } +} diff --git a/src/channels/wasm/loader.rs b/src/channels/wasm/loader.rs index 6329428fea..622e326f94 100644 --- a/src/channels/wasm/loader.rs +++ b/src/channels/wasm/loader.rs @@ -317,6 +317,13 @@ impl LoadedChannel { .map(|f| f.webhook_secret_name()) .unwrap_or_else(|| format!("{}_webhook_secret", self.channel.channel_name())) } + + /// Get the webhook verification mode from capabilities. + pub fn webhook_verification_mode(&self) -> Option { + self.capabilities_file + .as_ref() + .and_then(|f| f.webhook_verification_mode().map(|s| s.to_string())) + } } /// Results from loading multiple channels. diff --git a/src/channels/wasm/mod.rs b/src/channels/wasm/mod.rs index 882709a967..ae5bc2ff04 100644 --- a/src/channels/wasm/mod.rs +++ b/src/channels/wasm/mod.rs @@ -81,13 +81,14 @@ mod bundled; mod capabilities; mod error; +mod hook; mod host; mod loader; mod router; mod runtime; mod schema; pub mod setup; -pub(crate) mod signature; +pub mod signature; #[allow(dead_code)] pub(crate) mod storage; mod telegram_host_config; @@ -97,6 +98,7 @@ mod wrapper; pub use bundled::{available_channel_names, bundled_channel_names, install_bundled_channel}; pub use capabilities::{ChannelCapabilities, EmitRateLimitConfig, HttpEndpointConfig, PollConfig}; pub use error::WasmChannelError; +pub use hook::MessagePersistedHook; pub use host::{ChannelEmitRateLimiter, ChannelHostState, EmittedMessage}; pub use loader::{ DiscoveredChannel, LoadResults, LoadedChannel, WasmChannelLoader, default_channels_dir, diff --git a/src/channels/wasm/router.rs b/src/channels/wasm/router.rs index 8005ccea56..122a30c824 100644 --- a/src/channels/wasm/router.rs +++ b/src/channels/wasm/router.rs @@ -6,6 +6,14 @@ use std::collections::HashMap; use std::sync::Arc; +/// Maximum time to wait for ACK from agent before returning HTTP response. +/// If the agent doesn't persist the message within this time, the webhook +/// returns 200 OK anyway (best-effort reliability). +/// +/// WhatsApp Cloud API expects responses in 5-15 seconds, so we use 10 seconds +/// as a conservative value within that range. +const ACK_TIMEOUT_SECS: u64 = 10; + use axum::{ Json, Router, body::Bytes, @@ -15,7 +23,7 @@ use axum::{ routing::{get, post}, }; use serde::{Deserialize, Serialize}; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, oneshot}; use crate::channels::wasm::wrapper::WasmChannel; @@ -46,6 +54,11 @@ pub struct WasmChannelRouter { signature_keys: RwLock>, /// HMAC-SHA256 signing secrets for signature verification by channel name (Slack-style). hmac_secrets: RwLock>, + /// Verification mode per channel: "query_param", "signature", etc. + verification_modes: RwLock>, + /// Pending webhook ACKs - keyed by "channel:message_id", value is signaled when + /// ack_message() is called after message persistence. + pending_acks: RwLock>>, } impl WasmChannelRouter { @@ -58,6 +71,76 @@ impl WasmChannelRouter { secret_headers: RwLock::new(HashMap::new()), signature_keys: RwLock::new(HashMap::new()), hmac_secrets: RwLock::new(HashMap::new()), + verification_modes: RwLock::new(HashMap::new()), + pending_acks: RwLock::new(HashMap::new()), + } + } + + // ======================================================================== + // Webhook Acknowledgment (reliable message processing) + // ======================================================================== + + /// Register a pending acknowledgment for a webhook message. + /// + /// Call this before processing a webhook message. The returned receiver + /// will be signaled when the message has been persisted to the database. + /// The webhook handler should wait on this receiver before returning 200 OK. + /// + /// # Arguments + /// * `key` - Unique identifier for the message, typically "channel:message_id" + /// + /// # Returns + /// A oneshot receiver that will be signaled when ack_message() is called. + pub async fn register_pending_ack(&self, key: String) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel(); + self.pending_acks.write().await.insert(key.clone(), tx); + tracing::debug!(key = %key, "Registered pending webhook ACK"); + rx + } + + /// Signal that a message has been persisted and the webhook can return 200 OK. + /// + /// Called by the agent loop after persist_user_message() completes. + /// Also triggers the on_message_persisted WASM callback for channels that + /// implement it (e.g., WhatsApp for mark_as_read). + /// + /// Note: Deduplication recording happens at webhook handler level (before + /// sending to agent) to prevent race conditions with concurrent webhooks. + /// + /// # Arguments + /// * `key` - The same key passed to register_pending_ack() (format: "channel:message_id") + /// * `message_metadata` - JSON metadata for channel-specific post-persistence actions + pub async fn ack_message(&self, key: &str, message_metadata: &str) { + if let Some(tx) = self.pending_acks.write().await.remove(key) { + // Signal the webhook handler to return 200 OK + let _ = tx.send(()); + tracing::debug!(key = %key, "Webhook ACK signaled"); + + // Parse key to get channel name for callback + let channel_name = key.split(':').next().unwrap_or(""); + + // Look up the channel and call on_message_persisted + if let Some(channel) = self.channels.read().await.get(channel_name) + && let Err(e) = channel.call_on_message_persisted(message_metadata).await + { + tracing::warn!( + channel = %channel_name, + error = %e, + "on_message_persisted callback failed (best-effort)" + ); + } + } else { + tracing::debug!(key = %key, "No pending ACK found (may have timed out)"); + } + } + + /// Clean up an orphaned pending ACK entry after timeout. + /// + /// Called by the webhook handler when an ACK times out or the sender is dropped. + /// This prevents memory leaks from accumulating orphaned entries. + pub async fn cleanup_pending_ack(&self, key: &str) { + if self.pending_acks.write().await.remove(key).is_some() { + tracing::debug!(key = %key, "Cleaned up orphaned pending ACK"); } } @@ -69,12 +152,14 @@ impl WasmChannelRouter { /// * `secret` - Optional webhook secret for validation /// * `secret_header` - Optional HTTP header name for secret validation /// (e.g., "X-Telegram-Bot-Api-Secret-Token"). Defaults to "X-Webhook-Secret". + /// * `verification_mode` - Optional verification mode: "query_param", "signature", etc. pub async fn register( &self, channel: Arc, endpoints: Vec, secret: Option, secret_header: Option, + verification_mode: Option, ) { let name = channel.channel_name().to_string(); @@ -92,6 +177,7 @@ impl WasmChannelRouter { "Registered WASM channel HTTP endpoint" ); } + drop(path_map); // Store secret if provided if let Some(s) = secret { @@ -100,7 +186,15 @@ impl WasmChannelRouter { // Store secret header if provided if let Some(h) = secret_header { - self.secret_headers.write().await.insert(name, h); + self.secret_headers.write().await.insert(name.clone(), h); + } + + // Store verification mode if provided + if let Some(m) = verification_mode { + self.verification_modes + .write() + .await + .insert(name.clone(), m); } } @@ -138,6 +232,7 @@ impl WasmChannelRouter { self.secret_headers.write().await.remove(channel_name); self.signature_keys.write().await.remove(channel_name); self.hmac_secrets.write().await.remove(channel_name); + self.verification_modes.write().await.remove(channel_name); // Remove all paths for this channel self.path_to_channel @@ -145,6 +240,12 @@ impl WasmChannelRouter { .await .retain(|_, name| name != channel_name); + // Remove pending ACKs for this channel + self.pending_acks + .write() + .await + .retain(|key, _| !key.starts_with(&format!("{}:", channel_name))); + tracing::info!( channel = %channel_name, "Unregistered WASM channel" @@ -230,6 +331,17 @@ impl WasmChannelRouter { pub async fn get_hmac_secret(&self, channel_name: &str) -> Option { self.hmac_secrets.read().await.get(channel_name).cloned() } + + /// Get the verification mode for a channel. + /// + /// Returns `None` if no mode is configured (default behavior applies). + pub async fn get_verification_mode(&self, channel_name: &str) -> Option { + self.verification_modes + .read() + .await + .get(channel_name) + .cloned() + } } impl Default for WasmChannelRouter { @@ -333,70 +445,81 @@ async fn webhook_handler( let channel_name = channel.channel_name(); - // Check if secret is required - if state.router.requires_secret(channel_name).await { - // Get the secret header name for this channel (from capabilities or default) - let secret_header_name = state.router.get_secret_header(channel_name).await; - - // Try to get secret from query param or the channel's configured header - let provided_secret = query - .get("secret") - .cloned() - .or_else(|| { - headers - .get(&secret_header_name) - .and_then(|v| v.to_str().ok()) - .map(|s| s.to_string()) - }) - .or_else(|| { - // Fallback to generic header if different from configured - if secret_header_name != "X-Webhook-Secret" { + // Check verification mode for GET requests (WhatsApp-style query param verification) + // In query_param mode, the WASM channel validates via hub.verify_token query param + let verification_mode = state.router.get_verification_mode(channel_name).await; + if method == Method::GET && verification_mode.as_deref() == Some("query_param") { + tracing::debug!( + channel = %channel_name, + "Skipping host-level secret validation for GET request (query_param verification mode)" + ); + // Skip directly to WASM call - the channel validates via query param + } else { + // Check if secret is required + if state.router.requires_secret(channel_name).await { + // Get the secret header name for this channel (from capabilities or default) + let secret_header_name = state.router.get_secret_header(channel_name).await; + + // Try to get secret from query param or the channel's configured header + let provided_secret = query + .get("secret") + .cloned() + .or_else(|| { headers - .get("X-Webhook-Secret") + .get(&secret_header_name) .and_then(|v| v.to_str().ok()) .map(|s| s.to_string()) - } else { - None - } - }); + }) + .or_else(|| { + // Fallback to generic header if different from configured + if secret_header_name != "X-Webhook-Secret" { + headers + .get("X-Webhook-Secret") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) + } else { + None + } + }); - tracing::debug!( - channel = %channel_name, - has_provided_secret = provided_secret.is_some(), - provided_secret_len = provided_secret.as_ref().map(|s| s.len()), - "Checking webhook secret" - ); + tracing::debug!( + channel = %channel_name, + has_provided_secret = provided_secret.is_some(), + provided_secret_len = provided_secret.as_ref().map(|s| s.len()), + "Checking webhook secret" + ); - match provided_secret { - Some(secret) => { - if !state.router.validate_secret(channel_name, &secret).await { + match provided_secret { + Some(secret) => { + if !state.router.validate_secret(channel_name, &secret).await { + tracing::warn!( + channel = %channel_name, + "Webhook secret validation failed" + ); + return ( + StatusCode::UNAUTHORIZED, + Json(serde_json::json!({ + "error": "Invalid webhook secret" + })), + ); + } + tracing::debug!(channel = %channel_name, "Webhook secret validated"); + } + None => { tracing::warn!( channel = %channel_name, - "Webhook secret validation failed" + "Webhook secret required but not provided" ); return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({ - "error": "Invalid webhook secret" + "error": "Webhook secret required" })), ); } - tracing::debug!(channel = %channel_name, "Webhook secret validated"); - } - None => { - tracing::warn!( - channel = %channel_name, - "Webhook secret required but not provided" - ); - return ( - StatusCode::UNAUTHORIZED, - Json(serde_json::json!({ - "error": "Webhook secret required" - })), - ); } } - } + } // end of verification_mode else block // Ed25519 signature verification (Discord-style) if let Some(pub_key_hex) = state.router.get_signature_key(channel_name).await { @@ -449,17 +572,24 @@ async fn webhook_handler( } } - // HMAC-SHA256 signature verification (Slack-style) + // HMAC-SHA256 signature verification (Slack-style or WhatsApp-style) if let Some(hmac_secret) = state.router.get_hmac_secret(channel_name).await { - let timestamp = headers + // Try Slack-style headers first + let slack_timestamp = headers .get("x-slack-request-timestamp") .and_then(|v| v.to_str().ok()); - let sig_header = headers + let slack_sig = headers .get("x-slack-signature") .and_then(|v| v.to_str().ok()); - match (timestamp, sig_header) { - (Some(ts), Some(sig)) => { + // Try WhatsApp-style header + let whatsapp_sig = headers + .get("x-hub-signature-256") + .and_then(|v| v.to_str().ok()); + + match (slack_timestamp, slack_sig, whatsapp_sig) { + // Slack-style verification + (Some(ts), Some(sig), _) => { let now_secs = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() @@ -474,7 +604,7 @@ async fn webhook_handler( ) { tracing::warn!( channel = %channel_name, - "HMAC-SHA256 signature verification failed" + "HMAC-SHA256 signature verification failed (Slack-style)" ); return ( StatusCode::UNAUTHORIZED, @@ -483,17 +613,34 @@ async fn webhook_handler( })), ); } - tracing::debug!(channel = %channel_name, "HMAC-SHA256 signature verified"); + tracing::debug!(channel = %channel_name, "HMAC-SHA256 signature verified (Slack-style)"); } + // WhatsApp-style verification + (_, _, Some(sig)) => { + if !crate::channels::wasm::signature::verify_hmac_sha256(&hmac_secret, sig, &body) { + tracing::warn!( + channel = %channel_name, + "HMAC-SHA256 signature verification failed (WhatsApp-style)" + ); + return ( + StatusCode::UNAUTHORIZED, + Json(serde_json::json!({ + "error": "Invalid signature" + })), + ); + } + tracing::debug!(channel = %channel_name, "HMAC-SHA256 signature verified (WhatsApp-style)"); + } + // No recognized signature headers _ => { tracing::warn!( channel = %channel_name, - "Slack signature headers missing but secret is registered" + "HMAC signature headers missing but secret is registered" ); return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({ - "error": "Missing Slack signature headers" + "error": "Missing signature headers" })), ); } @@ -530,7 +677,26 @@ async fn webhook_handler( ) .await { - Ok(response) => { + Ok((response, emitted_info)) => { + // Register pending ACKs for emitted messages + // Track both keys and receivers so we can clean up timed-out entries + let mut ack_keys: Vec = Vec::new(); + let mut ack_receivers: Vec> = Vec::new(); + for (message_id, _metadata) in &emitted_info { + let ack_key = format!("{}:{}", channel_name, message_id); + let rx = state.router.register_pending_ack(ack_key.clone()).await; + ack_keys.push(ack_key); + ack_receivers.push(rx); + + // Metadata will be passed to ack_message() by the agent after persistence, + // which triggers on_message_persisted callback (e.g., for mark_as_read) + tracing::debug!( + channel = %channel_name, + message_id = %message_id, + "Registered pending ACK for webhook message" + ); + } + let status = StatusCode::from_u16(response.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); @@ -538,6 +704,7 @@ async fn webhook_handler( channel = %channel_name, status = %status, body_len = response.body.len(), + emitted_count = emitted_info.len(), "WASM channel on_http_request completed successfully" ); @@ -549,6 +716,51 @@ async fn webhook_handler( }) }); + // Wait for ACKs with timeout + // If no messages were emitted, return immediately + if !ack_receivers.is_empty() { + let ack_timeout = tokio::time::Duration::from_secs(ACK_TIMEOUT_SECS); + let ack_results: Vec<_> = futures::future::join_all( + ack_receivers + .into_iter() + .map(|rx| tokio::time::timeout(ack_timeout, rx)), + ) + .await; + + let mut acked = 0; + let mut timed_out = 0; + for (i, result) in ack_results.into_iter().enumerate() { + match result { + Ok(Ok(())) => acked += 1, + Ok(Err(_)) => { + // Sender dropped - clean up the orphaned entry + timed_out += 1; + state.router.cleanup_pending_ack(&ack_keys[i]).await; + } + Err(_) => { + // Timeout - clean up the orphaned entry + timed_out += 1; + state.router.cleanup_pending_ack(&ack_keys[i]).await; + } + } + } + + if timed_out > 0 { + tracing::warn!( + channel = %channel_name, + acked = acked, + timed_out = timed_out, + "Some webhook ACKs timed out (cleaned up orphaned entries)" + ); + } else { + tracing::debug!( + channel = %channel_name, + acked = acked, + "All webhook ACKs received" + ); + } + } + (status, Json(body_json)) } Err(e) => { @@ -692,7 +904,13 @@ mod tests { }]; router - .register(channel, endpoints, Some("secret123".to_string()), None) + .register( + channel, + endpoints, + Some("secret123".to_string()), + None, + None, + ) .await; // Should find channel by path @@ -711,7 +929,7 @@ mod tests { let channel = create_test_channel("slack"); router - .register(channel, vec![], Some("secret123".to_string()), None) + .register(channel, vec![], Some("secret123".to_string()), None, None) .await; // Correct secret @@ -722,7 +940,7 @@ mod tests { // Channel without secret always validates let channel2 = create_test_channel("telegram"); - router.register(channel2, vec![], None, None).await; + router.register(channel2, vec![], None, None, None).await; assert!(router.validate_secret("telegram", "anything").await); } @@ -738,7 +956,7 @@ mod tests { require_secret: false, }]; - router.register(channel, endpoints, None, None).await; + router.register(channel, endpoints, None, None, None).await; // Should exist assert!( @@ -767,8 +985,8 @@ mod tests { let channel1 = create_test_channel("slack"); let channel2 = create_test_channel("telegram"); - router.register(channel1, vec![], None, None).await; - router.register(channel2, vec![], None, None).await; + router.register(channel1, vec![], None, None, None).await; + router.register(channel2, vec![], None, None, None).await; let channels = router.list_channels().await; assert_eq!(channels.len(), 2); @@ -788,6 +1006,7 @@ mod tests { vec![], Some("secret123".to_string()), Some("X-Telegram-Bot-Api-Secret-Token".to_string()), + None, ) .await; @@ -800,7 +1019,7 @@ mod tests { // Channel without custom header should use default let channel2 = create_test_channel("slack"); router - .register(channel2, vec![], Some("secret456".to_string()), None) + .register(channel2, vec![], Some("secret456".to_string()), None, None) .await; assert_eq!(router.get_secret_header("slack").await, "X-Webhook-Secret"); } @@ -812,7 +1031,7 @@ mod tests { let router = WasmChannelRouter::new(); let channel = create_test_channel("slack"); - router.register(channel, vec![], None, None).await; + router.register(channel, vec![], None, None, None).await; let hmac_secret = "my-slack-signing-secret"; router.register_hmac_secret("slack", hmac_secret).await; @@ -825,7 +1044,7 @@ mod tests { async fn test_no_hmac_secret_returns_none() { let router = WasmChannelRouter::new(); let channel = create_test_channel("slack"); - router.register(channel, vec![], None, None).await; + router.register(channel, vec![], None, None, None).await; // Slack has no HMAC secret registered let secret = router.get_hmac_secret("slack").await; @@ -844,7 +1063,7 @@ mod tests { require_secret: false, }]; - router.register(channel, endpoints, None, None).await; + router.register(channel, endpoints, None, None, None).await; router.register_hmac_secret("slack", "signing-secret").await; // Secret should exist @@ -864,7 +1083,7 @@ mod tests { let router = WasmChannelRouter::new(); let channel = create_test_channel("discord"); - router.register(channel, vec![], None, None).await; + router.register(channel, vec![], None, None, None).await; let fake_pub_key = "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2"; router @@ -880,7 +1099,7 @@ mod tests { async fn test_no_signature_key_returns_none() { let router = WasmChannelRouter::new(); let channel = create_test_channel("slack"); - router.register(channel, vec![], None, None).await; + router.register(channel, vec![], None, None, None).await; // Slack has no signature key registered let key = router.get_signature_key("slack").await; @@ -899,7 +1118,7 @@ mod tests { require_secret: false, }]; - router.register(channel, endpoints, None, None).await; + router.register(channel, endpoints, None, None, None).await; // Use a valid 32-byte Ed25519 key for this test let valid_key = "d75a980182b10ab7d54bfed3c964073a0ee172f3daa3f4a18446b7e8c7ac6602"; router @@ -923,7 +1142,7 @@ mod tests { async fn test_register_valid_signature_key_succeeds() { let router = WasmChannelRouter::new(); let channel = create_test_channel("discord"); - router.register(channel, vec![], None, None).await; + router.register(channel, vec![], None, None, None).await; // Valid 32-byte Ed25519 public key (from test keypair) let valid_key = "d75a980182b10ab7d54bfed3c964073a0ee172f3daa3f4a18446b7e8c7ac6602"; @@ -935,7 +1154,7 @@ mod tests { async fn test_register_invalid_hex_key_fails() { let router = WasmChannelRouter::new(); let channel = create_test_channel("discord"); - router.register(channel, vec![], None, None).await; + router.register(channel, vec![], None, None, None).await; let result = router .register_signature_key("discord", "not-valid-hex-zzz") @@ -947,7 +1166,7 @@ mod tests { async fn test_register_wrong_length_key_fails() { let router = WasmChannelRouter::new(); let channel = create_test_channel("discord"); - router.register(channel, vec![], None, None).await; + router.register(channel, vec![], None, None, None).await; // 16 bytes instead of 32 let short_key = hex::encode([0u8; 16]); @@ -959,7 +1178,7 @@ mod tests { async fn test_register_empty_key_fails() { let router = WasmChannelRouter::new(); let channel = create_test_channel("discord"); - router.register(channel, vec![], None, None).await; + router.register(channel, vec![], None, None, None).await; let result = router.register_signature_key("discord", "").await; assert!(result.is_err(), "Empty key should be rejected"); @@ -969,7 +1188,7 @@ mod tests { async fn test_valid_key_is_retrievable() { let router = WasmChannelRouter::new(); let channel = create_test_channel("discord"); - router.register(channel, vec![], None, None).await; + router.register(channel, vec![], None, None, None).await; let valid_key = "d75a980182b10ab7d54bfed3c964073a0ee172f3daa3f4a18446b7e8c7ac6602"; router @@ -985,7 +1204,7 @@ mod tests { async fn test_invalid_key_does_not_store() { let router = WasmChannelRouter::new(); let channel = create_test_channel("discord"); - router.register(channel, vec![], None, None).await; + router.register(channel, vec![], None, None, None).await; // Attempt to register invalid key let _ = router @@ -1019,7 +1238,9 @@ mod tests { require_secret: false, }]; - wasm_router.register(channel, endpoints, None, None).await; + wasm_router + .register(channel, endpoints, None, None, None) + .await; let app = create_wasm_channel_router(wasm_router.clone(), None); (wasm_router, app) @@ -1246,7 +1467,13 @@ mod tests { // Register with BOTH secret and signature key wasm_router - .register(channel, endpoints, Some("my-secret".to_string()), None) + .register( + channel, + endpoints, + Some("my-secret".to_string()), + None, + None, + ) .await; let signing_key = test_signing_key(); @@ -1304,7 +1531,9 @@ mod tests { require_secret: false, }]; - wasm_router.register(channel, endpoints, None, None).await; + wasm_router + .register(channel, endpoints, None, None, None) + .await; let app = create_wasm_channel_router(wasm_router.clone(), None); (wasm_router, app) @@ -1500,4 +1729,136 @@ mod tests { "Signature with mismatched timestamp should return 401" ); } + + // ── Verification Mode Tests ───────────────────────────────────────── + + /// Helper to create a router with a WhatsApp-style channel using query_param verification. + async fn setup_whatsapp_router() -> (Arc, AxumRouter) { + let wasm_router = Arc::new(WasmChannelRouter::new()); + let channel = create_test_channel("whatsapp"); + + // Register with verification_mode = "query_param" for GET webhook verification + let endpoints = vec![RegisteredEndpoint { + channel_name: "whatsapp".to_string(), + path: "/webhook/whatsapp".to_string(), + methods: vec!["GET".to_string(), "POST".to_string()], + require_secret: true, + }]; + + wasm_router + .register( + channel, + endpoints, + Some("verify_token_123".to_string()), + Some("X-Hub-Signature-256".to_string()), + Some("query_param".to_string()), + ) + .await; + + let app = create_wasm_channel_router(wasm_router.clone(), None); + (wasm_router, app) + } + + #[tokio::test] + async fn test_get_request_with_query_param_mode_skips_secret_check() { + let (_wasm_router, app) = setup_whatsapp_router().await; + + // GET request without secret header but with query param + // In query_param mode, the WASM channel validates the hub.verify_token + let req = Request::builder() + .method("GET") + .uri("/webhook/whatsapp?hub.mode=subscribe&hub.challenge=test&hub.verify_token=verify_token_123") + .body(Body::empty()) + .unwrap(); + + let resp = app.oneshot(req).await.unwrap(); + // Should NOT be 401 - query_param mode skips host-level secret validation + // (may be 500 since no real WASM module, but not auth failure) + assert_ne!( + resp.status(), + StatusCode::UNAUTHORIZED, + "GET request with query_param mode should skip host-level secret validation" + ); + } + + #[tokio::test] + async fn test_post_request_with_query_param_mode_still_requires_hmac() { + let (wasm_router, app) = setup_whatsapp_router().await; + + // Register HMAC secret for POST verification + wasm_router + .register_hmac_secret("whatsapp", "app_secret_123") + .await; + + // POST request without HMAC signature should fail + let req = Request::builder() + .method("POST") + .uri("/webhook/whatsapp") + .header("content-type", "application/json") + .body(Body::from(r#"{"entry":[]}"#)) + .unwrap(); + + let resp = app.oneshot(req).await.unwrap(); + // Should be 401 - HMAC is required for POST even in query_param mode + assert_eq!( + resp.status(), + StatusCode::UNAUTHORIZED, + "POST request without HMAC signature should return 401" + ); + } + + // ── Webhook ACK Mechanism Tests ───────────────────────────────────── + + #[tokio::test] + async fn test_register_and_ack_message() { + let router = WasmChannelRouter::new(); + let channel = create_test_channel("test"); + + router.register(channel, vec![], None, None, None).await; + + // Register pending ACK + let key = "test:message123".to_string(); + let rx = router.register_pending_ack(key.clone()).await; + + // ACK the message with metadata + let metadata = r#"{"phone_number_id":"123","message_id":"msg456"}"#; + router.ack_message(&key, metadata).await; + + // Receiver should be signaled + let result = rx.await; + assert!(result.is_ok(), "ACK receiver should be signaled"); + } + + #[tokio::test] + async fn test_ack_nonexistent_key_is_safe() { + let router = WasmChannelRouter::new(); + + // ACK a key that was never registered (should not panic) + router.ack_message("nonexistent:key", "{}").await; + } + + #[tokio::test] + async fn test_unregister_clears_pending_acks() { + let router = WasmChannelRouter::new(); + let channel = create_test_channel("test"); + + router.register(channel, vec![], None, None, None).await; + + // Register pending ACK + let key = "test:message123".to_string(); + let rx = router.register_pending_ack(key.clone()).await; + + // Unregister the channel + router.unregister("test").await; + + // ACK the message (should be no-op since channel was unregistered) + router.ack_message(&key, "{}").await; + + // Receiver should NOT be signaled (sender was dropped during retain) + let result = rx.await; + assert!( + result.is_err(), + "ACK receiver should not be signaled after unregister" + ); + } } diff --git a/src/channels/wasm/schema.rs b/src/channels/wasm/schema.rs index b508142661..820d2bbdab 100644 --- a/src/channels/wasm/schema.rs +++ b/src/channels/wasm/schema.rs @@ -185,6 +185,20 @@ impl ChannelCapabilitiesFile { .and_then(|w| w.secret_name.clone()) .unwrap_or_else(|| format!("{}_webhook_secret", self.name)) } + + /// Get the webhook verification mode for this channel. + /// + /// Returns the verification mode declared in `webhook.verification_mode`: + /// - None/default: Require secret header for all requests + /// - "query_param": Skip host-level secret validation for GET, WASM validates via query param + /// - "signature": Always require signature validation + pub fn webhook_verification_mode(&self) -> Option<&str> { + self.capabilities + .channel + .as_ref() + .and_then(|c| c.webhook.as_ref()) + .and_then(|w| w.verification_mode.as_deref()) + } } /// Schema for channel capabilities. @@ -302,6 +316,14 @@ pub struct WebhookSchema { /// Secret name in secrets store for HMAC-SHA256 signing (Slack-style). #[serde(default)] pub hmac_secret_name: Option, + + /// How to handle GET request validation: + /// - None/default: Require secret header for all requests (current behavior) + /// - "query_param": Skip host-level secret validation for GET requests; + /// the WASM module validates via query param (e.g., WhatsApp hub.verify_token) + /// - "signature": Always require signature validation (for Discord-style Ed25519) + #[serde(default)] + pub verification_mode: Option, } /// Setup configuration schema. @@ -806,4 +828,38 @@ mod tests { "discord_public_key must be in the secrets allowlist" ); } + + #[test] + fn test_webhook_verification_mode_parsing() { + let json = r#"{ + "name": "test", + "capabilities": { + "channel": { + "webhook": { + "verification_mode": "query_param" + } + } + } + }"#; + + let cap: ChannelCapabilitiesFile = serde_json::from_str(json).unwrap(); + assert_eq!(cap.webhook_verification_mode(), Some("query_param")); + } + + #[test] + fn test_webhook_hmac_secret_name_parsing() { + let json = r#"{ + "name": "test", + "capabilities": { + "channel": { + "webhook": { + "hmac_secret_name": "whatsapp_app_secret" + } + } + } + }"#; + + let cap: ChannelCapabilitiesFile = serde_json::from_str(json).unwrap(); + assert_eq!(cap.hmac_secret_name(), Some("whatsapp_app_secret")); + } } diff --git a/src/channels/wasm/setup.rs b/src/channels/wasm/setup.rs index 2b9703dc6f..91007a054c 100644 --- a/src/channels/wasm/setup.rs +++ b/src/channels/wasm/setup.rs @@ -140,6 +140,9 @@ async fn register_channel( let secret_header = loaded.webhook_secret_header().map(|s| s.to_string()); + // Extract verification mode before moving loaded.channel + let verification_mode = loaded.webhook_verification_mode(); + let webhook_path = format!("/webhook/{}", channel_name); let endpoints = vec![RegisteredEndpoint { channel_name: channel_name.clone(), @@ -216,6 +219,7 @@ async fn register_channel( endpoints, webhook_secret.clone(), secret_header, + verification_mode, ) .await; diff --git a/src/channels/wasm/signature.rs b/src/channels/wasm/signature.rs index 2253bff570..51f9b86fbb 100644 --- a/src/channels/wasm/signature.rs +++ b/src/channels/wasm/signature.rs @@ -1,11 +1,19 @@ -//! Webhook signature verification (Discord Ed25519 and Slack HMAC-SHA256). +//! Webhook signature verification (Discord Ed25519, Slack HMAC-SHA256, WhatsApp HMAC-SHA256). //! //! Validates request signatures for incoming webhooks: //! - Discord: `X-Signature-Ed25519` and `X-Signature-Timestamp` headers //! - Slack: `X-Slack-Signature` and `X-Slack-Request-Timestamp` headers +//! - WhatsApp: `X-Hub-Signature-256` header (simple body-only HMAC) //! //! See: //! See: +//! See: + +use hmac::{Hmac, Mac}; +use sha2::Sha256; +use subtle::ConstantTimeEq; + +type HmacSha256 = Hmac; /// Verify a Discord interaction signature. /// @@ -52,6 +60,47 @@ pub fn verify_discord_signature( verifying_key.verify_strict(&message, &signature).is_ok() } +/// Verify HMAC-SHA256 signature (WhatsApp style, simple body-only). +/// +/// # Arguments +/// * `secret` - The HMAC secret (App Secret) +/// * `signature_header` - Value from X-Hub-Signature-256 header (format: "sha256=") +/// * `body` - Raw request body bytes +/// +/// # Returns +/// `true` if signature is valid, `false` otherwise +pub fn verify_hmac_sha256(secret: &str, signature_header: &str, body: &[u8]) -> bool { + // Parse header format: "sha256=" + let Some(hex_signature) = signature_header.strip_prefix("sha256=") else { + return false; + }; + + // Decode expected signature + let Ok(expected_sig) = hex::decode(hex_signature) else { + return false; + }; + + // SHA-256 produces 32-byte signatures - reject wrong lengths early + if expected_sig.len() != 32 { + return false; + } + + // Compute HMAC-SHA256 + let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) { + Ok(m) => m, + Err(_) => return false, + }; + mac.update(body); + let result = mac.finalize(); + let computed_sig = result.into_bytes(); + + // Constant-time comparison to prevent timing attacks + computed_sig + .as_slice() + .ct_eq(expected_sig.as_slice()) + .into() +} + /// Verify a Slack webhook signature using HMAC-SHA256. /// /// Slack signs each webhook request with HMAC-SHA256 using: @@ -69,9 +118,6 @@ pub fn verify_slack_signature( signature_header: &str, now_secs: i64, ) -> bool { - use hmac::{Hmac, Mac}; - use sha2::Sha256; - // 1. Parse and check staleness (5-minute window) let ts: i64 = match timestamp.parse() { Ok(v) => v, @@ -89,7 +135,7 @@ pub fn verify_slack_signature( basestring.extend_from_slice(body); // 3. Compute HMAC-SHA256 - let mut mac = match Hmac::::new_from_slice(signing_secret.as_bytes()) { + let mut mac = match HmacSha256::new_from_slice(signing_secret.as_bytes()) { Ok(m) => m, Err(_) => return false, }; @@ -99,7 +145,6 @@ pub fn verify_slack_signature( let expected = format!("v0={}", computed_hex); // 4. Constant-time compare (avoids timing side-channels) - use subtle::ConstantTimeEq; expected .as_bytes() .ct_eq(signature_header.as_bytes()) @@ -116,11 +161,7 @@ pub fn verify_hmac_sha256_prefixed( signature_header: &str, prefix: &str, ) -> bool { - use hmac::{Hmac, Mac}; - use sha2::Sha256; - use subtle::ConstantTimeEq; - - let mut mac = match Hmac::::new_from_slice(secret.as_bytes()) { + let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) { Ok(m) => m, Err(_) => return false, }; @@ -700,4 +741,76 @@ mod tests { "Empty timestamp should be rejected" ); } + + // ── Category: HMAC-SHA256 Signature Verification (WhatsApp/Meta) ──────────── + + /// Helper: compute HMAC-SHA256 signature in WhatsApp/Meta format (`sha256=`). + fn compute_whatsapp_style_hmac_signature(secret: &str, body: &[u8]) -> String { + use hmac::Mac; + let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).unwrap(); + mac.update(body); + let result = mac.finalize(); + format!("sha256={}", hex::encode(result.into_bytes())) + } + + #[test] + fn test_hmac_valid_signature_succeeds() { + let secret = "my_app_secret"; + let body = br#"{"entry":[{"id":"123"}]}"#; + let sig_header = compute_whatsapp_style_hmac_signature(secret, body); + + assert!( + verify_hmac_sha256(secret, &sig_header, body), + "Valid HMAC signature should verify" + ); + } + + #[test] + fn test_hmac_wrong_secret_fails() { + let secret = "correct_secret"; + let wrong_secret = "wrong_secret"; + let body = br#"{"test":"data"}"#; + let sig_header = compute_whatsapp_style_hmac_signature(secret, body); + + assert!( + !verify_hmac_sha256(wrong_secret, &sig_header, body), + "Signature with wrong secret should fail" + ); + } + + #[test] + fn test_hmac_tampered_body_fails() { + let secret = "my_secret"; + let body = br#"original body"#; + let tampered = br#"tampered body"#; + let sig_header = compute_whatsapp_style_hmac_signature(secret, body); + + assert!( + !verify_hmac_sha256(secret, &sig_header, tampered), + "Tampered body should fail verification" + ); + } + + #[test] + fn test_hmac_invalid_header_format_fails() { + let secret = "secret"; + let body = br#"data"#; + + assert!(!verify_hmac_sha256(secret, "invalid", body)); + assert!(!verify_hmac_sha256(secret, "sha256=not_hex!", body)); + assert!(!verify_hmac_sha256(secret, "", body)); + } + + #[test] + fn test_hmac_wrong_length_fails() { + let secret = "secret"; + let body = br#"data"#; + // 16 bytes instead of 32 + let short_sig = format!("sha256={}", "a".repeat(16)); + + assert!( + !verify_hmac_sha256(secret, &short_sig, body), + "Wrong-length signature should fail" + ); + } } diff --git a/src/channels/wasm/wrapper.rs b/src/channels/wasm/wrapper.rs index 6ca798318c..ddbfd2c4c8 100644 --- a/src/channels/wasm/wrapper.rs +++ b/src/channels/wasm/wrapper.rs @@ -37,7 +37,7 @@ use tokio::sync::{RwLock, mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; use uuid::Uuid; use wasmtime::Store; -use wasmtime::component::Linker; +use wasmtime::component::{Linker, TypedFunc}; use wasmtime_wasi::{ResourceTable, WasiCtx, WasiCtxBuilder, WasiView}; use crate::channels::wasm::capabilities::ChannelCapabilities; @@ -1140,6 +1140,91 @@ impl WasmChannel { Ok(instance) } + /// Call the optional on_message_persisted callback using raw component model API. + /// + /// This function: + /// 1. Instantiates the component with raw Instance access + /// 2. Tries to get the optional `on-message-persisted` function + /// 3. If it exists, calls it with the metadata_json argument + /// 4. Returns Ok(()) if the export doesn't exist (backward compatibility) + /// + /// This is best-effort - errors are logged but don't propagate to block ACKs. + #[allow(clippy::too_many_arguments)] + fn call_optional_persistence_callback( + runtime: &WasmChannelRuntime, + prepared: &PreparedChannelModule, + store: &mut Store, + metadata_json: &str, + ) -> Result<(), WasmChannelError> { + let engine = runtime.engine(); + + // Get the compiled component + let component = prepared + .component() + .ok_or_else(|| { + WasmChannelError::Compilation("No compiled component available".to_string()) + })? + .clone(); + + // Create linker and add host functions + let mut linker = Linker::new(engine); + Self::add_host_functions(&mut linker)?; + + // Instantiate with raw Instance access (use reborrow to avoid moving store) + let instance = linker.instantiate(&mut *store, &component).map_err(|e| { + let msg = e.to_string(); + if msg.contains("near:agent") || msg.contains("import") { + WasmChannelError::Instantiation(format!( + "{msg}. This may indicate a WIT version mismatch — \ + the channel was compiled against a different WIT than the host supports \ + (host WIT: {}). Rebuild the channel against the current WIT.", + crate::tools::wasm::WIT_CHANNEL_VERSION + )) + } else { + WasmChannelError::Instantiation(msg) + } + })?; + + // The optional export function name in WIT format + // Format: "[interface-name]::[function-name]" + // For "near:agent/channel-persistence" interface with "on-message-persisted" function: + const PERSISTENCE_FUNC: &str = "near:agent/channel-persistence::on-message-persisted"; + + // Try to get the optional function - returns None if not exported (backward compatible) + // Component model uses tuples for params/results: (String,) -> (Result<(), String>,) + let typed_func: TypedFunc<(String,), (Result<(), String>,)> = + match instance.get_typed_func(&mut *store, PERSISTENCE_FUNC) { + Ok(func) => func, + Err(_) => { + // Channel doesn't export the optional function - backward compatible + tracing::trace!( + channel = %prepared.name, + "on_message_persisted callback not supported (function not found)" + ); + return Ok(()); + } + }; + + // Call the function with the metadata_json argument + let (result,) = typed_func + .call(&mut *store, (metadata_json.to_string(),)) + .map_err(|e| WasmChannelError::Trapped { + name: prepared.name.clone(), + reason: e.to_string(), + })?; + + // Handle the result + if let Err(e) = result { + tracing::warn!( + channel = %prepared.name, + error = %e, + "on_message_persisted callback returned error (best-effort)" + ); + } + + Ok(()) + } + /// Map WASM execution errors to our error types. fn map_wasm_error(e: anyhow::Error, name: &str, fuel_limit: u64) -> WasmChannelError { let error_str = e.to_string(); @@ -1290,6 +1375,10 @@ impl WasmChannel { /// Execute the on_http_request callback. /// /// Called when an HTTP request arrives at a registered endpoint. + /// + /// Returns the HTTP response and a list of (message_id, metadata) tuples for + /// messages that were emitted during processing. This enables ACK tracking + /// for reliable webhook processing. pub async fn call_on_http_request( &self, method: &str, @@ -1298,7 +1387,7 @@ impl WasmChannel { query: &HashMap, body: &[u8], secret_validated: bool, - ) -> Result { + ) -> Result<(HttpResponse, Vec<(uuid::Uuid, serde_json::Value)>), WasmChannelError> { tracing::info!( channel = %self.name, method = method, @@ -1334,7 +1423,7 @@ impl WasmChannel { path = path, "WASM channel on_http_request called (no WASM module)" ); - return Ok(HttpResponse::ok()); + return Ok((HttpResponse::ok(), Vec::new())); } let runtime = Arc::clone(&self.runtime); @@ -1410,16 +1499,17 @@ impl WasmChannel { let channel_name = self.name.clone(); match result { Ok(Ok((response, mut host_state))) => { - // Process emitted messages + // Process emitted messages and collect (message_id, metadata) for ACK tracking let emitted = host_state.take_emitted_messages(); - self.process_emitted_messages(emitted).await?; + let emitted_info = self.process_emitted_messages(emitted).await?; tracing::debug!( channel = %channel_name, status = response.status, + emitted_count = emitted_info.len(), "WASM channel on_http_request completed" ); - Ok(response) + Ok((response, emitted_info)) } Ok(Err(e)) => Err(e), Err(_) => Err(WasmChannelError::Timeout { @@ -1864,6 +1954,85 @@ impl WasmChannel { } } + /// Execute the on_message_persisted callback. + /// + /// Called after a message has been successfully persisted to the database. + /// Channels can use this for follow-up actions like WhatsApp mark_as_read. + /// + /// Returns Ok(()) even on failure - this is best-effort and should not block ACKs. + pub async fn call_on_message_persisted( + &self, + metadata_json: &str, + ) -> Result<(), WasmChannelError> { + // If no WASM bytes, return Ok (for testing) + if self.prepared.component().is_none() { + tracing::debug!( + channel = %self.name, + "on_message_persisted called (no WASM module)" + ); + return Ok(()); + } + + let runtime = Arc::clone(&self.runtime); + let prepared = Arc::clone(&self.prepared); + let capabilities = Self::inject_workspace_reader(&self.capabilities, &self.workspace_store); + let timeout = self.runtime.config().callback_timeout; + let credentials = self.get_credentials().await; + let host_credentials = resolve_channel_host_credentials( + &self.capabilities, + self.secrets_store.as_deref(), + &self.owner_scope_id, + ) + .await; + let pairing_store = self.pairing_store.clone(); + let metadata_json = metadata_json.to_string(); + let channel_name = self.name.clone(); + + let result = tokio::time::timeout(timeout, async move { + tokio::task::spawn_blocking(move || { + let mut store = Self::create_store( + &runtime, + &prepared, + &capabilities, + credentials, + host_credentials, + pairing_store, + )?; + + // Try to call the optional on_message_persisted callback + // Returns Ok(()) if the export doesn't exist (backward compatibility) + Self::call_optional_persistence_callback( + &runtime, + &prepared, + &mut store, + &metadata_json, + ) + }) + .await + .map_err(|e| WasmChannelError::ExecutionPanicked { + name: channel_name, + reason: e.to_string(), + })? + }) + .await; + + match result { + Ok(Ok(())) => { + tracing::debug!(channel = %self.name, "on_message_persisted completed"); + Ok(()) + } + Ok(Err(e)) => { + // Log but don't fail - this is best-effort + tracing::warn!(channel = %self.name, error = %e, "on_message_persisted failed"); + Ok(()) + } + Err(_timeout) => { + tracing::warn!(channel = %self.name, "on_message_persisted timed out"); + Ok(()) + } + } + } + /// Execute a single on_status callback with a fresh WASM instance. /// /// Static method for use by the background typing repeat task (which @@ -2137,10 +2306,12 @@ impl WasmChannel { } /// Process emitted messages from a callback. + /// + /// Returns a vector of (message_id, metadata) for ACK tracking. async fn process_emitted_messages( &self, messages: Vec, - ) -> Result<(), WasmChannelError> { + ) -> Result, WasmChannelError> { tracing::info!( channel = %self.name, message_count = messages.len(), @@ -2149,7 +2320,7 @@ impl WasmChannel { if messages.is_empty() { tracing::debug!(channel = %self.name, "No messages emitted"); - return Ok(()); + return Ok(Vec::new()); } // Clone sender to avoid holding RwLock read guard across send().await in the loop @@ -2161,11 +2332,13 @@ impl WasmChannel { count = messages.len(), "Messages emitted but no sender available - channel may not be started!" ); - return Ok(()); + return Ok(Vec::new()); }; tx.clone() }; + let mut emitted_info = Vec::new(); + for emitted in messages { // Check rate limit — acquire and release the write lock before send().await { @@ -2191,6 +2364,7 @@ impl WasmChannel { let mut msg = IncomingMessage::new(&self.name, &resolved_user_id, &emitted.content) .with_owner_id(&self.owner_scope_id) .with_sender_id(&emitted.user_id); + let message_id = msg.id; // Capture ID before moving msg (for ACK tracking) if let Some(name) = emitted.user_name { msg = msg.with_user_name(name); @@ -2228,6 +2402,9 @@ impl WasmChannel { self.update_broadcast_metadata(&emitted.metadata_json).await; } + // Extract metadata for ACK mechanism (after apply_emitted_metadata) + let metadata = msg.metadata.clone(); + // Send to stream — no locks held across this await tracing::info!( channel = %self.name, @@ -2249,9 +2426,12 @@ impl WasmChannel { channel = %self.name, "Message successfully sent to agent queue" ); + + // Track for ACK mechanism + emitted_info.push((message_id, metadata)); } - Ok(()) + Ok(emitted_info) } /// Start the polling loop if configured. diff --git a/src/db/postgres.rs b/src/db/postgres.rs index 8c18e25288..05ccdef0a1 100644 --- a/src/db/postgres.rs +++ b/src/db/postgres.rs @@ -707,3 +707,7 @@ impl WorkspaceStore for PgBackend { .await } } + +// ==================== Tests ==================== +// Webhook dedup tests are in the libsql module which supports in-memory testing. +// PostgreSQL tests require a running database with migrations applied. diff --git a/src/extensions/manager.rs b/src/extensions/manager.rs index 00d787a5a3..4ee3f050cf 100644 --- a/src/extensions/manager.rs +++ b/src/extensions/manager.rs @@ -3594,6 +3594,7 @@ impl ExtensionManager { endpoints, webhook_secret, secret_header, + None, // verification_mode - not supported for hot-activated channels yet ) .await; tracing::info!(channel = %channel_name, "Registered hot-activated channel with webhook router"); diff --git a/src/hooks/bundled.rs b/src/hooks/bundled.rs index 9ca1fe9299..6aff04af50 100644 --- a/src/hooks/bundled.rs +++ b/src/hooks/bundled.rs @@ -21,13 +21,14 @@ const DEFAULT_WEBHOOK_TIMEOUT_MS: u64 = 2000; const DEFAULT_WEBHOOK_MAX_IN_FLIGHT: usize = 32; const MAX_HOOK_TIMEOUT_MS: u64 = 30_000; -const ALL_HOOK_POINTS: [HookPoint; 6] = [ +const ALL_HOOK_POINTS: [HookPoint; 7] = [ HookPoint::BeforeInbound, HookPoint::BeforeToolCall, HookPoint::BeforeOutbound, HookPoint::OnSessionStart, HookPoint::OnSessionEnd, HookPoint::TransformResponse, + HookPoint::OnMessagePersisted, ]; /// Errors while parsing or compiling declarative hook bundles. @@ -515,6 +516,11 @@ enum OutboundWebhookEventSummary { ResponseTransform { response_length: usize, }, + MessagePersisted { + user_id: String, + channel: String, + message_id: String, + }, } #[async_trait] @@ -634,6 +640,16 @@ fn summarize_webhook_event(event: &HookEvent) -> OutboundWebhookEventSummary { response_length: response.len(), } } + HookEvent::MessagePersisted { + user_id, + channel, + message_id, + .. + } => OutboundWebhookEventSummary::MessagePersisted { + user_id: user_id.clone(), + channel: channel.clone(), + message_id: message_id.clone(), + }, } } @@ -879,7 +895,8 @@ fn event_user_id(event: &HookEvent) -> &str { | HookEvent::Outbound { user_id, .. } | HookEvent::SessionStart { user_id, .. } | HookEvent::SessionEnd { user_id, .. } - | HookEvent::ResponseTransform { user_id, .. } => user_id, + | HookEvent::ResponseTransform { user_id, .. } + | HookEvent::MessagePersisted { user_id, .. } => user_id, } } @@ -893,6 +910,11 @@ fn extract_primary_content(event: &HookEvent) -> String { session_id.clone() } HookEvent::ResponseTransform { response, .. } => response.clone(), + HookEvent::MessagePersisted { + channel, + message_id, + .. + } => format!("{}:{}", channel, message_id), } } diff --git a/src/hooks/hook.rs b/src/hooks/hook.rs index 9c5670f3d7..67cee0015d 100644 --- a/src/hooks/hook.rs +++ b/src/hooks/hook.rs @@ -21,6 +21,10 @@ pub enum HookPoint { OnSessionEnd, /// Transform the final response before completing a turn. TransformResponse, + /// After a user message is persisted to the database. + /// Fired after successful DB persistence. Used by WASM channels to signal + /// ACK for webhooks (e.g., WhatsApp mark_as_read callback). + OnMessagePersisted, } impl HookPoint { @@ -33,6 +37,7 @@ impl HookPoint { HookPoint::OnSessionStart => "onSessionStart", HookPoint::OnSessionEnd => "onSessionEnd", HookPoint::TransformResponse => "transformResponse", + HookPoint::OnMessagePersisted => "onMessagePersisted", } } } @@ -72,6 +77,13 @@ pub enum HookEvent { thread_id: String, response: String, }, + /// A user message was persisted to the database. + MessagePersisted { + user_id: String, + channel: String, + message_id: String, + metadata: serde_json::Value, + }, } impl HookEvent { @@ -84,6 +96,7 @@ impl HookEvent { HookEvent::SessionStart { .. } => HookPoint::OnSessionStart, HookEvent::SessionEnd { .. } => HookPoint::OnSessionEnd, HookEvent::ResponseTransform { .. } => HookPoint::TransformResponse, + HookEvent::MessagePersisted { .. } => HookPoint::OnMessagePersisted, } } @@ -108,6 +121,9 @@ impl HookEvent { HookEvent::SessionStart { .. } | HookEvent::SessionEnd { .. } => { // Session events don't have modifiable content } + HookEvent::MessagePersisted { .. } => { + // MessagePersisted events don't have modifiable content + } } } } diff --git a/src/hooks/registry.rs b/src/hooks/registry.rs index d20788bbd6..13058631b4 100644 --- a/src/hooks/registry.rs +++ b/src/hooks/registry.rs @@ -179,6 +179,13 @@ fn extract_content(event: &HookEvent) -> String { HookEvent::SessionStart { session_id, .. } | HookEvent::SessionEnd { session_id, .. } => { session_id.clone() } + HookEvent::MessagePersisted { + channel, + message_id, + .. + } => { + format!("{}:{}", channel, message_id) + } } } diff --git a/src/main.rs b/src/main.rs index 745cae09b4..44df32379e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -349,6 +349,14 @@ async fn async_main() -> anyhow::Result<()> { } } + // Register WASM message persisted hook if WASM channels are loaded + if let Some(ref state) = wasm_channel_runtime_state { + use ironclaw::channels::wasm::MessagePersistedHook; + let hook = std::sync::Arc::new(MessagePersistedHook::new(Arc::clone(&state.2))); + components.hooks.register(hook).await; + tracing::debug!("Registered MessagePersisted hook for WASM channel ACK signaling"); + } + // Add Signal channel if configured and not CLI-only mode. if !cli.cli_only && let Some(ref signal_config) = config.channels.signal diff --git a/src/tools/wasm/mod.rs b/src/tools/wasm/mod.rs index 1998e801b6..f9981dc17e 100644 --- a/src/tools/wasm/mod.rs +++ b/src/tools/wasm/mod.rs @@ -77,10 +77,10 @@ /// /// Extensions declaring a `wit_version` in their capabilities file are checked /// against this at load time: same major, not greater than host. -pub const WIT_TOOL_VERSION: &str = "0.3.0"; +pub const WIT_TOOL_VERSION: &str = "0.4.0"; /// Host WIT version for channel extensions. -pub const WIT_CHANNEL_VERSION: &str = "0.3.0"; +pub const WIT_CHANNEL_VERSION: &str = "0.4.0"; mod allowlist; mod capabilities; diff --git a/tests/telegram_auth_integration.rs b/tests/telegram_auth_integration.rs index 9299962b46..312db2b8ae 100644 --- a/tests/telegram_auth_integration.rs +++ b/tests/telegram_auth_integration.rs @@ -200,7 +200,7 @@ async fn test_group_message_unauthorized_user_blocked_with_allowlist() { .expect("HTTP callback failed"); // Should return 200 OK (always respond quickly to Telegram) - assert_eq!(response.status, 200); + assert_eq!(response.0.status, 200); // REGRESSION TEST: The fix ensures the message is dropped // Before the fix: group messages bypassed the allow_from check when owner_id=null @@ -252,7 +252,7 @@ async fn test_group_message_authorized_user_allowed() { .expect("HTTP callback failed"); // Should return 200 OK - assert_eq!(response.status, 200); + assert_eq!(response.0.status, 200); // REGRESSION TEST: Authorized users pass through the authorization check // The fix ensures that group messages now properly check allow_from when owner_id=null @@ -298,7 +298,7 @@ async fn test_private_message_with_owner_id_set_uses_guest_pairing_flow() { .await .expect("HTTP callback failed"); - assert_eq!(response.status, 200); + assert_eq!(response.0.status, 200); let pending = pairing_store .list_pending("telegram") @@ -351,7 +351,7 @@ async fn test_private_messages_use_chat_id_as_thread_scope() { .await .expect("HTTP callback failed"); - assert_eq!(response.status, 200); + assert_eq!(response.0.status, 200); let msg = timeout(Duration::from_secs(1), stream.next()) .await @@ -398,7 +398,7 @@ async fn test_private_message_without_owner_id_with_pairing_policy() { .await .expect("HTTP callback failed"); - assert_eq!(response.status, 200); + assert_eq!(response.0.status, 200); // REGRESSION TEST: Private messages with pairing policy still emit // (pairing and message emission are independent flows) @@ -444,7 +444,7 @@ async fn test_open_dm_policy_allows_all_users() { .await .expect("HTTP callback failed"); - assert_eq!(response.status, 200); + assert_eq!(response.0.status, 200); // REGRESSION TEST: Open policy should allow all users // With dm_policy="open", authorization checks are skipped for all users @@ -489,7 +489,7 @@ async fn test_bot_mention_detection_case_insensitive() { .await .expect("HTTP callback failed"); - assert_eq!(response.status, 200); + assert_eq!(response.0.status, 200); // REGRESSION TEST: Bot mentions should be case-insensitive // Case-insensitive detection allows @mybot and @MyBot to both trigger the bot diff --git a/tests/wasm_channel_integration.rs b/tests/wasm_channel_integration.rs index 7e05c0f397..dfc21fc500 100644 --- a/tests/wasm_channel_integration.rs +++ b/tests/wasm_channel_integration.rs @@ -72,7 +72,7 @@ mod router_tests { }]; router - .register(channel.clone(), endpoints, None, None) + .register(channel.clone(), endpoints, None, None, None) .await; // Verify channel is found by path @@ -97,7 +97,13 @@ mod router_tests { )); router - .register(channel, vec![], Some("my-secret-123".to_string()), None) + .register( + channel, + vec![], + Some("my-secret-123".to_string()), + None, + None, + ) .await; // Correct secret validates @@ -136,7 +142,7 @@ mod router_tests { require_secret: false, }]; - router.register(channel, endpoints, None, None).await; + router.register(channel, endpoints, None, None, None).await; // Channel exists assert!(router.get_channel_for_path("/webhook/temp").await.is_some()); @@ -168,7 +174,7 @@ mod router_tests { require_secret: false, }]; - router.register(channel, endpoints, None, None).await; + router.register(channel, endpoints, None, None, None).await; } // Verify all channels are registered @@ -232,7 +238,7 @@ mod channel_lifecycle_tests { .await .expect("HTTP callback failed"); - assert_eq!(response.status, 200); + assert_eq!(response.0.status, 200); // Cleanup channel.shutdown().await.expect("Shutdown failed"); @@ -455,3 +461,135 @@ mod message_emission_tests { assert_eq!(state.emits_dropped(), 1); } } + +mod hmac_signature_tests { + use super::*; + + /// Test HMAC-SHA256 signature verification (WhatsApp-style) + #[test] + fn test_verify_hmac_sha256_valid_signature() { + let secret = "my_app_secret"; + let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"id":"wamid.123","from":"15551234567","type":"text","text":{"body":"Hello"}}]}}]}]}"#; + + // Compute expected signature + let expected_sig = { + use hmac::{Hmac, Mac}; + use sha2::Sha256; + type HmacSha256 = Hmac; + let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).unwrap(); + mac.update(body); + let result = mac.finalize(); + format!("sha256={}", hex::encode(result.into_bytes())) + }; + + // Verify using the signature module + let result = + ironclaw::channels::wasm::signature::verify_hmac_sha256(secret, &expected_sig, body); + assert!(result, "Valid signature should verify"); + } + + #[test] + fn test_verify_hmac_sha256_wrong_secret() { + let secret = "my_app_secret"; + let wrong_secret = "wrong_secret"; + let body = br#"{"test": "data"}"#; + + // Compute signature with correct secret + let sig = { + use hmac::{Hmac, Mac}; + use sha2::Sha256; + type HmacSha256 = Hmac; + let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).unwrap(); + mac.update(body); + let result = mac.finalize(); + format!("sha256={}", hex::encode(result.into_bytes())) + }; + + // Verify with wrong secret should fail + let result = + ironclaw::channels::wasm::signature::verify_hmac_sha256(wrong_secret, &sig, body); + assert!(!result, "Wrong secret should fail verification"); + } + + #[test] + fn test_verify_hmac_sha256_tampered_body() { + let secret = "my_app_secret"; + let body = br#"{"test": "data"}"#; + let tampered = br#"{"test": "tampered"}"#; + + // Compute signature for original body + let sig = { + use hmac::{Hmac, Mac}; + use sha2::Sha256; + type HmacSha256 = Hmac; + let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).unwrap(); + mac.update(body); + let result = mac.finalize(); + format!("sha256={}", hex::encode(result.into_bytes())) + }; + + // Verify with tampered body should fail + let result = + ironclaw::channels::wasm::signature::verify_hmac_sha256(secret, &sig, tampered); + assert!(!result, "Tampered body should fail verification"); + } + + #[test] + fn test_verify_hmac_sha256_invalid_header_format() { + let secret = "my_app_secret"; + let body = br#"{"test": "data"}"#; + + // Invalid formats + assert!(!ironclaw::channels::wasm::signature::verify_hmac_sha256( + secret, "invalid", body + )); + assert!(!ironclaw::channels::wasm::signature::verify_hmac_sha256( + secret, + "sha256=not_hex!", + body + )); + assert!(!ironclaw::channels::wasm::signature::verify_hmac_sha256( + secret, "", body + )); + } + + /// Test that router properly registers and retrieves HMAC secrets + #[tokio::test] + async fn test_router_hmac_secret_registration() { + let router = WasmChannelRouter::new(); + let runtime = create_test_runtime(); + let channel = Arc::new(create_test_channel( + runtime, + "whatsapp", + vec!["/webhook/whatsapp"], + )); + + // Register channel with HMAC secret + router + .register( + channel, + vec![], + Some("verify_token_123".to_string()), + Some("X-Hub-Signature-256".to_string()), + Some("query_param".to_string()), + ) + .await; + + // Register HMAC secret separately (simulating setup.rs behavior) + router + .register_hmac_secret("whatsapp", "my_app_secret") + .await; + + // Verify HMAC secret is registered + assert_eq!( + router.get_hmac_secret("whatsapp").await, + Some("my_app_secret".to_string()) + ); + + // Verify verification mode is stored + assert_eq!( + router.get_verification_mode("whatsapp").await, + Some("query_param".to_string()) + ); + } +} diff --git a/tests/wit_compat.rs b/tests/wit_compat.rs index 4dcacf4eb1..1e47ec789d 100644 --- a/tests/wit_compat.rs +++ b/tests/wit_compat.rs @@ -214,9 +214,8 @@ fn instantiate_tool_component( // If the WIT added/removed/renamed a function, stub registration // or instantiation will fail. - // Register stubs for both versioned (0.3.0+) and unversioned (pre-0.3.0) interface - // paths so that both old and new WASM artifacts can instantiate. - for interface in &["near:agent/host", "near:agent/host@0.3.0"] { + // Register stubs for versioned (0.4.0) and unversioned interface paths. + for interface in &["near:agent/host", "near:agent/host@0.4.0"] { let mut root = linker.root(); if let Ok(mut host) = root.instance(interface) { stub_shared_host_functions(&mut host)?; @@ -252,9 +251,7 @@ fn instantiate_channel_component( wasmtime_wasi::add_to_linker_sync(&mut linker) .map_err(|e| format!("WASI linker failed: {e}"))?; - // Register stubs for both versioned (0.3.0+) and unversioned (pre-0.3.0) interface - // paths so that both old and new WASM artifacts can instantiate. - // Register stubs under both versioned and unversioned interface paths. + // Register stubs for versioned (0.4.0) and unversioned interface paths. // This helper avoids repeating the stub registration code. fn stub_channel_host( host: &mut wasmtime::component::LinkerInstance<'_, TestStoreData>, @@ -313,8 +310,8 @@ fn instantiate_channel_component( { let mut root = linker.root(); let mut host = root - .instance("near:agent/channel-host@0.3.0") - .map_err(|e| format!("failed to create versioned channel-host@0.3.0: {e}"))?; + .instance("near:agent/channel-host@0.4.0") + .map_err(|e| format!("failed to create versioned channel-host@0.4.0: {e}"))?; stub_channel_host(&mut host)?; } diff --git a/tools-src/github/github-tool.capabilities.json b/tools-src/github/github-tool.capabilities.json index 61bbd55fff..b24148ff78 100644 --- a/tools-src/github/github-tool.capabilities.json +++ b/tools-src/github/github-tool.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.1", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "capabilities": { "webhook": { "hmac_secret_name": "github_webhook_secret", diff --git a/tools-src/gmail/gmail-tool.capabilities.json b/tools-src/gmail/gmail-tool.capabilities.json index 2e11d32b7c..54a80a223c 100644 --- a/tools-src/gmail/gmail-tool.capabilities.json +++ b/tools-src/gmail/gmail-tool.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "http": { "allowlist": [ { diff --git a/tools-src/google-calendar/google-calendar-tool.capabilities.json b/tools-src/google-calendar/google-calendar-tool.capabilities.json index 15e756aeee..c2ccf906d3 100644 --- a/tools-src/google-calendar/google-calendar-tool.capabilities.json +++ b/tools-src/google-calendar/google-calendar-tool.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "http": { "allowlist": [ { diff --git a/tools-src/google-docs/google-docs-tool.capabilities.json b/tools-src/google-docs/google-docs-tool.capabilities.json index 7a365c1d07..2c60c389b1 100644 --- a/tools-src/google-docs/google-docs-tool.capabilities.json +++ b/tools-src/google-docs/google-docs-tool.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "http": { "allowlist": [ { diff --git a/tools-src/google-drive/google-drive-tool.capabilities.json b/tools-src/google-drive/google-drive-tool.capabilities.json index 5366793374..0ab617e853 100644 --- a/tools-src/google-drive/google-drive-tool.capabilities.json +++ b/tools-src/google-drive/google-drive-tool.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "http": { "allowlist": [ { diff --git a/tools-src/google-sheets/google-sheets-tool.capabilities.json b/tools-src/google-sheets/google-sheets-tool.capabilities.json index 624c43810e..f693b46852 100644 --- a/tools-src/google-sheets/google-sheets-tool.capabilities.json +++ b/tools-src/google-sheets/google-sheets-tool.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "http": { "allowlist": [ { diff --git a/tools-src/google-slides/google-slides-tool.capabilities.json b/tools-src/google-slides/google-slides-tool.capabilities.json index 17334bc09c..2d0d1fa915 100644 --- a/tools-src/google-slides/google-slides-tool.capabilities.json +++ b/tools-src/google-slides/google-slides-tool.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "http": { "allowlist": [ { diff --git a/tools-src/llm-context/llm-context-tool.capabilities.json b/tools-src/llm-context/llm-context-tool.capabilities.json index 72061eaa5d..04081a6b5c 100644 --- a/tools-src/llm-context/llm-context-tool.capabilities.json +++ b/tools-src/llm-context/llm-context-tool.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.1.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "capabilities": { "http": { "allowlist": [ diff --git a/tools-src/slack/slack-tool.capabilities.json b/tools-src/slack/slack-tool.capabilities.json index 8b9060d7ae..74a9e68f08 100644 --- a/tools-src/slack/slack-tool.capabilities.json +++ b/tools-src/slack/slack-tool.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "http": { "allowlist": [ { diff --git a/tools-src/telegram/telegram-tool.capabilities.json b/tools-src/telegram/telegram-tool.capabilities.json index 665baedd56..18ea5cf468 100644 --- a/tools-src/telegram/telegram-tool.capabilities.json +++ b/tools-src/telegram/telegram-tool.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "http": { "allowlist": [ { diff --git a/tools-src/web-search/web-search-tool.capabilities.json b/tools-src/web-search/web-search-tool.capabilities.json index 9c2559ab52..c02cbc5ed6 100644 --- a/tools-src/web-search/web-search-tool.capabilities.json +++ b/tools-src/web-search/web-search-tool.capabilities.json @@ -1,6 +1,6 @@ { "version": "0.2.0", - "wit_version": "0.3.0", + "wit_version": "0.4.0", "description": "Search the web using Brave Search. Returns titles, URLs, descriptions, and publication dates for matching web pages. Supports filtering by country, language, and freshness. Authentication is handled via the 'brave_api_key' secret injected by the host.", "parameters": { "type": "object", diff --git a/wit/channel.wit b/wit/channel.wit index c0eb451045..bc4b9d12f3 100644 --- a/wit/channel.wit +++ b/wit/channel.wit @@ -1,4 +1,4 @@ -package near:agent@0.3.0; +package near:agent@0.4.0; // WASM Channel Sandbox Interface // @@ -420,10 +420,38 @@ interface channel { on-shutdown: func(); } +/// Persistence callbacks for channels. +/// +/// All channels must implement this interface. Channels that don't need +/// post-persistence actions (e.g., Discord, Slack) can return Ok(()). +/// Channels like WhatsApp use this to call mark_as_read after persistence. +/// +/// Note: WIT does not support optional exports, so this interface is mandatory +/// in the world definition. Channels implement no-op versions if unused. +interface channel-persistence { + /// Called after a message has been persisted to the database. + /// + /// Channels can use this to perform follow-up actions like + /// calling external APIs (e.g., WhatsApp mark_as_read). + /// + /// Arguments: + /// - metadata-json: The metadata from the persisted message + /// + /// Returns: + /// - Ok: Post-persistence action completed successfully + /// - Err(string): Action failure message (does not block the ACK) + on-message-persisted: func(metadata-json: string) -> result<_, string>; +} + /// World definition for sandboxed channels. /// /// Channels import host capabilities and export the channel interface. +/// +/// Note: WIT does not support optional exports. All channels must implement +/// channel-persistence, even if just a no-op (return Ok(())). WhatsApp uses +/// this for mark_as_read; other channels return Ok(()) immediately. world sandboxed-channel { import channel-host; export channel; + export channel-persistence; } diff --git a/wit/tool.wit b/wit/tool.wit index cfe2b591af..a4ef994c45 100644 --- a/wit/tool.wit +++ b/wit/tool.wit @@ -1,4 +1,4 @@ -package near:agent@0.3.0; +package near:agent@0.4.0; // WASM Tool Sandbox Interface //