diff --git a/wasmcloud-test-util/src/provider_test.rs b/wasmcloud-test-util/src/provider_test.rs index 65f1c6a..a50cc0b 100644 --- a/wasmcloud-test-util/src/provider_test.rs +++ b/wasmcloud-test-util/src/provider_test.rs @@ -2,15 +2,29 @@ //! //! simple test harness to load a capability provider and test it //! + +pub use log; + use crate::testing::TestResult; use anyhow::anyhow; use async_trait::async_trait; use futures::future::BoxFuture; +use futures::lock::Mutex; use nkeys::{KeyPair, KeyPairType}; -use serde::Serialize; -use std::{fs, io::Write, ops::Deref, path::PathBuf, sync::Arc}; +use serde::{de, Deserialize, Serialize}; +use std::{ + collections::{BTreeMap, HashMap}, + env::var, + error::Error, + fs, + io::Write, + ops::Deref, + path::PathBuf, + str::FromStr, + sync::Arc, + time::Duration, +}; use tokio::sync::OnceCell; -use toml::value::Value as TomlValue; use wasmbus_rpc::{ common::{Context, Message, SendOpts, Transport}, core::{HealthCheckRequest, HealthCheckResponse, HostData, LinkDefinition, WasmCloudEntity}, @@ -18,42 +32,132 @@ use wasmbus_rpc::{ rpc_client::RpcClient, }; -pub type SimpleValueMap = std::collections::HashMap; -pub type TomlMap = std::collections::BTreeMap; -pub type JsonMap = serde_json::Map; +pub type TestFunc = fn() -> BoxFuture<'static, RpcResult<()>>; -const DEFAULT_START_DELAY_SEC: u64 = 1; -const DEFAULT_RPC_TIMEOUT_MILLIS: u64 = 2000; +const DEFAULT_START_DELAY: Duration = Duration::from_secs(1); +const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_millis(2000); const DEFAULT_NATS_URL: &str = "127.0.0.1:4222"; // use a unique lattice prefix to separate test traffic const TEST_LATTICE_PREFIX: &str = "TEST"; const TEST_HOST_ID: &str = "_TEST_"; -static ONCE: OnceCell = OnceCell::const_new(); -pub type TestFunc = fn() -> BoxFuture<'static, RpcResult<()>>; +static PROVIDERS: OnceCell>> = OnceCell::const_new(); -fn to_value_map(data: &toml::map::Map) -> RpcResult { - let mut map = SimpleValueMap::default(); - // copy simple values into the map - for (k, v) in data.iter() { - match v { - TomlValue::Integer(_) | TomlValue::Float(_) | TomlValue::Boolean(_) => { - map.insert(k.clone(), v.to_string()); - } - // string is handled separately because 'to_string()' adds quotes - TomlValue::String(s) => { - map.insert(k.clone(), s.clone()); - } - // intentionally omitted - TomlValue::Array(_) | TomlValue::Table(_) | TomlValue::Datetime(_) => {} +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct LogLevel(pub log::Level); + +impl Deref for LogLevel { + type Target = log::Level; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl FromStr for LogLevel { + type Err = ::Err; + + fn from_str(s: &str) -> Result { + s.parse::().map(Self) + } +} + +impl<'de> Deserialize<'de> for LogLevel { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + String::deserialize(deserializer)? + .parse() + .map_err(de::Error::custom) + } +} + +impl From for LogLevel { + fn from(level: log::Level) -> Self { + Self(level) + } +} + +impl From for log::Level { + fn from(LogLevel(level): LogLevel) -> Self { + level + } +} + +/// Provider test config +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq)] +pub struct Config { + /// Whether to enable Rust backtrace + pub backtrace: bool, + /// Contract ID + pub contract_id: String, + /// Lattice RPC prefix + pub lattice_rpc_prefix: String, + pub link_delay: Duration, + pub link_name: String, + /// [`LogLevel`] for capability provider set using "RUST_LOG" environment variable, defaults to [LogLevel::Info] + pub log_level: LogLevel, + /// URL at which NATS is accessible + pub nats_url: String, + /// URL at which Redis is accessible + pub redis_url: String, + pub start_delay: Duration, + // NOTE: `BTreeMap`, unlike `HashMap` implements `Hash` + pub values: BTreeMap, +} + +impl Default for Config { + fn default() -> Self { + Self { + backtrace: false, + contract_id: "wasmcloud:example".into(), + lattice_rpc_prefix: TEST_LATTICE_PREFIX.into(), + link_delay: Default::default(), + link_name: "default".into(), + log_level: log::Level::Info.into(), + nats_url: DEFAULT_NATS_URL.into(), + redis_url: "0.0.0.0:6379".into(), + start_delay: DEFAULT_START_DELAY, + values: Default::default(), + } + } +} + +impl Config { + /// Loads [Config] from path contained in environment variable `PROVIDER_TEST_CONFIG`, + /// otherwise attempts to use `provider_test_config.toml` in working directory. + pub fn load() -> Result { + let path = if let Ok(path) = var("PROVIDER_TEST_CONFIG") { + PathBuf::from(path) + } else { + PathBuf::from("./provider_test_config.toml") }; + let data = if !path.is_file() { + Err(RpcError::ProviderInit(format!( + "Missing configuration file '{}'. Config file should be 'provider_test_config.toml' \ + in the current directory, or a .toml file whose path is in the environment variable \ + 'PROVIDER_TEST_CONFIG'", + &path.display() + ))) + } else { + fs::read_to_string(&path).map_err(|e| { + RpcError::ProviderInit(format!( + "failed reading config from {}: {}", + &path.display(), + e + )) + }) + }?; + let config = toml::from_str(&data).map_err(|e| { + RpcError::ProviderInit(format!( + "parse error in configuration file loaded from {}: {}", + &path.display(), + e + )) + })?; + Ok(config) } - // copy the entire map as base64-encoded json with value "config_b64" - let json = serde_json::to_string(data) - .map_err(|e| RpcError::Ser(format!("invalid 'values' map: {}", e)))?; - let b64 = base64::encode_config(&json, base64::STANDARD_NO_PAD); - map.insert("config_b64".to_string(), b64); - Ok(map) } #[derive(Clone, Debug)] @@ -76,10 +180,10 @@ pub struct ProviderProcess { pub actor_id: String, pub path: PathBuf, pub proc: std::process::Child, - pub config: TomlMap, + pub config: Config, pub nats_client: async_nats::Client, pub rpc_client: RpcClient, - pub timeout_ms: std::sync::Mutex, + pub timeout_ms: std::sync::Mutex, } impl ProviderProcess { @@ -103,7 +207,7 @@ impl ProviderProcess { } /// link the test to the provider - pub async fn link_to_test(&self, values: SimpleValueMap) -> Result<(), anyhow::Error> { + pub async fn link_to_test(&self, values: impl IntoIterator) -> Result<(), anyhow::Error> { let topic = format!( "wasmbus.rpc.{}.{}.{}.linkdefs.put", &self.host_data.lattice_rpc_prefix, @@ -112,15 +216,10 @@ impl ProviderProcess { ); let mut ld = LinkDefinition::default(); ld.actor_id = self.actor_id.clone(); - ld.contract_id = self - .config - .get("contract_id") - .and_then(|v| v.as_str()) - .unwrap_or("wasmcloud:example") - .to_string(); + ld.contract_id = self.config.contract_id.clone(); ld.link_name = self.host_data.link_name.clone(); ld.provider_id = self.host_data.provider_key.clone(); - ld.values = values; + ld.values = values.into_iter().collect(); let bytes = serde_json::to_vec(&ld)?; self.rpc_client.publish(topic, bytes).await?; @@ -210,7 +309,7 @@ impl ProviderProcess { if !resp.is_empty() { eprintln!("shutdown response: {}", String::from_utf8_lossy(&resp)); } - tokio::time::sleep(std::time::Duration::from_secs(2)).await; + tokio::time::sleep(Duration::from_secs(2)).await; Ok(()) } } @@ -222,16 +321,16 @@ impl Transport for Provider { _ctx: &Context, message: Message<'_>, _opts: Option, - ) -> std::result::Result, RpcError> { + ) -> Result, RpcError> { self.inner.send_rpc(message).await } /// sets the time period for an expected response to rpc messages, /// after which an RpcError::Timeout will occur. - fn set_timeout(&self, interval: std::time::Duration) { + fn set_timeout(&self, interval: Duration) { let lock = self.timeout_ms.try_lock(); if let Ok(mut rg) = lock { - *rg = interval.as_millis() as u64 + *rg = interval.as_millis() } else { // only if mutex was poisioned, which would only happen // if somebody paniced while holding this mutex. @@ -248,90 +347,28 @@ impl Deref for Provider { } } -pub(crate) fn nats_url(config: &TomlMap) -> String { - config - .get("nats_url") - .and_then(|v| v.as_str()) - .unwrap_or(DEFAULT_NATS_URL) - .to_string() -} - -/// load toml configuration. looks for environment variable "PROVIDER_TEST_CONFIG", -/// otherwise loads defaults. -pub fn load_config() -> Result { - let path = if let Ok(path) = std::env::var("PROVIDER_TEST_CONFIG") { - PathBuf::from(path) - } else { - PathBuf::from("./provider_test_config.toml") - }; - let data = if !path.is_file() { - Err(RpcError::ProviderInit(format!( - "Missing configuration file '{}'. Config file should be 'provider_test_config.toml' \ - in the current directory, or a .toml file whose path is in the environment variable \ - 'PROVIDER_TEST_CONFIG'", - &path.display() - ))) - } else { - fs::read_to_string(&path).map_err(|e| { - RpcError::ProviderInit(format!( - "failed reading config from {}: {}", - &path.display(), - e - )) - }) - }?; - let map = toml::from_str(&data).map_err(|e| { - RpcError::ProviderInit(format!( - "parse error in configuration file loaded from {}: {}", - &path.display(), - e - )) - })?; - Ok(map) -} - -/// Starts a capability provider from its par file for testing. -/// Configuration file path should be in the environment variable PROVIDER_TEST_CONFIG -/// Par file path should be either in the environment variable PROVIDER_TEST_PAR -/// or in the config file as "par_file" +/// Starts a capability provider from its par file for testing pub async fn start_provider_test( - config: TomlMap, - exe_path: &std::path::Path, + config: Config, + path: impl Into, ld: LinkDefinition, ) -> Result { - let exe_file = fs::File::open(exe_path)?; + let path = path.into(); + + let file = fs::File::open(&path)?; // generate a fake host key, which we will use for cluster issuer and host id let host_key = KeyPair::new(KeyPairType::Server); eprintln!("host_id: {}", host_key.public_key()); eprintln!("contract_id: {}", &ld.contract_id); let actor_id = ld.actor_id.clone(); - // set logging level for capability provider with the "RUST_LOG" environment variable, - // default level is "info" - let log_level = match config.get("rust_log") { - Some(TomlValue::String(level)) => level.to_string(), - _ => "info".to_string(), - }; - eprintln!("log_level: {}", &log_level); - - // set RUST_BACKTRACE, if requested - // default is disabled - let enable_backtrace = match config.get("rust_backtrace") { - Some(TomlValue::String(sval)) if sval.as_str() == "1" => "1", - Some(TomlValue::Integer(num)) if *num > 0 => "1", - Some(TomlValue::Boolean(bval)) if *bval => "1", - _ => "0", - }; + eprintln!("log_level: {}", &config.log_level.as_str()); let mut host_data = HostData::default(); host_data.host_id = TEST_HOST_ID.to_string(); // host_key.public_key(); - host_data.lattice_rpc_prefix = config - .get("lattice_rpc_prefix") - .and_then(|v| v.as_str()) - .unwrap_or(TEST_LATTICE_PREFIX) - .to_string(); + host_data.lattice_rpc_prefix = config.lattice_rpc_prefix.clone(); host_data.link_name = ld.link_name.clone(); - host_data.lattice_rpc_url = nats_url(&config); + host_data.lattice_rpc_url = config.nats_url.clone(); host_data.provider_key = ld.provider_id.clone(); host_data.invocation_seed = host_key.seed().unwrap(); host_data.cluster_issuers = vec![host_key.public_key()]; @@ -343,21 +380,21 @@ pub async fn start_provider_test( encoded.push_str("\r\n"); // provider's stdout is piped through our stdout - let mut child_proc = std::process::Command::new(exe_path) + let mut proc = std::process::Command::new(&path) .stdout(std::process::Stdio::piped()) .stdin(std::process::Stdio::piped()) - .env("RUST_LOG", &log_level) - .env("RUST_BACKTRACE", enable_backtrace) + .env("RUST_LOG", config.log_level.as_str()) + .env("RUST_BACKTRACE", if config.backtrace { "1" } else { "0" }) .spawn() .map_err(|e| { RpcError::Other(format!( "launching provider bin at {}: {}", - &exe_path.display(), + &path.display(), e )) })?; - let mut stdin = child_proc + let mut stdin = proc .stdin .take() .ok_or_else(|| RpcError::ProviderInit("failed to open child stdin".into()))?; @@ -368,26 +405,30 @@ pub async fn start_provider_test( let nats_client = host_data.nats_connect().await?; wasmbus_rpc::provider::init_host_bridge_for_test(nats_client.clone(), &host_data)?; - Ok(Provider { - inner: Arc::new(ProviderProcess { - file: exe_file, - path: exe_path.to_owned(), - proc: child_proc, - actor_id, - config, - nats_client: nats_client.clone(), - rpc_client: RpcClient::new( - nats_client, - host_key.public_key(), - Some(std::time::Duration::from_millis( - host_data.default_rpc_timeout_ms.unwrap_or(2000), - )), - Arc::new(host_key), - ), - host_data, - timeout_ms: std::sync::Mutex::new(DEFAULT_RPC_TIMEOUT_MILLIS), - }), - }) + let rpc_client_timeout = host_data + .default_rpc_timeout_ms + .map(Duration::from_millis) + .or(Some(DEFAULT_RPC_TIMEOUT)); + let rpc_client = RpcClient::new( + nats_client.clone(), + host_key.public_key(), + rpc_client_timeout, + Arc::new(host_key), + ); + + let timeout_ms = std::sync::Mutex::new(DEFAULT_RPC_TIMEOUT.as_millis()); + let inner = Arc::new(ProviderProcess { + file, + path, + proc, + actor_id, + config, + nats_client, + rpc_client, + host_data, + timeout_ms, + }); + Ok(Provider { inner }) } /// given a list of regex patterns for test cases, run all tests @@ -447,6 +488,13 @@ macro_rules! run_selected_spawn { }}; } +pub struct TestCase { + pub name: String, + pub func: TestFunc, + pub bin_path: PathBuf, + pub config: Config, +} + /// Execute all tests. In the current implementation, /// all tests are run sequentially, and always in the same order. // A future version of this should take a parameter for a scheduling strategy, @@ -461,104 +509,82 @@ macro_rules! run_selected_spawn { // The reason I had put the spawn in was to catch panics from assert // calls that fail. pub async fn run_tests( - tests: Vec<(&'static str, TestFunc)>, -) -> std::result::Result, Box> { + tests: impl IntoIterator, +) -> Result, Box> { let mut results: Vec = Vec::new(); let handle = tokio::runtime::Handle::current(); - for (name, tfunc) in tests.into_iter() { - let rc: RpcResult<()> = handle.spawn(tfunc()).await?; + for TestCase { + name, + func, + bin_path, + config, + } in tests + { + let rc: RpcResult<()> = handle.spawn(func()).await?; results.push(TestResult { - name: name.to_string(), + name, passed: rc.is_ok(), ..Default::default() }); + let _ = test_provider(bin_path, config).await.shutdown().await; // send shutdown message } - - let provider = test_provider().await; - let _ = provider.shutdown().await; // send shutdown message - Ok(results) } -pub async fn test_provider() -> Provider { - ONCE.get_or_init(|| async { - match load_provider().await { - Ok(p) => p, - Err(e) => { - panic!("failed to load provider: {}", e); - } - } - }) - .await - .clone() +pub async fn test_provider(bin_path: impl Into, config: Config) -> Provider { + let mut providers = PROVIDERS + .get_or_init(|| async { Default::default() }) + .await + .lock() + .await; + if let Some(prov) = providers.get(&config) { + return prov.clone(); + } + let prov = load_provider(bin_path, config.clone()) + .await + .expect("failed to load provider"); + providers.insert(config, prov.clone()); + prov } -pub async fn load_provider() -> Result { - let mut conf = load_config()?; - let values = conf.remove("values"); - let link_values = if let Some(toml::Value::Table(map)) = values { - to_value_map(&map)? - } else { - SimpleValueMap::default() - }; - let exe_path = conf - .get("bin_path") - .and_then(|v| v.as_str()) - .map(PathBuf::from) - .ok_or_else(|| { - RpcError::ProviderInit("Must specifiy binary path in 'bin_path' in config file".into()) - })?; +pub async fn load_provider( + bin_path: impl Into, + config: Config, +) -> Result { let actor_kp = KeyPair::new(KeyPairType::Module); eprintln!("actor_id: {}", actor_kp.public_key()); let provider_kp = KeyPair::new(KeyPairType::Service); eprintln!("provider_id: {}", provider_kp.public_key()); + let config_b64 = if !config.values.is_empty() { + let json = serde_json::to_string(&config.values) + .map_err(|e| RpcError::Ser(format!("invalid 'values' map: {}", e)))?; + Some(base64::encode_config(&json, base64::STANDARD_NO_PAD)) + } else { + None + }; + let values = config + .values + .clone() + .into_iter() + .chain(config_b64.map(|v| ("config_b64".into(), v))) + .collect(); + let mut test_linkdef = LinkDefinition::default(); test_linkdef.actor_id = actor_kp.public_key(); - test_linkdef.contract_id = conf - .get("contract_id") - .and_then(|v| v.as_str()) - .unwrap_or("wasmcloud:example") - .to_string(); - test_linkdef.link_name = conf - .get("link_name") - .and_then(|v| v.as_str()) - .unwrap_or("default") - .to_string(); + test_linkdef.contract_id = config.contract_id.clone(); + test_linkdef.link_name = config.link_name.clone(); test_linkdef.provider_id = provider_kp.public_key(); - test_linkdef.values = link_values; - let prov = start_provider_test(conf, &exe_path, test_linkdef.clone()).await?; + test_linkdef.values = values; + let prov = start_provider_test(config.clone(), bin_path, test_linkdef.clone()).await?; // give it time to startup - let delay_time_sec = match prov.config.get("start_delay_sec") { - Some(n) => { - let n = n.as_integer().unwrap_or(0); - if !(0..=60).contains(&n) { - return Err(RpcError::InvalidParameter(format!( - "configuration value 'start_delay_sec' is too large: {}", - n - ))); - } - std::cmp::max(n as u64, DEFAULT_START_DELAY_SEC) - } - None => DEFAULT_START_DELAY_SEC, - }; - tokio::time::sleep(std::time::Duration::from_secs(delay_time_sec)).await; + tokio::time::sleep(prov.config.start_delay).await; // optionally, allow extra time to handle put_link - if let Some(n) = prov.config.get("link_delay_sec") { - if let Some(n) = n.as_integer() { - if n > 0 { - eprintln!("Pausing {} secs after put_link", n); - tokio::time::sleep(std::time::Duration::from_secs(n as u64)).await; - } - } else { - return Err(RpcError::InvalidParameter(format!( - "configuration value 'link_delay_sec={}' is not a valid integer", - n - ))); - } + if !config.link_delay.is_zero() { + eprintln!("Pausing {:?} secs after put_link", config.link_delay); + tokio::time::sleep(config.link_delay).await; } - Ok(prov) }