Skip to content

Commit c568a29

Browse files
committed
feat: finished reimplementation of rest adapter
1 parent 72e6a36 commit c568a29

5 files changed

Lines changed: 301 additions & 78 deletions

File tree

adapter/rest/src/main.rs

Lines changed: 84 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
use std::collections::HashMap;
2-
use std::sync::Arc;
3-
use tokio::sync::Mutex;
4-
51
use base::{
6-
runner::ServerContext,
7-
runner::ServerRunner,
8-
traits::{LoadConfig, Server as ServerTrait},
2+
extract_flow_setting_field,
3+
runner::{ServerContext, ServerRunner},
4+
store::FlowIdenfiyResult,
5+
traits::{IdentifiableFlow, LoadConfig, Server as ServerTrait},
96
};
107
use http::{request::HttpRequest, response::HttpResponse, server::Server};
8+
use std::collections::HashMap;
9+
use std::sync::Arc;
1110
use tonic::async_trait;
11+
use tucana::shared::ValidationFlow;
1212

1313
#[tokio::main]
1414
async fn main() {
@@ -27,6 +27,31 @@ struct HttpServer {
2727
http_server: Option<Server>,
2828
}
2929

30+
struct RequestRoute {
31+
url: String,
32+
}
33+
34+
impl IdentifiableFlow for RequestRoute {
35+
fn identify(&self, flow: &ValidationFlow) -> bool {
36+
let url = extract_flow_setting_field(&flow.settings, "HTTP_URL", "url");
37+
38+
let regex_str = match url.as_deref() {
39+
Some(s) => s,
40+
None => return false,
41+
};
42+
43+
let regex = match regex::Regex::new(regex_str) {
44+
Ok(regex) => regex,
45+
Err(err) => {
46+
log::error!("Failed to compile regex: {}", err);
47+
return false;
48+
}
49+
};
50+
51+
regex.is_match(&self.url)
52+
}
53+
}
54+
3055
#[async_trait]
3156
impl ServerTrait<HttpServerConfig> for HttpServer {
3257
async fn init(&mut self, ctx: &ServerContext<HttpServerConfig>) -> anyhow::Result<()> {
@@ -35,28 +60,63 @@ impl ServerTrait<HttpServerConfig> for HttpServer {
3560
}
3661

3762
/// The "serve forever" loop.
38-
async fn run(&mut self, _ctx: &ServerContext<HttpServerConfig>) -> anyhow::Result<()> {
63+
async fn run(&mut self, ctx: &ServerContext<HttpServerConfig>) -> anyhow::Result<()> {
3964
if let Some(server) = &mut self.http_server {
40-
let counter = Arc::new(Mutex::new(0));
65+
println!("Registering async closure handler...");
4166

4267
server.register_async_closure({
43-
let counter = Arc::clone(&counter);
68+
let store = Arc::clone(&ctx.adapter_store);
4469
move |request: HttpRequest| {
45-
let counter = Arc::clone(&counter);
70+
let store = Arc::clone(&store);
4671
async move {
47-
let mut number = counter.lock().await;
48-
*number += 1;
72+
println!("Handler called with request: {:?}", &request);
4973

50-
println!("Received request: {:?}", request);
74+
let pattern =
75+
format!("*::*::{}::{}", request.host, request.method.to_string());
76+
println!("Pattern created: {}", pattern);
5177

52-
let headers = HashMap::new();
53-
Some(HttpResponse::ok(
54-
format!("Hello from REST server! {}", number).into_bytes(),
55-
headers,
56-
))
78+
let route = RequestRoute {
79+
url: request.path.clone(),
80+
};
81+
82+
println!("About to call get_possible_flow_match...");
83+
let identification_result =
84+
store.get_possible_flow_match(pattern, route).await;
85+
println!("Flow identification completed");
86+
87+
match identification_result {
88+
FlowIdenfiyResult::Single(_flow) => {
89+
println!("Single flow found, returning success response");
90+
//TODO: Implement flow execution logic
91+
//let execution_result = ctx
92+
// .adapter_store
93+
// .validate_and_execute_flow(flow, None)
94+
// .await;
95+
96+
let headers = HashMap::new();
97+
let response = Some(HttpResponse::ok(
98+
String::from("Flow executed successfully!").into_bytes(),
99+
headers,
100+
));
101+
println!("Returning response: {:?}", response.is_some());
102+
return response;
103+
}
104+
_ => {
105+
println!("No single flow found, returning default response");
106+
let headers = HashMap::new();
107+
let response = Some(HttpResponse::internal_server_error(
108+
format!("No Flow found for path: {}", request.path),
109+
headers,
110+
));
111+
println!("Returning response: {:?}", response.is_some());
112+
return response;
113+
}
114+
}
57115
}
58116
}
59117
});
118+
119+
println!("Starting HTTP server...");
60120
server.start().await;
61121
};
62122

@@ -65,7 +125,10 @@ impl ServerTrait<HttpServerConfig> for HttpServer {
65125

66126
/// Called on shutdown signal.
67127
async fn shutdown(&mut self, _ctx: &ServerContext<HttpServerConfig>) -> anyhow::Result<()> {
68-
todo!("shutdown http server");
128+
if let Some(server) = &self.http_server {
129+
server.shutdown();
130+
}
131+
Ok(())
69132
}
70133
}
71134

@@ -76,6 +139,6 @@ struct HttpServerConfig {
76139

77140
impl LoadConfig for HttpServerConfig {
78141
fn load() -> Self {
79-
HttpServerConfig { port: 8080 }
142+
HttpServerConfig { port: 8081 }
80143
}
81144
}

crates/base/src/lib.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,28 @@
1+
use tucana::shared::{FlowSetting, value::Kind};
2+
13
pub mod config;
24
pub mod runner;
35
pub mod store;
46
pub mod traits;
7+
8+
pub fn extract_flow_setting_field(
9+
settings: &Vec<FlowSetting>,
10+
def_key: &str,
11+
field_name: &str,
12+
) -> Option<String> {
13+
settings.iter().find_map(|setting| {
14+
if setting.flow_setting_id != def_key {
15+
return None;
16+
}
17+
18+
let obj = setting.object.as_ref()?;
19+
obj.fields.iter().find_map(|(k, v)| {
20+
if k == field_name {
21+
if let Some(Kind::StringValue(s)) = &v.kind {
22+
return Some(s.clone());
23+
}
24+
}
25+
None
26+
})
27+
})
28+
}

crates/base/src/store.rs

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,28 +68,25 @@ impl AdapterStore {
6868
key: String,
6969
id: I,
7070
) -> FlowIdenfiyResult {
71-
let mut stream = match self.kv.watch(key).await {
72-
Ok(stream) => stream,
71+
let mut collector = Vec::new();
72+
let mut keys = match self.kv.keys().await {
73+
Ok(keys) => keys.boxed(),
7374
Err(err) => {
74-
eprintln!("Failed to watch key: {}", err);
75+
eprintln!("Failed to get keys: {}", err);
7576
return FlowIdenfiyResult::None;
7677
}
7778
};
7879

79-
let mut collector = Vec::new();
80-
while let Some(entry) = stream.next().await {
81-
match entry {
82-
Ok(entry) => {
83-
let decoded_flow = ValidationFlow::decode(entry.value);
84-
if let Ok(flow) = decoded_flow {
85-
if id.identify(&flow) {
86-
collector.push(flow);
87-
}
88-
};
89-
}
90-
Err(err) => {
91-
eprintln!("Failed to get flow: {}", err);
92-
}
80+
while let Ok(Some(key)) = keys.try_next().await {
81+
println!("key: {:?}", key);
82+
83+
if let Ok(Some(bytes)) = self.kv.get(key).await {
84+
let decoded_flow = ValidationFlow::decode(bytes);
85+
if let Ok(flow) = decoded_flow {
86+
if id.identify(&flow) {
87+
collector.push(flow);
88+
}
89+
};
9390
}
9491
}
9592

crates/http/src/request.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ pub fn convert_to_http_request(stream: &TcpStream) -> Result<HttpRequest, HttpRe
161161
line.clear();
162162
}
163163

164+
println!("{:?}", &raw_http_request);
164165
// Parse headers
165166
let http_request = parse_request(raw_http_request, buf_reader)?;
166167

0 commit comments

Comments
 (0)