Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: clean up parts of the codebase #981

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
File renamed without changes.
File renamed without changes.
167 changes: 32 additions & 135 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
use super::ingest::ingester_logstream;
use super::ingest::ingester_rbac;
use super::ingest::ingester_role;
use super::server::Server;
use super::IngestorMetadata;
use super::OpenIdClient;
use super::ParseableServer;
use crate::analytics;
use crate::banner;
use crate::handlers::airplane;
use crate::handlers::http::health_check;
use crate::handlers::http::ingest;
use crate::handlers::http::logstream;
use crate::handlers::http::middleware::DisAllowRootUser;
Expand All @@ -38,138 +44,44 @@ use crate::storage::ObjectStorageError;
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use crate::sync;

use std::sync::Arc;

use super::ingest::ingester_logstream;
use super::ingest::ingester_rbac;
use super::ingest::ingester_role;
use super::server::Server;
use super::ssl_acceptor::get_ssl_acceptor;
use super::IngestorMetadata;
use super::OpenIdClient;
use super::ParseableServer;

use crate::{
handlers::http::{base_path, cross_origin_config},
option::CONFIG,
};
use crate::{handlers::http::base_path, option::CONFIG};
use actix_web::body::MessageBody;
use actix_web::middleware::from_fn;
use actix_web::web;
use actix_web::web::resource;
use actix_web::Scope;
use actix_web::{web, App, HttpServer};
use actix_web_prometheus::PrometheusMetrics;
use anyhow::anyhow;
use async_trait::async_trait;
use base64::Engine;
use bytes::Bytes;
use once_cell::sync::Lazy;
use relative_path::RelativePathBuf;
use serde_json::Value;
use tokio::sync::{oneshot, Mutex};

/// ! have to use a guard before using it
pub static INGESTOR_META: Lazy<IngestorMetadata> =
Lazy::new(|| staging::get_ingestor_info().expect("Should Be valid Json"));

#[derive(Default)]
pub struct IngestServer;

#[async_trait(?Send)]
#[async_trait]
impl ParseableServer for IngestServer {
// we dont need oidc client here its just here to satisfy the trait
async fn start(
&self,
prometheus: PrometheusMetrics,
_oidc_client: Option<crate::oidc::OpenidConfig>,
) -> anyhow::Result<()> {
// set the ingestor metadata
self.set_ingestor_metadata().await?;

// get the ssl stuff
let ssl = get_ssl_acceptor(
&CONFIG.parseable.tls_cert_path,
&CONFIG.parseable.tls_key_path,
&CONFIG.parseable.trusted_ca_certs_path,
)?;

// fn that creates the app
let create_app_fn = move || {
App::new()
.wrap(prometheus.clone())
.configure(|config| IngestServer::configure_routes(config, None))
.wrap(from_fn(health_check::check_shutdown_middleware))
.wrap(actix_web::middleware::Logger::default())
.wrap(actix_web::middleware::Compress::default())
.wrap(cross_origin_config())
};

// Create a channel to trigger server shutdown
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger)));

// Clone the shutdown signal for the signal handler
let shutdown_signal = server_shutdown_signal.clone();

// Spawn the signal handler task
let signal_task = tokio::spawn(async move {
health_check::handle_signals(shutdown_signal).await;
log::info!("Received shutdown signal, notifying server to shut down...");
});

// Create the HTTP server
let http_server = HttpServer::new(create_app_fn)
.workers(num_cpus::get())
.shutdown_timeout(60);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

60 here


// Start the server with or without TLS
let srv = if let Some(config) = ssl {
http_server
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
.run()
} else {
http_server.bind(&CONFIG.parseable.address)?.run()
};

// Graceful shutdown handling
let srv_handle = srv.handle();

let sync_task = tokio::spawn(async move {
// Wait for the shutdown signal
let _ = shutdown_rx.await;

// Perform S3 sync and wait for completion
log::info!("Starting data sync to S3...");
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
log::warn!("Failed to sync local data with object store. {:?}", e);
} else {
log::info!("Successfully synced all data to S3.");
}

// Initiate graceful shutdown
log::info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
});

// Await the HTTP server to run
let server_result = srv.await;

// Await the signal handler to ensure proper cleanup
if let Err(e) = signal_task.await {
log::error!("Error in signal handler: {:?}", e);
}

// Wait for the sync task to complete before exiting
if let Err(e) = sync_task.await {
log::error!("Error in sync task: {:?}", e);
} else {
log::info!("Sync task completed successfully.");
}

// Return the result of the server
server_result?;

Ok(())
// configure the api routes
fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option<OpenIdClient>) {
config
.service(
// Base path "{url}/api/v1"
web::scope(&base_path())
.service(Server::get_ingest_factory())
.service(Self::logstream_api())
.service(Server::get_about_factory())
.service(Self::analytics_factory())
.service(Server::get_liveness_factory())
.service(Self::get_user_webscope())
.service(Self::get_user_role_webscope())
.service(Server::get_metrics_webscope())
.service(Server::get_readiness_factory()),
)
.service(Server::get_ingest_otel_factory());
}

/// implement the init method will just invoke the initialize method
Expand Down Expand Up @@ -202,25 +114,6 @@ impl ParseableServer for IngestServer {
}

