From d390045b00b96fb58732c17b04384a2282a562c7 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 24 Oct 2023 11:18:26 -0400 Subject: [PATCH] chore: test subscribers v0 --- Cargo.lock | 127 +++++++++++++++++++++++++- Cargo.toml | 1 + src/config.rs | 3 +- src/lib.rs | 2 +- src/main.rs | 2 +- tests/integration.rs | 207 ++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 334 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d4b0c15..619db923 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -114,6 +114,27 @@ version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-trait" version = "0.1.68" @@ -979,6 +1000,15 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "concurrent-queue" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.9.2" @@ -1735,6 +1765,21 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.28" @@ -1758,6 +1803,12 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.28" @@ -2049,6 +2100,27 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" +[[package]] +name = "http-types" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" +dependencies = [ + "anyhow", + "async-channel", + "base64 0.13.1", + "futures-lite", + "http 0.2.9", + "infer", + "pin-project-lite", + "rand 0.7.3", + "serde", + "serde_json", + "serde_qs 0.8.5", + "serde_urlencoded", + "url", +] + [[package]] name = "httparse" version = "1.8.0" @@ -2185,6 +2257,12 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "infer" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" + [[package]] name = "inout" version = "0.1.3" @@ -2707,6 +2785,7 @@ dependencies = [ "url", "uuid", "wc", + "wiremock", "x25519-dalek", ] @@ -2976,6 +3055,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -3428,7 +3513,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "serde_qs", + "serde_qs 0.10.1", "thiserror", "tokio", "tokio-stream", @@ -3872,6 +3957,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_qs" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6" +dependencies = [ + "percent-encoding", + "serde", + "thiserror", +] + [[package]] name = "serde_qs" version = "0.10.1" @@ -4954,6 +5050,7 @@ dependencies = [ "form_urlencoded", "idna 0.3.0", "percent-encoding", + "serde", ] [[package]] @@ -5002,6 +5099,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "want" version = "0.3.0" @@ -5347,6 +5450,28 @@ dependencies = [ "winapi", ] +[[package]] +name = "wiremock" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6f71803d3a1c80377a06221e0530be02035d5b3e854af56c6ece7ac20ac441d" +dependencies = [ + "assert-json-diff", + "async-trait", + "base64 0.21.0", + "deadpool", + "futures", + "futures-timer", + "http-types", + "hyper", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 62eced31..a2432290 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ rmp-serde = "1.1.1" deadpool-redis = "0.12.0" rand_chacha = "0.3.1" sqlx = { version = "0.7.1", features = ["runtime-tokio-native-tls", "postgres", "chrono", "uuid"] } +wiremock = "0.5.19" [dev-dependencies] sha3 = "0.10.8" diff --git a/src/config.rs b/src/config.rs index bab24f80..f4ab80a6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -45,8 +45,7 @@ pub struct Configuration { impl Configuration { pub fn new() -> crate::Result { - let config = envy::from_env::()?; - Ok(config) + Ok(envy::from_env::()?) } pub fn log_level(&self) -> tracing::Level { diff --git a/src/lib.rs b/src/lib.rs index 3eaca533..edee4b10 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -141,7 +141,7 @@ pub async fn bootstrap(mut shutdown: broadcast::Receiver<()>, config: Configurat .route("/v1/:project_id/notify", post(handlers::notify_v1::handler)) .route( "/:project_id/subscribers", - get(handlers::get_subscribers_v1::handler), + get(handlers::get_subscribers_v0::handler), ) .route( "/v1/:project_id/subscribers", diff --git a/src/main.rs b/src/main.rs index 154125d1..eb160fe3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use { #[tokio::main] async fn main() -> Result<()> { let (_signal, shutdown) = broadcast::channel(1); - dotenv().ok(); + dotenv().expect("Failed to load .env file"); let config = Configuration::new().expect("Failed to load config!"); tracing_subscriber::fmt() diff --git a/tests/integration.rs b/tests/integration.rs index b49e2b4d..1b4b281f 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,10 +1,13 @@ use { + async_trait::async_trait, chrono::{Duration, Utc}, + hyper::StatusCode, mongodb::{ bson::doc, options::{ClientOptions, ResolverConfig}, }, notify_server::{ + config::Configuration, migrate::{self, ClientData, Keypair, LookupEntry, ProjectData}, model::{ helpers::{ @@ -17,11 +20,20 @@ use { }, types::AccountId, }, + registry::RegistryAuthResponse, }, rand_chacha::rand_core::OsRng, relay_rpc::domain::{ProjectId, Topic}, + reqwest::Response, sqlx::{postgres::PgPoolOptions, PgPool}, - std::collections::HashSet, + std::{ + collections::HashSet, + net::{IpAddr, Ipv4Addr, SocketAddrV4}, + }, + test_context::{test_context, AsyncTestContext}, + tokio::{net::TcpListener, sync::broadcast, time::error::Elapsed}, + tracing_subscriber::fmt::format::FmtSpan, + url::Url, }; async fn get_dbs() -> (mongodb::Database, PgPool) { @@ -1097,8 +1109,197 @@ async fn test_call_migrate_twice() { } } +async fn is_port_available(port: u16) -> bool { + TcpListener::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port)) + .await + .is_ok() +} + +async fn find_free_port() -> u16 { + use std::sync::atomic::{AtomicU16, Ordering}; + static NEXT_PORT: AtomicU16 = AtomicU16::new(9000); + loop { + let port = NEXT_PORT.fetch_add(1, Ordering::SeqCst); + if is_port_available(port).await { + return port; + } + } +} + +async fn wait_for_port_to_be(port: u16, open: bool) -> Result<(), Elapsed> { + use {std::time::Duration, tokio::time}; + time::timeout(Duration::from_secs(3), async { + while is_port_available(port).await != open { + time::sleep(Duration::from_millis(10)).await; + } + }) + .await +} + +struct NotifyServerContext { + shutdown: broadcast::Sender<()>, + port: u16, + url: Url, + postgres: PgPool, +} + +#[async_trait] +impl AsyncTestContext for NotifyServerContext { + async fn setup() -> Self { + let mock_server = { + use wiremock::{ + http::Method, + matchers::{method, path}, + Mock, MockServer, ResponseTemplate, + }; + let mock_server = MockServer::start().await; + Mock::given(method(Method::Get)) + .and(path("/internal/project/validate-notify-keys")) + .respond_with( + ResponseTemplate::new(StatusCode::OK) + .set_body_json(RegistryAuthResponse { is_valid: true }), + ) + .mount(&mock_server) + .await; + mock_server + }; + + let public_ip = IpAddr::V4(Ipv4Addr::LOCALHOST); + let config = Configuration { + postgres_url: std::env::var("POSTGRES_URL").unwrap(), + log_level: "trace".to_string(), + public_ip, + port: find_free_port().await, + registry_url: mock_server.uri(), + database_url: std::env::var("DATABASE_URL").unwrap(), + keypair_seed: hex::encode(rand::Rng::gen::<[u8; 10]>(&mut rand::thread_rng())), + project_id: std::env::var("PROJECT_ID").unwrap(), + relay_url: std::env::var("RELAY_URL").unwrap(), + notify_url: format!("http://{public_ip}"), + registry_auth_token: "".to_owned(), + auth_redis_addr_read: None, + auth_redis_addr_write: None, + redis_pool_size: 1, + telemetry_prometheus_port: None, + s3_endpoint: None, + geoip_db_bucket: None, + geoip_db_key: None, + blocked_countries: vec![], + analytics_export_bucket: None, + }; + tracing_subscriber::fmt() + .with_env_filter(&config.log_level) + .with_span_events(FmtSpan::CLOSE) + .with_ansi(std::env::var("ANSI_LOGS").is_ok()) + .init(); + + let (signal, shutdown) = broadcast::channel(1); + tokio::task::spawn({ + let config = config.clone(); + async move { + notify_server::bootstrap(shutdown, config).await.unwrap(); + } + }); + + wait_for_port_to_be(config.port, false).await.unwrap(); + + let postgres = PgPoolOptions::new() + .connect(&config.postgres_url) + .await + .unwrap(); + + Self { + shutdown: signal, + port: config.port, + url: Url::parse(&format!("http://{}:{}", config.public_ip, config.port)).unwrap(), + postgres, + } + } + + async fn teardown(mut self) { + self.shutdown.send(()).unwrap(); + wait_for_port_to_be(self.port, true).await.unwrap(); + } +} + +async fn assert_successful_response(response: Response) -> Response { + let status = response.status(); + if !status.is_success() { + panic!( + "non-successful response {status}: {:?}", + response.text().await + ); + } + response +} + +#[test_context(NotifyServerContext)] +#[tokio::test] +async fn test_get_subscribers_v0(notify_server: &NotifyServerContext) { + let project_id = ProjectId::generate(); + let app_domain = &generate_app_domain(); + let topic = Topic::generate(); + let (signing_secret, signing_public) = generate_signing_keys(); + let (authentication_secret, authentication_public) = generate_authentication_keys(); + upsert_project( + project_id.clone(), + app_domain, + topic, + authentication_public, + authentication_secret, + signing_public, + signing_secret, + ¬ify_server.postgres, + ) + .await + .unwrap(); + let project = get_project_by_project_id(project_id.clone(), ¬ify_server.postgres) + .await + .unwrap(); + + let account = generate_account_id(); + let scope = HashSet::from(["scope1".to_string(), "scope2".to_string()]); + let notify_key = rand::Rng::gen::<[u8; 32]>(&mut rand::thread_rng()); + let notify_topic = sha256::digest(¬ify_key).into(); + upsert_subscriber( + project.id, + account.clone(), + scope, + ¬ify_key, + notify_topic, + ¬ify_server.postgres, + ) + .await + .unwrap(); + + let accounts = + get_subscriber_accounts_by_project_id(project_id.clone(), ¬ify_server.postgres) + .await + .unwrap(); + assert_eq!(accounts, vec![account.clone()]); + + let accounts = assert_successful_response( + reqwest::Client::new() + .get( + notify_server + .url + .join(&format!("/{project_id}/subscribers")) + .unwrap(), + ) + .header("Authorization", format!("Bearer {}", "f")) + .send() + .await + .unwrap(), + ) + .await + .json::>() + .await + .unwrap(); + assert_eq!(accounts, vec![account]); +} + #[tokio::test] -async fn test_get_subscribers_v0() { +async fn test_get_subscriber_accounts_by_project_id() { let (_, postgres) = get_dbs().await; let project_id = ProjectId::generate(); @@ -1144,7 +1345,7 @@ async fn test_get_subscribers_v0() { } #[tokio::test] -async fn test_get_subscribers_v1() { +async fn test_get_subscriber_accounts_and_scopes_by_project_id() { let (_, postgres) = get_dbs().await; let project_id = ProjectId::generate();