Skip to content
This repository has been archived by the owner on Mar 24, 2023. It is now read-only.

Commit

Permalink
Call workflow for each event (#9)
Browse files Browse the repository at this point in the history
octox now accepts a function that is called for every event that the
server receives. The function takes the event, and returns a result. If
the workflow succeeded, the result contains a serialize value that is
used as the body of the response. If the workflow failed, an error is
returned to the caller.
  • Loading branch information
jdno authored May 26, 2022
1 parent 3e52290 commit c398aa5
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 43 deletions.
22 changes: 20 additions & 2 deletions examples/hello-world.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
use octox::{Error, Octox};
use std::sync::Arc;

use github_parts::event::Event;
use serde_json::Value;

use octox::{Error, Octox, Workflow, WorkflowError};

#[derive(Debug)]
struct HelloWorld;

impl Workflow for HelloWorld {
fn process(&self, event: Event) -> Result<Value, WorkflowError> {
let body = format!("received {}", event).into();

println!("{}", &body);

Ok(body)
}
}

#[tokio::main]
async fn main() -> Result<(), Error> {
dotenv::dotenv().ok();

let octox = Octox::new();
let octox = Octox::new().workflow(Arc::new(HelloWorld))?;
octox.serve().await
}
44 changes: 18 additions & 26 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,43 @@ use axum::Json;
use hmac::{Hmac, Mac};
use serde_json::json;
use sha2::Sha256;
use thiserror::Error;

use crate::WebhookSecret;

type HmacSha256 = Hmac<Sha256>;

#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Error)]
pub enum AuthError {
#[error("missing {0} header")]
MissingHeader(String),
#[error("failed to initialize cryptographic key")]
FailedHmacInitialization,
#[error("X-Hub-Signature-256 header has the wrong format")]
WrongSignatureFormat,
#[error("failed to decode the X-Hub-Signature-256 header")]
FailedDecodingSignature,
#[error("X-Hub-Signature-256 header is invalid")]
InvalidSignature,
#[error("failed to deserialize the body based on the X-GitHub-Event header")]
UnexpectedPayload,
}

impl IntoResponse for AuthError {
fn into_response(self) -> Response {
let (status, error_message) = match self {
AuthError::MissingHeader(header) => (
StatusCode::BAD_REQUEST,
format!("missing {} header", header),
),
AuthError::FailedHmacInitialization => (
StatusCode::INTERNAL_SERVER_ERROR,
"failed to initialize cryptographic key".into(),
),
AuthError::WrongSignatureFormat => (
StatusCode::BAD_REQUEST,
"X-Hub-Signature-256 header must start with sha256=".into(),
),
AuthError::FailedDecodingSignature => (
StatusCode::INTERNAL_SERVER_ERROR,
"failed to decode the X-Hub-Signature-256 header".into(),
),
AuthError::InvalidSignature => (
StatusCode::UNAUTHORIZED,
"X-Hub-Signature-256 header is invalid".into(),
),
AuthError::UnexpectedPayload => (
StatusCode::BAD_REQUEST,
"failed to deserialize the body based of the X-GitHub-Event header".into(),
),
let message = self.to_string();

let status = match self {
AuthError::MissingHeader(_) => StatusCode::BAD_REQUEST,
AuthError::FailedHmacInitialization => StatusCode::INTERNAL_SERVER_ERROR,
AuthError::WrongSignatureFormat => StatusCode::BAD_REQUEST,
AuthError::FailedDecodingSignature => StatusCode::INTERNAL_SERVER_ERROR,
AuthError::InvalidSignature => StatusCode::UNAUTHORIZED,
AuthError::UnexpectedPayload => StatusCode::BAD_REQUEST,
};

let body = Json(json!({
"error": error_message,
"error": message,
}));

(status, body).into_response()
Expand Down
7 changes: 7 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use thiserror::Error;

use crate::auth::AuthError;
use crate::workflow::WorkflowError;

#[derive(Debug, Error)]
pub enum Error {
#[error("failed to call external API")]
Api(#[from] reqwest::Error),
#[error(transparent)]
Auth(#[from] AuthError),
#[error("failed to initialize the web framework")]
Axum(#[from] hyper::Error),
#[error("{0}")]
Expand All @@ -14,6 +19,8 @@ pub enum Error {
Io(#[from] std::io::Error),
#[error("failed to create JWT")]
Jwt(#[from] jsonwebtoken::errors::Error),
#[error(transparent)]
Workflow(#[from] WorkflowError),
#[error("{0}")]
Unknown(String),
}
Expand Down
19 changes: 19 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fmt::{Display, Formatter};
use std::fs::File;
use std::io::Read;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener};
use std::sync::Arc;

use axum::routing::{get, post};
use axum::{Extension, Router, Server};
Expand All @@ -10,11 +11,13 @@ use crate::routes::{health, webhook};

pub use self::error::Error;
pub use self::github::{AppId, GitHubHost, PrivateKey, WebhookSecret};
pub use self::workflow::{Workflow, WorkflowError};

mod auth;
mod error;
mod github;
mod routes;
mod workflow;

#[derive(Debug)]
pub struct Octox {
Expand All @@ -24,13 +27,19 @@ pub struct Octox {
webhook_secret: Option<WebhookSecret>,
socket_address: SocketAddr,
tcp_listener: Option<TcpListener>,
workflow: Option<Arc<dyn Workflow>>,
}

impl Octox {
pub fn new() -> Self {
Self::default()
}

pub fn workflow(mut self, workflow: Arc<dyn Workflow>) -> Result<Self, Error> {
self.workflow = Some(workflow);
Ok(self)
}

pub fn github_host(mut self, github_host: String) -> Result<Self, Error> {
self.github_host = GitHubHost::new(github_host);
Ok(self)
Expand Down Expand Up @@ -67,6 +76,7 @@ impl Octox {
let app = Router::new()
.route("/", post(webhook))
.route("/health", get(health))
.layer(self.workflow_extension()?)
.layer(self.github_host_extension()?)
.layer(self.app_id_extension()?)
.layer(self.private_key_extension()?)
Expand All @@ -84,6 +94,14 @@ impl Octox {
Ok(())
}

fn workflow_extension(&self) -> Result<Extension<Arc<dyn Workflow>>, Error> {
if let Some(workflow) = &self.workflow {
return Ok(Extension(workflow.clone()));
}

Err(Error::Configuration("workflow must be set".into()))
}

fn github_host_extension(&self) -> Result<Extension<GitHubHost>, Error> {
Ok(Extension(self.github_host.clone()))
}
Expand Down Expand Up @@ -166,6 +184,7 @@ impl Default for Octox {
webhook_secret: None,
socket_address,
tcp_listener: None,
workflow: None,
}
}
}
Expand Down
20 changes: 14 additions & 6 deletions src/routes/webhook.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
use std::sync::Arc;

use axum::body::Bytes;
use axum::http::{HeaderMap, StatusCode};
use axum::Extension;
use axum::http::HeaderMap;
use axum::{Extension, Json};
use github_parts::event::Event;
use serde_json::Value;

use crate::auth::{verify_signature, AuthError};
use crate::WebhookSecret;
use crate::error::Error;
use crate::github::WebhookSecret;
use crate::workflow::Workflow;

pub async fn webhook(
headers: HeaderMap,
body: Bytes,
Extension(webhook_secret): Extension<WebhookSecret>,
) -> Result<StatusCode, AuthError> {
Extension(workflow): Extension<Arc<dyn Workflow>>,
) -> Result<Json<Value>, Error> {
let signature = get_signature(&headers)?;
verify_signature(&body, &signature, &webhook_secret)?;

let event_type = get_event(&headers)?;
let _event = deserialize_event(&event_type, &body)?;
let event = deserialize_event(&event_type, &body)?;

let body = workflow.process(event)?;

Ok(StatusCode::OK)
Ok(Json(body))
}

fn get_signature(headers: &HeaderMap) -> Result<String, AuthError> {
Expand Down
26 changes: 26 additions & 0 deletions src/workflow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use std::fmt::Debug;

use github_parts::event::Event;
use thiserror::Error;

pub trait Workflow: Debug + Sync + Send {
fn process(&self, event: Event) -> Result<serde_json::Value, WorkflowError>;
}

#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Error)]
pub enum WorkflowError {
#[error("{0}")]
Unknown(String),
}

impl IntoResponse for WorkflowError {
fn into_response(self) -> Response {
match self {
WorkflowError::Unknown(error) => {
(StatusCode::INTERNAL_SERVER_ERROR, error).into_response()
}
}
}
}
15 changes: 11 additions & 4 deletions tests/health.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use mockito::mock;
use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;

use mockito::mock;
use reqwest::Client;

use octox::{Error, Octox};

use self::workflow::HelloWorld;

mod workflow;

#[tokio::test]
async fn health_returns_ok() -> Result<(), Error> {
dotenv::dotenv().ok();
Expand All @@ -16,9 +21,10 @@ async fn health_returns_ok() -> Result<(), Error> {

let octox = Octox::new()
.tcp_listener(listener)?
.github_host(mockito::server_url())?
.app_id(1)?
.private_key(include_str!("fixtures/private-key.pem"))?;
.github_host(mockito::server_url())?
.private_key(include_str!("fixtures/private-key.pem"))?
.workflow(Arc::new(HelloWorld))?;

tokio::spawn(async move {
octox.serve().await.unwrap();
Expand Down Expand Up @@ -46,7 +52,8 @@ async fn health_returns_error() -> Result<(), Error> {

let octox = Octox::new()
.tcp_listener(listener)?
.github_host(mockito::server_url())?;
.github_host(mockito::server_url())?
.workflow(Arc::new(HelloWorld))?;

tokio::spawn(async move {
octox.serve().await.unwrap();
Expand Down
22 changes: 17 additions & 5 deletions tests/webhook.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use reqwest::Client;
use std::fs::read;
use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;

use reqwest::Client;

use octox::{Error, Octox};

use self::workflow::HelloWorld;

mod workflow;

#[tokio::test]
async fn webhook_accepts_valid_signature() -> Result<(), Error> {
dotenv::dotenv().ok();
Expand All @@ -14,7 +20,8 @@ async fn webhook_accepts_valid_signature() -> Result<(), Error> {
let octox = Octox::new()
.tcp_listener(listener)?
.github_host(mockito::server_url())?
.webhook_secret("secret")?;
.webhook_secret("secret")?
.workflow(Arc::new(HelloWorld))?;

tokio::spawn(async move {
octox.serve().await.unwrap();
Expand All @@ -37,7 +44,10 @@ async fn webhook_accepts_valid_signature() -> Result<(), Error> {
.send()
.await?;

assert_eq!("", response.text().await.unwrap());
assert_eq!(
"\"received unsupported event\"",
response.text().await.unwrap()
);
Ok(())
}

Expand All @@ -51,7 +61,8 @@ async fn webhook_requires_signature() -> Result<(), Error> {
let octox = Octox::new()
.tcp_listener(listener)?
.github_host(mockito::server_url())?
.webhook_secret("secret")?;
.webhook_secret("secret")?
.workflow(Arc::new(HelloWorld))?;

tokio::spawn(async move {
octox.serve().await.unwrap();
Expand Down Expand Up @@ -87,7 +98,8 @@ async fn webhook_rejects_invalid_signature() -> Result<(), Error> {
let octox = Octox::new()
.tcp_listener(listener)?
.github_host(mockito::server_url())?
.webhook_secret("secret")?;
.webhook_secret("secret")?
.workflow(Arc::new(HelloWorld))?;

tokio::spawn(async move {
octox.serve().await.unwrap();
Expand Down
13 changes: 13 additions & 0 deletions tests/workflow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use github_parts::event::Event;
use serde_json::Value;

use octox::{Workflow, WorkflowError};

#[derive(Debug)]
pub struct HelloWorld;

impl Workflow for HelloWorld {
fn process(&self, event: Event) -> Result<Value, WorkflowError> {
Ok(format!("received {}", event).into())
}
}

0 comments on commit c398aa5

Please sign in to comment.