Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
563 changes: 188 additions & 375 deletions src/channels/relay/channel.rs

Large diffs are not rendered by default.

243 changes: 39 additions & 204 deletions src/channels/relay/client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
//! HTTP client for the channel-relay service.
//!
//! Wraps reqwest for all channel-relay API calls: OAuth initiation,
//! SSE streaming, token renewal, and Slack API proxy.
//! callback registration, and Slack API proxy.

Comment on lines 1 to 5
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::Stream;
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;

/// Known relay event types.
pub mod event_types {
Expand All @@ -18,7 +13,7 @@ pub mod event_types {
pub const MENTION: &str = "mention";
}

/// A parsed SSE event from the channel-relay stream.
/// A parsed event from the channel-relay webhook callback.
///
/// Field names match the channel-relay `ChannelEvent` struct exactly.
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -123,21 +118,13 @@ impl RelayClient {
///
/// Calls `GET /oauth/slack/auth` with `redirect(Policy::none())` and
/// returns the `Location` header (Slack OAuth URL) without following it.
pub async fn initiate_oauth(
&self,
instance_id: &str,
user_id: &str,
callback_url: &str,
) -> Result<String, RelayError> {
/// Initiate Slack OAuth. Channel-relay derives all URLs from the trusted
/// instance_url in chat-api. IronClaw supplies no URLs.
pub async fn initiate_oauth(&self) -> Result<String, RelayError> {
let resp = self
.http
.get(format!("{}/oauth/slack/auth", self.base_url))
.header("X-API-Key", self.api_key.expose_secret())
.query(&[
("instance_id", instance_id),
("user_id", user_id),
("callback", callback_url),
])
.bearer_auth(self.api_key.expose_secret())
.send()
.await
.map_err(|e| RelayError::Network(e.to_string()))?;
Expand Down Expand Up @@ -173,104 +160,71 @@ impl RelayClient {
}
}

/// Connect to the SSE event stream.
/// Proxy an API call through channel-relay for any provider.
///
/// Returns a stream of parsed `ChannelEvent`s and the `JoinHandle` of the
/// background SSE parser task. The caller is responsible for reconnection
/// logic on stream end/error and for aborting the handle on shutdown.
pub async fn connect_stream(
/// Calls `POST /proxy/{provider}/{method}?team_id=X&instance_id=Y` with the given JSON body.
/// Register a pending approval with channel-relay and receive an opaque token.
/// The token is embedded in Slack button values instead of routing fields.
pub async fn create_approval(
&self,
stream_token: &str,
stream_timeout_secs: u64,
) -> Result<(ChannelEventStream, tokio::task::JoinHandle<()>), RelayError> {
let resp = self
.http
.get(format!("{}/stream", self.base_url))
.query(&[("token", stream_token)])
.timeout(std::time::Duration::from_secs(stream_timeout_secs))
.send()
.await
.map_err(|e| RelayError::Network(e.to_string()))?;

let status = resp.status();
if status == reqwest::StatusCode::UNAUTHORIZED {
return Err(RelayError::TokenExpired);
}
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
return Err(RelayError::Api {
status: status.as_u16(),
message: body,
});
team_id: &str,
channel_id: &str,
thread_ts: Option<&str>,
request_id: &str,
sender_id: &str,
) -> Result<String, RelayError> {
let mut body = serde_json::json!({
"team_id": team_id,
"channel_id": channel_id,
"request_id": request_id,
"sender_id": sender_id,
});
if let Some(ts) = thread_ts {
body["thread_ts"] = serde_json::Value::String(ts.to_string());
}

// Spawn a background task that reads the SSE stream and sends parsed events
let (tx, rx) = mpsc::channel(64);
let byte_stream = resp.bytes_stream();
let handle = tokio::spawn(parse_sse_stream(byte_stream, tx));

Ok((ChannelEventStream { rx }, handle))
}

/// Renew an expired stream token.
///
/// Calls `POST /stream/renew` with API key auth, returns a new stream token.
pub async fn renew_token(
&self,
instance_id: &str,
user_id: &str,
) -> Result<String, RelayError> {
let resp = self
.http
.post(format!("{}/stream/renew", self.base_url))
.header("X-API-Key", self.api_key.expose_secret())
.json(&serde_json::json!({
"instance_id": instance_id,
"user_id": user_id,
}))
.post(format!("{}/approvals", self.base_url))
.bearer_auth(self.api_key.expose_secret())
.json(&body)
.send()
.await
.map_err(|e| RelayError::Network(e.to_string()))?;

let status = resp.status();
if !status.is_success() {
if !resp.status().is_success() {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
return Err(RelayError::Api {
status: status.as_u16(),
status,
message: body,
});
}

let body: serde_json::Value = resp
let result: serde_json::Value = resp
.json()
.await
.map_err(|e| RelayError::Protocol(e.to_string()))?;
body.get("stream_token")
.or_else(|| body.get("token"))

result
.get("approval_token")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.ok_or_else(|| RelayError::Protocol("Response missing stream_token field".to_string()))
.ok_or_else(|| RelayError::Protocol("missing approval_token in response".to_string()))
}

/// Proxy an API call through channel-relay for any provider.
///
/// Calls `POST /proxy/{provider}/{method}?team_id=X&instance_id=Y` with the given JSON body.
pub async fn proxy_provider(
&self,
provider: &str,
team_id: &str,
method: &str,
body: serde_json::Value,
instance_id: Option<&str>,
) -> Result<serde_json::Value, RelayError> {
let mut query: Vec<(&str, &str)> = vec![("team_id", team_id)];
if let Some(iid) = instance_id {
query.push(("instance_id", iid));
}
let query: Vec<(&str, &str)> = vec![("team_id", team_id)];
let resp = self
.http
.post(format!("{}/proxy/{}/{}", self.base_url, provider, method))
.header("X-API-Key", self.api_key.expose_secret())
.bearer_auth(self.api_key.expose_secret())
.query(&query)
.json(&body)
.send()
Expand All @@ -296,7 +250,7 @@ impl RelayClient {
let resp = self
.http
.get(format!("{}/connections", self.base_url))
.header("X-API-Key", self.api_key.expose_secret())
.bearer_auth(self.api_key.expose_secret())
.query(&[("instance_id", instance_id)])
.send()
.await
Expand All @@ -317,91 +271,6 @@ impl RelayClient {
}
}

/// Async stream of parsed channel events from SSE.
pub struct ChannelEventStream {
rx: mpsc::Receiver<ChannelEvent>,
}

impl Stream for ChannelEventStream {
type Item = ChannelEvent;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}

/// Parse SSE format from a reqwest bytes stream.
///
/// SSE format:
/// ```text
/// event: message
/// data: {"key": "value"}
///
/// ```
/// Blank line terminates an event.
async fn parse_sse_stream(
byte_stream: impl futures::Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
tx: mpsc::Sender<ChannelEvent>,
) {
use futures::StreamExt;

let mut buffer = Vec::<u8>::new();
let mut event_type = String::new();
let mut data_lines = Vec::new();

let mut byte_stream = std::pin::pin!(byte_stream);
while let Some(chunk_result) = byte_stream.next().await {
let chunk = match chunk_result {
Ok(c) => c,
Err(e) => {
tracing::debug!(error = %e, "SSE stream chunk error");
break;
}
};

buffer.extend_from_slice(&chunk);

// Process complete lines (decode UTF-8 only on full lines to avoid
// corruption when multi-byte characters span chunk boundaries)
while let Some(newline_pos) = buffer.iter().position(|&b| b == b'\n') {
let line = String::from_utf8_lossy(&buffer[..newline_pos])
.trim_end_matches('\r')
.to_string();
buffer.drain(..=newline_pos);

if line.is_empty() {
// Blank line = end of event
if !data_lines.is_empty() {
let data = data_lines.join("\n");
if let Ok(mut event) = serde_json::from_str::<ChannelEvent>(&data) {
if event.event_type.is_empty() && !event_type.is_empty() {
event.event_type = event_type.clone();
}
if tx.send(event).await.is_err() {
return; // receiver dropped
}
} else {
tracing::debug!(
event_type = %event_type,
data_len = data.len(),
"Failed to parse SSE event data as ChannelEvent"
);
}
}
event_type.clear();
data_lines.clear();
} else if let Some(value) = line.strip_prefix("event:") {
event_type = value.trim().to_string();
} else if let Some(value) = line.strip_prefix("data:") {
data_lines.push(value.trim().to_string());
}
// Ignore other fields (id:, retry:, comments)
}
}

tracing::debug!("SSE stream ended");
}

/// Errors from relay client operations.
#[derive(Debug, thiserror::Error)]
pub enum RelayError {
Expand All @@ -413,9 +282,6 @@ pub enum RelayError {

#[error("Protocol error: {0}")]
Protocol(String),

#[error("Stream token expired")]
TokenExpired,
}

#[cfg(test)]
Expand Down Expand Up @@ -494,9 +360,6 @@ mod tests {
message: "unauthorized".into(),
};
assert_eq!(err.to_string(), "API error (HTTP 401): unauthorized");

let err = RelayError::TokenExpired;
assert_eq!(err.to_string(), "Stream token expired");
}

#[test]
Expand All @@ -518,32 +381,4 @@ mod tests {
assert!(make(event_types::DIRECT_MESSAGE).is_message());
assert!(make(event_types::MENTION).is_message());
}

#[tokio::test]
async fn parse_sse_handles_multibyte_utf8_across_chunks() {
// The crab emoji (🦀) is 4 bytes: [0xF0, 0x9F, 0xA6, 0x80].
// Split it across two chunks to verify no U+FFFD corruption.
let event_json = r#"{"event_type":"message","content":"hello 🦀 world","provider_scope":"T1","channel_id":"C1","sender_id":"U1"}"#;
let full = format!("event: message\ndata: {}\n\n", event_json);
let bytes = full.as_bytes();

// Find the crab emoji and split mid-character
let crab_pos = bytes
.windows(4)
.position(|w| w == [0xF0, 0x9F, 0xA6, 0x80])
.expect("crab emoji not found");
let split_at = crab_pos + 2; // split in the middle of the 4-byte emoji

let chunk1 = bytes::Bytes::copy_from_slice(&bytes[..split_at]);
let chunk2 = bytes::Bytes::copy_from_slice(&bytes[split_at..]);

let chunks: Vec<Result<bytes::Bytes, reqwest::Error>> = vec![Ok(chunk1), Ok(chunk2)];
let stream = futures::stream::iter(chunks);

let (tx, mut rx) = mpsc::channel(8);
parse_sse_stream(stream, tx).await;

let event = rx.recv().await.expect("should receive event");
assert_eq!(event.text(), "hello 🦀 world");
}
}
7 changes: 4 additions & 3 deletions src/channels/relay/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Channel-relay integration for connecting to external messaging platforms
//! (Slack) via the channel-relay service.
//!
//! The relay service handles OAuth, credential storage, webhook ingestion,
//! and SSE event streaming. IronClaw consumes the SSE stream and sends
//! messages via the relay's proxy API.
//! The relay service handles OAuth, credential storage, and webhook ingestion.
//! IronClaw receives events via webhook callbacks and sends messages via the
//! relay's proxy API.

pub mod channel;
pub mod client;
pub mod webhook;

pub use channel::{DEFAULT_RELAY_NAME, RelayChannel};
pub use client::RelayClient;
Loading
Loading