From 12fb2d52d2b4ec73561b854827a05910c6cef348 Mon Sep 17 00:00:00 2001 From: jiaxiao zhou Date: Tue, 10 Sep 2024 09:48:12 +0000 Subject: [PATCH 1/2] shim: organize code to different modules this commit organized code to different modules: - constants: to handling constants across entire crate - engine: the execution engine - source: manages application sources - stdio_hook: a trigger hook for stdio - trigger: manages spin triggers - utils: utility functions it also removes ResolvedAppSource as it's not needed Signed-off-by: jiaxiao zhou --- containerd-shim-spin/src/constants.rs | 22 + containerd-shim-spin/src/engine.rs | 628 ++++--------------------- containerd-shim-spin/src/main.rs | 5 + containerd-shim-spin/src/source.rs | 115 +++++ containerd-shim-spin/src/stdio_hook.rs | 25 + containerd-shim-spin/src/trigger.rs | 116 +++++ containerd-shim-spin/src/utils.rs | 158 +++++++ 7 files changed, 541 insertions(+), 528 deletions(-) create mode 100644 containerd-shim-spin/src/constants.rs create mode 100644 containerd-shim-spin/src/source.rs create mode 100644 containerd-shim-spin/src/stdio_hook.rs create mode 100644 containerd-shim-spin/src/trigger.rs create mode 100644 containerd-shim-spin/src/utils.rs diff --git a/containerd-shim-spin/src/constants.rs b/containerd-shim-spin/src/constants.rs new file mode 100644 index 00000000..8bb6e6cc --- /dev/null +++ b/containerd-shim-spin/src/constants.rs @@ -0,0 +1,22 @@ +/// SPIN_ADDR_DEFAULT is the default address and port that the Spin HTTP trigger +/// listens on. +pub(crate) const SPIN_ADDR_DEFAULT: &str = "0.0.0.0:80"; +/// SPIN_HTTP_LISTEN_ADDR_ENV is the environment variable that can be used to +/// override the default address and port that the Spin HTTP trigger listens on. +pub(crate) const SPIN_HTTP_LISTEN_ADDR_ENV: &str = "SPIN_HTTP_LISTEN_ADDR"; +/// RUNTIME_CONFIG_PATH specifies the expected location and name of the runtime +/// config for a Spin application. The runtime config should be loaded into the +/// root `/` of the container. +pub(crate) const RUNTIME_CONFIG_PATH: &str = "/runtime-config.toml"; +/// Describes an OCI layer with Wasm content +pub(crate) const OCI_LAYER_MEDIA_TYPE_WASM: &str = "application/vnd.wasm.content.layer.v1+wasm"; +/// Expected location of the Spin manifest when loading from a file rather than +/// an OCI image +pub(crate) const SPIN_MANIFEST_FILE_PATH: &str = "/spin.toml"; +/// Known prefix for the Spin application variables environment variable +/// provider: https://github.com/fermyon/spin/blob/436ad589237c02f7aa4693e984132808fd80b863/crates/variables/src/provider/env.rs#L9 +pub(crate) const SPIN_APPLICATION_VARIABLE_PREFIX: &str = "SPIN_VARIABLE"; +/// Working directory for Spin applications +pub(crate) const SPIN_TRIGGER_WORKING_DIR: &str = "/"; +/// Name of the Spin lock file +pub(crate) const SPIN_LOCK_FILE_NAME: &str = "spin.lock"; diff --git a/containerd-shim-spin/src/engine.rs b/containerd-shim-spin/src/engine.rs index b0b2f7af..f35b003f 100644 --- a/containerd-shim-spin/src/engine.rs +++ b/containerd-shim-spin/src/engine.rs @@ -1,14 +1,10 @@ use std::{ - collections::{hash_map::DefaultHasher, HashSet}, + collections::hash_map::DefaultHasher, env, - fs::File, hash::{Hash, Hasher}, - io::Write, - net::{SocketAddr, ToSocketAddrs}, - path::{Path, PathBuf}, }; -use anyhow::{anyhow, ensure, Context, Result}; +use anyhow::{Context, Result}; use containerd_shim_wasm::{ container::{Engine, RuntimeContext, Stdio}, sandbox::WasmLayer, @@ -16,37 +12,24 @@ use containerd_shim_wasm::{ }; use futures::future; use log::info; -use oci_spec::image::MediaType; use spin_app::locked::LockedApp; -use spin_loader::{cache::Cache, FilesMountStrategy}; -use spin_manifest::schema::v2::AppManifest; -use spin_trigger::{loader, RuntimeConfig, TriggerExecutor, TriggerExecutorBuilder, TriggerHooks}; +use spin_trigger::TriggerExecutor; use spin_trigger_http::HttpTrigger; use spin_trigger_redis::RedisTrigger; use tokio::runtime::Runtime; use trigger_command::CommandTrigger; use trigger_mqtt::MqttTrigger; use trigger_sqs::SqsTrigger; -use url::Url; -/// SPIN_ADDR_DEFAULT is the default address and port that the Spin HTTP trigger -/// listens on. -const SPIN_ADDR_DEFAULT: &str = "0.0.0.0:80"; -/// SPIN_HTTP_LISTEN_ADDR_ENV is the environment variable that can be used to -/// override the default address and port that the Spin HTTP trigger listens on. -const SPIN_HTTP_LISTEN_ADDR_ENV: &str = "SPIN_HTTP_LISTEN_ADDR"; -/// RUNTIME_CONFIG_PATH specifies the expected location and name of the runtime -/// config for a Spin application. The runtime config should be loaded into the -/// root `/` of the container. -const RUNTIME_CONFIG_PATH: &str = "/runtime-config.toml"; -/// Describes an OCI layer with Wasm content -const OCI_LAYER_MEDIA_TYPE_WASM: &str = "application/vnd.wasm.content.layer.v1+wasm"; -/// Expected location of the Spin manifest when loading from a file rather than -/// an OCI image -const SPIN_MANIFEST_FILE_PATH: &str = "/spin.toml"; -/// Known prefix for the Spin application variables environment variable -/// provider: https://github.com/fermyon/spin/blob/436ad589237c02f7aa4693e984132808fd80b863/crates/variables/src/provider/env.rs#L9 -const SPIN_APPLICATION_VARIABLE_PREFIX: &str = "SPIN_VARIABLE"; +use crate::{ + constants, + source::Source, + trigger::{build_trigger, get_supported_triggers}, + utils::{ + configure_application_variables_from_environment_variables, initialize_cache, + is_wasm_content, parse_addr, + }, +}; #[derive(Clone)] pub struct SpinEngine { @@ -65,354 +48,6 @@ impl Default for SpinEngine { } } -struct StdioTriggerHook; -impl TriggerHooks for StdioTriggerHook { - fn app_loaded( - &mut self, - _app: &spin_app::App, - _runtime_config: &RuntimeConfig, - _resolver: &std::sync::Arc, - ) -> Result<()> { - Ok(()) - } - - fn component_store_builder( - &self, - _component: &spin_app::AppComponent, - builder: &mut spin_core::StoreBuilder, - ) -> Result<()> { - builder.inherit_stdout(); - builder.inherit_stderr(); - Ok(()) - } -} - -#[derive(Clone)] -enum AppSource { - File(PathBuf), - Oci, -} - -impl std::fmt::Debug for AppSource { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - AppSource::File(path) => write!(f, "File({})", path.display()), - AppSource::Oci => write!(f, "Oci"), - } - } -} - -impl SpinEngine { - async fn app_source(&self, ctx: &impl RuntimeContext, cache: &Cache) -> Result { - match ctx.entrypoint().source { - containerd_shim_wasm::container::Source::File(_) => { - Ok(AppSource::File(SPIN_MANIFEST_FILE_PATH.into())) - } - containerd_shim_wasm::container::Source::Oci(layers) => { - info!(" >>> configuring spin oci application {}", layers.len()); - - for layer in layers { - log::debug!("<<< layer config: {:?}", layer.config); - } - - for artifact in layers { - match artifact.config.media_type() { - MediaType::Other(name) - if name == spin_oci::client::SPIN_APPLICATION_MEDIA_TYPE => - { - let path = PathBuf::from("/spin.json"); - log::info!("writing spin oci config to {:?}", path); - File::create(&path) - .context("failed to create spin.json")? - .write_all(&artifact.layer) - .context("failed to write spin.json")?; - } - MediaType::Other(name) if name == OCI_LAYER_MEDIA_TYPE_WASM => { - log::info!( - "<<< writing wasm artifact with length {:?} config to cache, near {:?}", - artifact.layer.len(), cache.manifests_dir() - ); - cache - .write_wasm(&artifact.layer, &artifact.config.digest()) - .await?; - } - MediaType::Other(name) if name == spin_oci::client::DATA_MEDIATYPE => { - log::debug!( - "<<< writing data layer to cache, near {:?}", - cache.manifests_dir() - ); - cache - .write_data(&artifact.layer, &artifact.config.digest()) - .await?; - } - MediaType::Other(name) if name == spin_oci::client::ARCHIVE_MEDIATYPE => { - log::debug!( - "<<< writing archive layer and unpacking contents to cache, near {:?}", - cache.manifests_dir() - ); - self.handle_archive_layer( - cache, - &artifact.layer, - &artifact.config.digest(), - ) - .await - .context("unable to unpack archive layer")?; - } - _ => { - log::debug!( - "<<< unknown media type {:?}", - artifact.config.media_type() - ); - } - } - } - Ok(AppSource::Oci) - } - } - } - - async fn resolve_app_source( - &self, - app_source: AppSource, - cache: &Cache, - ) -> Result { - let resolve_app_source = match app_source { - AppSource::File(source) => ResolvedAppSource::File { - manifest_path: source.clone(), - manifest: spin_manifest::manifest_from_file(source.clone())?, - }, - AppSource::Oci => { - let working_dir = PathBuf::from("/"); - let loader = spin_oci::OciLoader::new(working_dir); - - // TODO: what is the best way to get this info? It isn't used only saved in the locked file - let reference = "docker.io/library/wasmtest_spin:latest"; - - let locked_app = loader - .load_from_cache(PathBuf::from("/spin.json"), reference, cache) - .await?; - ResolvedAppSource::OciRegistry { locked_app } - } - }; - Ok(resolve_app_source) - } - - async fn wasm_exec_async(&self, ctx: &impl RuntimeContext) -> Result<()> { - // create a cache directory at /.cache - // this is needed for the spin LocalLoader to work - // TODO: spin should provide a more flexible `loader::from_file` that - // does not assume the existence of a cache directory - let cache_dir = PathBuf::from("/.cache"); - let cache = Cache::new(Some(cache_dir.clone())) - .await - .context("failed to create cache")?; - env::set_var("XDG_CACHE_HOME", &cache_dir); - let app_source = self.app_source(ctx, &cache).await?; - let resolved_app_source = self.resolve_app_source(app_source.clone(), &cache).await?; - configure_application_variables_from_environment_variables(&resolved_app_source)?; - let trigger_cmds = trigger_command_for_resolved_app_source(&resolved_app_source) - .with_context(|| format!("Couldn't find trigger executor for {app_source:?}"))?; - let locked_app = self.load_resolved_app_source(resolved_app_source).await?; - - let _telemetry_guard = spin_telemetry::init(version!().to_string())?; - - self.run_trigger( - ctx, - trigger_cmds.iter().map(|s| s.as_ref()).collect(), - locked_app, - app_source, - ) - .await - } - - async fn run_trigger( - &self, - ctx: &impl RuntimeContext, - trigger_types: Vec<&str>, - app: LockedApp, - app_source: AppSource, - ) -> Result<()> { - let working_dir = PathBuf::from("/"); - let mut futures_list = Vec::with_capacity(trigger_types.len()); - for trigger_type in trigger_types.iter() { - let f = match trigger_type.to_owned() { - HttpTrigger::TRIGGER_TYPE => { - let http_trigger: HttpTrigger = self - .build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone()) - .await - .context("failed to build spin trigger")?; - - info!(" >>> running spin http trigger"); - let address_str = env::var(SPIN_HTTP_LISTEN_ADDR_ENV) - .unwrap_or_else(|_| SPIN_ADDR_DEFAULT.to_string()); - let address = parse_addr(&address_str)?; - http_trigger.run(spin_trigger_http::CliArgs { - address, - tls_cert: None, - tls_key: None, - }) - } - RedisTrigger::TRIGGER_TYPE => { - let redis_trigger: RedisTrigger = self - .build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone()) - .await - .context("failed to build spin trigger")?; - - info!(" >>> running spin redis trigger"); - redis_trigger.run(spin_trigger::cli::NoArgs) - } - SqsTrigger::TRIGGER_TYPE => { - let sqs_trigger: SqsTrigger = self - .build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone()) - .await - .context("failed to build spin trigger")?; - - info!(" >>> running spin trigger"); - sqs_trigger.run(spin_trigger::cli::NoArgs) - } - CommandTrigger::TRIGGER_TYPE => { - let command_trigger: CommandTrigger = self - .build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone()) - .await - .context("failed to build spin trigger")?; - - info!(" >>> running spin trigger"); - command_trigger.run(trigger_command::CliArgs { - guest_args: ctx.args().to_vec(), - }) - } - MqttTrigger::TRIGGER_TYPE => { - let mqtt_trigger: MqttTrigger = self - .build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone()) - .await - .context("failed to build spin trigger")?; - - info!(" >>> running spin trigger"); - mqtt_trigger.run(trigger_mqtt::CliArgs { test: false }) - } - _ => { - todo!( - "Only Http, Redis, MQTT, SQS and Command triggers are currently supported." - ) - } - }; - - futures_list.push(f) - } - - info!(" >>> notifying main thread we are about to start"); - - // exit as soon as any of the trigger completes/exits - let (result, index, rest) = future::select_all(futures_list).await; - info!( - " >>> trigger type '{trigger_type}' exited", - trigger_type = trigger_types[index] - ); - - drop(rest); - - result - } - - async fn load_resolved_app_source( - &self, - resolved: ResolvedAppSource, - ) -> anyhow::Result { - match resolved { - ResolvedAppSource::File { manifest_path, .. } => { - // TODO: This should be configurable, see https://github.com/deislabs/containerd-wasm-shims/issues/166 - // TODO: ^^ Move aforementioned issue to this repo - let files_mount_strategy = FilesMountStrategy::Direct; - spin_loader::from_file(&manifest_path, files_mount_strategy, None).await - } - ResolvedAppSource::OciRegistry { locked_app } => Ok(locked_app), - } - } - - async fn write_locked_app(&self, locked_app: &LockedApp, working_dir: &Path) -> Result { - let locked_path = working_dir.join("spin.lock"); - let locked_app_contents = - serde_json::to_vec_pretty(&locked_app).context("failed to serialize locked app")?; - tokio::fs::write(&locked_path, locked_app_contents) - .await - .with_context(|| format!("failed to write {:?}", locked_path))?; - let locked_url = Url::from_file_path(&locked_path) - .map_err(|_| anyhow!("cannot convert to file URL: {locked_path:?}"))? - .to_string(); - - Ok(locked_url) - } - - async fn build_spin_trigger( - &self, - working_dir: PathBuf, - app: LockedApp, - app_source: AppSource, - ) -> Result - where - for<'de> ::TriggerConfig: serde::de::Deserialize<'de>, - { - let locked_url = self.write_locked_app(&app, &working_dir).await?; - - // Build trigger config - let mut loader = loader::TriggerLoader::new(working_dir.clone(), true); - match app_source { - AppSource::Oci => unsafe { - // Configure the loader to support loading AOT compiled components.. - // Since all components were compiled by the shim (during `precompile`), - // this operation can be considered safe. - loader.enable_loading_aot_compiled_components(); - }, - // Currently, it is only possible to precompile applications distributed using - // `spin registry push` - AppSource::File(_) => {} - }; - let mut runtime_config = RuntimeConfig::new(PathBuf::from("/").into()); - // Load in runtime config if one exists at expected location - if Path::new(RUNTIME_CONFIG_PATH).exists() { - runtime_config.merge_config_file(RUNTIME_CONFIG_PATH)?; - } - let mut builder = TriggerExecutorBuilder::new(loader); - builder - .hooks(StdioTriggerHook {}) - .config_mut() - .wasmtime_config() - .cranelift_opt_level(spin_core::wasmtime::OptLevel::Speed); - let init_data = Default::default(); - let executor = builder.build(locked_url, runtime_config, init_data).await?; - Ok(executor) - } - - // Returns Some(WasmLayer) if the layer contains wasm, otherwise None - fn is_wasm_content(layer: &WasmLayer) -> Option { - if let MediaType::Other(name) = layer.config.media_type() { - if name == OCI_LAYER_MEDIA_TYPE_WASM { - return Some(layer.clone()); - } - } - None - } - - async fn handle_archive_layer( - &self, - cache: &Cache, - bytes: impl AsRef<[u8]>, - digest: impl AsRef, - ) -> Result<()> { - // spin_oci::client::unpack_archive_layer attempts to create a tempdir via tempfile::tempdir() - // which will fall back to /tmp if TMPDIR is not set. /tmp is either not found or not accessible - // in the shim environment, hence setting to current working directory. - if env::var("TMPDIR").is_err() { - log::debug!( - "<<< TMPDIR is not set; setting to current working directory for unpacking archive layer" - ); - env::set_var("TMPDIR", env::current_dir().unwrap_or(".".into())); - } - - spin_oci::client::unpack_archive_layer(cache, bytes, digest).await - } -} - impl Engine for SpinEngine { fn name() -> &'static str { "spin" @@ -448,7 +83,7 @@ impl Engine for SpinEngine { fn supported_layers_types() -> &'static [&'static str] { &[ - OCI_LAYER_MEDIA_TYPE_WASM, + constants::OCI_LAYER_MEDIA_TYPE_WASM, spin_oci::client::ARCHIVE_MEDIATYPE, spin_oci::client::DATA_MEDIATYPE, spin_oci::client::SPIN_APPLICATION_MEDIA_TYPE, @@ -459,7 +94,7 @@ impl Engine for SpinEngine { // Runwasi expects layers to be returned in the same order, so wrap each layer in an option, setting non Wasm layers to None let precompiled_layers = layers .iter() - .map(|layer| match SpinEngine::is_wasm_content(layer) { + .map(|layer| match is_wasm_content(layer) { Some(wasm_layer) => { log::info!( "Precompile called for wasm layer {:?}", @@ -494,168 +129,105 @@ impl Engine for SpinEngine { } } -fn parse_addr(addr: &str) -> Result { - let addrs: SocketAddr = addr - .to_socket_addrs()? - .next() - .ok_or_else(|| anyhow!("could not parse address: {}", addr))?; - Ok(addrs) -} - -// TODO: we should use spin's ResolvedAppSource -pub enum ResolvedAppSource { - File { - manifest_path: PathBuf, - manifest: AppManifest, - }, - OciRegistry { - locked_app: LockedApp, - }, -} +impl SpinEngine { + async fn wasm_exec_async(&self, ctx: &impl RuntimeContext) -> Result<()> { + let cache = initialize_cache().await?; + let app_source = Source::from_ctx(ctx, &cache).await?; + let locked_app = app_source.to_locked_app(&cache).await?; + configure_application_variables_from_environment_variables(&locked_app)?; + let trigger_cmds = get_supported_triggers(&locked_app) + .with_context(|| format!("Couldn't find trigger executor for {app_source:?}"))?; -impl ResolvedAppSource { - pub fn trigger_types(&self) -> anyhow::Result> { - let types = match self { - ResolvedAppSource::File { manifest, .. } => { - manifest.triggers.keys().collect::>() - } - ResolvedAppSource::OciRegistry { locked_app } => locked_app - .triggers - .iter() - .map(|t| &t.trigger_type) - .collect::>(), - }; + let _telemetry_guard = spin_telemetry::init(version!().to_string())?; - ensure!(!types.is_empty(), "no triggers in app"); - Ok(types.into_iter().map(|t| t.as_str()).collect()) + self.run_trigger( + ctx, + trigger_cmds.iter().map(|s| s.as_ref()).collect(), + locked_app, + app_source, + ) + .await } - pub fn variables(&self) -> Vec<&str> { - match self { - ResolvedAppSource::File { manifest, .. } => manifest - .variables - .keys() - .map(|k| k.as_ref()) - .collect::>(), - ResolvedAppSource::OciRegistry { locked_app } => locked_app - .variables - .keys() - .map(|k| k.as_ref()) - .collect::>(), - } - } -} + async fn run_trigger( + &self, + ctx: &impl RuntimeContext, + trigger_types: Vec<&str>, + app: LockedApp, + app_source: Source, + ) -> Result<()> { + let mut futures_list = Vec::with_capacity(trigger_types.len()); + for trigger_type in trigger_types.iter() { + let f = match trigger_type.to_owned() { + HttpTrigger::TRIGGER_TYPE => { + let http_trigger = + build_trigger::(app.clone(), app_source.clone()).await?; + info!(" >>> running spin http trigger"); + let address_str = env::var(constants::SPIN_HTTP_LISTEN_ADDR_ENV) + .unwrap_or_else(|_| constants::SPIN_ADDR_DEFAULT.to_string()); + let address = parse_addr(&address_str)?; + http_trigger.run(spin_trigger_http::CliArgs { + address, + tls_cert: None, + tls_key: None, + }) + } + RedisTrigger::TRIGGER_TYPE => { + let redis_trigger = + build_trigger::(app.clone(), app_source.clone()).await?; + info!(" >>> running spin redis trigger"); + redis_trigger.run(spin_trigger::cli::NoArgs) + } + SqsTrigger::TRIGGER_TYPE => { + let sqs_trigger = + build_trigger::(app.clone(), app_source.clone()).await?; + info!(" >>> running spin sqs trigger"); + sqs_trigger.run(spin_trigger::cli::NoArgs) + } + CommandTrigger::TRIGGER_TYPE => { + let command_trigger = + build_trigger::(app.clone(), app_source.clone()).await?; + info!(" >>> running spin command trigger"); + command_trigger.run(trigger_command::CliArgs { + guest_args: ctx.args().to_vec(), + }) + } + MqttTrigger::TRIGGER_TYPE => { + let mqtt_trigger = + build_trigger::(app.clone(), app_source.clone()).await?; + info!(" >>> running spin mqtt trigger"); + mqtt_trigger.run(trigger_mqtt::CliArgs { test: false }) + } + _ => { + // This should never happen as we check for supported triggers in get_supported_triggers + unreachable!() + } + }; -fn trigger_command_for_resolved_app_source(resolved: &ResolvedAppSource) -> Result> { - let trigger_types = resolved.trigger_types()?; - let mut types = Vec::with_capacity(trigger_types.len()); - for trigger_type in trigger_types.iter() { - match trigger_type.to_owned() { - RedisTrigger::TRIGGER_TYPE - | HttpTrigger::TRIGGER_TYPE - | SqsTrigger::TRIGGER_TYPE - | MqttTrigger::TRIGGER_TYPE - | CommandTrigger::TRIGGER_TYPE => types.push(trigger_type), - _ => { - todo!("Only Http, Redis, MQTT and SQS triggers are currently supported.") - } + futures_list.push(f) } - } - Ok(trigger_types.iter().map(|x| x.to_string()).collect()) -} + info!(" >>> notifying main thread we are about to start"); + + // exit as soon as any of the trigger completes/exits + let (result, index, rest) = future::select_all(futures_list).await; + info!( + " >>> trigger type '{trigger_type}' exited", + trigger_type = trigger_types[index] + ); -// For each Spin app variable, checks if a container environment variable with -// the same name exists and duplicates it in the environment with the -// application variable prefix -fn configure_application_variables_from_environment_variables( - resolved: &ResolvedAppSource, -) -> Result<()> { - resolved - .variables() - .into_iter() - .map(str::to_ascii_uppercase) - .for_each(|variable| { - env::var(&variable) - .map(|val| { - let prefixed = format!("{}_{}", SPIN_APPLICATION_VARIABLE_PREFIX, variable); - env::set_var(prefixed, val); - }) - .ok(); - }); - Ok(()) + drop(rest); + + result + } } #[cfg(test)] mod tests { - use std::env; + use oci_spec::image::MediaType; use super::*; - #[test] - fn test_configure_application_variables_from_environment_variables() { - temp_env::with_vars( - [ - ("SPIN_VARIABLE_DO_NOT_RESET", Some("val1")), - ("SHOULD_BE_PREFIXED", Some("val2")), - ("ignored_if_not_uppercased_env", Some("val3")), - ], - || { - let app_json = r#" - { - "spin_lock_version": 1, - "entrypoint": "test", - "components": [], - "variables": {"should_be_prefixed": { "required": "true"}, "do_not_reset" : { "required": "true"}, "not_set_as_container_env": { "required": "true"}, "ignored_if_not_uppercased_env": { "required": "true"}}, - "triggers": [] - }"#; - let locked_app = LockedApp::from_json(app_json.as_bytes()).unwrap(); - let resolved = ResolvedAppSource::OciRegistry { locked_app }; - - configure_application_variables_from_environment_variables(&resolved).unwrap(); - assert_eq!(env::var("SPIN_VARIABLE_DO_NOT_RESET").unwrap(), "val1"); - assert_eq!( - env::var("SPIN_VARIABLE_SHOULD_BE_PREFIXED").unwrap(), - "val2" - ); - assert!(env::var("SPIN_VARIABLE_NOT_SET_AS_CONTAINER_ENV").is_err()); - assert!(env::var("SPIN_VARIABLE_IGNORED_IF_NOT_UPPERCASED_ENV").is_err()); - // Original env vars are still retained but not set in variable provider - assert!(env::var("SHOULD_BE_PREFIXED").is_ok()); - }, - ); - } - - #[test] - fn can_parse_spin_address() { - let parsed = parse_addr(SPIN_ADDR_DEFAULT).unwrap(); - assert_eq!(parsed.clone().port(), 80); - assert_eq!(parsed.ip().to_string(), "0.0.0.0"); - } - - #[test] - fn is_wasm_content() { - let wasm_content = WasmLayer { - layer: vec![], - config: oci_spec::image::Descriptor::new( - MediaType::Other(OCI_LAYER_MEDIA_TYPE_WASM.to_string()), - 1024, - "sha256:1234", - ), - }; - // Should be ignored - let data_content = WasmLayer { - layer: vec![], - config: oci_spec::image::Descriptor::new( - MediaType::Other(spin_oci::client::DATA_MEDIATYPE.to_string()), - 1024, - "sha256:1234", - ), - }; - assert!(SpinEngine::is_wasm_content(&wasm_content).is_some()); - assert!(SpinEngine::is_wasm_content(&data_content).is_none()); - } - #[test] fn precompile() { let module = wat::parse_str("(module)").unwrap(); @@ -669,7 +241,7 @@ mod tests { WasmLayer { layer: module.clone(), config: oci_spec::image::Descriptor::new( - MediaType::Other(OCI_LAYER_MEDIA_TYPE_WASM.to_string()), + MediaType::Other(constants::OCI_LAYER_MEDIA_TYPE_WASM.to_string()), 1024, "sha256:1234", ), @@ -678,7 +250,7 @@ mod tests { WasmLayer { layer: component.to_owned(), config: oci_spec::image::Descriptor::new( - MediaType::Other(OCI_LAYER_MEDIA_TYPE_WASM.to_string()), + MediaType::Other(constants::OCI_LAYER_MEDIA_TYPE_WASM.to_string()), 1024, "sha256:1234", ), diff --git a/containerd-shim-spin/src/main.rs b/containerd-shim-spin/src/main.rs index cd1ec867..af1afc0f 100644 --- a/containerd-shim-spin/src/main.rs +++ b/containerd-shim-spin/src/main.rs @@ -4,7 +4,12 @@ use containerd_shim_wasm::{ sandbox::cli::{revision, shim_main, version}, }; +mod constants; mod engine; +mod source; +mod stdio_hook; +mod trigger; +mod utils; fn main() { // Configure the shim to have only error level logging for performance improvements. diff --git a/containerd-shim-spin/src/source.rs b/containerd-shim-spin/src/source.rs new file mode 100644 index 00000000..fde89703 --- /dev/null +++ b/containerd-shim-spin/src/source.rs @@ -0,0 +1,115 @@ +use std::{fs::File, io::Write, path::PathBuf}; + +use anyhow::{Context, Result}; +use containerd_shim_wasm::container::RuntimeContext; +use log::info; +use oci_spec::image::MediaType; +use spin_app::locked::LockedApp; +use spin_loader::{cache::Cache, FilesMountStrategy}; + +use crate::{constants, utils::handle_archive_layer}; + +#[derive(Clone)] +pub enum Source { + File(PathBuf), + Oci, +} + +impl std::fmt::Debug for Source { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Source::File(path) => write!(f, "File({})", path.display()), + Source::Oci => write!(f, "Oci"), + } + } +} + +impl Source { + pub(crate) async fn from_ctx(ctx: &impl RuntimeContext, cache: &Cache) -> Result { + match ctx.entrypoint().source { + containerd_shim_wasm::container::Source::File(_) => { + Ok(Source::File(constants::SPIN_MANIFEST_FILE_PATH.into())) + } + containerd_shim_wasm::container::Source::Oci(layers) => { + info!(" >>> configuring spin oci application {}", layers.len()); + + for layer in layers { + log::debug!("<<< layer config: {:?}", layer.config); + } + + for artifact in layers { + match artifact.config.media_type() { + MediaType::Other(name) + if name == spin_oci::client::SPIN_APPLICATION_MEDIA_TYPE => + { + let path = PathBuf::from("/spin.json"); + log::info!("writing spin oci config to {:?}", path); + File::create(&path) + .context("failed to create spin.json")? + .write_all(&artifact.layer) + .context("failed to write spin.json")?; + } + MediaType::Other(name) if name == constants::OCI_LAYER_MEDIA_TYPE_WASM => { + log::info!( + "<<< writing wasm artifact with length {:?} config to cache, near {:?}", + artifact.layer.len(), + cache.manifests_dir() + ); + cache + .write_wasm(&artifact.layer, &artifact.config.digest()) + .await?; + } + MediaType::Other(name) if name == spin_oci::client::DATA_MEDIATYPE => { + log::debug!( + "<<< writing data layer to cache, near {:?}", + cache.manifests_dir() + ); + cache + .write_data(&artifact.layer, &artifact.config.digest()) + .await?; + } + MediaType::Other(name) if name == spin_oci::client::ARCHIVE_MEDIATYPE => { + log::debug!( + "<<< writing archive layer and unpacking contents to cache, near {:?}", + cache.manifests_dir() + ); + handle_archive_layer(cache, &artifact.layer, &artifact.config.digest()) + .await + .context("unable to unpack archive layer")?; + } + _ => { + log::debug!( + "<<< unknown media type {:?}", + artifact.config.media_type() + ); + } + } + } + Ok(Source::Oci) + } + } + } + + pub(crate) async fn to_locked_app(&self, cache: &Cache) -> Result { + let locked_app = match self { + Source::File(source) => { + // TODO: This should be configurable, see https://github.com/deislabs/containerd-wasm-shims/issues/166 + // TODO: ^^ Move aforementioned issue to this repo + let files_mount_strategy = FilesMountStrategy::Direct; + spin_loader::from_file(&source, files_mount_strategy, None).await + } + Source::Oci => { + let working_dir = PathBuf::from("/"); + let loader = spin_oci::OciLoader::new(working_dir); + + // TODO: what is the best way to get this info? It isn't used only saved in the locked file + let reference = "docker.io/library/wasmtest_spin:latest"; + + loader + .load_from_cache(PathBuf::from("/spin.json"), reference, cache) + .await + } + }?; + Ok(locked_app) + } +} diff --git a/containerd-shim-spin/src/stdio_hook.rs b/containerd-shim-spin/src/stdio_hook.rs new file mode 100644 index 00000000..2b1a1daf --- /dev/null +++ b/containerd-shim-spin/src/stdio_hook.rs @@ -0,0 +1,25 @@ +use anyhow::Result; +use spin_trigger::{RuntimeConfig, TriggerHooks}; + +pub(crate) struct StdioHook; + +impl TriggerHooks for StdioHook { + fn app_loaded( + &mut self, + _app: &spin_app::App, + _runtime_config: &RuntimeConfig, + _resolver: &std::sync::Arc, + ) -> Result<()> { + Ok(()) + } + + fn component_store_builder( + &self, + _component: &spin_app::AppComponent, + builder: &mut spin_core::StoreBuilder, + ) -> Result<()> { + builder.inherit_stdout(); + builder.inherit_stderr(); + Ok(()) + } +} diff --git a/containerd-shim-spin/src/trigger.rs b/containerd-shim-spin/src/trigger.rs new file mode 100644 index 00000000..d4b3824a --- /dev/null +++ b/containerd-shim-spin/src/trigger.rs @@ -0,0 +1,116 @@ +use std::{ + collections::HashSet, + path::{Path, PathBuf}, +}; + +use anyhow::{anyhow, Context, Result}; +use spin_app::locked::LockedApp; +use spin_trigger::{loader, RuntimeConfig, TriggerExecutor, TriggerExecutorBuilder}; +use spin_trigger_http::HttpTrigger; +use spin_trigger_redis::RedisTrigger; +use trigger_command::CommandTrigger; +use trigger_mqtt::MqttTrigger; +use trigger_sqs::SqsTrigger; +use url::Url; + +use crate::{constants, source::Source, stdio_hook::StdioHook}; + +pub(crate) async fn build_trigger(app: LockedApp, app_source: Source) -> Result +where + T: spin_trigger::TriggerExecutor, + T::TriggerConfig: serde::de::DeserializeOwned, +{ + let working_dir = PathBuf::from(constants::SPIN_TRIGGER_WORKING_DIR); + let trigger: T = build_trigger_inner(working_dir, app, app_source) + .await + .context("failed to build spin trigger")?; + Ok(trigger) +} + +async fn build_trigger_inner( + working_dir: PathBuf, + app: LockedApp, + app_source: Source, +) -> Result +where + for<'de> ::TriggerConfig: serde::de::Deserialize<'de>, +{ + let locked_url = write_locked_app(&app, &working_dir).await?; + + // Build trigger config + let mut loader = loader::TriggerLoader::new(working_dir.clone(), true); + match app_source { + Source::Oci => unsafe { + // Configure the loader to support loading AOT compiled components.. + // Since all components were compiled by the shim (during `precompile`), + // this operation can be considered safe. + loader.enable_loading_aot_compiled_components(); + }, + // Currently, it is only possible to precompile applications distributed using + // `spin registry push` + Source::File(_) => {} + }; + let mut runtime_config = RuntimeConfig::new(PathBuf::from("/").into()); + // Load in runtime config if one exists at expected location + if Path::new(constants::RUNTIME_CONFIG_PATH).exists() { + runtime_config.merge_config_file(constants::RUNTIME_CONFIG_PATH)?; + } + let mut builder = TriggerExecutorBuilder::new(loader); + builder + .hooks(StdioHook {}) + .config_mut() + .wasmtime_config() + .cranelift_opt_level(spin_core::wasmtime::OptLevel::Speed); + let init_data = Default::default(); + let executor = builder.build(locked_url, runtime_config, init_data).await?; + Ok(executor) +} + +async fn write_locked_app(locked_app: &LockedApp, working_dir: &Path) -> Result { + let locked_path: PathBuf = working_dir.join(constants::SPIN_LOCK_FILE_NAME); + let locked_app_contents = + serde_json::to_vec_pretty(&locked_app).context("failed to serialize locked app")?; + tokio::fs::write(&locked_path, locked_app_contents) + .await + .with_context(|| format!("failed to write {:?}", locked_path))?; + let locked_url = Url::from_file_path(&locked_path) + .map_err(|_| anyhow!("cannot convert to file URL: {locked_path:?}"))? + .to_string(); + + Ok(locked_url) +} + +/// get the supported trigger types from the `LockedApp`. +/// +/// this function filters the trigger types to only return the ones that are currently supported. +/// If an unsupported trigger type is found, it returns an error indicating which trigger type is unsupported. +/// +/// supported trigger types include: +/// - redis +/// - http +/// - sqs +/// - mqtt +/// - command +pub(crate) fn get_supported_triggers(locked_app: &LockedApp) -> anyhow::Result> { + let supported_triggers: HashSet<&str> = HashSet::from([ + RedisTrigger::TRIGGER_TYPE, + HttpTrigger::TRIGGER_TYPE, + SqsTrigger::TRIGGER_TYPE, + MqttTrigger::TRIGGER_TYPE, + CommandTrigger::TRIGGER_TYPE, + ]); + + let mut types: Vec = Vec::with_capacity(locked_app.triggers.len()); + + for trigger in &locked_app.triggers { + let trigger_type = &trigger.trigger_type; + if !supported_triggers.contains(trigger_type.as_str()) { + anyhow::bail!( + "Only Http, Redis, MQTT, SQS, and Command triggers are currently supported. Found unsupported trigger: {:?}", + trigger_type + ); + } + types.push(trigger_type.clone()); + } + Ok(types) +} diff --git a/containerd-shim-spin/src/utils.rs b/containerd-shim-spin/src/utils.rs new file mode 100644 index 00000000..3f53efab --- /dev/null +++ b/containerd-shim-spin/src/utils.rs @@ -0,0 +1,158 @@ +use std::{ + env, + net::{SocketAddr, ToSocketAddrs}, + path::PathBuf, +}; + +use anyhow::{anyhow, Context, Result}; +use containerd_shim_wasm::sandbox::WasmLayer; +use oci_spec::image::MediaType; +use spin_app::locked::LockedApp; +use spin_loader::cache::Cache; + +use crate::constants; + +// create a cache directory at /.cache +// this is needed for the spin LocalLoader to work +// TODO: spin should provide a more flexible `loader::from_file` that +// does not assume the existence of a cache directory +pub(crate) async fn initialize_cache() -> Result { + let cache_dir = PathBuf::from("/.cache"); + let cache = Cache::new(Some(cache_dir.clone())) + .await + .context("failed to create cache")?; + env::set_var("XDG_CACHE_HOME", &cache_dir); + Ok(cache) +} + +pub(crate) async fn handle_archive_layer( + cache: &Cache, + bytes: impl AsRef<[u8]>, + digest: impl AsRef, +) -> Result<()> { + // spin_oci::client::unpack_archive_layer attempts to create a tempdir via tempfile::tempdir() + // which will fall back to /tmp if TMPDIR is not set. /tmp is either not found or not accessible + // in the shim environment, hence setting to current working directory. + if env::var("TMPDIR").is_err() { + log::debug!( + "<<< TMPDIR is not set; setting to current working directory for unpacking archive layer" + ); + env::set_var("TMPDIR", env::current_dir().unwrap_or(".".into())); + } + + spin_oci::client::unpack_archive_layer(cache, bytes, digest).await +} + +// Returns Some(WasmLayer) if the layer contains wasm, otherwise None +pub(crate) fn is_wasm_content(layer: &WasmLayer) -> Option { + if let MediaType::Other(name) = layer.config.media_type() { + if name == constants::OCI_LAYER_MEDIA_TYPE_WASM { + return Some(layer.clone()); + } + } + None +} + +pub(crate) fn parse_addr(addr: &str) -> Result { + let addrs: SocketAddr = addr + .to_socket_addrs()? + .next() + .ok_or_else(|| anyhow!("could not parse address: {}", addr))?; + Ok(addrs) +} + +// For each Spin app variable, checks if a container environment variable with +// the same name exists and duplicates it in the environment with the +// application variable prefix +pub(crate) fn configure_application_variables_from_environment_variables( + resolved: &LockedApp, +) -> Result<()> { + resolved + .variables + .keys() + .map(|k| k.as_ref()) + .map(str::to_ascii_uppercase) + .for_each(|variable| { + env::var(&variable) + .map(|val| { + let prefixed = format!( + "{}_{}", + constants::SPIN_APPLICATION_VARIABLE_PREFIX, + variable + ); + env::set_var(prefixed, val); + }) + .ok(); + }); + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::env; + + use super::*; + + #[test] + fn test_configure_application_variables_from_environment_variables() { + temp_env::with_vars( + [ + ("SPIN_VARIABLE_DO_NOT_RESET", Some("val1")), + ("SHOULD_BE_PREFIXED", Some("val2")), + ("ignored_if_not_uppercased_env", Some("val3")), + ], + || { + let app_json = r#" + { + "spin_lock_version": 1, + "entrypoint": "test", + "components": [], + "variables": {"should_be_prefixed": { "required": "true"}, "do_not_reset" : { "required": "true"}, "not_set_as_container_env": { "required": "true"}, "ignored_if_not_uppercased_env": { "required": "true"}}, + "triggers": [] + }"#; + let locked_app = LockedApp::from_json(app_json.as_bytes()).unwrap(); + + configure_application_variables_from_environment_variables(&locked_app).unwrap(); + assert_eq!(env::var("SPIN_VARIABLE_DO_NOT_RESET").unwrap(), "val1"); + assert_eq!( + env::var("SPIN_VARIABLE_SHOULD_BE_PREFIXED").unwrap(), + "val2" + ); + assert!(env::var("SPIN_VARIABLE_NOT_SET_AS_CONTAINER_ENV").is_err()); + assert!(env::var("SPIN_VARIABLE_IGNORED_IF_NOT_UPPERCASED_ENV").is_err()); + // Original env vars are still retained but not set in variable provider + assert!(env::var("SHOULD_BE_PREFIXED").is_ok()); + }, + ); + } + + #[test] + fn can_parse_spin_address() { + let parsed = parse_addr(constants::SPIN_ADDR_DEFAULT).unwrap(); + assert_eq!(parsed.clone().port(), 80); + assert_eq!(parsed.ip().to_string(), "0.0.0.0"); + } + + #[test] + fn is_wasm_content_test() { + let wasm_content = WasmLayer { + layer: vec![], + config: oci_spec::image::Descriptor::new( + MediaType::Other(constants::OCI_LAYER_MEDIA_TYPE_WASM.to_string()), + 1024, + "sha256:1234", + ), + }; + // Should be ignored + let data_content = WasmLayer { + layer: vec![], + config: oci_spec::image::Descriptor::new( + MediaType::Other(spin_oci::client::DATA_MEDIATYPE.to_string()), + 1024, + "sha256:1234", + ), + }; + assert!(is_wasm_content(&wasm_content).is_some()); + assert!(is_wasm_content(&data_content).is_none()); + } +} From 79d54ba559824df85950ef309c8d25472218a432 Mon Sep 17 00:00:00 2001 From: jiaxiao zhou Date: Thu, 19 Sep 2024 23:27:10 +0000 Subject: [PATCH 2/2] containerd-shim-spin: get_supported_triggers returns a HashSet instead of Vec Signed-off-by: jiaxiao zhou --- containerd-shim-spin/src/engine.rs | 30 +++++++++++++---------------- containerd-shim-spin/src/trigger.rs | 30 +++++++++++++++-------------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/containerd-shim-spin/src/engine.rs b/containerd-shim-spin/src/engine.rs index f35b003f..961037fb 100644 --- a/containerd-shim-spin/src/engine.rs +++ b/containerd-shim-spin/src/engine.rs @@ -1,5 +1,5 @@ use std::{ - collections::hash_map::DefaultHasher, + collections::{hash_map::DefaultHasher, HashSet}, env, hash::{Hash, Hasher}, }; @@ -137,28 +137,24 @@ impl SpinEngine { configure_application_variables_from_environment_variables(&locked_app)?; let trigger_cmds = get_supported_triggers(&locked_app) .with_context(|| format!("Couldn't find trigger executor for {app_source:?}"))?; - let _telemetry_guard = spin_telemetry::init(version!().to_string())?; - self.run_trigger( - ctx, - trigger_cmds.iter().map(|s| s.as_ref()).collect(), - locked_app, - app_source, - ) - .await + self.run_trigger(ctx, &trigger_cmds, locked_app, app_source) + .await } async fn run_trigger( &self, ctx: &impl RuntimeContext, - trigger_types: Vec<&str>, + trigger_types: &HashSet, app: LockedApp, app_source: Source, ) -> Result<()> { - let mut futures_list = Vec::with_capacity(trigger_types.len()); + let mut futures_list = Vec::new(); + let mut trigger_type_map = Vec::new(); + for trigger_type in trigger_types.iter() { - let f = match trigger_type.to_owned() { + let f = match trigger_type.as_str() { HttpTrigger::TRIGGER_TYPE => { let http_trigger = build_trigger::(app.clone(), app_source.clone()).await?; @@ -204,17 +200,17 @@ impl SpinEngine { } }; - futures_list.push(f) + trigger_type_map.push(trigger_type.clone()); + futures_list.push(f); } info!(" >>> notifying main thread we are about to start"); // exit as soon as any of the trigger completes/exits let (result, index, rest) = future::select_all(futures_list).await; - info!( - " >>> trigger type '{trigger_type}' exited", - trigger_type = trigger_types[index] - ); + let trigger_type = &trigger_type_map[index]; + + info!(" >>> trigger type '{trigger_type}' exited"); drop(rest); diff --git a/containerd-shim-spin/src/trigger.rs b/containerd-shim-spin/src/trigger.rs index d4b3824a..efd18ca6 100644 --- a/containerd-shim-spin/src/trigger.rs +++ b/containerd-shim-spin/src/trigger.rs @@ -91,7 +91,9 @@ async fn write_locked_app(locked_app: &LockedApp, working_dir: &Path) -> Result< /// - sqs /// - mqtt /// - command -pub(crate) fn get_supported_triggers(locked_app: &LockedApp) -> anyhow::Result> { +/// +/// Note: this function returns a `HashSet` of supported trigger types. Duplicates are removed. +pub(crate) fn get_supported_triggers(locked_app: &LockedApp) -> anyhow::Result> { let supported_triggers: HashSet<&str> = HashSet::from([ RedisTrigger::TRIGGER_TYPE, HttpTrigger::TRIGGER_TYPE, @@ -100,17 +102,17 @@ pub(crate) fn get_supported_triggers(locked_app: &LockedApp) -> anyhow::Result = Vec::with_capacity(locked_app.triggers.len()); - - for trigger in &locked_app.triggers { - let trigger_type = &trigger.trigger_type; - if !supported_triggers.contains(trigger_type.as_str()) { - anyhow::bail!( - "Only Http, Redis, MQTT, SQS, and Command triggers are currently supported. Found unsupported trigger: {:?}", - trigger_type - ); - } - types.push(trigger_type.clone()); - } - Ok(types) + locked_app.triggers.iter() + .map(|trigger| { + let trigger_type = &trigger.trigger_type; + if !supported_triggers.contains(trigger_type.as_str()) { + Err(anyhow!( + "Only Http, Redis, MQTT, SQS, and Command triggers are currently supported. Found unsupported trigger: {:?}", + trigger_type + )) + } else { + Ok(trigger_type.clone()) + } + }) + .collect::>>() }