Skip to content

Commit

Permalink
Test stream response
Browse files Browse the repository at this point in the history
  • Loading branch information
jjnicola committed Jul 12, 2023
1 parent 843e333 commit 1beffbc
Show file tree
Hide file tree
Showing 8 changed files with 643 additions and 13 deletions.
357 changes: 350 additions & 7 deletions rust/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ members = [
"models",
"osp",
"openvasd",
"streamer",
]
45 changes: 45 additions & 0 deletions rust/examples/simple_scan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"scan_id": "6c591f83-8f7b-452a-8c78-ba35779e682f",
"target": {
"hosts": [
"127.0.0.1",
"::1",
"10.0.0.1"
],
"ports": [
{
"protocol": "udp",
"range": [
{
"start": 1,
"end": 3000
}
]
},
{
"protocol": "tcp",
"range": [
{
"start": 1,
"end": 3000
}
]
}
],
"credentials": [
{
"service": "ssh",
"port": 22,
"up": {
"username": "user",
"password": "pw"
}
}
]
},
"vts": [
{
"oid": "1.3.6.1.4.1.25623.1.0.50282"
}
]
}
5 changes: 5 additions & 0 deletions rust/openvasd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ rustls-pemfile = "1.0.2"
async-trait = "0.1.68"
clap = { version = "4.3.0", features = ["derive", "env"] }
toml = "0.7.4"
tokio-util = { version = "0.7.8", features = ["full"] }
tokio-serde = "0.8.0"
tokio-stream = "0.1.14"
file-format = "0.17.3"
futures = "0.3.28"

[dev-dependencies]
101 changes: 98 additions & 3 deletions rust/openvasd/src/controller/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@
//!
//! All known paths must be handled in the entrypoint function.

use std::{fmt::Display, sync::Arc};
use std::convert::Infallible;
use std::ffi::c_void;
use std::{fmt::Display, sync::Arc, pin::Pin};
use futures_util::stream::Iter;
use tokio::io::AsyncRead;
use tokio_util::codec::{BytesCodec, FramedRead};
use tokio_util::io::{ReaderStream};
use tokio_stream::StreamExt;

use super::{context::Context, quit_on_poison};
use hyper::{Body, Method, Request, Response};
use futures_util::Stream;
use hyper::{Body, Method, Request, Response, body::Bytes};

use crate::scan::{Error, ScanDeleter, ScanStarter, ScanStopper};
/// The supported paths of scannerd
Expand Down Expand Up @@ -294,8 +302,95 @@ where
}
(&Method::GET, Vts) => {
let (_, oids) = ctx.oids.read()?.clone();
Ok(ctx.response.ok(&oids))
let mut flag = false;
let stream = futures::stream::iter(
oids
.into_iter()
.map(|s: String| -> Result<String, Infallible>
{
Ok(format!("{},",serde_json::to_string(&s).unwrap()).to_string())
}
)
);

Ok(ctx.response.ok_stream(stream).await)
}
_ => Ok(ctx.response.not_found("path", req.uri().path())),
}
}




//fn async_read() -> impl Stream<Item = Result<u8, std::io::Error>> {
// let f = File::open("/dev/random").expect("Could not open file");
// let reader = BufReader::new(f);
// stream::iter(reader.bytes())
//}
//
//async fn async_main(oids: Vec<String>) {
// while let Some(b) = async_read().next().await {
// println!("{:?}", b);
// }
//}



//use futures::stream::futures_unordered::FuturesUnordered;
//async fn read_oids(oids: Vec<String>) -> Vec<String> {
// oids.iter()
// .map(|oid| oid)
// .collect::<FuturesUnordered<_>>()
// .collect::<Vec<_>>()
// .await
//}
//
//async fn read_oid(oid: &String) -> String {
// let oidstr = serde_json::to_string(&oid).unwrap();
// println!("{:?}", oid);
// oidstr
//}

//struct Oids(Vec<String>);
//
//impl Oids {
// fn new(oids: Vec<String>) -> Self {
// Self(oids)
// }
//}
//
//impl AsyncRead for Oids {
// fn poll_read(
// mut self: Pin<&mut Self>,
// _cx: &mut Context<'_>,
// buf: &mut ReadBuf<'_>,
// ) -> Poll<io::Result<()>> {
// let amt = std::cmp::min(self.len(), buf.remaining());
// let (a, b) = self.split_at(amt);
// buf.put_slice(a);
// *self = b;
// Poll::Ready(Ok(()))
// }
//}

//use futures_util::stream::iter;
//async fn vec_to_stream<S, O, E, I>(oids: Vec<String>) -> Iter<std::vec::IntoIter<std::string::String>>
//where
// S: Stream<Item = Result<O, E>> + Send + 'static,
// O: Into<Bytes> + 'static,
// E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
// I: IntoIterator,
//{
// //let v: Vec<Result<_, std::io::Error>> = oids.into_iter().map(|x| Ok(x)).collect();
// //let v: Vec<Result<_, std::io::Error>> = oids.into_iter().map(|x| Ok(x)).collect();
//
//
// futures_util::stream::iter(oids)
//
//
// //hyper::body::HttpBody::Data(stream)
//}

