diff --git a/Cargo.lock b/Cargo.lock index 4b5b776e18..175e2cf68a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -557,17 +557,15 @@ dependencies = [ name = "api-calendar" version = "0.1.0" dependencies = [ - "api-env", + "api-nango", "axum 0.8.8", "chrono", "google-calendar", "nango", - "reqwest 0.13.2", "sentry", "serde", "serde_json", "thiserror 2.0.18", - "tokio", "tracing", "utoipa", ] @@ -586,6 +584,7 @@ dependencies = [ "api-auth", "api-env", "axum 0.8.8", + "chrono", "nango", "reqwest 0.13.2", "sentry", @@ -618,16 +617,14 @@ dependencies = [ name = "api-storage" version = "0.1.0" dependencies = [ - "api-env", + "api-nango", "axum 0.8.8", "google-drive", "nango", - "reqwest 0.13.2", "sentry", "serde", "serde_json", "thiserror 2.0.18", - "tokio", "tracing", "utoipa", ] diff --git a/apps/api/src/env.rs b/apps/api/src/env.rs index 4bc2e6daa9..2bf233384b 100644 --- a/apps/api/src/env.rs +++ b/apps/api/src/env.rs @@ -15,6 +15,8 @@ pub struct Env { pub sentry_dsn: Option, #[serde(default, deserialize_with = "hypr_api_env::filter_empty")] pub posthog_api_key: Option, + #[serde(default, deserialize_with = "hypr_api_env::filter_empty")] + pub supabase_service_role_key: Option, #[serde(flatten)] pub supabase: hypr_api_env::SupabaseEnv, diff --git a/apps/api/src/main.rs b/apps/api/src/main.rs index 94cdc67b3a..2f0c92daec 100644 --- a/apps/api/src/main.rs +++ b/apps/api/src/main.rs @@ -71,8 +71,12 @@ async fn app() -> Router { let auth_state_basic = AuthState::new(&env.supabase.supabase_url); let auth_state_support = AuthState::new(&env.supabase.supabase_url); - let calendar_config = hypr_api_calendar::CalendarConfig::new(&env.nango); - let nango_config = hypr_api_nango::NangoConfig::new(&env.nango); + let nango_config = hypr_api_nango::NangoConfig::new( + &env.nango, + &env.supabase, + env.supabase_service_role_key.clone(), + ); + let nango_connection_state = hypr_api_nango::NangoConnectionState::from_config(&nango_config); let subscription_config = hypr_api_subscription::SubscriptionConfig::new(&env.supabase, &env.stripe); let support_config = hypr_api_support::SupportConfig::new( @@ -101,8 +105,9 @@ async fn app() -> Router { let pro_routes = Router::new() .merge(hypr_api_research::router(research_config)) - .nest("/calendar", hypr_api_calendar::router(calendar_config)) + .nest("/calendar", hypr_api_calendar::router()) .nest("/nango", hypr_api_nango::router(nango_config.clone())) + .layer(axum::Extension(nango_connection_state)) .route_layer(middleware::from_fn(auth::sentry_and_analytics)) .route_layer(middleware::from_fn_with_state( auth_state_pro, diff --git a/crates/api-calendar/Cargo.toml b/crates/api-calendar/Cargo.toml index 074dd83bfb..1e8406fe96 100644 --- a/crates/api-calendar/Cargo.toml +++ b/crates/api-calendar/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -hypr-api-env = { workspace = true } +hypr-api-nango = { workspace = true } hypr-google-calendar = { workspace = true } hypr-nango = { workspace = true } @@ -13,9 +13,7 @@ chrono = { workspace = true, features = ["serde"] } utoipa = { workspace = true } axum = { workspace = true } -reqwest = { workspace = true, features = ["json"] } sentry = { workspace = true } -tokio = { workspace = true } tracing = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/crates/api-calendar/src/config.rs b/crates/api-calendar/src/config.rs deleted file mode 100644 index 542158ad7f..0000000000 --- a/crates/api-calendar/src/config.rs +++ /dev/null @@ -1,14 +0,0 @@ -use hypr_api_env::NangoEnv; - -#[derive(Clone)] -pub struct CalendarConfig { - pub nango: NangoEnv, -} - -impl CalendarConfig { - pub fn new(nango: &NangoEnv) -> Self { - Self { - nango: nango.clone(), - } - } -} diff --git a/crates/api-calendar/src/error.rs b/crates/api-calendar/src/error.rs index 9272c4d54f..04a722ac8a 100644 --- a/crates/api-calendar/src/error.rs +++ b/crates/api-calendar/src/error.rs @@ -30,6 +30,9 @@ pub enum CalendarError { #[error("Internal error: {0}")] Internal(String), + + #[error(transparent)] + NangoConnection(#[from] hypr_api_nango::NangoConnectionError), } impl IntoResponse for CalendarError { @@ -48,6 +51,7 @@ impl IntoResponse for CalendarError { internal_message, ) } + Self::NangoConnection(err) => return err.into_response(), }; let body = Json(ErrorResponse { diff --git a/crates/api-calendar/src/lib.rs b/crates/api-calendar/src/lib.rs index e115d87118..c9b1108350 100644 --- a/crates/api-calendar/src/lib.rs +++ b/crates/api-calendar/src/lib.rs @@ -1,9 +1,6 @@ -mod config; mod error; mod openapi; mod routes; -mod state; -pub use config::CalendarConfig; pub use openapi::openapi; pub use routes::router; diff --git a/crates/api-calendar/src/openapi.rs b/crates/api-calendar/src/openapi.rs index 8f90083ba4..ef9cb8ce40 100644 --- a/crates/api-calendar/src/openapi.rs +++ b/crates/api-calendar/src/openapi.rs @@ -11,7 +11,6 @@ use crate::routes::ListEventsResponse; ), components( schemas( - crate::routes::calendar::ListCalendarsRequest, crate::routes::calendar::ListCalendarsResponse, crate::routes::calendar::ListEventsRequest, ListEventsResponse, diff --git a/crates/api-calendar/src/routes/calendar.rs b/crates/api-calendar/src/routes/calendar.rs index da040d4914..ad0c991a78 100644 --- a/crates/api-calendar/src/routes/calendar.rs +++ b/crates/api-calendar/src/routes/calendar.rs @@ -1,14 +1,9 @@ -use axum::{Json, extract::State}; +use axum::Json; +use hypr_api_nango::{GoogleCalendar, NangoConnection}; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use crate::error::{CalendarError, Result}; -use crate::state::AppState; - -#[derive(Debug, Deserialize, ToSchema)] -pub struct ListCalendarsRequest { - pub connection_id: String, -} #[derive(Debug, Serialize, ToSchema)] pub struct ListCalendarsResponse { @@ -17,7 +12,6 @@ pub struct ListCalendarsResponse { #[derive(Debug, Deserialize, ToSchema)] pub struct ListEventsRequest { - pub connection_id: String, pub calendar_id: String, #[serde(default)] pub time_min: Option, @@ -42,7 +36,6 @@ pub struct ListEventsResponse { #[derive(Debug, Deserialize, ToSchema)] pub struct CreateEventRequest { - pub connection_id: String, pub calendar_id: String, pub summary: String, pub start: EventDateTime, @@ -82,7 +75,6 @@ pub struct CreateEventResponse { #[utoipa::path( post, path = "/calendars", - request_body = ListCalendarsRequest, responses( (status = 200, description = "Calendars fetched", body = ListCalendarsResponse), (status = 401, description = "Unauthorized"), @@ -91,15 +83,9 @@ pub struct CreateEventResponse { tag = "calendar", )] pub async fn list_calendars( - State(state): State, - Json(payload): Json, + nango: NangoConnection, ) -> Result> { - let proxy = state - .nango - .integration("google-calendar") - .connection(&payload.connection_id); - let http = hypr_nango::NangoHttpClient::new(proxy); - let client = hypr_google_calendar::GoogleCalendarClient::new(http); + let client = hypr_google_calendar::GoogleCalendarClient::new(nango.into_http()); let response = client .list_calendars() @@ -127,15 +113,10 @@ pub async fn list_calendars( tag = "calendar", )] pub async fn list_events( - State(state): State, + nango: NangoConnection, Json(payload): Json, ) -> Result> { - let proxy = state - .nango - .integration("google-calendar") - .connection(&payload.connection_id); - let http = hypr_nango::NangoHttpClient::new(proxy); - let client = hypr_google_calendar::GoogleCalendarClient::new(http); + let client = hypr_google_calendar::GoogleCalendarClient::new(nango.into_http()); let time_min = payload .time_min @@ -196,15 +177,10 @@ pub async fn list_events( tag = "calendar", )] pub async fn create_event( - State(state): State, + nango: NangoConnection, Json(payload): Json, ) -> Result> { - let proxy = state - .nango - .integration("google-calendar") - .connection(&payload.connection_id); - let http = hypr_nango::NangoHttpClient::new(proxy); - let client = hypr_google_calendar::GoogleCalendarClient::new(http); + let client = hypr_google_calendar::GoogleCalendarClient::new(nango.into_http()); let req = hypr_google_calendar::CreateEventRequest { calendar_id: payload.calendar_id, diff --git a/crates/api-calendar/src/routes/mod.rs b/crates/api-calendar/src/routes/mod.rs index b5dd43b4d9..99d8d71ddd 100644 --- a/crates/api-calendar/src/routes/mod.rs +++ b/crates/api-calendar/src/routes/mod.rs @@ -2,17 +2,11 @@ pub(crate) mod calendar; use axum::{Router, routing::post}; -use crate::config::CalendarConfig; -use crate::state::AppState; - pub use calendar::ListEventsResponse; -pub fn router(config: CalendarConfig) -> Router { - let state = AppState::new(config); - +pub fn router() -> Router { Router::new() .route("/calendars", post(calendar::list_calendars)) .route("/events", post(calendar::list_events)) .route("/events/create", post(calendar::create_event)) - .with_state(state) } diff --git a/crates/api-calendar/src/state.rs b/crates/api-calendar/src/state.rs deleted file mode 100644 index 36ea71b745..0000000000 --- a/crates/api-calendar/src/state.rs +++ /dev/null @@ -1,20 +0,0 @@ -use hypr_nango::NangoClient; - -use crate::config::CalendarConfig; - -#[derive(Clone)] -pub(crate) struct AppState { - pub(crate) nango: NangoClient, -} - -impl AppState { - pub(crate) fn new(config: CalendarConfig) -> Self { - let mut builder = hypr_nango::NangoClient::builder().api_key(&config.nango.nango_api_key); - if let Some(api_base) = &config.nango.nango_api_base { - builder = builder.api_base(api_base); - } - let nango = builder.build().expect("failed to build NangoClient"); - - Self { nango } - } -} diff --git a/crates/api-nango/Cargo.toml b/crates/api-nango/Cargo.toml index 2dd987887e..6c6e1573b4 100644 --- a/crates/api-nango/Cargo.toml +++ b/crates/api-nango/Cargo.toml @@ -8,6 +8,8 @@ hypr-api-auth = { workspace = true } hypr-api-env = { workspace = true } hypr-nango = { workspace = true } +chrono = { workspace = true } +urlencoding = { workspace = true } utoipa = { workspace = true } axum = { workspace = true } diff --git a/crates/api-nango/src/config.rs b/crates/api-nango/src/config.rs index 754e2aeb1e..45a2eaab81 100644 --- a/crates/api-nango/src/config.rs +++ b/crates/api-nango/src/config.rs @@ -1,14 +1,24 @@ -use hypr_api_env::NangoEnv; +use hypr_api_env::{NangoEnv, SupabaseEnv}; #[derive(Clone)] pub struct NangoConfig { pub nango: NangoEnv, + pub supabase_url: String, + pub supabase_anon_key: String, + pub supabase_service_role_key: Option, } impl NangoConfig { - pub fn new(nango: &NangoEnv) -> Self { + pub fn new( + nango: &NangoEnv, + supabase: &SupabaseEnv, + supabase_service_role_key: Option, + ) -> Self { Self { nango: nango.clone(), + supabase_url: supabase.supabase_url.clone(), + supabase_anon_key: supabase.supabase_anon_key.clone(), + supabase_service_role_key, } } } diff --git a/crates/api-nango/src/extractor.rs b/crates/api-nango/src/extractor.rs new file mode 100644 index 0000000000..d3af42655e --- /dev/null +++ b/crates/api-nango/src/extractor.rs @@ -0,0 +1,207 @@ +use std::marker::PhantomData; + +use axum::{ + Json, + extract::FromRequestParts, + http::{StatusCode, request::Parts}, + response::{IntoResponse, Response}, +}; +use hypr_api_auth::AuthContext; +use hypr_nango::{NangoClient, OwnedNangoHttpClient, OwnedNangoProxy}; +use serde::Serialize; + +use crate::integrations::NangoIntegrationId; + +#[derive(Clone)] +pub struct NangoConnectionState { + nango: NangoClient, + http_client: reqwest::Client, + supabase_url: String, + supabase_anon_key: String, +} + +impl NangoConnectionState { + pub fn new( + nango: NangoClient, + supabase_url: impl Into, + supabase_anon_key: impl Into, + ) -> Self { + Self { + nango, + http_client: reqwest::Client::new(), + supabase_url: supabase_url.into().trim_end_matches('/').to_string(), + supabase_anon_key: supabase_anon_key.into(), + } + } + + pub fn from_config(config: &crate::config::NangoConfig) -> Self { + let mut builder = hypr_nango::NangoClient::builder().api_key(&config.nango.nango_api_key); + if let Some(api_base) = &config.nango.nango_api_base { + builder = builder.api_base(api_base); + } + let nango = builder.build().expect("failed to build NangoClient"); + + Self::new(nango, &config.supabase_url, &config.supabase_anon_key) + } + + async fn get_connection_id( + &self, + auth_token: &str, + user_id: &str, + integration_id: &str, + ) -> Result { + let encoded_user_id = urlencoding::encode(user_id); + let encoded_integration_id = urlencoding::encode(integration_id); + let url = format!( + "{}/rest/v1/nango_connections?select=connection_id&user_id=eq.{}&integration_id=eq.{}", + self.supabase_url, encoded_user_id, encoded_integration_id, + ); + + let response = self + .http_client + .get(&url) + .header("Authorization", format!("Bearer {}", auth_token)) + .header("apikey", &self.supabase_anon_key) + .send() + .await + .map_err(|e| NangoConnectionError::Database(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(NangoConnectionError::Database(format!( + "query failed: {} - {}", + status, body + ))); + } + + #[derive(serde::Deserialize)] + struct Row { + connection_id: String, + } + + let rows: Vec = response + .json() + .await + .map_err(|e| NangoConnectionError::Database(e.to_string()))?; + + rows.into_iter() + .next() + .map(|r| r.connection_id) + .ok_or_else(|| NangoConnectionError::NotConnected(integration_id.to_string())) + } +} + +pub struct NangoConnection { + http: OwnedNangoHttpClient, + _marker: PhantomData, +} + +impl NangoConnection { + pub fn into_http(self) -> OwnedNangoHttpClient { + self.http + } +} + +#[derive(Debug)] +pub enum NangoConnectionError { + NotAuthenticated, + NotConnected(String), + MissingState, + Database(String), +} + +#[derive(Serialize)] +struct ErrorDetails { + code: String, + message: String, +} + +#[derive(Serialize)] +struct ErrorResponse { + error: ErrorDetails, +} + +impl IntoResponse for NangoConnectionError { + fn into_response(self) -> Response { + let (status, code, message) = match &self { + Self::NotAuthenticated => ( + StatusCode::UNAUTHORIZED, + "unauthorized", + "not authenticated".to_string(), + ), + Self::NotConnected(integration_id) => ( + StatusCode::BAD_REQUEST, + "not_connected", + format!("no connection found for integration: {}", integration_id), + ), + Self::MissingState => { + tracing::error!("NangoConnectionState not found in request extensions"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + "internal_server_error", + "internal server error".to_string(), + ) + } + Self::Database(msg) => { + tracing::error!(error = %msg, "nango_connection_db_error"); + sentry::capture_message(msg, sentry::Level::Error); + ( + StatusCode::INTERNAL_SERVER_ERROR, + "internal_server_error", + "internal server error".to_string(), + ) + } + }; + + let body = Json(ErrorResponse { + error: ErrorDetails { + code: code.to_string(), + message, + }, + }); + + (status, body).into_response() + } +} + +impl std::fmt::Display for NangoConnectionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::NotAuthenticated => write!(f, "not authenticated"), + Self::NotConnected(id) => write!(f, "not connected: {}", id), + Self::MissingState => write!(f, "missing NangoConnectionState"), + Self::Database(msg) => write!(f, "database error: {}", msg), + } + } +} + +impl std::error::Error for NangoConnectionError {} + +impl FromRequestParts for NangoConnection { + type Rejection = NangoConnectionError; + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { + let auth = parts + .extensions + .get::() + .ok_or(NangoConnectionError::NotAuthenticated)?; + + let nango_state = parts + .extensions + .get::() + .ok_or(NangoConnectionError::MissingState)?; + + let connection_id = nango_state + .get_connection_id(&auth.token, &auth.claims.sub, I::ID) + .await?; + + let proxy = OwnedNangoProxy::new(&nango_state.nango, I::ID.to_string(), connection_id); + let http = OwnedNangoHttpClient::new(proxy); + + Ok(NangoConnection { + http, + _marker: PhantomData, + }) + } +} diff --git a/crates/api-nango/src/integrations.rs b/crates/api-nango/src/integrations.rs new file mode 100644 index 0000000000..b4ba6c5d70 --- /dev/null +++ b/crates/api-nango/src/integrations.rs @@ -0,0 +1,15 @@ +pub trait NangoIntegrationId: Send + Sync + 'static { + const ID: &'static str; +} + +pub struct GoogleCalendar; + +impl NangoIntegrationId for GoogleCalendar { + const ID: &'static str = "google-calendar"; +} + +pub struct GoogleDrive; + +impl NangoIntegrationId for GoogleDrive { + const ID: &'static str = "google-drive"; +} diff --git a/crates/api-nango/src/lib.rs b/crates/api-nango/src/lib.rs index 64962e88b7..4ec6c957b1 100644 --- a/crates/api-nango/src/lib.rs +++ b/crates/api-nango/src/lib.rs @@ -1,9 +1,14 @@ mod config; mod error; +pub mod extractor; +pub mod integrations; mod openapi; mod routes; mod state; +mod supabase; pub use config::NangoConfig; +pub use extractor::{NangoConnection, NangoConnectionError, NangoConnectionState}; +pub use integrations::{GoogleCalendar, GoogleDrive, NangoIntegrationId}; pub use openapi::openapi; pub use routes::{router, webhook_router}; diff --git a/crates/api-nango/src/routes/webhook.rs b/crates/api-nango/src/routes/webhook.rs index f2f76a93bb..b8579d9387 100644 --- a/crates/api-nango/src/routes/webhook.rs +++ b/crates/api-nango/src/routes/webhook.rs @@ -50,6 +50,35 @@ pub async fn nango_webhook( "nango webhook received" ); + if payload.r#type == "auth" && payload.success && payload.operation != "deletion" { + state + .supabase + .upsert_connection( + &payload.end_user.end_user_id, + &payload.provider_config_key, + &payload.connection_id, + &payload.provider, + ) + .await + .map_err(|e| { + tracing::error!(error = %e, "failed to upsert nango connection"); + NangoError::Internal(e.to_string()) + })?; + } + + // Nango sends deletion webhooks with `success: true` on successful revocation. + // We gate on `success` to avoid deleting local state if revocation failed on Nango's side. + if payload.r#type == "auth" && payload.success && payload.operation == "deletion" { + state + .supabase + .delete_connection(&payload.end_user.end_user_id, &payload.provider_config_key) + .await + .map_err(|e| { + tracing::error!(error = %e, "failed to delete nango connection"); + NangoError::Internal(e.to_string()) + })?; + } + Ok(Json(WebhookResponse { status: "ok".to_string(), })) diff --git a/crates/api-nango/src/state.rs b/crates/api-nango/src/state.rs index 181026275f..e4de6f2f8f 100644 --- a/crates/api-nango/src/state.rs +++ b/crates/api-nango/src/state.rs @@ -1,11 +1,13 @@ use hypr_nango::NangoClient; use crate::config::NangoConfig; +use crate::supabase::SupabaseClient; #[derive(Clone)] pub(crate) struct AppState { pub(crate) config: NangoConfig, pub(crate) nango: NangoClient, + pub(crate) supabase: SupabaseClient, } impl AppState { @@ -16,6 +18,15 @@ impl AppState { } let nango = builder.build().expect("failed to build NangoClient"); - Self { config, nango } + let supabase = SupabaseClient::new( + &config.supabase_url, + config.supabase_service_role_key.clone(), + ); + + Self { + config, + nango, + supabase, + } } } diff --git a/crates/api-nango/src/supabase.rs b/crates/api-nango/src/supabase.rs new file mode 100644 index 0000000000..7613f302b7 --- /dev/null +++ b/crates/api-nango/src/supabase.rs @@ -0,0 +1,108 @@ +#[derive(Clone)] +pub(crate) struct SupabaseClient { + supabase_url: String, + supabase_service_role_key: Option, + http_client: reqwest::Client, +} + +impl SupabaseClient { + pub(crate) fn new( + supabase_url: impl Into, + supabase_service_role_key: Option, + ) -> Self { + Self { + supabase_url: supabase_url.into().trim_end_matches('/').to_string(), + supabase_service_role_key, + http_client: reqwest::Client::new(), + } + } + + fn service_role_key(&self) -> Result<&str, crate::error::NangoError> { + self.supabase_service_role_key.as_deref().ok_or_else(|| { + crate::error::NangoError::Internal( + "supabase_service_role_key not configured".to_string(), + ) + }) + } + + pub(crate) async fn upsert_connection( + &self, + user_id: &str, + integration_id: &str, + connection_id: &str, + provider: &str, + ) -> Result<(), crate::error::NangoError> { + let service_role_key = self.service_role_key()?; + + let url = format!( + "{}/rest/v1/nango_connections?on_conflict=user_id,integration_id", + self.supabase_url, + ); + + let body = serde_json::json!({ + "user_id": user_id, + "integration_id": integration_id, + "connection_id": connection_id, + "provider": provider, + "updated_at": chrono::Utc::now().to_rfc3339(), + }); + + let response = self + .http_client + .post(&url) + .header("Authorization", format!("Bearer {}", service_role_key)) + .header("apikey", service_role_key) + .header("Content-Type", "application/json") + .header("Prefer", "resolution=merge-duplicates") + .json(&body) + .send() + .await + .map_err(|e| crate::error::NangoError::Internal(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(crate::error::NangoError::Internal(format!( + "upsert failed: {} - {}", + status, body + ))); + } + + Ok(()) + } + + pub(crate) async fn delete_connection( + &self, + user_id: &str, + integration_id: &str, + ) -> Result<(), crate::error::NangoError> { + let service_role_key = self.service_role_key()?; + + let encoded_user_id = urlencoding::encode(user_id); + let encoded_integration_id = urlencoding::encode(integration_id); + let url = format!( + "{}/rest/v1/nango_connections?user_id=eq.{}&integration_id=eq.{}", + self.supabase_url, encoded_user_id, encoded_integration_id, + ); + + let response = self + .http_client + .delete(&url) + .header("Authorization", format!("Bearer {}", service_role_key)) + .header("apikey", service_role_key) + .send() + .await + .map_err(|e| crate::error::NangoError::Internal(e.to_string()))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(crate::error::NangoError::Internal(format!( + "delete failed: {} - {}", + status, body + ))); + } + + Ok(()) + } +} diff --git a/crates/api-storage/Cargo.toml b/crates/api-storage/Cargo.toml index f397b036a7..2a51787e57 100644 --- a/crates/api-storage/Cargo.toml +++ b/crates/api-storage/Cargo.toml @@ -4,16 +4,14 @@ version = "0.1.0" edition = "2024" [dependencies] -hypr-api-env = { workspace = true } +hypr-api-nango = { workspace = true } hypr-google-drive = { workspace = true } hypr-nango = { workspace = true } utoipa = { workspace = true } axum = { workspace = true } -reqwest = { workspace = true, features = ["json"] } sentry = { workspace = true } -tokio = { workspace = true } tracing = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/crates/api-storage/src/config.rs b/crates/api-storage/src/config.rs deleted file mode 100644 index 1d4ae7735f..0000000000 --- a/crates/api-storage/src/config.rs +++ /dev/null @@ -1,14 +0,0 @@ -use hypr_api_env::NangoEnv; - -#[derive(Clone)] -pub struct StorageConfig { - pub nango: NangoEnv, -} - -impl StorageConfig { - pub fn new(nango: &NangoEnv) -> Self { - Self { - nango: nango.clone(), - } - } -} diff --git a/crates/api-storage/src/error.rs b/crates/api-storage/src/error.rs index 66289920d8..a9bd0404ad 100644 --- a/crates/api-storage/src/error.rs +++ b/crates/api-storage/src/error.rs @@ -26,10 +26,14 @@ pub enum StorageError { Auth(String), #[error("Invalid request: {0}")] + #[allow(dead_code)] BadRequest(String), #[error("Internal error: {0}")] Internal(String), + + #[error(transparent)] + NangoConnection(#[from] hypr_api_nango::NangoConnectionError), } impl IntoResponse for StorageError { @@ -48,6 +52,7 @@ impl IntoResponse for StorageError { internal_message, ) } + Self::NangoConnection(err) => return err.into_response(), }; let body = Json(ErrorResponse { diff --git a/crates/api-storage/src/lib.rs b/crates/api-storage/src/lib.rs index a6f848c3eb..c9b1108350 100644 --- a/crates/api-storage/src/lib.rs +++ b/crates/api-storage/src/lib.rs @@ -1,9 +1,6 @@ -mod config; mod error; mod openapi; mod routes; -mod state; -pub use config::StorageConfig; pub use openapi::openapi; pub use routes::router; diff --git a/crates/api-storage/src/routes/mod.rs b/crates/api-storage/src/routes/mod.rs index 5d2ed180ac..55aea8929a 100644 --- a/crates/api-storage/src/routes/mod.rs +++ b/crates/api-storage/src/routes/mod.rs @@ -2,15 +2,9 @@ pub(crate) mod storage; use axum::{Router, routing::post}; -use crate::config::StorageConfig; -use crate::state::AppState; - -pub fn router(config: StorageConfig) -> Router { - let state = AppState::new(config); - +pub fn router() -> Router { Router::new() .route("/files", post(storage::list_files)) .route("/files/get", post(storage::get_file)) .route("/files/download", post(storage::download_file)) - .with_state(state) } diff --git a/crates/api-storage/src/routes/storage.rs b/crates/api-storage/src/routes/storage.rs index 707c93defb..372a1b33fc 100644 --- a/crates/api-storage/src/routes/storage.rs +++ b/crates/api-storage/src/routes/storage.rs @@ -1,13 +1,12 @@ -use axum::{Json, extract::State}; +use axum::Json; +use hypr_api_nango::{GoogleDrive, NangoConnection}; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use crate::error::{Result, StorageError}; -use crate::state::AppState; #[derive(Debug, Deserialize, ToSchema)] pub struct ListFilesRequest { - pub connection_id: String, #[serde(default)] pub q: Option, #[serde(default)] @@ -27,7 +26,6 @@ pub struct ListFilesResponse { #[derive(Debug, Deserialize, ToSchema)] pub struct GetFileRequest { - pub connection_id: String, pub file_id: String, } @@ -38,7 +36,6 @@ pub struct GetFileResponse { #[derive(Debug, Deserialize, ToSchema)] pub struct DownloadFileRequest { - pub connection_id: String, pub file_id: String, } @@ -59,15 +56,10 @@ pub struct DownloadFileResponse { tag = "storage", )] pub async fn list_files( - State(state): State, + nango: NangoConnection, Json(payload): Json, ) -> Result> { - let proxy = state - .nango - .integration("google-drive") - .connection(&payload.connection_id); - let http = hypr_nango::NangoHttpClient::new(proxy); - let client = hypr_google_drive::GoogleDriveClient::new(http); + let client = hypr_google_drive::GoogleDriveClient::new(nango.into_http()); let req = hypr_google_drive::ListFilesRequest { q: payload.q, @@ -106,15 +98,10 @@ pub async fn list_files( tag = "storage", )] pub async fn get_file( - State(state): State, + nango: NangoConnection, Json(payload): Json, ) -> Result> { - let proxy = state - .nango - .integration("google-drive") - .connection(&payload.connection_id); - let http = hypr_nango::NangoHttpClient::new(proxy); - let client = hypr_google_drive::GoogleDriveClient::new(http); + let client = hypr_google_drive::GoogleDriveClient::new(nango.into_http()); let req = hypr_google_drive::GetFileRequest { file_id: payload.file_id, @@ -143,15 +130,10 @@ pub async fn get_file( tag = "storage", )] pub async fn download_file( - State(state): State, + nango: NangoConnection, Json(payload): Json, ) -> Result> { - let proxy = state - .nango - .integration("google-drive") - .connection(&payload.connection_id); - let http = hypr_nango::NangoHttpClient::new(proxy); - let client = hypr_google_drive::GoogleDriveClient::new(http); + let client = hypr_google_drive::GoogleDriveClient::new(nango.into_http()); let data = client .download_file(&payload.file_id) diff --git a/crates/api-storage/src/state.rs b/crates/api-storage/src/state.rs deleted file mode 100644 index b2b5881233..0000000000 --- a/crates/api-storage/src/state.rs +++ /dev/null @@ -1,20 +0,0 @@ -use hypr_nango::NangoClient; - -use crate::config::StorageConfig; - -#[derive(Clone)] -pub(crate) struct AppState { - pub(crate) nango: NangoClient, -} - -impl AppState { - pub(crate) fn new(config: StorageConfig) -> Self { - let mut builder = hypr_nango::NangoClient::builder().api_key(&config.nango.nango_api_key); - if let Some(api_base) = &config.nango.nango_api_base { - builder = builder.api_base(api_base); - } - let nango = builder.build().expect("failed to build NangoClient"); - - Self { nango } - } -} diff --git a/crates/nango/src/http.rs b/crates/nango/src/http.rs index 76f6a537d5..2ae887e7c2 100644 --- a/crates/nango/src/http.rs +++ b/crates/nango/src/http.rs @@ -1,4 +1,4 @@ -use crate::proxy::NangoProxy; +use crate::proxy::{NangoProxy, OwnedNangoProxy}; pub struct NangoHttpClient<'a> { proxy: NangoProxy<'a>, @@ -59,3 +59,64 @@ impl<'a> hypr_http::HttpClient for NangoHttpClient<'a> { Ok(bytes.to_vec()) } } + +#[derive(Clone)] +pub struct OwnedNangoHttpClient { + proxy: OwnedNangoProxy, +} + +impl OwnedNangoHttpClient { + pub fn new(proxy: OwnedNangoProxy) -> Self { + Self { proxy } + } +} + +impl hypr_http::HttpClient for OwnedNangoHttpClient { + async fn get(&self, path: &str) -> Result, Box> { + let response = self.proxy.get(path)?.send().await?; + let bytes = response.error_for_status()?.bytes().await?; + Ok(bytes.to_vec()) + } + + async fn post( + &self, + path: &str, + body: Vec, + ) -> Result, Box> { + let json_value: serde_json::Value = serde_json::from_slice(&body)?; + let response = self.proxy.post(path, &json_value)?.send().await?; + let bytes = response.error_for_status()?.bytes().await?; + Ok(bytes.to_vec()) + } + + async fn put( + &self, + path: &str, + body: Vec, + ) -> Result, Box> { + let json_value: serde_json::Value = serde_json::from_slice(&body)?; + let response = self.proxy.put(path, &json_value)?.send().await?; + let bytes = response.error_for_status()?.bytes().await?; + Ok(bytes.to_vec()) + } + + async fn patch( + &self, + path: &str, + body: Vec, + ) -> Result, Box> { + let json_value: serde_json::Value = serde_json::from_slice(&body)?; + let response = self.proxy.patch(path, &json_value)?.send().await?; + let bytes = response.error_for_status()?.bytes().await?; + Ok(bytes.to_vec()) + } + + async fn delete( + &self, + path: &str, + ) -> Result, Box> { + let response = self.proxy.delete(path)?.send().await?; + let bytes = response.error_for_status()?.bytes().await?; + Ok(bytes.to_vec()) + } +} diff --git a/crates/nango/src/lib.rs b/crates/nango/src/lib.rs index aa24e72e05..28c28c8749 100644 --- a/crates/nango/src/lib.rs +++ b/crates/nango/src/lib.rs @@ -15,8 +15,10 @@ pub use connect_session::*; pub use connection::*; pub use error::*; pub use http::NangoHttpClient; +pub use http::OwnedNangoHttpClient; pub use integration::*; pub use proxy::NangoProxy; +pub use proxy::OwnedNangoProxy; pub use sync::*; pub use trigger::*; pub use webhook::*; diff --git a/crates/nango/src/proxy.rs b/crates/nango/src/proxy.rs index f1c8bd90f1..f13945497f 100644 --- a/crates/nango/src/proxy.rs +++ b/crates/nango/src/proxy.rs @@ -49,28 +49,15 @@ impl<'a> NangoProxy<'a> { } fn apply_headers(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder { - let mut builder = builder - .header("Connection-Id", &self.connection_id) - .header("Provider-Config-Key", &self.integration_id); - - if let Some(retries) = self.retries { - builder = builder.header("Retries", retries.to_string()); - } - - if let Some(ref retry_on) = self.retry_on { - let codes: Vec = retry_on.iter().map(|c| c.to_string()).collect(); - builder = builder.header("Retry-On", codes.join(",")); - } - - if let Some(ref base_url) = self.base_url_override { - builder = builder.header("Base-Url-Override", base_url); - } - - if let Some(decompress) = self.decompress { - builder = builder.header("Decompress", decompress.to_string()); - } - - builder + apply_proxy_headers( + builder, + &self.connection_id, + &self.integration_id, + self.retries, + self.retry_on.as_ref(), + self.base_url_override.as_deref(), + self.decompress, + ) } pub fn get( @@ -135,6 +122,136 @@ impl<'a> NangoProxy<'a> { } } +#[derive(Clone)] +pub struct OwnedNangoProxy { + client: reqwest::Client, + api_base: url::Url, + integration_id: String, + connection_id: String, + retries: Option, + retry_on: Option>, + base_url_override: Option, + decompress: Option, +} + +impl OwnedNangoProxy { + pub fn new(nango: &NangoClient, integration_id: String, connection_id: String) -> Self { + Self { + client: nango.client.clone(), + api_base: nango.api_base.clone(), + integration_id, + connection_id, + retries: Some(3), + retry_on: Some(vec![429, 500, 502, 503, 504]), + base_url_override: None, + decompress: None, + } + } + + fn apply_headers(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder { + apply_proxy_headers( + builder, + &self.connection_id, + &self.integration_id, + self.retries, + self.retry_on.as_ref(), + self.base_url_override.as_deref(), + self.decompress, + ) + } + + pub fn get( + &self, + path: impl std::fmt::Display, + ) -> Result { + let url = make_proxy_url(&self.api_base, path)?; + Ok(self.apply_headers(self.client.get(url))) + } + + pub fn post( + &self, + path: impl std::fmt::Display, + data: &T, + ) -> Result { + let url = make_proxy_url(&self.api_base, path)?; + Ok(self.apply_headers( + self.client + .post(url) + .header("Content-Type", "application/json") + .json(data), + )) + } + + pub fn put( + &self, + path: impl std::fmt::Display, + data: &T, + ) -> Result { + let url = make_proxy_url(&self.api_base, path)?; + Ok(self.apply_headers( + self.client + .put(url) + .header("Content-Type", "application/json") + .json(data), + )) + } + + pub fn patch( + &self, + path: impl std::fmt::Display, + data: &T, + ) -> Result { + let url = make_proxy_url(&self.api_base, path)?; + Ok(self.apply_headers( + self.client + .patch(url) + .header("Content-Type", "application/json") + .json(data), + )) + } + + pub fn delete( + &self, + path: impl std::fmt::Display, + ) -> Result { + let url = make_proxy_url(&self.api_base, path)?; + Ok(self.apply_headers(self.client.delete(url))) + } +} + +fn apply_proxy_headers( + builder: reqwest::RequestBuilder, + connection_id: &str, + integration_id: &str, + retries: Option, + retry_on: Option<&Vec>, + base_url_override: Option<&str>, + decompress: Option, +) -> reqwest::RequestBuilder { + let mut builder = builder + .header("Connection-Id", connection_id) + .header("Provider-Config-Key", integration_id); + + if let Some(retries) = retries { + builder = builder.header("Retries", retries.to_string()); + } + + if let Some(retry_on) = retry_on { + let codes: Vec = retry_on.iter().map(|c| c.to_string()).collect(); + builder = builder.header("Retry-On", codes.join(",")); + } + + if let Some(base_url) = base_url_override { + builder = builder.header("Base-Url-Override", base_url); + } + + if let Some(decompress) = decompress { + builder = builder.header("Decompress", decompress.to_string()); + } + + builder +} + fn make_proxy_url(base: &url::Url, path: impl std::fmt::Display) -> Result { let mut url = base.clone(); let path_str = path.to_string(); diff --git a/supabase/migrations/20250214000000_create_nango_connections.sql b/supabase/migrations/20250214000000_create_nango_connections.sql new file mode 100644 index 0000000000..255aba413c --- /dev/null +++ b/supabase/migrations/20250214000000_create_nango_connections.sql @@ -0,0 +1,29 @@ +CREATE TABLE "nango_connections" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid(), + "user_id" uuid NOT NULL, + "integration_id" text NOT NULL, + "connection_id" text NOT NULL, + "provider" text NOT NULL, + "created_at" timestamptz NOT NULL DEFAULT now(), + "updated_at" timestamptz NOT NULL DEFAULT now() +); + +ALTER TABLE "nango_connections" ENABLE ROW LEVEL SECURITY; + +ALTER TABLE "nango_connections" + ADD CONSTRAINT "nango_connections_user_id_fk" + FOREIGN KEY ("user_id") REFERENCES "auth"."users"("id") ON DELETE CASCADE; + +CREATE UNIQUE INDEX "nango_connections_user_integration_idx" + ON "nango_connections" ("user_id", "integration_id"); + +CREATE INDEX "nango_connections_connection_id_idx" + ON "nango_connections" ("connection_id"); + +CREATE POLICY "nango_connections_select_owner" ON "nango_connections" + AS PERMISSIVE FOR SELECT TO "authenticated" + USING ((select auth.uid()) = user_id); + +CREATE POLICY "nango_connections_service_all" ON "nango_connections" + AS PERMISSIVE FOR ALL TO "service_role" + USING (true) WITH CHECK (true);