Skip to content

Commit

Permalink
Merge pull request #53 from rajatjindal/support-multi-trigger-app
Browse files Browse the repository at this point in the history
Support multi trigger apps
  • Loading branch information
devigned authored Apr 15, 2024
2 parents 175e241 + 9229593 commit 0892076
Show file tree
Hide file tree
Showing 15 changed files with 1,520 additions and 70 deletions.
155 changes: 90 additions & 65 deletions containerd-shim-spin/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use containerd_shim_wasm::{
container::{Engine, RuntimeContext, Stdio},
sandbox::WasmLayer,
};
use futures::future;
use log::info;
use oci_spec::image::MediaType;
use spin_app::locked::LockedApp;
Expand Down Expand Up @@ -189,70 +190,91 @@ impl SpinEngine {
env::set_var("XDG_CACHE_HOME", &cache_dir);
let app_source = self.app_source(ctx, &cache).await?;
let resolved_app_source = self.resolve_app_source(app_source.clone(), &cache).await?;
let trigger_cmd = trigger_command_for_resolved_app_source(&resolved_app_source)
let trigger_cmds = trigger_command_for_resolved_app_source(&resolved_app_source)
.with_context(|| format!("Couldn't find trigger executor for {app_source:?}"))?;
let locked_app = self.load_resolved_app_source(resolved_app_source).await?;
self.run_trigger(ctx, &trigger_cmd, locked_app, app_source)
.await
self.run_trigger(
ctx,
trigger_cmds.iter().map(|s| s.as_ref()).collect(),
locked_app,
app_source,
)
.await
}

async fn run_trigger(
&self,
ctx: &impl RuntimeContext,
trigger_type: &str,
trigger_types: Vec<&str>,
app: LockedApp,
app_source: AppSource,
) -> Result<()> {
let working_dir = PathBuf::from("/");
let f = match trigger_type {
HttpTrigger::TRIGGER_TYPE => {
let http_trigger: HttpTrigger = self
.build_spin_trigger(working_dir, app, app_source)
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
http_trigger.run(spin_trigger_http::CliArgs {
address: parse_addr(SPIN_ADDR).unwrap(),
tls_cert: None,
tls_key: None,
})
}
RedisTrigger::TRIGGER_TYPE => {
let redis_trigger: RedisTrigger = self
.build_spin_trigger(working_dir, app, app_source)
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
redis_trigger.run(spin_trigger::cli::NoArgs)
}
SqsTrigger::TRIGGER_TYPE => {
let sqs_trigger: SqsTrigger = self
.build_spin_trigger(working_dir, app, app_source)
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
sqs_trigger.run(spin_trigger::cli::NoArgs)
}
CommandTrigger::TRIGGER_TYPE => {
let command_trigger: CommandTrigger = self
.build_spin_trigger(working_dir, app, app_source)
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
command_trigger.run(trigger_command::CliArgs {
guest_args: ctx.args().to_vec(),
})
}
_ => {
todo!("Only Http, Redis and SQS triggers are currently supported.")
}
};
let mut futures_list = Vec::with_capacity(trigger_types.len());
for trigger_type in trigger_types.iter() {
let f = match trigger_type.to_owned() {
HttpTrigger::TRIGGER_TYPE => {
let http_trigger: HttpTrigger = self
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin http trigger");
http_trigger.run(spin_trigger_http::CliArgs {
address: parse_addr(SPIN_ADDR).unwrap(),
tls_cert: None,
tls_key: None,
})
}
RedisTrigger::TRIGGER_TYPE => {
let redis_trigger: RedisTrigger = self
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin redis trigger");
redis_trigger.run(spin_trigger::cli::NoArgs)
}
SqsTrigger::TRIGGER_TYPE => {
let sqs_trigger: SqsTrigger = self
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
sqs_trigger.run(spin_trigger::cli::NoArgs)
}
CommandTrigger::TRIGGER_TYPE => {
let command_trigger: CommandTrigger = self
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
command_trigger.run(trigger_command::CliArgs {
guest_args: ctx.args().to_vec(),
})
}
_ => {
todo!("Only Http, Redis and SQS triggers are currently supported.")
}
};

futures_list.push(f)
}

info!(" >>> notifying main thread we are about to start");
f.await

// exit as soon as any of the trigger completes/exits
let (result, index, rest) = future::select_all(futures_list).await;
info!(
" >>> trigger type '{trigger_type}' exited",
trigger_type = trigger_types[index]
);

drop(rest);

result
}

async fn load_resolved_app_source(
Expand Down Expand Up @@ -435,7 +457,7 @@ pub enum ResolvedAppSource {
}

impl ResolvedAppSource {
pub fn trigger_type(&self) -> anyhow::Result<&str> {
pub fn trigger_types(&self) -> anyhow::Result<Vec<&str>> {
let types = match self {
ResolvedAppSource::File { manifest, .. } => {
manifest.triggers.keys().collect::<HashSet<_>>()
Expand All @@ -448,23 +470,26 @@ impl ResolvedAppSource {
};

ensure!(!types.is_empty(), "no triggers in app");
ensure!(types.len() == 1, "multiple trigger types not yet supported");
Ok(types.into_iter().next().unwrap())
Ok(types.into_iter().map(|t| t.as_str()).collect())
}
}

fn trigger_command_for_resolved_app_source(resolved: &ResolvedAppSource) -> Result<String> {
let trigger_type = resolved.trigger_type()?;

match trigger_type {
RedisTrigger::TRIGGER_TYPE
| HttpTrigger::TRIGGER_TYPE
| SqsTrigger::TRIGGER_TYPE
| CommandTrigger::TRIGGER_TYPE => Ok(trigger_type.to_owned()),
_ => {
todo!("Only Http, Redis, SQS, and command triggers are currently supported.")
fn trigger_command_for_resolved_app_source(resolved: &ResolvedAppSource) -> Result<Vec<String>> {
let trigger_types = resolved.trigger_types()?;
let mut types = Vec::with_capacity(trigger_types.len());
for trigger_type in trigger_types.iter() {
match trigger_type.to_owned() {
RedisTrigger::TRIGGER_TYPE
| HttpTrigger::TRIGGER_TYPE
| SqsTrigger::TRIGGER_TYPE
| CommandTrigger::TRIGGER_TYPE => types.push(trigger_type),
_ => {
todo!("Only Http, Redis and SQS triggers are currently supported.")
}
}
}

Ok(trigger_types.iter().map(|x| x.to_string()).collect())
}

#[cfg(test)]
Expand Down
15 changes: 15 additions & 0 deletions images/spin-multi-trigger-app/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM --platform=${BUILDPLATFORM} rust:1.73 AS build
WORKDIR /opt/build
COPY . .
RUN rustup target add wasm32-wasi

WORKDIR /opt/build/spin-http-trigger
RUN cargo build --target wasm32-wasi --release

WORKDIR /opt/build/spin-redis-trigger
RUN cargo build --target wasm32-wasi --release

FROM scratch
COPY --from=build /opt/build/spin-http-trigger/target/wasm32-wasi/release/spin_http_trigger.wasm .
COPY --from=build /opt/build/spin-redis-trigger/target/wasm32-wasi/release/spin_redis_trigger.wasm .
COPY --from=build /opt/build/spin.toml .
2 changes: 2 additions & 0 deletions images/spin-multi-trigger-app/spin-http-trigger/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
target/
.spin/
Loading

0 comments on commit 0892076

Please sign in to comment.