From 791ca91670816eb7c8ea1da2b00ffd8fc7809cc0 Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Fri, 17 Feb 2023 20:47:33 +0100 Subject: [PATCH] feat!: introduce structured runtime test config - Replace the usage of free-form `TomlMap` by a structured, typed `Config` struct - Remove `bin_path` from config - Require passing of `Config` and `path` to `test_provider` at runtime Breaking changes: - `load_config` is replaced by `Config::load` method - `test_provider` requires `path` and `Config` as arguments - `rust_backtrace` config field is renamed to `backtrace` and its type is changed from string to a boolean Signed-off-by: Roman Volosatovs --- wasmcloud-test-util/src/provider_test.rs | 409 ++++++++++++----------- 1 file changed, 220 insertions(+), 189 deletions(-) diff --git a/wasmcloud-test-util/src/provider_test.rs b/wasmcloud-test-util/src/provider_test.rs index 65f1c6a..9782546 100644 --- a/wasmcloud-test-util/src/provider_test.rs +++ b/wasmcloud-test-util/src/provider_test.rs @@ -2,15 +2,26 @@ //! //! simple test harness to load a capability provider and test it //! + +pub use log; + use crate::testing::TestResult; use anyhow::anyhow; use async_trait::async_trait; -use futures::future::BoxFuture; +use futures::{future::BoxFuture, lock::Mutex}; use nkeys::{KeyPair, KeyPairType}; -use serde::Serialize; -use std::{fs, io::Write, ops::Deref, path::PathBuf, sync::Arc}; +use serde::{de, Deserialize, Serialize}; +use std::{ + collections::{BTreeMap, HashMap}, + fs, + io::Write, + ops::Deref, + path::PathBuf, + str::FromStr, + sync::Arc, + time::Duration, +}; use tokio::sync::OnceCell; -use toml::value::Value as TomlValue; use wasmbus_rpc::{ common::{Context, Message, SendOpts, Transport}, core::{HealthCheckRequest, HealthCheckResponse, HostData, LinkDefinition, WasmCloudEntity}, @@ -18,42 +29,136 @@ use wasmbus_rpc::{ rpc_client::RpcClient, }; +pub type TestFunc = fn() -> BoxFuture<'static, RpcResult<()>>; + pub type SimpleValueMap = std::collections::HashMap; pub type TomlMap = std::collections::BTreeMap; pub type JsonMap = serde_json::Map; -const DEFAULT_START_DELAY_SEC: u64 = 1; -const DEFAULT_RPC_TIMEOUT_MILLIS: u64 = 2000; +const DEFAULT_START_DELAY: Duration = Duration::from_secs(1); +const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(2); const DEFAULT_NATS_URL: &str = "127.0.0.1:4222"; // use a unique lattice prefix to separate test traffic const TEST_LATTICE_PREFIX: &str = "TEST"; const TEST_HOST_ID: &str = "_TEST_"; -static ONCE: OnceCell = OnceCell::const_new(); -pub type TestFunc = fn() -> BoxFuture<'static, RpcResult<()>>; +static PROVIDERS: OnceCell>> = OnceCell::const_new(); -fn to_value_map(data: &toml::map::Map) -> RpcResult { - let mut map = SimpleValueMap::default(); - // copy simple values into the map - for (k, v) in data.iter() { - match v { - TomlValue::Integer(_) | TomlValue::Float(_) | TomlValue::Boolean(_) => { - map.insert(k.clone(), v.to_string()); - } - // string is handled separately because 'to_string()' adds quotes - TomlValue::String(s) => { - map.insert(k.clone(), s.clone()); - } - // intentionally omitted - TomlValue::Array(_) | TomlValue::Table(_) | TomlValue::Datetime(_) => {} +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct LogLevel(pub log::Level); + +impl Deref for LogLevel { + type Target = log::Level; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl FromStr for LogLevel { + type Err = ::Err; + + fn from_str(s: &str) -> Result { + s.parse::().map(Self) + } +} + +impl<'de> Deserialize<'de> for LogLevel { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + String::deserialize(deserializer)? + .parse() + .map_err(de::Error::custom) + } +} + +impl From for LogLevel { + fn from(level: log::Level) -> Self { + Self(level) + } +} + +impl From for log::Level { + fn from(LogLevel(level): LogLevel) -> Self { + level + } +} + +/// Provider test config +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq)] +pub struct Config { + /// Whether to enable Rust backtrace + pub backtrace: bool, + /// Contract ID + pub contract_id: String, + /// Lattice RPC prefix + pub lattice_rpc_prefix: String, + pub link_delay: Duration, + pub link_name: String, + /// [`LogLevel`] for capability provider set using "RUST_LOG" environment variable, defaults to [LogLevel::Info] + pub log_level: LogLevel, + /// URL at which NATS is accessible + pub nats_url: String, + /// URL at which Redis is accessible + pub redis_url: String, + pub start_delay: Duration, + // NOTE: `BTreeMap`, unlike `HashMap` implements `Hash` + pub values: BTreeMap, +} + +impl Default for Config { + fn default() -> Self { + Self { + backtrace: false, + contract_id: "wasmcloud:example".into(), + lattice_rpc_prefix: TEST_LATTICE_PREFIX.into(), + link_delay: Default::default(), + link_name: "default".into(), + log_level: log::Level::Info.into(), + nats_url: DEFAULT_NATS_URL.into(), + redis_url: "0.0.0.0:6379".into(), + start_delay: DEFAULT_START_DELAY, + values: Default::default(), + } + } +} + +impl Config { + /// Loads [Config] from path contained in environment variable `PROVIDER_TEST_CONFIG`, + /// otherwise attempts to use `provider_test_config.toml` in working directory. + pub fn load() -> Result { + let path = if let Ok(path) = std::env::var("PROVIDER_TEST_CONFIG") { + PathBuf::from(path) + } else { + PathBuf::from("./provider_test_config.toml") }; + let data = if !path.is_file() { + Err(RpcError::ProviderInit(format!( + "Missing configuration file '{}'. Config file should be 'provider_test_config.toml' \ + in the current directory, or a .toml file whose path is in the environment variable \ + 'PROVIDER_TEST_CONFIG'", + &path.display() + ))) + } else { + fs::read_to_string(&path).map_err(|e| { + RpcError::ProviderInit(format!( + "failed reading config from {}: {}", + &path.display(), + e + )) + }) + }?; + let config = toml::from_str(&data).map_err(|e| { + RpcError::ProviderInit(format!( + "parse error in configuration file loaded from {}: {}", + &path.display(), + e + )) + })?; + Ok(config) } - // copy the entire map as base64-encoded json with value "config_b64" - let json = serde_json::to_string(data) - .map_err(|e| RpcError::Ser(format!("invalid 'values' map: {}", e)))?; - let b64 = base64::encode_config(&json, base64::STANDARD_NO_PAD); - map.insert("config_b64".to_string(), b64); - Ok(map) } #[derive(Clone, Debug)] @@ -76,10 +181,10 @@ pub struct ProviderProcess { pub actor_id: String, pub path: PathBuf, pub proc: std::process::Child, - pub config: TomlMap, + pub config: Config, pub nats_client: async_nats::Client, pub rpc_client: RpcClient, - pub timeout_ms: std::sync::Mutex, + pub timeout_ms: std::sync::Mutex, } impl ProviderProcess { @@ -112,12 +217,7 @@ impl ProviderProcess { ); let mut ld = LinkDefinition::default(); ld.actor_id = self.actor_id.clone(); - ld.contract_id = self - .config - .get("contract_id") - .and_then(|v| v.as_str()) - .unwrap_or("wasmcloud:example") - .to_string(); + ld.contract_id = self.config.contract_id.clone(); ld.link_name = self.host_data.link_name.clone(); ld.provider_id = self.host_data.provider_key.clone(); ld.values = values; @@ -231,7 +331,7 @@ impl Transport for Provider { fn set_timeout(&self, interval: std::time::Duration) { let lock = self.timeout_ms.try_lock(); if let Ok(mut rg) = lock { - *rg = interval.as_millis() as u64 + *rg = interval.as_millis() } else { // only if mutex was poisioned, which would only happen // if somebody paniced while holding this mutex. @@ -248,90 +348,28 @@ impl Deref for Provider { } } -pub(crate) fn nats_url(config: &TomlMap) -> String { - config - .get("nats_url") - .and_then(|v| v.as_str()) - .unwrap_or(DEFAULT_NATS_URL) - .to_string() -} - -/// load toml configuration. looks for environment variable "PROVIDER_TEST_CONFIG", -/// otherwise loads defaults. -pub fn load_config() -> Result { - let path = if let Ok(path) = std::env::var("PROVIDER_TEST_CONFIG") { - PathBuf::from(path) - } else { - PathBuf::from("./provider_test_config.toml") - }; - let data = if !path.is_file() { - Err(RpcError::ProviderInit(format!( - "Missing configuration file '{}'. Config file should be 'provider_test_config.toml' \ - in the current directory, or a .toml file whose path is in the environment variable \ - 'PROVIDER_TEST_CONFIG'", - &path.display() - ))) - } else { - fs::read_to_string(&path).map_err(|e| { - RpcError::ProviderInit(format!( - "failed reading config from {}: {}", - &path.display(), - e - )) - }) - }?; - let map = toml::from_str(&data).map_err(|e| { - RpcError::ProviderInit(format!( - "parse error in configuration file loaded from {}: {}", - &path.display(), - e - )) - })?; - Ok(map) -} - -/// Starts a capability provider from its par file for testing. -/// Configuration file path should be in the environment variable PROVIDER_TEST_CONFIG -/// Par file path should be either in the environment variable PROVIDER_TEST_PAR -/// or in the config file as "par_file" +/// Starts a capability provider from its par file for testing pub async fn start_provider_test( - config: TomlMap, - exe_path: &std::path::Path, + config: Config, + exe_path: impl Into, ld: LinkDefinition, ) -> Result { - let exe_file = fs::File::open(exe_path)?; + let exe_path = exe_path.into(); + + let file = fs::File::open(&exe_path)?; // generate a fake host key, which we will use for cluster issuer and host id let host_key = KeyPair::new(KeyPairType::Server); eprintln!("host_id: {}", host_key.public_key()); eprintln!("contract_id: {}", &ld.contract_id); let actor_id = ld.actor_id.clone(); - // set logging level for capability provider with the "RUST_LOG" environment variable, - // default level is "info" - let log_level = match config.get("rust_log") { - Some(TomlValue::String(level)) => level.to_string(), - _ => "info".to_string(), - }; - eprintln!("log_level: {}", &log_level); - - // set RUST_BACKTRACE, if requested - // default is disabled - let enable_backtrace = match config.get("rust_backtrace") { - Some(TomlValue::String(sval)) if sval.as_str() == "1" => "1", - Some(TomlValue::Integer(num)) if *num > 0 => "1", - Some(TomlValue::Boolean(bval)) if *bval => "1", - _ => "0", - }; + eprintln!("log_level: {}", &config.log_level.as_str()); let mut host_data = HostData::default(); host_data.host_id = TEST_HOST_ID.to_string(); // host_key.public_key(); - host_data.lattice_rpc_prefix = config - .get("lattice_rpc_prefix") - .and_then(|v| v.as_str()) - .unwrap_or(TEST_LATTICE_PREFIX) - .to_string(); + host_data.lattice_rpc_prefix = config.lattice_rpc_prefix.clone(); host_data.link_name = ld.link_name.clone(); - host_data.lattice_rpc_url = nats_url(&config); + host_data.lattice_rpc_url = config.nats_url.clone(); host_data.provider_key = ld.provider_id.clone(); host_data.invocation_seed = host_key.seed().unwrap(); host_data.cluster_issuers = vec![host_key.public_key()]; @@ -343,11 +381,11 @@ pub async fn start_provider_test( encoded.push_str("\r\n"); // provider's stdout is piped through our stdout - let mut child_proc = std::process::Command::new(exe_path) + let mut child_proc = std::process::Command::new(&exe_path) .stdout(std::process::Stdio::piped()) .stdin(std::process::Stdio::piped()) - .env("RUST_LOG", &log_level) - .env("RUST_BACKTRACE", enable_backtrace) + .env("RUST_LOG", config.log_level.as_str()) + .env("RUST_BACKTRACE", if config.backtrace { "1" } else { "0" }) .spawn() .map_err(|e| { RpcError::Other(format!( @@ -370,8 +408,8 @@ pub async fn start_provider_test( Ok(Provider { inner: Arc::new(ProviderProcess { - file: exe_file, - path: exe_path.to_owned(), + file, + path: exe_path, proc: child_proc, actor_id, config, @@ -379,13 +417,14 @@ pub async fn start_provider_test( rpc_client: RpcClient::new( nats_client, host_key.public_key(), - Some(std::time::Duration::from_millis( - host_data.default_rpc_timeout_ms.unwrap_or(2000), - )), + host_data + .default_rpc_timeout_ms + .map(Duration::from_millis) + .or(Some(DEFAULT_RPC_TIMEOUT)), Arc::new(host_key), ), host_data, - timeout_ms: std::sync::Mutex::new(DEFAULT_RPC_TIMEOUT_MILLIS), + timeout_ms: std::sync::Mutex::new(DEFAULT_RPC_TIMEOUT.as_millis()), }), }) } @@ -447,6 +486,13 @@ macro_rules! run_selected_spawn { }}; } +pub struct TestCase { + pub name: String, + pub func: TestFunc, + pub bin_path: PathBuf, + pub config: Config, +} + /// Execute all tests. In the current implementation, /// all tests are run sequentially, and always in the same order. // A future version of this should take a parameter for a scheduling strategy, @@ -461,104 +507,89 @@ macro_rules! run_selected_spawn { // The reason I had put the spawn in was to catch panics from assert // calls that fail. pub async fn run_tests( - tests: Vec<(&'static str, TestFunc)>, + tests: impl IntoIterator, ) -> std::result::Result, Box> { let mut results: Vec = Vec::new(); let handle = tokio::runtime::Handle::current(); - for (name, tfunc) in tests.into_iter() { - let rc: RpcResult<()> = handle.spawn(tfunc()).await?; + let (tcs, confs): (Vec<_>, Vec<_>) = tests + .into_iter() + .map( + |TestCase { + name, + func, + bin_path, + config, + }| ((name, func), (bin_path, config)), + ) + .unzip(); + for (name, func) in tcs { + let rc = handle.spawn(func()).await?; results.push(TestResult { - name: name.to_string(), + name, passed: rc.is_ok(), ..Default::default() }); } - - let provider = test_provider().await; - let _ = provider.shutdown().await; // send shutdown message - + for (bin_path, conf) in confs { + let _ = test_provider(bin_path, conf).await.shutdown().await; // send shutdown message + } Ok(results) } -pub async fn test_provider() -> Provider { - ONCE.get_or_init(|| async { - match load_provider().await { - Ok(p) => p, - Err(e) => { - panic!("failed to load provider: {}", e); - } - } - }) - .await - .clone() +pub async fn test_provider(bin_path: impl Into, config: Config) -> Provider { + let mut providers = PROVIDERS + .get_or_init(|| async { Default::default() }) + .await + .lock() + .await; + if let Some(prov) = providers.get(&config) { + return prov.clone(); + } + let prov = load_provider(bin_path, config.clone()) + .await + .expect("failed to load provider"); + providers.insert(config, prov.clone()); + prov } -pub async fn load_provider() -> Result { - let mut conf = load_config()?; - let values = conf.remove("values"); - let link_values = if let Some(toml::Value::Table(map)) = values { - to_value_map(&map)? - } else { - SimpleValueMap::default() - }; - let exe_path = conf - .get("bin_path") - .and_then(|v| v.as_str()) - .map(PathBuf::from) - .ok_or_else(|| { - RpcError::ProviderInit("Must specifiy binary path in 'bin_path' in config file".into()) - })?; +pub async fn load_provider( + exe_path: impl Into, + conf: Config, +) -> Result { let actor_kp = KeyPair::new(KeyPairType::Module); eprintln!("actor_id: {}", actor_kp.public_key()); let provider_kp = KeyPair::new(KeyPairType::Service); eprintln!("provider_id: {}", provider_kp.public_key()); + let config_b64 = if !conf.values.is_empty() { + let json = serde_json::to_string(&conf.values) + .map_err(|e| RpcError::Ser(format!("invalid 'values' map: {}", e)))?; + Some(base64::encode_config(&json, base64::STANDARD_NO_PAD)) + } else { + None + }; + let link_values = conf + .values + .clone() + .into_iter() + .chain(config_b64.map(|v| ("config_b64".into(), v))) + .collect(); + let mut test_linkdef = LinkDefinition::default(); test_linkdef.actor_id = actor_kp.public_key(); - test_linkdef.contract_id = conf - .get("contract_id") - .and_then(|v| v.as_str()) - .unwrap_or("wasmcloud:example") - .to_string(); - test_linkdef.link_name = conf - .get("link_name") - .and_then(|v| v.as_str()) - .unwrap_or("default") - .to_string(); + test_linkdef.contract_id = conf.contract_id.clone(); + test_linkdef.link_name = conf.link_name.clone(); test_linkdef.provider_id = provider_kp.public_key(); test_linkdef.values = link_values; - let prov = start_provider_test(conf, &exe_path, test_linkdef.clone()).await?; + let prov = start_provider_test(conf.clone(), exe_path, test_linkdef.clone()).await?; // give it time to startup - let delay_time_sec = match prov.config.get("start_delay_sec") { - Some(n) => { - let n = n.as_integer().unwrap_or(0); - if !(0..=60).contains(&n) { - return Err(RpcError::InvalidParameter(format!( - "configuration value 'start_delay_sec' is too large: {}", - n - ))); - } - std::cmp::max(n as u64, DEFAULT_START_DELAY_SEC) - } - None => DEFAULT_START_DELAY_SEC, - }; - tokio::time::sleep(std::time::Duration::from_secs(delay_time_sec)).await; + tokio::time::sleep(prov.config.start_delay).await; // optionally, allow extra time to handle put_link - if let Some(n) = prov.config.get("link_delay_sec") { - if let Some(n) = n.as_integer() { - if n > 0 { - eprintln!("Pausing {} secs after put_link", n); - tokio::time::sleep(std::time::Duration::from_secs(n as u64)).await; - } - } else { - return Err(RpcError::InvalidParameter(format!( - "configuration value 'link_delay_sec={}' is not a valid integer", - n - ))); - } + if !conf.link_delay.is_zero() { + eprintln!("Pausing {:?} secs after put_link", conf.link_delay); + tokio::time::sleep(conf.link_delay).await; } - Ok(prov) }