diff --git a/Cargo.lock b/Cargo.lock index 6f6ca6a..364d04d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2085,7 +2085,6 @@ dependencies = [ "env_logger", "http 0.0.0", "log", - "redis", "regex", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 70afca4..b3c84d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,12 +23,6 @@ syn = "2.0" quote = "1.0" proc-macro2 = "1.0" tokio = { version = "1.44.1", features = ["rt-multi-thread"] } -redis = { version = "0.30.0", features = [ - "aio", - "tokio-comp", - "async-std-comp", - "json", -] } uuid = { version = "1.16.0", features = ["v4"] } tonic = "0.13.0" diff --git a/adapter/rest/Cargo.toml b/adapter/rest/Cargo.toml index 93da227..eac8078 100644 --- a/adapter/rest/Cargo.toml +++ b/adapter/rest/Cargo.toml @@ -9,7 +9,6 @@ config = { workspace = true } validator = { workspace = true } code0-flow = { workspace = true } tokio = { workspace = true } -redis = { workspace = true } tucana = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } diff --git a/adapter/rest/src/.env b/adapter/rest/src/.env index 82ebd9a..38fe9d9 100644 --- a/adapter/rest/src/.env +++ b/adapter/rest/src/.env @@ -2,3 +2,4 @@ PORT=8081 REDIS_URL=redis://localhost:6379 RABBITMQ_URL=amqp://localhost:5672 AQUILA_URL=http://localhost:8080 +IS_STATIC=true diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 152b84a..3e5b9db 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -3,7 +3,11 @@ pub mod store; mod types; use code0_flow::{ - flow_queue::service::RabbitmqClient, flow_store::connection::create_flow_store_connection, + flow_queue::service::RabbitmqClient, + flow_store::{ + connection::create_flow_store_connection, + service::{FlowStoreService, FlowStoreServiceBase}, + }, }; use config::FromEnv; use http::{ @@ -13,19 +17,22 @@ use http::{ }; use queue::queue::handle_connection; use std::{future::Future, pin::Pin, sync::Arc}; +use tokio::sync::Mutex; use types::{get_data_types, get_flow_types}; pub struct FlowConnectionHandler { - flow_store: code0_flow::flow_store::connection::FlowStore, + flow_store: Arc>, rabbitmq_client: Arc, } impl FlowConnectionHandler { pub async fn new(config: &Config) -> Self { let flow_store = create_flow_store_connection(config.redis_url.clone()).await; + let flow_store_service = Arc::new(Mutex::new(FlowStoreServiceBase::new(flow_store).await)); + let rabbitmq_client = Arc::new(RabbitmqClient::new(config.rabbitmq_url.as_str()).await); FlowConnectionHandler { - flow_store, + flow_store: flow_store_service, rabbitmq_client, } } diff --git a/adapter/rest/src/queue/mod.rs b/adapter/rest/src/queue/mod.rs index 640841b..c20772a 100644 --- a/adapter/rest/src/queue/mod.rs +++ b/adapter/rest/src/queue/mod.rs @@ -2,10 +2,11 @@ pub mod queue { use code0_flow::{ flow_queue::service::{Message, RabbitmqClient}, - flow_store::connection::FlowStore, + flow_store::service::FlowStoreService, }; use http::{request::HttpRequest, response::HttpResponse}; use std::{collections::HashMap, sync::Arc, time::Duration}; + use tokio::sync::Mutex; use tucana::shared::{Struct, Value}; use validator::{resolver::flow_resolver::resolve_flow, verify_flow}; @@ -30,11 +31,11 @@ pub mod queue { pub async fn handle_connection( mut request: HttpRequest, - flow_store: FlowStore, + flow_store: Arc>, rabbitmq_client: Arc, ) -> Option { // Check if a flow exists for the given settings, return none if not exsist for http handler - let flow_exists = check_flow_exists(&flow_store, &request).await; + let flow_exists = check_flow_exists(flow_store, &request).await; let flow_result = match flow_exists { Some(flow) => flow, diff --git a/adapter/rest/src/store/mod.rs b/adapter/rest/src/store/mod.rs index 83cac55..51bd938 100644 --- a/adapter/rest/src/store/mod.rs +++ b/adapter/rest/src/store/mod.rs @@ -1,9 +1,11 @@ pub mod store { - use code0_flow::flow_store::connection::FlowStore; + use std::sync::Arc; + + use code0_flow::flow_store::service::{FlowStoreService, FlowStoreServiceBase}; use http::request::HttpRequest; - use redis::{AsyncCommands, JsonAsyncCommands}; use regex::Regex; - use tucana::shared::{value::Kind, Flow, Struct}; + use tokio::sync::Mutex; + use tucana::shared::{value::Kind, Flow, FlowSetting}; //The regex is required for later purposes --> resolve the parameter of the url pub struct FlowExistResult { @@ -11,100 +13,63 @@ pub mod store { pub regex_pattern: Regex, } - pub async fn check_flow_exists( - flow_store: &FlowStore, - request: &HttpRequest, - ) -> Option { - let mut store = flow_store.lock().await; - - // Get all keys from Redis - let keys: Vec = store.keys("*").await.unwrap_or_default(); - let mut result: Vec = Vec::new(); - - // Retrieve JSON values for each key - for key in keys { - if let Ok(json_value) = store.json_get::<&String, &str, String>(&key, "$").await { - let flow = match serde_json::from_str::>(json_value.as_str()) { - Ok(flow) => flow[0].clone(), - Err(_) => continue, - }; - - result.push(flow); + fn extract_field(settings: &[FlowSetting], def_key: &str, field_name: &str) -> Option { + settings.iter().find_map(|setting| { + let def = setting.definition.as_ref()?; + if def.key != def_key { + return None; } - } - - for flow in result { - let mut correct_url = false; - let mut correct_method = false; - let mut flow_regex: Option = None; - - for setting in flow.settings.clone() { - let definition = match setting.definition { - Some(definition) => definition, - None => continue, - }; - - if definition.key == "HTTP_METHOD" { - let object: Struct = match setting.object { - Some(object) => object, - None => continue, - }; - for field in object.fields { - if field.0 == "method" { - if let Some(Kind::StringValue(method)) = field.1.kind { - if method == request.method.to_string() { - correct_method = true; - } - } - } + let obj = setting.object.as_ref()?; + obj.fields.iter().find_map(|(k, v)| { + if k == field_name { + if let Some(Kind::StringValue(s)) = &v.kind { + return Some(s.clone()); } - - continue; } + None + }) + }) + } - if definition.key == "URL" { - let object: Struct = match setting.object { - Some(object) => object, - None => continue, - }; + pub async fn check_flow_exists( + flow_store: Arc>, + request: &HttpRequest, + ) -> Option { + let flows = { + let mut store = flow_store.lock().await; + let pattern = format!("*::*::{}::{}", request.host, request.method.to_string()); + let result = store.query_flows(pattern).await; + + match result { + Ok(flows) => flows.flows, + Err(_) => return None, + } + }; - for field in object.fields { - if field.0 == "url" { - if let Some(Kind::StringValue(regex_str)) = field.1.kind { - let regex = match regex::Regex::new(®ex_str) { - Ok(regex) => regex, - Err(err) => { - log::error!("Failed to compile regex: {}", err); - continue; - } - }; + for flow in flows { + let url = extract_field(&flow.settings, "HTTP_URL", "url"); - if regex.is_match(&request.path) { - correct_url = true; - flow_regex = Some(regex); - } - } - } - } + let regex_str = match url { + Some(string) => string, + None => continue, + }; + let regex = match regex::Regex::new(®ex_str) { + Ok(regex) => regex, + Err(err) => { + log::error!("Failed to compile regex: {}", err); continue; } - } - - if correct_method && correct_url { - let regex_pattern = match flow_regex { - Some(regex) => regex.clone(), - None => continue, - }; + }; + if regex.is_match(&request.path) { return Some(FlowExistResult { flow, - regex_pattern, + regex_pattern: regex, }); } } - None } } diff --git a/crates/http/src/request.rs b/crates/http/src/request.rs index dbd15f6..f20151e 100644 --- a/crates/http/src/request.rs +++ b/crates/http/src/request.rs @@ -106,6 +106,7 @@ pub struct HttpRequest { pub method: HttpOption, pub path: String, pub version: String, + pub host: String, pub headers: HeaderMap, /// The body of the request. @@ -278,10 +279,23 @@ fn parse_request( None }; + let host = { + match header_map.get("host") { + Some(host) => host.clone(), + None => { + return Err(HttpResponse::bad_request( + "Missing Host in Headers!".to_string(), + HashMap::new(), + )); + } + } + }; + Ok(HttpRequest { method, path: path.to_string(), version: version.to_string(), + host, headers: header_map, body, }) diff --git a/helper/flow/src/main.rs b/helper/flow/src/main.rs index 08c4aa1..5068632 100644 --- a/helper/flow/src/main.rs +++ b/helper/flow/src/main.rs @@ -6,7 +6,7 @@ use code0_flow::flow_store::{ }; use tokio::sync::Mutex; use tucana::shared::Flows; -use typed_flows::add_flow::get_add_rest_flow; +use typed_flows::{add_flow::get_add_rest_flow, mutiply_flow::get_multiply_rest_flow}; pub mod typed_data_types; pub mod typed_flows; @@ -24,7 +24,7 @@ async fn main() { let mut client = flow_store_client.lock().await; let _ = client .insert_flows(Flows { - flows: vec![get_add_rest_flow()], + flows: vec![get_add_rest_flow(), get_multiply_rest_flow()], }) .await; } diff --git a/helper/flow/src/typed_flows/add_flow.rs b/helper/flow/src/typed_flows/add_flow.rs index 2437469..db9d6b6 100644 --- a/helper/flow/src/typed_flows/add_flow.rs +++ b/helper/flow/src/typed_flows/add_flow.rs @@ -32,6 +32,19 @@ pub fn get_add_rest_flow() -> Flow { input_type_identifier: Some(String::from("HTTP_REQUEST")), return_type_identifier: Some(String::from("HTTP_RESPONSE")), settings: vec![ + FlowSetting { + definition: Some(FlowSettingDefinition { + id: String::from("14234234"), + key: String::from("HTTP_HOST"), + }), + object: Some(Struct { + fields: { + let mut map = HashMap::new(); + map.insert(String::from("host"), get_string_value("localhost:8081")); + map + }, + }), + }, FlowSetting { definition: Some(FlowSettingDefinition { id: String::from("1424525"), diff --git a/helper/flow/src/typed_flows/mod.rs b/helper/flow/src/typed_flows/mod.rs index ca52dcc..3f3cda7 100644 --- a/helper/flow/src/typed_flows/mod.rs +++ b/helper/flow/src/typed_flows/mod.rs @@ -1 +1,2 @@ pub mod add_flow; +pub mod mutiply_flow; diff --git a/helper/flow/src/typed_flows/mutiply_flow.rs b/helper/flow/src/typed_flows/mutiply_flow.rs new file mode 100644 index 0000000..5e0666e --- /dev/null +++ b/helper/flow/src/typed_flows/mutiply_flow.rs @@ -0,0 +1,99 @@ +use std::collections::HashMap; + +use tucana::shared::{ + node_parameter::Value, Flow, FlowSetting, FlowSettingDefinition, NodeFunction, + NodeFunctionDefinition, NodeParameter, NodeParameterDefinition, Struct, +}; + +use crate::typed_data_types::{ + get_http_method_data_type, get_http_request_data_type, get_http_response_data_type, + get_http_url_data_type, +}; + +pub fn get_multiply_rest_flow() -> Flow { + fn get_string_value(value: &str) -> tucana::shared::Value { + tucana::shared::Value { + kind: Some(tucana::shared::value::Kind::StringValue(String::from( + value, + ))), + } + } + + Flow { + flow_id: 2, + project_id: 1, + r#type: "REST".to_string(), + data_types: vec![ + get_http_url_data_type(), + get_http_method_data_type(), + get_http_request_data_type(), + get_http_response_data_type(), + ], + input_type_identifier: Some(String::from("HTTP_REQUEST")), + return_type_identifier: Some(String::from("HTTP_RESPONSE")), + settings: vec![ + FlowSetting { + definition: Some(FlowSettingDefinition { + id: String::from("14234234"), + key: String::from("HTTP_HOST"), + }), + object: Some(Struct { + fields: { + let mut map = HashMap::new(); + map.insert(String::from("host"), get_string_value("localhost:8081")); + map + }, + }), + }, + FlowSetting { + definition: Some(FlowSettingDefinition { + id: String::from("1424525"), + key: String::from("HTTP_URL"), + }), + object: Some(Struct { + fields: { + let mut map = HashMap::new(); + map.insert(String::from("url"), get_string_value("/multiply")); + map + }, + }), + }, + FlowSetting { + definition: Some(FlowSettingDefinition { + id: String::from("14245252352"), + key: String::from("HTTP_METHOD"), + }), + object: Some(Struct { + fields: { + let mut map = HashMap::new(); + map.insert(String::from("method"), get_string_value("GET")); + map + }, + }), + }, + ], + starting_node: Some(NodeFunction { + definition: Some(NodeFunctionDefinition { + function_id: String::from("234567890"), + runtime_function_id: String::from("std::math::mutiply"), + }), + parameters: vec![ + NodeParameter { + definition: Some(NodeParameterDefinition { + parameter_id: String::from("12345678"), + runtime_parameter_id: String::from("first"), + }), + value: Some(Value::LiteralValue(get_string_value("body.first"))), + }, + NodeParameter { + definition: Some(NodeParameterDefinition { + parameter_id: String::from("25346346"), + runtime_parameter_id: String::from("second"), + }), + value: Some(Value::LiteralValue(get_string_value("body.second"))), + }, + ], + next_node: None, + }), + } +}