Skip to content

Commit

Permalink
no working
Browse files Browse the repository at this point in the history
  • Loading branch information
jjnicola committed Jul 12, 2023
1 parent 1beffbc commit d8a74c8
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 32 deletions.
92 changes: 73 additions & 19 deletions rust/openvasd/src/controller/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,11 @@
//! All known paths must be handled in the entrypoint function.

use std::convert::Infallible;
use std::ffi::c_void;
use std::{fmt::Display, sync::Arc, pin::Pin};
use futures_util::stream::Iter;
use tokio::io::AsyncRead;
use tokio_util::codec::{BytesCodec, FramedRead};
use tokio_util::io::{ReaderStream};
use tokio_stream::StreamExt;

use std::marker::PhantomData;
use std::{fmt::Display, sync::Arc};
use futures::future::Map;
use futures::stream::futures_unordered::IntoIter;
use futures::stream::{self, StreamExt};
use super::{context::Context, quit_on_poison};
use futures_util::Stream;
use hyper::{Body, Method, Request, Response, body::Bytes};
Expand Down Expand Up @@ -113,11 +110,14 @@ where
///
/// First it will be checked if a known path is requested and if the method is supported.
/// Than corresponding functions will be called to handle the request.
pub async fn entrypoint<'a, S>(
pub async fn entrypoint<'a, S, J, O, E>(
req: Request<Body>,
ctx: Arc<Context<S>>,
) -> Result<Response<Body>, Error>
where
J: Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
S: ScanStarter
+ ScanStopper
+ ScanDeleter
Expand Down Expand Up @@ -302,26 +302,80 @@ where
}
(&Method::GET, Vts) => {
let (_, oids) = ctx.oids.read()?.clone();
let mut flag = false;
let stream = futures::stream::iter(
oids
.into_iter()
.map(|s: String| -> Result<String, Infallible>
{
Ok(format!("{},",serde_json::to_string(&s).unwrap()).to_string())
}
)
);

let mut v: Vec<Foo<J, O, E>> = Vec::new();
let j1 = Foo::new(vec!['['.to_string()], Box::new(|s: String| -> Result<String, Infallible>
{
Ok(s)
}));
v.push(j1);


let j2 = Foo::new(oids, Box::new(|s: String| -> Result<String, Infallible>
{
Ok(format!("{},",serde_json::to_string(&s).unwrap()).to_string())
}));
v.push(j2);


let j3 = Foo::new(vec![']'.to_string()], Box::new(|s: String| -> Result<String, Infallible>
{
Ok(s)
}));
v.push(j3);

let stream = futures::stream::iter(v.into_iter().map(
|v| v.exec()
));
Ok(ctx.response.ok_stream(stream).await)

}
_ => Ok(ctx.response.not_found("path", req.uri().path())),
}
}

struct Foo<J, O, E>
where
J: Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
{
pub v: Vec<String>,
pub c: Box<dyn Fn(String) -> Result<String, Infallible> + 'static>,
p: PhantomData<J>
}

impl<J, O, E> Foo<J, O, E>
where
J: Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
{
fn new(v: Vec<String>, c: impl Fn(String) -> Result<String, Infallible> + 'static) -> Self {
Self {
v,
c: Box::new(c),
p: PhantomData
}
}

pub fn exec(self) -> Result<String, Infallible> {
let e = self.v.iter().map(
|s| (self.c)(s.to_string())
).collect();
e
}
}

// let stream = futures::stream::iter(
// oids
// .into_iter()
// .map(|s: String| -> Result<String, Infallible>
// {
// Ok(format!("{},",serde_json::to_string(&s).unwrap()).to_string())
// }
// )
// );
//fn async_read() -> impl Stream<Item = Result<u8, std::io::Error>> {
// let f = File::open("/dev/random").expect("Could not open file");
// let reader = BufReader::new(f);
Expand Down
22 changes: 9 additions & 13 deletions rust/openvasd/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@
//
// SPDX-License-Identifier: GPL-2.0-or-later

use std::{error::Error, pin::Pin, convert::Infallible};
use std::{error::Error};

use futures::{stream, Stream};
use hyper::{body::Bytes, Body};
use futures::{Stream};
use hyper::{body::Bytes};
use serde::Serialize;
use futures_util::{StreamExt, Future};
use tokio::io::AsyncRead;
use tokio_util::codec::{BytesCodec, FramedRead};
use tokio_serde;
use tokio::io::{self, AsyncReadExt};

use tokio_stream;


type Result = hyper::Response<hyper::Body>;

#[derive(Debug, Default)]
Expand All @@ -23,9 +19,9 @@ pub struct Response {
}

impl Response {
async fn create_stream<S, O, E>(&self, code: hyper::StatusCode, value: S) -> Result
async fn create_stream<J, O, E>(&self, code: hyper::StatusCode, value: J) -> Result
where
S: Stream<Item = std::result::Result<O, E>> + Send + 'static,
J: Stream<Item = std::result::Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
{
Expand All @@ -47,9 +43,9 @@ impl Response {
}
}

pub async fn ok_stream<S, O, E>(&self, value: S) -> Result
pub async fn ok_stream<J, O, E>(&self, value: J) -> Result
where
S: Stream<Item = std::result::Result<O, E>> + Send + 'static,
J: Stream<Item = std::result::Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
{
Expand Down

0 comments on commit d8a74c8

Please sign in to comment.