impl IngestServer {
// configure the api routes
fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option<OpenIdClient>) {
config
.service(
// Base path "{url}/api/v1"
web::scope(&base_path())
.service(Server::get_ingest_factory())
.service(Self::logstream_api())
.service(Server::get_about_factory())
.service(Self::analytics_factory())
.service(Server::get_liveness_factory())
.service(Self::get_user_webscope())
.service(Self::get_user_role_webscope())
.service(Server::get_metrics_webscope())
.service(Server::get_readiness_factory()),
)
.service(Server::get_ingest_otel_factory());
}

fn analytics_factory() -> Scope {
web::scope("/analytics").service(
// GET "/analytics" ==> Get analytics data
Expand Down Expand Up @@ -480,7 +373,11 @@ impl IngestServer {

tokio::spawn(airplane::server());

let app = self.start(prometheus, CONFIG.parseable.openid.clone());
// set the ingestor metadata
self.set_ingestor_metadata().await?;

// Ingestors shouldn't have to deal with OpenId auth flow
let app = self.start(prometheus, None);

tokio::pin!(app);
loop {
Expand Down
126 changes: 120 additions & 6 deletions server/src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,145 @@ pub mod utils;

use std::sync::Arc;

use actix_web::middleware::from_fn;
use actix_web::web::ServiceConfig;
use actix_web::App;
use actix_web::HttpServer;
use actix_web_prometheus::PrometheusMetrics;
use async_trait::async_trait;
use openid::Discovered;

use crate::oidc;
use base64::Engine;
use openid::Discovered;
use serde::Deserialize;
use serde::Serialize;
use ssl_acceptor::get_ssl_acceptor;
use tokio::sync::{oneshot, Mutex};

use super::cross_origin_config;
use super::API_BASE_PATH;
use super::API_VERSION;
use crate::handlers::http::health_check;
use crate::oidc;
use crate::option::CONFIG;

pub type OpenIdClient = Arc<openid::Client<Discovered, oidc::Claims>>;

// to be decided on what the Default version should be
pub const DEFAULT_VERSION: &str = "v3";

include!(concat!(env!("OUT_DIR"), "/generated.rs"));

#[async_trait(?Send)]
#[async_trait]
pub trait ParseableServer {
// async fn validate(&self) -> Result<(), ObjectStorageError>;
/// configure the router
fn configure_routes(config: &mut ServiceConfig, oidc_client: Option<OpenIdClient>)
where
Self: Sized;

/// configure the server
async fn start(
&self,
prometheus: PrometheusMetrics,
oidc_client: Option<crate::oidc::OpenidConfig>,
) -> anyhow::Result<()>;
) -> anyhow::Result<()>
where
Self: Sized,
{
let oidc_client = match oidc_client {
Some(config) => {
let client = config
.connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code"))
.await?;
Some(Arc::new(client))
}

None => None,
};

// get the ssl stuff
let ssl = get_ssl_acceptor(
&CONFIG.parseable.tls_cert_path,
&CONFIG.parseable.tls_key_path,
&CONFIG.parseable.trusted_ca_certs_path,
)?;

// fn that creates the app
let create_app_fn = move || {
App::new()
.wrap(prometheus.clone())
.configure(|config| Self::configure_routes(config, oidc_client.clone()))
.wrap(from_fn(health_check::check_shutdown_middleware))
.wrap(actix_web::middleware::Logger::default())
.wrap(actix_web::middleware::Compress::default())
.wrap(cross_origin_config())
};

// Create a channel to trigger server shutdown
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger)));

// Clone the shutdown signal for the signal handler
let shutdown_signal = server_shutdown_signal.clone();

// Spawn the signal handler task
let signal_task = tokio::spawn(async move {
health_check::handle_signals(shutdown_signal).await;
println!("Received shutdown signal, notifying server to shut down...");
});

// Create the HTTP server
let http_server = HttpServer::new(create_app_fn)
.workers(num_cpus::get())
.shutdown_timeout(60);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should be shutdown timeout?


// Start the server with or without TLS
let srv = if let Some(config) = ssl {
http_server
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
.run()
} else {
http_server.bind(&CONFIG.parseable.address)?.run()
};

// Graceful shutdown handling
let srv_handle = srv.handle();

let sync_task = tokio::spawn(async move {
// Wait for the shutdown signal
let _ = shutdown_rx.await;

// Perform S3 sync and wait for completion
log::info!("Starting data sync to S3...");
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
log::warn!("Failed to sync local data with object store. {:?}", e);
} else {
log::info!("Successfully synced all data to S3.");
}

// Initiate graceful shutdown
log::info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
});

// Await the HTTP server to run
let server_result = srv.await;

// Await the signal handler to ensure proper cleanup
if let Err(e) = signal_task.await {
log::error!("Error in signal handler: {:?}", e);
}

// Wait for the sync task to complete before exiting
if let Err(e) = sync_task.await {
log::error!("Error in sync task: {:?}", e);
} else {
log::info!("Sync task completed successfully.");
}

// Return the result of the server
server_result?;

Ok(())
}

async fn init(&self) -> anyhow::Result<()>;

Expand Down
Loading
Loading