//Iter<std::vec::IntoIter<std::result::Result<std::string::String, std::io::Error>>>


47 changes: 44 additions & 3 deletions rust/openvasd/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@
//
// SPDX-License-Identifier: GPL-2.0-or-later

use std::error::Error;
use std::{error::Error, pin::Pin, convert::Infallible};

use futures::{stream, Stream};
use hyper::{body::Bytes, Body};
use serde::Serialize;
use futures_util::{StreamExt, Future};
use tokio::io::AsyncRead;
use tokio_util::codec::{BytesCodec, FramedRead};
use tokio_serde;
use tokio::io::{self, AsyncReadExt};

use tokio_stream;
type Result = hyper::Response<hyper::Body>;

#[derive(Debug, Default)]
Expand All @@ -15,7 +23,40 @@ pub struct Response {
}

impl Response {
#[tracing::instrument]
async fn create_stream<S, O, E>(&self, code: hyper::StatusCode, value: S) -> Result
where
S: Stream<Item = std::result::Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
{
match hyper::Response::builder()
.status(code)
.header("Content-Type", "application/json")
.header("authentication", &self.authentication)
.header("version", &self.version)
.body(hyper::Body::wrap_stream(value))
{
Ok(resp) => resp,
Err(e) => {
tracing::error!("Error creating response: {}", e);
hyper::Response::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.body(hyper::Body::empty())
.unwrap()
}
}
}

pub async fn ok_stream<S, O, E>(&self, value: S) -> Result
where
S: Stream<Item = std::result::Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
{
self.create_stream(hyper::StatusCode::OK, value).await
}


fn create<T>(&self, code: hyper::StatusCode, value: &T) -> Result
where
T: ?Sized + Serialize + std::fmt::Debug,
Expand Down Expand Up @@ -49,7 +90,7 @@ impl Response {
}
}
}

pub fn ok<T>(&self, value: &T) -> Result
where
T: ?Sized + Serialize + std::fmt::Debug,
Expand Down
32 changes: 32 additions & 0 deletions rust/streamer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
name = "streamer"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
models = {path = "../models"}
osp = {path = "../osp"}
nasl-interpreter = { path = "../nasl-interpreter" }
feed = {path = "../feed"}
storage = { path = "../storage" }
hyper = { version = "0.14.26", features = ["full", "stream"] }
tokio = { version = "1.28.1", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
serde_json = "1.0.96"
serde = { version = "1.0.163", features = ["derive"] }
uuid = {version = "1", features = ["v4", "fast-rng", "serde"]}
hyper-rustls = "0.24.0"
rustls = "0.21.1"
tokio-rustls = "0.24.0"
futures-util = "0.3.28"
rustls-pemfile = "1.0.2"
async-trait = "0.1.68"
clap = { version = "4.3.0", features = ["derive", "env"] }
toml = "0.7.4"
http-body-util = "0.1.0-rc.2"
tokio-utils = "0.1.2"
tokio-util = "0.7.8"
pretty_env_logger = "0.5.0"
68 changes: 68 additions & 0 deletions rust/streamer/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#![deny(warnings)]

use tokio::fs::File;

use tokio_util::codec::{BytesCodec, FramedRead};

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Result, Server, StatusCode};

static INDEX: &str = "/tmp/feed.json";
static NOTFOUND: &[u8] = b"Not Found";

#[tokio::main]
async fn main() {
pretty_env_logger::init();

let addr = "127.0.0.1:1337".parse().unwrap();

let make_service =
make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(response_examples)) });

let server = Server::bind(&addr).serve(make_service);

println!("Listening on http://{}", addr);

if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}

async fn response_examples(req: Request<Body>) -> Result<Response<Body>> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/index.html") => simple_file_send(INDEX).await,
(&Method::GET, "/no_file.html") => {
// Test what happens when file cannot be be found
simple_file_send("this_file_should_not_exist.html").await
}
_ => Ok(not_found()),
}
}

/// HTTP status code 404
fn not_found() -> Response<Body> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(NOTFOUND.into())
.unwrap()
}

async fn simple_file_send(filename: &str) -> Result<Response<Body>> {
// Serve a file by asynchronously reading it by chunks using tokio-util crate.

if let Ok(file) = File::open(filename).await {
let stream = FramedRead::new(file, BytesCodec::new());
let body = Body::wrap_stream(stream);
//return Ok(Response::new(body));
match hyper::Response::builder()
.status(hyper::StatusCode::OK)
.header(hyper::header::CONTENT_TYPE, "application/json")
.header(hyper::header::CONTENT_ENCODING, "chuncked")
.body(body) {
Ok(resp) => return Ok(resp),
_ => return Ok(not_found())
}
};
Ok(not_found())
}

0 comments on commit 1beffbc

Please sign in to comment.