Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ code0-flow = { version = "0.0.14" }
serde_json = "1.0.140"
async-nats = "0.42.0"
tonic-health = "0.14.1"
tokio-stream = "0.1.17"
uuid = { version = "1.18.0", features = ["v4"] }
6 changes: 6 additions & 0 deletions src/configuration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ pub struct Config {
///
/// Behavior:
/// Searches for the env. file at root level. Filename: `.env`
impl Default for Config {
fn default() -> Self {
Self::new()
}
}

impl Config {
pub fn new() -> Self {
Config {
Expand Down
8 changes: 4 additions & 4 deletions src/flow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ fn extract_field(settings: &[FlowSetting], def_key: &str, field_name: &str) -> O

let obj = setting.object.as_ref()?;
obj.fields.iter().find_map(|(k, v)| {
if k == field_name {
if let Some(Kind::StringValue(s)) = &v.kind {
return Some(s.clone());
}
if k == field_name
&& let Some(Kind::StringValue(s)) = &v.kind
{
return Some(s.clone());
}
None
})
Expand Down
107 changes: 25 additions & 82 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use crate::sagittarius::test_execution_client_impl::SagittariusTestExecutionServiceClient;
use crate::{configuration::Config as AquilaConfig, flow::get_flow_identifier};
use async_nats::jetstream::kv::Config;
use code0_flow::flow_config::load_env_file;
use prost::Message;
use sagittarius::flow_service_client_impl::SagittariusFlowClient;
use serde_json::from_str;
use server::AquilaGRPCServer;
use std::{collections::HashMap, fs::File, io::Read, sync::Arc};
use tucana::shared::{
FlowSetting, Flows, NodeFunction, NodeParameter, NodeValue, Struct, ValidationFlow, Value,
node_value, value::Kind,
};
use std::{fs::File, io::Read, sync::Arc};
use tucana::shared::Flows;

pub mod authorization;
pub mod configuration;
Expand All @@ -20,77 +18,6 @@ pub mod stream;

#[tokio::main]
async fn main() {
let flow = ValidationFlow {
flow_id: 1,
project_id: 1,
r#type: String::from("REST"),
data_types: vec![],
input_type_identifier: Some(String::from("HTTP_REQUEST")),
return_type_identifier: Some(String::from("HTTP_RESPONSE")),
settings: vec![
FlowSetting {
database_id: 1,
flow_setting_id: String::from("HTTP_METHOD"),
object: Some(Struct {
fields: HashMap::from([(
String::from("method"),
Value {
kind: Some(Kind::StringValue(String::from("GET"))),
},
)]),
}),
},
FlowSetting {
database_id: 1,
flow_setting_id: String::from("HTTP_URL"),
object: Some(Struct {
fields: HashMap::from([(
String::from("url"),
Value {
kind: Some(Kind::StringValue(String::from("/hello-world"))),
},
)]),
}),
},
FlowSetting {
database_id: 1,
flow_setting_id: String::from("HTTP_HOST"),
object: Some(Struct {
fields: HashMap::from([(
String::from("host"),
Value {
kind: Some(Kind::StringValue(String::from("localhost"))),
},
)]),
}),
},
],
starting_node: Some(NodeFunction {
database_id: 1,
runtime_function_id: String::from("std::control::break"),
next_node: None,
parameters: vec![NodeParameter {
database_id: 1,
runtime_parameter_id: String::from("value"),
value: Some(NodeValue {
value: Some(node_value::Value::LiteralValue(Value {
kind: Some(Kind::StructValue(Struct {
fields: HashMap::from([(
String::from("hallo"),
Value {
kind: Some(Kind::StringValue(String::from("welt"))),
},
)]),
})),
})),
}),
}],
}),
};

let s = serde_json::to_string_pretty(&flow).unwrap();
println!("{}", s);

log::info!("Starting Aquila...");

// Configure Logging
Expand All @@ -108,16 +35,16 @@ async fn main() {
Err(err) => panic!("Failed to connect to NATS server: {}", err),
};

let jetstream = async_nats::jetstream::new(client.clone());
let jet_stream = async_nats::jetstream::new(client.clone());

let _ = jetstream
let _ = jet_stream
.create_key_value(Config {
bucket: config.nats_bucket.clone(),
..Default::default()
})
.await;

let kv_store = match jetstream.get_key_value(config.nats_bucket.clone()).await {
let kv_store = match jet_stream.get_key_value(config.nats_bucket.clone()).await {
Ok(kv) => Arc::new(kv),
Err(err) => panic!("Failed to get key-value store: {}", err),
};
Expand All @@ -136,10 +63,26 @@ async fn main() {
}
};

let mut sagittarius_client =
SagittariusFlowClient::new(config.backend_url, kv_store, config.runtime_token).await;
// Connect to Sagittarius Flow Endpoint
SagittariusFlowClient::new(
config.backend_url.clone(),
kv_store.clone(),
config.runtime_token.clone(),
)
.await
.init_flow_stream()
.await;

sagittarius_client.init_flow_stream().await;
// Connect to Sagittarius Execution Endpoint
SagittariusTestExecutionServiceClient::new(
client,
kv_store,
config.backend_url,
config.runtime_token,
)
.await
.logon()
.await;
} else {
init_flows_from_json(config.flow_fallback_path, kv_store).await
}
Expand Down
6 changes: 3 additions & 3 deletions src/sagittarius/action_service_client_impl.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use tonic::{transport::Channel, Extensions, Request, Response};
use tonic::{Extensions, Request, Response, transport::Channel};
use tucana::{
sagittarius::{
action_service_client::ActionServiceClient, ActionLogoffRequest, ActionLogoffResponse,
ActionLogonRequest, ActionLogonResponse,
ActionLogoffRequest, ActionLogoffResponse, ActionLogonRequest, ActionLogonResponse,
action_service_client::ActionServiceClient,
},
shared::RuntimeFunctionDefinition,
};
Expand Down
4 changes: 2 additions & 2 deletions src/sagittarius/data_type_service_client_impl.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::authorization::authorization::get_authorization_metadata;
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::{transport::Channel, Extensions, Request};
use tonic::{Extensions, Request, transport::Channel};
use tucana::sagittarius::{
data_type_service_client::DataTypeServiceClient,
DataTypeUpdateRequest as SagittariusDataTypeUpdateRequest,
data_type_service_client::DataTypeServiceClient,
};
use tucana::{
aquila::DataTypeUpdateRequest as AquilaDataTypeUpdateRequest,
Expand Down
4 changes: 2 additions & 2 deletions src/sagittarius/flow_type_service_client_impl.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::authorization::authorization::get_authorization_metadata;
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use tonic::Extensions;
use tonic::Request;
use tonic::transport::Channel;
use tucana::aquila::FlowTypeUpdateRequest as AquilaFlowTypeUpdateRequest;
use tucana::aquila::FlowTypeUpdateResponse as AquilaFlowTypeUpdateResponse;
use tucana::sagittarius::flow_type_service_client::FlowTypeServiceClient;
use tucana::sagittarius::FlowTypeUpdateRequest as SagittariusFlowTypeUpdateRequest;
use tucana::sagittarius::flow_type_service_client::FlowTypeServiceClient;

pub struct SagittariusFlowTypeServiceClient {
client: FlowTypeServiceClient<Channel>,
Expand Down
1 change: 1 addition & 0 deletions src/sagittarius/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub mod data_type_service_client_impl;
pub mod flow_service_client_impl;
pub mod flow_type_service_client_impl;
pub mod runtime_function_service_client_impl;
pub mod test_execution_client_impl;
4 changes: 2 additions & 2 deletions src/sagittarius/runtime_function_service_client_impl.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::authorization::authorization::get_authorization_metadata;
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use tonic::Extensions;
use tonic::Request;
use tonic::transport::Channel;
use tucana::aquila::RuntimeFunctionDefinitionUpdateRequest as AquilaRuntimeFunctionUpdateRequest;
use tucana::aquila::RuntimeFunctionDefinitionUpdateResponse as AquilaRuntimeFunctionUpdateResponse;
use tucana::sagittarius::runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient;
use tucana::sagittarius::RuntimeFunctionDefinitionUpdateRequest as SagittariusRuntimeFunctionUpdateRequest;
use tucana::sagittarius::runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient;

pub struct SagittariusRuntimeFunctionServiceClient {
client: RuntimeFunctionDefinitionServiceClient<Channel>,
Expand Down
Loading