diff --git a/server/src/catalog.rs b/server/src/catalog/mod.rs similarity index 99% rename from server/src/catalog.rs rename to server/src/catalog/mod.rs index e93f6cdd4..330f932cf 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog/mod.rs @@ -318,7 +318,7 @@ async fn create_manifest( } pub async fn remove_manifest_from_snapshot( - storage: Arc, + storage: Arc, stream_name: &str, dates: Vec, ) -> Result, ObjectStorageError> { @@ -343,7 +343,7 @@ pub async fn remove_manifest_from_snapshot( } pub async fn get_first_event( - storage: Arc, + storage: Arc, stream_name: &str, dates: Vec, ) -> Result, ObjectStorageError> { diff --git a/server/src/handlers/http.rs b/server/src/handlers/http/mod.rs similarity index 100% rename from server/src/handlers/http.rs rename to server/src/handlers/http/mod.rs diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index f6e54ce8e..6fec54b56 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -15,10 +15,15 @@ * along with this program. If not, see . * */ +use super::ingest::ingester_logstream; +use super::ingest::ingester_rbac; +use super::ingest::ingester_role; +use super::server::Server; +use super::IngestorMetadata; +use super::OpenIdClient; +use super::ParseableServer; use crate::analytics; -use crate::banner; use crate::handlers::airplane; -use crate::handlers::http::health_check; use crate::handlers::http::ingest; use crate::handlers::http::logstream; use crate::handlers::http::middleware::DisAllowRootUser; @@ -28,9 +33,7 @@ use crate::localcache::LocalCacheManager; use crate::metrics; use crate::migration; use crate::migration::metadata_migration::migrate_ingester_metadata; -use crate::rbac; use crate::rbac::role::Action; -use crate::storage; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::object_storage::parseable_json_path; use crate::storage::staging; @@ -38,27 +41,11 @@ use crate::storage::ObjectStorageError; use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::sync; -use std::sync::Arc; - -use super::ingest::ingester_logstream; -use super::ingest::ingester_rbac; -use super::ingest::ingester_role; -use super::server::Server; -use super::ssl_acceptor::get_ssl_acceptor; -use super::IngestorMetadata; -use super::OpenIdClient; -use super::ParseableServer; - -use crate::{ - handlers::http::{base_path, cross_origin_config}, - option::CONFIG, -}; +use crate::{handlers::http::base_path, option::CONFIG}; use actix_web::body::MessageBody; -use actix_web::middleware::from_fn; +use actix_web::web; use actix_web::web::resource; use actix_web::Scope; -use actix_web::{web, App, HttpServer}; -use actix_web_prometheus::PrometheusMetrics; use anyhow::anyhow; use async_trait::async_trait; use base64::Engine; @@ -66,142 +53,15 @@ use bytes::Bytes; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; use serde_json::Value; -use tokio::sync::{oneshot, Mutex}; /// ! have to use a guard before using it pub static INGESTOR_META: Lazy = Lazy::new(|| staging::get_ingestor_info().expect("Should Be valid Json")); -#[derive(Default)] pub struct IngestServer; -#[async_trait(?Send)] +#[async_trait] impl ParseableServer for IngestServer { - // we dont need oidc client here its just here to satisfy the trait - async fn start( - &self, - prometheus: PrometheusMetrics, - _oidc_client: Option, - ) -> anyhow::Result<()> { - // set the ingestor metadata - self.set_ingestor_metadata().await?; - - // get the ssl stuff - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - &CONFIG.parseable.trusted_ca_certs_path, - )?; - - // fn that creates the app - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|config| IngestServer::configure_routes(config, None)) - .wrap(from_fn(health_check::check_shutdown_middleware)) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - // Create a channel to trigger server shutdown - let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); - let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); - - // Clone the shutdown signal for the signal handler - let shutdown_signal = server_shutdown_signal.clone(); - - // Spawn the signal handler task - let signal_task = tokio::spawn(async move { - health_check::handle_signals(shutdown_signal).await; - log::info!("Received shutdown signal, notifying server to shut down..."); - }); - - // Create the HTTP server - let http_server = HttpServer::new(create_app_fn) - .workers(num_cpus::get()) - .shutdown_timeout(60); - - // Start the server with or without TLS - let srv = if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - } else { - http_server.bind(&CONFIG.parseable.address)?.run() - }; - - // Graceful shutdown handling - let srv_handle = srv.handle(); - - let sync_task = tokio::spawn(async move { - // Wait for the shutdown signal - let _ = shutdown_rx.await; - - // Perform S3 sync and wait for completion - log::info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { - log::warn!("Failed to sync local data with object store. {:?}", e); - } else { - log::info!("Successfully synced all data to S3."); - } - - // Initiate graceful shutdown - log::info!("Graceful shutdown of HTTP server triggered"); - srv_handle.stop(true).await; - }); - - // Await the HTTP server to run - let server_result = srv.await; - - // Await the signal handler to ensure proper cleanup - if let Err(e) = signal_task.await { - log::error!("Error in signal handler: {:?}", e); - } - - // Wait for the sync task to complete before exiting - if let Err(e) = sync_task.await { - log::error!("Error in sync task: {:?}", e); - } else { - log::info!("Sync task completed successfully."); - } - - // Return the result of the server - server_result?; - - Ok(()) - } - - /// implement the init method will just invoke the initialize method - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; - - // check for querier state. Is it there, or was it there in the past - let parseable_json = self.check_querier_state().await?; - // to get the .parseable.json file in staging - self.validate_credentials().await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - - banner::print(&CONFIG, &metadata).await; - rbac::map::init(&metadata); - // set the info in the global metadata - metadata.set_global(); - self.initialize().await - } - - fn validate(&self) -> anyhow::Result<()> { - if CONFIG.get_storage_mode_string() == "Local drive" { - return Err(anyhow::Error::msg( - // Error Message can be better - "Ingest Server cannot be started in local storage mode. Please start the server in a supported storage mode.", - )); - } - - Ok(()) - } -} - -impl IngestServer { // configure the api routes fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option) { config @@ -221,6 +81,83 @@ impl IngestServer { .service(Server::get_ingest_otel_factory()); } + async fn load_metadata(&self) -> anyhow::Result> { + // parseable can't use local storage for persistence when running a distributed setup + if CONFIG.get_storage_mode_string() == "Local drive" { + return Err(anyhow::Error::msg( + "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.", + )); + } + + // check for querier state. Is it there, or was it there in the past + let parseable_json = self.check_querier_state().await?; + // to get the .parseable.json file in staging + self.validate_credentials().await?; + + Ok(parseable_json) + } + + /// configure the server and start an instance to ingest data + async fn init(&self) -> anyhow::Result<()> { + // ! Undefined and Untested behaviour + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; + + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + tokio::spawn(airplane::server()); + + // set the ingestor metadata + self.set_ingestor_metadata().await?; + + // Ingestors shouldn't have to deal with OpenId auth flow + let app = self.start(prometheus, None); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + + }; + } + } +} + +impl IngestServer { fn analytics_factory() -> Scope { web::scope("/analytics").service( // GET "/analytics" ==> Get analytics data @@ -459,58 +396,4 @@ impl IngestServer { Ok(()) } - - async fn initialize(&self) -> anyhow::Result<()> { - // ! Undefined and Untested behaviour - if let Some(cache_manager) = LocalCacheManager::global() { - cache_manager - .validate(CONFIG.parseable.local_cache_size) - .await?; - }; - - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - - tokio::spawn(airplane::server()); - - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } - - }; - } - } } diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs index 6f6d2bfd7..57d5313f2 100644 --- a/server/src/handlers/http/modal/mod.rs +++ b/server/src/handlers/http/modal/mod.rs @@ -26,14 +26,27 @@ pub mod utils; use std::sync::Arc; +use actix_web::middleware::from_fn; +use actix_web::web::ServiceConfig; +use actix_web::App; +use actix_web::HttpServer; use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; -use openid::Discovered; - -use crate::oidc; use base64::Engine; +use bytes::Bytes; +use openid::Discovered; use serde::Deserialize; use serde::Serialize; +use ssl_acceptor::get_ssl_acceptor; +use tokio::sync::{oneshot, Mutex}; + +use super::cross_origin_config; +use super::API_BASE_PATH; +use super::API_VERSION; +use crate::handlers::http::health_check; +use crate::oidc; +use crate::option::CONFIG; + pub type OpenIdClient = Arc>; // to be decided on what the Default version should be @@ -41,20 +54,124 @@ pub const DEFAULT_VERSION: &str = "v3"; include!(concat!(env!("OUT_DIR"), "/generated.rs")); -#[async_trait(?Send)] +#[async_trait] pub trait ParseableServer { - // async fn validate(&self) -> Result<(), ObjectStorageError>; + /// configure the router + fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) + where + Self: Sized; + + /// load metadata/configuration from persistence for previous sessions of parseable + async fn load_metadata(&self) -> anyhow::Result>; + + /// code that describes starting and setup procedures for each type of server + async fn init(&self) -> anyhow::Result<()>; /// configure the server async fn start( &self, prometheus: PrometheusMetrics, oidc_client: Option, - ) -> anyhow::Result<()>; + ) -> anyhow::Result<()> + where + Self: Sized, + { + let oidc_client = match oidc_client { + Some(config) => { + let client = config + .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) + .await?; + Some(Arc::new(client)) + } - async fn init(&self) -> anyhow::Result<()>; + None => None, + }; + + // get the ssl stuff + let ssl = get_ssl_acceptor( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + &CONFIG.parseable.trusted_ca_certs_path, + )?; + + // fn that creates the app + let create_app_fn = move || { + App::new() + .wrap(prometheus.clone()) + .configure(|config| Self::configure_routes(config, oidc_client.clone())) + .wrap(from_fn(health_check::check_shutdown_middleware)) + .wrap(actix_web::middleware::Logger::default()) + .wrap(actix_web::middleware::Compress::default()) + .wrap(cross_origin_config()) + }; + + // Create a channel to trigger server shutdown + let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); + let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); + + // Clone the shutdown signal for the signal handler + let shutdown_signal = server_shutdown_signal.clone(); - fn validate(&self) -> anyhow::Result<()>; + // Spawn the signal handler task + let signal_task = tokio::spawn(async move { + health_check::handle_signals(shutdown_signal).await; + println!("Received shutdown signal, notifying server to shut down..."); + }); + + // Create the HTTP server + let http_server = HttpServer::new(create_app_fn) + .workers(num_cpus::get()) + .shutdown_timeout(60); + + // Start the server with or without TLS + let srv = if let Some(config) = ssl { + http_server + .bind_rustls_0_22(&CONFIG.parseable.address, config)? + .run() + } else { + http_server.bind(&CONFIG.parseable.address)?.run() + }; + + // Graceful shutdown handling + let srv_handle = srv.handle(); + + let sync_task = tokio::spawn(async move { + // Wait for the shutdown signal + let _ = shutdown_rx.await; + + // Perform S3 sync and wait for completion + log::info!("Starting data sync to S3..."); + if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { + log::warn!("Failed to sync local data with object store. {:?}", e); + } else { + log::info!("Successfully synced all data to S3."); + } + + // Initiate graceful shutdown + log::info!("Graceful shutdown of HTTP server triggered"); + srv_handle.stop(true).await; + }); + + // Await the HTTP server to run + let server_result = srv.await; + + // Await the signal handler to ensure proper cleanup + if let Err(e) = signal_task.await { + log::error!("Error in signal handler: {:?}", e); + } + + // Wait for the sync task to complete before exiting + if let Err(e) = sync_task.await { + log::error!("Error in sync task: {:?}", e); + } else { + log::info!("Sync task completed successfully."); + } + + // Return the result of the server + server_result?; + + Ok(()) + } } #[derive(Serialize, Debug, Deserialize, Default, Clone, Eq, PartialEq)] diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 302bd977e..caaef973e 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -17,171 +17,38 @@ */ use crate::handlers::airplane; +use crate::handlers::http::base_path; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; use crate::handlers::http::{self, role}; -use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; -use crate::handlers::http::{health_check, logstream, MAX_EVENT_PAYLOAD_SIZE}; +use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; -use crate::{analytics, banner, metrics, migration, rbac, storage}; -use actix_web::middleware::from_fn; +use crate::{analytics, metrics, migration, storage}; use actix_web::web::{resource, ServiceConfig}; use actix_web::{web, Scope}; -use actix_web::{App, HttpServer}; use async_trait::async_trait; -use std::sync::Arc; -use tokio::sync::{oneshot, Mutex}; +use bytes::Bytes; use crate::option::CONFIG; use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role}; use super::server::Server; -use super::ssl_acceptor::get_ssl_acceptor; use super::{OpenIdClient, ParseableServer}; -#[derive(Default, Debug)] pub struct QueryServer; -#[async_trait(?Send)] +#[async_trait] impl ParseableServer for QueryServer { - async fn start( - &self, - prometheus: actix_web_prometheus::PrometheusMetrics, - oidc_client: Option, - ) -> anyhow::Result<()> { - let oidc_client = match oidc_client { - Some(config) => { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - Some(Arc::new(client)) - } - - None => None, - }; - - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - &CONFIG.parseable.trusted_ca_certs_path, - )?; - - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|config| QueryServer::configure_routes(config, oidc_client.clone())) - .wrap(from_fn(health_check::check_shutdown_middleware)) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - // Create a channel to trigger server shutdown - let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); - let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); - - // Clone the shutdown signal for the signal handler - let shutdown_signal = server_shutdown_signal.clone(); - - // Spawn the signal handler task - let signal_task = tokio::spawn(async move { - health_check::handle_signals(shutdown_signal).await; - log::info!("Received shutdown signal, notifying server to shut down..."); - }); - - // Create the HTTP server - let http_server = HttpServer::new(create_app_fn) - .workers(num_cpus::get()) - .shutdown_timeout(120); - - // Start the server with or without TLS - let srv = if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - } else { - http_server.bind(&CONFIG.parseable.address)?.run() - }; - - // Graceful shutdown handling - let srv_handle = srv.handle(); - - let sync_task = tokio::spawn(async move { - // Wait for the shutdown signal - let _ = shutdown_rx.await; - - // Perform S3 sync and wait for completion - log::info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { - log::warn!("Failed to sync local data with object store. {:?}", e); - } else { - log::info!("Successfully synced all data to S3."); - } - - // Initiate graceful shutdown - log::info!("Graceful shutdown of HTTP server triggered"); - srv_handle.stop(true).await; - }); - - // Await the HTTP server to run - let server_result = srv.await; - - // Await the signal handler to ensure proper cleanup - if let Err(e) = signal_task.await { - log::error!("Error in signal handler: {:?}", e); - } - - // Wait for the sync task to complete before exiting - if let Err(e) = sync_task.await { - log::error!("Error in sync task: {:?}", e); - } else { - log::info!("Sync task completed successfully."); - } - - // Return the result of the server - server_result?; - - Ok(()) - } - - /// implementation of init should just invoke a call to initialize - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; - migration::run_file_migration(&CONFIG).await?; - let parseable_json = CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - // initialize the rbac map - rbac::map::init(&metadata); - // keep metadata info in mem - metadata.set_global(); - self.initialize().await - } - - fn validate(&self) -> anyhow::Result<()> { - if CONFIG.get_storage_mode_string() == "Local drive" { - return Err(anyhow::anyhow!( - "Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.", - )); - } - - Ok(()) - } -} - -impl QueryServer { // configure the api routes fn configure_routes(config: &mut ServiceConfig, oidc_client: Option) { config .service( web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body .service(Server::get_query_factory()) .service(Server::get_trino_factory()) .service(Server::get_cache_webscope()) @@ -201,6 +68,91 @@ impl QueryServer { .service(Server::get_generated()); } + async fn load_metadata(&self) -> anyhow::Result> { + // parseable can't use local storage for persistence when running a distributed setup + if CONFIG.get_storage_mode_string() == "Local drive" { + return Err(anyhow::anyhow!( + "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.", + )); + } + + migration::run_file_migration(&CONFIG).await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + + Ok(parseable_json) + } + + /// initialize the server, run migrations as needed and start an instance + async fn init(&self) -> anyhow::Result<()> { + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + //create internal stream at server start + create_internal_stream_if_not_exists().await?; + + FILTERS.load().await?; + DASHBOARDS.load().await?; + // track all parquet files already in the data directory + storage::retention::load_retention_from_global(); + + // all internal data structures populated now. + // start the analytics scheduler if enabled + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler()?; + } + + if matches!(init_cluster_metrics_schedular(), Ok(())) { + log::info!("Cluster metrics scheduler started successfully"); + } + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.put_internal_stream_hot_tier().await?; + hot_tier_manager.download_from_s3()?; + }; + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + tokio::spawn(airplane::server()); + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining localsync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + + }; + } + } +} + +impl QueryServer { // get the role webscope fn get_user_role_webscope() -> Scope { web::scope("/role") @@ -439,72 +391,4 @@ impl QueryServer { ), ) } - - /// initialize the server, run migrations as needed and start the server - async fn initialize(&self) -> anyhow::Result<()> { - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - //create internal stream at server start - create_internal_stream_if_not_exists().await?; - - FILTERS.load().await?; - DASHBOARDS.load().await?; - // track all parquet files already in the data directory - storage::retention::load_retention_from_global(); - - // all internal data structures populated now. - // start the analytics scheduler if enabled - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler()?; - } - - if matches!(init_cluster_metrics_schedular(), Ok(())) { - log::info!("Cluster metrics scheduler started successfully"); - } - if let Some(hot_tier_manager) = HotTierManager::global() { - hot_tier_manager.put_internal_stream_hot_tier().await?; - hot_tier_manager.download_from_s3()?; - }; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - - tokio::spawn(airplane::server()); - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - log::error!("Error joining localsync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } - - }; - } - } } diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index ba1ab055c..48e931619 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -17,7 +17,6 @@ */ use crate::analytics; -use crate::banner; use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::base_path; @@ -27,32 +26,26 @@ use crate::handlers::http::query; use crate::handlers::http::trino; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; -use crate::handlers::http::API_BASE_PATH; -use crate::handlers::http::API_VERSION; use crate::hottier::HotTierManager; use crate::localcache::LocalCacheManager; use crate::metrics; use crate::migration; -use crate::rbac; use crate::storage; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; -use actix_web::middleware::from_fn; -use std::sync::Arc; -use tokio::sync::{oneshot, Mutex}; +use actix_web::web; use actix_web::web::resource; use actix_web::Resource; use actix_web::Scope; -use actix_web::{web, App, HttpServer}; -use actix_web_prometheus::PrometheusMetrics; use actix_web_static_files::ResourceFiles; use async_trait::async_trait; +use bytes::Bytes; use crate::{ handlers::http::{ - self, cross_origin_config, ingest, llm, logstream, + self, ingest, llm, logstream, middleware::{DisAllowRootUser, RouteExt}, oidc, role, MAX_EVENT_PAYLOAD_SIZE, }, @@ -62,139 +55,18 @@ use crate::{ // use super::generate; use super::generate; -use super::ssl_acceptor::get_ssl_acceptor; use super::OpenIdClient; use super::ParseableServer; -#[derive(Default)] + pub struct Server; -#[async_trait(?Send)] +#[async_trait] impl ParseableServer for Server { - async fn start( - &self, - prometheus: PrometheusMetrics, - oidc_client: Option, - ) -> anyhow::Result<()> { - let oidc_client = match oidc_client { - Some(config) => { - let client = config - .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code")) - .await?; - Some(Arc::new(client)) - } - None => None, - }; - - let create_app_fn = move || { - App::new() - .wrap(prometheus.clone()) - .configure(|cfg| Server::configure_routes(cfg, oidc_client.clone())) - .wrap(from_fn(health_check::check_shutdown_middleware)) - .wrap(actix_web::middleware::Logger::default()) - .wrap(actix_web::middleware::Compress::default()) - .wrap(cross_origin_config()) - }; - - let ssl = get_ssl_acceptor( - &CONFIG.parseable.tls_cert_path, - &CONFIG.parseable.tls_key_path, - &CONFIG.parseable.trusted_ca_certs_path, - )?; - - // Create a channel to trigger server shutdown - let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); - let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger))); - - // Clone the shutdown signal for the signal handler - let shutdown_signal = server_shutdown_signal.clone(); - - // Spawn the signal handler task - let signal_task = tokio::spawn(async move { - health_check::handle_signals(shutdown_signal).await; - log::info!("Received shutdown signal, notifying server to shut down..."); - }); - - // Create the HTTP server - let http_server = HttpServer::new(create_app_fn) - .workers(num_cpus::get()) - .shutdown_timeout(60); - - // Start the server with or without TLS - let srv = if let Some(config) = ssl { - http_server - .bind_rustls_0_22(&CONFIG.parseable.address, config)? - .run() - } else { - http_server.bind(&CONFIG.parseable.address)?.run() - }; - - // Graceful shutdown handling - let srv_handle = srv.handle(); - - let sync_task = tokio::spawn(async move { - // Wait for the shutdown signal - let _ = shutdown_rx.await; - - // Perform S3 sync and wait for completion - log::info!("Starting data sync to S3..."); - if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { - log::warn!("Failed to sync local data with object store. {:?}", e); - } else { - log::info!("Successfully synced all data to S3."); - } - - // Initiate graceful shutdown - log::info!("Graceful shutdown of HTTP server triggered"); - srv_handle.stop(true).await; - }); - - // Await the HTTP server to run - let server_result = srv.await; - - // Await the signal handler to ensure proper cleanup - if let Err(e) = signal_task.await { - log::error!("Error in signal handler: {:?}", e); - } - - // Wait for the sync task to complete before exiting - if let Err(e) = sync_task.await { - log::error!("Error in sync task: {:?}", e); - } else { - log::info!("Sync task completed successfully."); - } - - // Return the result of the server - server_result?; - - Ok(()) - } - - /// implementation of init should just invoke a call to initialize - async fn init(&self) -> anyhow::Result<()> { - self.validate()?; - migration::run_file_migration(&CONFIG).await?; - let parseable_json = CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; - banner::print(&CONFIG, &metadata).await; - rbac::map::init(&metadata); - metadata.set_global(); - self.initialize().await?; - Ok(()) - } - - fn validate(&self) -> anyhow::Result<()> { - Ok(()) - } -} - -impl Server { fn configure_routes(config: &mut web::ServiceConfig, oidc_client: Option) { // there might be a bug in the configure routes method config .service( web::scope(&base_path()) - // POST "/query" ==> Get results of the SQL query passed in request body .service(Self::get_query_factory()) .service(Self::get_trino_factory()) .service(Self::get_cache_webscope()) @@ -215,6 +87,85 @@ impl Server { .service(Self::get_generated()); } + async fn load_metadata(&self) -> anyhow::Result> { + migration::run_file_migration(&CONFIG).await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + + Ok(parseable_json) + } + + // configure the server and start an instance of the single server setup + async fn init(&self) -> anyhow::Result<()> { + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; + + let prometheus = metrics::build_metrics_handler(); + CONFIG.storage().register_store_metrics(&prometheus); + + migration::run_migration(&CONFIG).await?; + + FILTERS.load().await?; + DASHBOARDS.load().await?; + + storage::retention::load_retention_from_global(); + + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.download_from_s3()?; + }; + + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; + let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = + sync::object_store_sync().await; + + if CONFIG.parseable.send_analytics { + analytics::init_analytics_scheduler()?; + } + + tokio::spawn(handlers::livetail::server()); + tokio::spawn(handlers::airplane::server()); + + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + + tokio::pin!(app); + + loop { + tokio::select! { + e = &mut app => { + // actix server finished .. stop other threads and stop the server + remote_sync_inbox.send(()).unwrap_or(()); + localsync_inbox.send(()).unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + return e + }, + _ = &mut localsync_outbox => { + // crash the server if localsync fails for any reason + // panic!("Local Sync thread died. Server will fail now!") + return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) + }, + _ = &mut remote_sync_outbox => { + // remote_sync failed, this is recoverable by just starting remote_sync thread again + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; + } + + }; + } + } +} + +impl Server { // get the trino factory pub fn get_trino_factory() -> Resource { web::resource("/trinoquery") @@ -292,6 +243,7 @@ impl Server { } // get the query factory + // POST "/query" ==> Get results of the SQL query passed in request body pub fn get_query_factory() -> Resource { web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) } @@ -591,72 +543,4 @@ impl Server { pub fn get_generated() -> ResourceFiles { ResourceFiles::new("/", generate()).resolve_not_found_to_root() } - - async fn initialize(&self) -> anyhow::Result<()> { - if let Some(cache_manager) = LocalCacheManager::global() { - cache_manager - .validate(CONFIG.parseable.local_cache_size) - .await?; - }; - - let prometheus = metrics::build_metrics_handler(); - CONFIG.storage().register_store_metrics(&prometheus); - - migration::run_migration(&CONFIG).await?; - - FILTERS.load().await?; - DASHBOARDS.load().await?; - - storage::retention::load_retention_from_global(); - - if let Some(hot_tier_manager) = HotTierManager::global() { - hot_tier_manager.download_from_s3()?; - }; - - let (localsync_handler, mut localsync_outbox, localsync_inbox) = - sync::run_local_sync().await; - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync().await; - - if CONFIG.parseable.send_analytics { - analytics::init_analytics_scheduler()?; - } - - tokio::spawn(handlers::livetail::server()); - tokio::spawn(handlers::airplane::server()); - - let app = self.start(prometheus, CONFIG.parseable.openid.clone()); - - tokio::pin!(app); - - loop { - tokio::select! { - e = &mut app => { - // actix server finished .. stop other threads and stop the server - remote_sync_inbox.send(()).unwrap_or(()); - localsync_inbox.send(()).unwrap_or(()); - if let Err(e) = localsync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - return e - }, - _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) - }, - _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again - if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); - } - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; - } - - }; - } - } } diff --git a/server/src/handlers.rs b/server/src/handlers/mod.rs similarity index 100% rename from server/src/handlers.rs rename to server/src/handlers/mod.rs diff --git a/server/src/hottier.rs b/server/src/hottier.rs index 0528a1228..b6f29f609 100644 --- a/server/src/hottier.rs +++ b/server/src/hottier.rs @@ -289,7 +289,7 @@ impl HotTierManager { stream: &str, manifest_files_to_download: &mut BTreeMap>, parquet_file_size: &mut u64, - object_store: Arc, + object_store: Arc, ) -> Result<(), HotTierError> { if manifest_files_to_download.is_empty() { return Ok(()); diff --git a/server/src/main.rs b/server/src/main.rs index fca2ca307..f4d170bd9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -44,8 +44,6 @@ mod users; mod utils; mod validator; -use std::sync::Arc; - use handlers::http::modal::ParseableServer; use option::{Mode, CONFIG}; @@ -59,15 +57,20 @@ async fn main() -> anyhow::Result<()> { env_logger::init(); // these are empty ptrs so mem footprint should be minimal - let server: Arc = match CONFIG.parseable.mode { - Mode::Query => Arc::new(QueryServer), - - Mode::Ingest => Arc::new(IngestServer), - - Mode::All => Arc::new(Server), + let server: Box = match CONFIG.parseable.mode { + Mode::Query => Box::new(QueryServer), + Mode::Ingest => Box::new(IngestServer), + Mode::All => Box::new(Server), }; - server.init().await?; + // load metadata from persistence + let parseable_json = server.load_metadata().await?; + let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; + banner::print(&CONFIG, &metadata).await; + // initialize the rbac map + rbac::map::init(&metadata); + // keep metadata info in mem + metadata.set_global(); - Ok(()) + server.init().await } diff --git a/server/src/migration.rs b/server/src/migration/mod.rs similarity index 98% rename from server/src/migration.rs rename to server/src/migration/mod.rs index c0c483b2b..52336c654 100644 --- a/server/src/migration.rs +++ b/server/src/migration/mod.rs @@ -297,7 +297,7 @@ pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> { } async fn run_meta_file_migration( - object_store: &Arc, + object_store: &Arc, old_meta_file_path: RelativePathBuf, ) -> anyhow::Result<()> { // get the list of all meta files @@ -328,9 +328,7 @@ async fn run_meta_file_migration( Ok(()) } -async fn run_stream_files_migration( - object_store: &Arc, -) -> anyhow::Result<()> { +async fn run_stream_files_migration(object_store: &Arc) -> anyhow::Result<()> { let streams = object_store .list_old_streams() .await? diff --git a/server/src/option.rs b/server/src/option.rs index 73b701c6e..be4a8cd08 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -39,7 +39,7 @@ pub static CONFIG: Lazy> = Lazy::new(|| Arc::new(Config::new())); #[derive(Debug)] pub struct Config { pub parseable: Cli, - storage: Arc, + storage: Arc, pub storage_name: &'static str, } diff --git a/server/src/query.rs b/server/src/query/mod.rs similarity index 100% rename from server/src/query.rs rename to server/src/query/mod.rs diff --git a/server/src/rbac.rs b/server/src/rbac/mod.rs similarity index 100% rename from server/src/rbac.rs rename to server/src/rbac/mod.rs diff --git a/server/src/storage/azure_blob.rs b/server/src/storage/azure_blob.rs index 6475b8fdc..e639dae5f 100644 --- a/server/src/storage/azure_blob.rs +++ b/server/src/storage/azure_blob.rs @@ -163,7 +163,7 @@ impl ObjectStorageProvider for AzureBlobConfig { RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) } - fn get_object_store(&self) -> Arc { + fn get_object_store(&self) -> Arc { let azure = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index a84247f0b..b3d3e09cd 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -67,7 +67,7 @@ impl ObjectStorageProvider for FSConfig { RuntimeConfig::new() } - fn get_object_store(&self) -> Arc { + fn get_object_store(&self) -> Arc { Arc::new(LocalFS::new(self.root.clone())) } diff --git a/server/src/storage.rs b/server/src/storage/mod.rs similarity index 100% rename from server/src/storage.rs rename to server/src/storage/mod.rs diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7e2f7f609..dcd1dff37 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -58,15 +58,15 @@ use std::{ time::{Duration, Instant}, }; -pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { +pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { fn get_datafusion_runtime(&self) -> RuntimeConfig; - fn get_object_store(&self) -> Arc; + fn get_object_store(&self) -> Arc; fn get_endpoint(&self) -> String; fn register_store_metrics(&self, handler: &PrometheusMetrics); } #[async_trait] -pub trait ObjectStorage: Sync + 'static { +pub trait ObjectStorage: Send + Sync + 'static { async fn get_object(&self, path: &RelativePath) -> Result; // TODO: make the filter function optional as we may want to get all objects async fn get_objects( diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 0d6513437..7dae3ca72 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -289,7 +289,7 @@ impl ObjectStorageProvider for S3Config { RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) } - fn get_object_store(&self) -> Arc { + fn get_object_store(&self) -> Arc { let s3 = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index acdbeb8e4..931e092f6 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -104,13 +104,10 @@ pub async fn resolve_parseable_metadata( parseable_metadata: &Option, ) -> Result { let staging_metadata = get_staging_metadata()?; - let mut remote_metadata: Option = None; - if parseable_metadata.is_some() { - remote_metadata = Some( - serde_json::from_slice(parseable_metadata.as_ref().unwrap()) - .expect("parseable config is valid json"), - ); - } + let remote_metadata = parseable_metadata + .as_ref() + .map(|meta| serde_json::from_slice(meta).expect("parseable config is valid json")); + // Env Change needs to be updated let check = determine_environment(staging_metadata, remote_metadata); // flags for if metadata needs to be synced diff --git a/server/src/utils/arrow.rs b/server/src/utils/arrow/mod.rs similarity index 100% rename from server/src/utils/arrow.rs rename to server/src/utils/arrow/mod.rs diff --git a/server/src/utils/json.rs b/server/src/utils/json/mod.rs similarity index 100% rename from server/src/utils/json.rs rename to server/src/utils/json/mod.rs diff --git a/server/src/utils.rs b/server/src/utils/mod.rs similarity index 100% rename from server/src/utils.rs rename to server/src/utils/mod.rs