diff --git a/Cargo.lock b/Cargo.lock index 1d9a05c6..7b10d886 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -993,6 +993,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "async-signals" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7764064e8871fca00fb487328c3c9cfc71c0871b46d7b61f0089fd8a2d69cc30" +dependencies = [ + "futures-core", + "nix 0.23.2", + "once_cell", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -1707,6 +1718,7 @@ dependencies = [ "api", "assert_fs", "async-graphql", + "async-signals", "async-trait", "cfg-if", "chronicle-arrow", @@ -1729,6 +1741,7 @@ dependencies = [ "iri-string", "is-terminal", "jsonschema", + "libc", "mockito", "opa", "opentelemetry 0.21.0", @@ -6666,6 +6679,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memoffset" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.8.0" @@ -7112,6 +7134,19 @@ dependencies = [ "generics", ] +[[package]] +name = "nix" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f3790c00a0150112de0f4cd161e3d7fc4b2d8a5542ffc35f099a2562aecb35c" +dependencies = [ + "bitflags 1.3.2", + "cc", + "cfg-if", + "libc", + "memoffset 0.6.5", +] + [[package]] name = "nix" version = "0.24.3" @@ -14658,7 +14693,7 @@ dependencies = [ "log", "mach", "memfd", - "memoffset", + "memoffset 0.8.0", "paste", "rand 0.8.5", "rustix 0.36.17", diff --git a/Cargo.toml b/Cargo.toml index 102fa8b4..0ff66a31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ anyhow = { version = "^1", features = ["backtrace"] } assert_fs = "1.0" async-graphql = "^7" async-graphql-poem = "^7" +async-signals = "^0.4" async-stream = "^0.3.3" async-trait = "^0.1.61" backoff = { version = "0.4.0", features = ["futures", "tokio"] } diff --git a/crates/api/src/chronicle_graphql/mod.rs b/crates/api/src/chronicle_graphql/mod.rs index 934350b3..c99d3fd9 100644 --- a/crates/api/src/chronicle_graphql/mod.rs +++ b/crates/api/src/chronicle_graphql/mod.rs @@ -755,8 +755,8 @@ impl IriEndpoint { .body("failed to compact JSON response")) }, }, - Err(StoreError::Db(diesel::result::Error::NotFound)) - | Err(StoreError::RecordNotFound) => { + Err(StoreError::Db(diesel::result::Error::NotFound)) | + Err(StoreError::RecordNotFound) => { tracing::debug!("not found: {prov_type} {} in {ns}", id.external_id_part()); Ok(poem::Response::builder() .status(StatusCode::NOT_FOUND) @@ -820,9 +820,8 @@ impl IriEndpoint { match ChronicleIri::from_str(&ns_iri.iri) { Ok(iri) => Ok(Ok((ns_iri.ns.into(), iri))), - Err(error) => { - Ok(Err(Response::builder().status(StatusCode::NOT_FOUND).body(error.to_string()))) - }, + Err(error) => + Ok(Err(Response::builder().status(StatusCode::NOT_FOUND).body(error.to_string()))), } } @@ -833,24 +832,21 @@ impl IriEndpoint { claims: Option<&JwtClaims>, ) -> poem::Result { match self.parse_ns_iri_from_uri_path(req).await? { - Ok((ns, ChronicleIri::Activity(id))) => { + Ok((ns, ChronicleIri::Activity(id))) => self.response_for_query(claims, "activity", &id, &ns, |mut conn, id, ns| { self.store.prov_model_for_activity_id(&mut conn, id, ns) }) - .await - }, - Ok((ns, ChronicleIri::Agent(id))) => { + .await, + Ok((ns, ChronicleIri::Agent(id))) => self.response_for_query(claims, "agent", &id, &ns, |mut conn, id, ns| { self.store.prov_model_for_agent_id(&mut conn, id, ns) }) - .await - }, - Ok((ns, ChronicleIri::Entity(id))) => { + .await, + Ok((ns, ChronicleIri::Entity(id))) => self.response_for_query(claims, "entity", &id, &ns, |mut conn, id, ns| { self.store.prov_model_for_entity_id(&mut conn, id, ns) }) - .await - }, + .await, Ok(_) => Ok(poem::Response::builder() .status(StatusCode::NOT_FOUND) .body("may query only: activity, agent, entity")), @@ -989,7 +985,7 @@ lazy_static! { static ref SHUTDOWN_SIGNAL: Arc = Arc::new(Semaphore::new(0)); } -fn trigger_shutdown() { +pub fn trigger_shutdown() { SHUTDOWN_SIGNAL.add_permits(1); } diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 6111aed5..1e742674 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -511,12 +511,11 @@ where }; match stage { - SubmissionStage::Submitted(Ok(id)) => { + SubmissionStage::Submitted(Ok(id)) => if id == tx_id { debug!("Depth charge operation submitted: {id}"); continue; - } - }, + }, SubmissionStage::Submitted(Err(err)) => { if err.tx_id() == &tx_id { error!("Depth charge transaction rejected by Chronicle: {} {}", @@ -620,6 +619,7 @@ where connection: &mut PgConnection, to_apply: &Vec, ) -> Result>, ApiError> { + debug!(checking_for_effects = to_apply.len()); let mut model = ProvModel::default(); let mut transactions = Vec::::with_capacity(to_apply.len()); for op in to_apply { @@ -630,30 +630,27 @@ where model.namespace_context(&namespace); model }, - ChronicleOperation::AgentExists(AgentExists { ref namespace, ref id }) => { + ChronicleOperation::AgentExists(AgentExists { ref namespace, ref id }) => self.store.apply_prov_model_for_agent_id( connection, model, id, namespace.external_id_part(), - )? - }, - ChronicleOperation::ActivityExists(ActivityExists { ref namespace, ref id }) => { + )?, + ChronicleOperation::ActivityExists(ActivityExists { ref namespace, ref id }) => self.store.apply_prov_model_for_activity_id( connection, model, id, namespace.external_id_part(), - )? - }, - ChronicleOperation::EntityExists(EntityExists { ref namespace, ref id }) => { + )?, + ChronicleOperation::EntityExists(EntityExists { ref namespace, ref id }) => self.store.apply_prov_model_for_entity_id( connection, model, id, namespace.external_id_part(), - )? - }, + )?, ChronicleOperation::ActivityUses(ActivityUses { ref namespace, ref id, @@ -666,47 +663,42 @@ where namespace.external_id_part(), )?, ChronicleOperation::SetAttributes(ref o) => match o { - SetAttributes::Activity { namespace, id, .. } => { + SetAttributes::Activity { namespace, id, .. } => self.store.apply_prov_model_for_activity_id( connection, model, id, namespace.external_id_part(), - )? - }, - SetAttributes::Agent { namespace, id, .. } => { + )?, + SetAttributes::Agent { namespace, id, .. } => self.store.apply_prov_model_for_agent_id( connection, model, id, namespace.external_id_part(), - )? - }, - SetAttributes::Entity { namespace, id, .. } => { + )?, + SetAttributes::Entity { namespace, id, .. } => self.store.apply_prov_model_for_entity_id( connection, model, id, namespace.external_id_part(), - )? - }, + )?, }, - ChronicleOperation::StartActivity(StartActivity { namespace, id, .. }) => { + ChronicleOperation::StartActivity(StartActivity { namespace, id, .. }) => self.store.apply_prov_model_for_activity_id( connection, model, id, namespace.external_id_part(), - )? - }, - ChronicleOperation::EndActivity(EndActivity { namespace, id, .. }) => { + )?, + ChronicleOperation::EndActivity(EndActivity { namespace, id, .. }) => self.store.apply_prov_model_for_activity_id( connection, model, id, namespace.external_id_part(), - )? - }, + )?, ChronicleOperation::WasInformedBy(WasInformedBy { namespace, activity, @@ -870,6 +862,7 @@ where if applying_new_namespace { self.submit(id, identity, to_apply) } else if let Some(to_apply) = self.check_for_effects(connection, &to_apply)? { + info!(sending_operations = to_apply.len()); self.submit(id, identity, to_apply) } else { info!("API call will not result in any data changes"); @@ -1243,21 +1236,16 @@ where #[instrument(skip(self))] async fn dispatch(&mut self, command: (ApiCommand, AuthId)) -> Result { match command { - (ApiCommand::DepthCharge(DepthChargeCommand { namespace }), identity) => { - self.depth_charge(namespace, identity).await - }, - (ApiCommand::Import(ImportCommand { operations }), identity) => { - self.submit_import_operations(identity, operations).await - }, - (ApiCommand::NameSpace(NamespaceCommand::Create { id }), identity) => { - self.create_namespace(&id, identity).await - }, - (ApiCommand::Agent(AgentCommand::Create { id, namespace, attributes }), identity) => { - self.create_agent(id, namespace, attributes, identity).await - }, - (ApiCommand::Agent(AgentCommand::UseInContext { id, namespace }), _identity) => { - self.use_agent_in_cli_context(id, namespace).await - }, + (ApiCommand::DepthCharge(DepthChargeCommand { namespace }), identity) => + self.depth_charge(namespace, identity).await, + (ApiCommand::Import(ImportCommand { operations }), identity) => + self.submit_import_operations(identity, operations).await, + (ApiCommand::NameSpace(NamespaceCommand::Create { id }), identity) => + self.create_namespace(&id, identity).await, + (ApiCommand::Agent(AgentCommand::Create { id, namespace, attributes }), identity) => + self.create_agent(id, namespace, attributes, identity).await, + (ApiCommand::Agent(AgentCommand::UseInContext { id, namespace }), _identity) => + self.use_agent_in_cli_context(id, namespace).await, ( ApiCommand::Agent(AgentCommand::Delegate { id, @@ -1284,9 +1272,8 @@ where ApiCommand::Activity(ActivityCommand::End { id, namespace, time, agent }), identity, ) => self.end_activity(id, namespace, time, agent, identity).await, - (ApiCommand::Activity(ActivityCommand::Use { id, namespace, activity }), identity) => { - self.activity_use(id, namespace, activity, identity).await - }, + (ApiCommand::Activity(ActivityCommand::Use { id, namespace, activity }), identity) => + self.activity_use(id, namespace, activity, identity).await, ( ApiCommand::Activity(ActivityCommand::WasInformedBy { id, @@ -1308,10 +1295,9 @@ where ApiCommand::Entity(EntityCommand::Attribute { id, namespace, responsible, role }), identity, ) => self.attribute(namespace, responsible, id, role, identity).await, - (ApiCommand::Entity(EntityCommand::Create { id, namespace, attributes }), identity) => { + (ApiCommand::Entity(EntityCommand::Create { id, namespace, attributes }), identity) => self.create_entity(EntityId::from_external_id(&id), namespace, attributes, identity) - .await - }, + .await, ( ApiCommand::Activity(ActivityCommand::Generate { id, namespace, activity }), identity, @@ -1325,10 +1311,9 @@ where derivation, }), identity, - ) => { + ) => self.entity_derive(id, namespace, activity, used_entity, derivation, identity) - .await - }, + .await, (ApiCommand::Query(query), _identity) => self.query(query).await, } } diff --git a/crates/chronicle-arrow/src/lib.rs b/crates/chronicle-arrow/src/lib.rs index 4debaaa8..621d2b09 100644 --- a/crates/chronicle-arrow/src/lib.rs +++ b/crates/chronicle-arrow/src/lib.rs @@ -2,10 +2,11 @@ mod meta; mod operations; mod peekablestream; mod query; +use lazy_static::lazy_static; +use tokio::sync::broadcast; use api::{ApiDispatch, ApiError}; -use arrow_array::Array; use arrow_flight::decode::FlightRecordBatchStream; use arrow_flight::{ @@ -35,6 +36,7 @@ use query::EntityAndReferences; use r2d2::Pool; use serde::Serialize; use std::net::SocketAddr; +use tokio::sync::Semaphore; use std::{sync::Arc, vec::Vec}; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -226,6 +228,7 @@ impl FlightService for FlightServiceImpl { ) -> Result, Status> { Ok(Response::new(Box::pin(futures::stream::empty()) as Self::HandshakeStream)) } + #[instrument(skip(self, _request))] async fn list_flights( &self, @@ -485,13 +488,6 @@ impl FlightService for FlightServiceImpl { }, }; - tracing::debug!( - descriptor_type = %flight_descriptor.r#type, - descriptor_cmd = %String::from_utf8_lossy(&flight_descriptor.cmd), - descriptor_path = ?flight_descriptor.path, - "Flight descriptor" - ); - let filtered_stream = stream.filter_map(|item| async move { match item { Ok(flight_data) => { @@ -542,12 +538,27 @@ impl FlightService for FlightServiceImpl { } } +lazy_static! { + static ref SHUTDOWN_CHANNEL: (broadcast::Sender<()>, broadcast::Receiver<()>) = + broadcast::channel(1); +} + +/// Triggers a shutdown signal across the application. +pub fn trigger_shutdown() { + let _ = SHUTDOWN_CHANNEL.0.send(()); +} + +/// Returns a receiver for the shutdown signal. +pub async fn await_shutdown() { + SHUTDOWN_CHANNEL.0.subscribe().recv().await.ok(); +} + #[instrument(skip(pool, api))] pub async fn run_flight_service( domain: &common::domain::ChronicleDomainDef, pool: &Pool>, api: &ApiDispatch, - addrs: Vec, + addrs: &Vec, record_batch_size: usize, ) -> Result<(), tonic::transport::Error> { meta::cache_domain_schemas(domain); @@ -561,7 +572,7 @@ pub async fn run_flight_service( .add_service(arrow_flight::flight_service_server::FlightServiceServer::new( flight_service, )) - .serve(addr); + .serve_with_shutdown(*addr, await_shutdown()); services.push(server); } @@ -621,7 +632,7 @@ mod tests { let dispatch = api.api_dispatch().clone(); let domain = domain.clone(); tokio::spawn(async move { - super::run_flight_service(&domain, &pool, &dispatch, vec![addr], 10) + super::run_flight_service(&domain, &pool, &dispatch, &vec![addr], 10) .await .unwrap(); }); @@ -782,8 +793,8 @@ roles: namespace_name: "default".to_string(), namespace_uuid: Uuid::default().into_bytes(), attributes: create_attributes(meta.typ.as_deref(), &attributes), - started: Some(Utc.ymd(2022, 1, 1).and_hms(0, 0, 0)), - ended: Some(Utc.ymd(2022, 1, 2).and_hms(0, 0, 0)), + started: Some(Utc.with_ymd_and_hms(2022, 1, 1, 0, 0, 0).unwrap()), + ended: Some(Utc.with_ymd_and_hms(2022, 1, 2, 0, 0, 0).unwrap()), generated: vec![format!("entity-{}", i), format!("entity-{}", i + 1)], was_informed_by: vec![format!("activity-{}", i), format!("activity-{}", i + 1)], was_associated_with: vec![AssociationRef { diff --git a/crates/chronicle-arrow/src/meta.rs b/crates/chronicle-arrow/src/meta.rs index 25702ba4..1f7a2469 100644 --- a/crates/chronicle-arrow/src/meta.rs +++ b/crates/chronicle-arrow/src/meta.rs @@ -227,6 +227,7 @@ use std::str::FromStr; impl FromStr for Term { type Err = (); + fn from_str(s: &str) -> Result { match s { "Namespace" => Ok(Term::Namespace), diff --git a/crates/chronicle-arrow/src/operations.rs b/crates/chronicle-arrow/src/operations.rs index 0549cc3d..edf9349d 100644 --- a/crates/chronicle-arrow/src/operations.rs +++ b/crates/chronicle-arrow/src/operations.rs @@ -38,7 +38,7 @@ use crate::{ ChronicleArrowError, ChronicleTicket, }; -#[tracing::instrument(skip(record_batch))] +#[tracing::instrument(skip(record_batch, api))] pub async fn process_record_batch( descriptor_path: &Vec, record_batch: RecordBatch, @@ -61,18 +61,15 @@ pub async fn process_record_batch( .collect::>(); match domain_type_meta.term { - Term::Entity => { + Term::Entity => create_chronicle_entity(&domain_type_meta.typ, &record_batch, &attribute_columns, api) - .await? - }, - Term::Activity => { + .await?, + Term::Activity => create_chronicle_activity(&domain_type_meta.typ, &record_batch, &attribute_columns, api) - .await? - }, - Term::Agent => { + .await?, + Term::Agent => create_chronicle_agent(&domain_type_meta.typ, &record_batch, &attribute_columns, api) - .await? - }, + .await?, Term::Namespace => create_chronicle_namespace(&record_batch, api).await?, } Ok(()) @@ -173,7 +170,7 @@ pub async fn create_chronicle_terms( .filter_map(|(column_name, array_ref)| array_ref.map(|array_ref| (column_name, array_ref))) .collect::>(); - tracing::debug!(?attribute_columns, "Processing attribute columns"); + tracing::trace!(?attribute_columns, "Processing attribute columns"); let mut operations = Vec::new(); for row_index in 0..record_batch.num_rows() { @@ -742,33 +739,30 @@ pub async fn calculate_count_by_metadata_term( ) -> Result { let pool = pool.clone(); match term { - Term::Entity => { + Term::Entity => spawn_blocking(move || { entity_count_by_type( &pool, domaintype.map(|x| x.to_string()).iter().map(|s| s.as_str()).collect(), ) }) - .await - }, - Term::Agent => { + .await, + Term::Agent => spawn_blocking(move || { agent_count_by_type( &pool, domaintype.map(|x| x.to_string()).iter().map(|s| s.as_str()).collect(), ) }) - .await - }, - Term::Activity => { + .await, + Term::Activity => spawn_blocking(move || { activity_count_by_type( &pool, domaintype.map(|x| x.to_string()).iter().map(|s| s.as_str()).collect(), ) }) - .await - }, + .await, _ => Ok(Ok(0)), } .map_err(|e| Status::from_error(e.into())) diff --git a/crates/chronicle-arrow/src/query.rs b/crates/chronicle-arrow/src/query.rs index 16ed4526..1b03df4e 100644 --- a/crates/chronicle-arrow/src/query.rs +++ b/crates/chronicle-arrow/src/query.rs @@ -9,8 +9,7 @@ use crate::{ }; use arrow::array::{ArrayBuilder, StringBuilder, StructBuilder}; use arrow_array::{ - Array, BooleanArray, Int64Array, ListArray, RecordBatch, StringArray, - TimestampNanosecondArray, + Array, BooleanArray, Int64Array, ListArray, RecordBatch, StringArray, TimestampNanosecondArray, }; use arrow_buffer::{Buffer, ToByteSlice}; use arrow_data::ArrayData; @@ -189,9 +188,8 @@ impl EntityAndReferences { let field_name = field.name(); match hashed_fields.get(field_name) { Some(array) => columns.push(array.clone()), - None => { - return Err(ChronicleArrowError::SchemaFieldNotFound(field_name.to_string())) - }, + None => + return Err(ChronicleArrowError::SchemaFieldNotFound(field_name.to_string())), } } @@ -353,9 +351,8 @@ impl ActivityAndReferences { let field_name = field.name(); match hashed_fields.get(field_name) { Some(array) => columns.push(array.clone()), - None => { - return Err(ChronicleArrowError::SchemaFieldNotFound(field_name.to_string())) - }, + None => + return Err(ChronicleArrowError::SchemaFieldNotFound(field_name.to_string())), } } @@ -485,9 +482,8 @@ impl AgentAndReferences { let field_name = field.name(); match hashed_fields.get(field_name) { Some(array) => columns.push(array.clone()), - None => { - return Err(ChronicleArrowError::SchemaFieldNotFound(field_name.to_string())) - }, + None => + return Err(ChronicleArrowError::SchemaFieldNotFound(field_name.to_string())), } } diff --git a/crates/chronicle-test-infrastructure/src/api_test.rs b/crates/chronicle-test-infrastructure/src/api_test.rs index c154ecda..d44a5b9a 100644 --- a/crates/chronicle-test-infrastructure/src/api_test.rs +++ b/crates/chronicle-test-infrastructure/src/api_test.rs @@ -272,13 +272,12 @@ async fn create_system_activity() { namespace: common::prov::SYSTEM_ID.into(), attributes: Attributes::new( Some(DomaintypeId::from_external_id("test")), - [( - "test".to_owned(), + [ Attribute { typ: "test".to_owned(), value: serde_json::Value::String("test".to_owned()).into(), }, - )] + ] .into_iter() .collect(), ), @@ -483,13 +482,12 @@ async fn contradict_attributes() { namespace: "testns".into(), attributes: Attributes::new( Some(DomaintypeId::from_external_id("test")), - [( - "test".to_owned(), + [ Attribute { typ: "test".to_owned(), value: serde_json::Value::String("test".to_owned()).into(), }, - )] + ] .into_iter() .collect(), ), @@ -533,13 +531,10 @@ async fn contradict_attributes() { namespace: "testns".into(), attributes: Attributes::new( Some(DomaintypeId::from_external_id("test")), - [( - "test".to_owned(), - Attribute { - typ: "test".to_owned(), - value: serde_json::Value::String("test2".to_owned()).into(), - }, - )] + [Attribute { + typ: "test".to_owned(), + value: serde_json::Value::String("test2".to_owned()).into(), + }] .into_iter() .collect(), ), @@ -563,13 +558,12 @@ async fn contradict_start_time() { namespace: "testns".into(), attributes: Attributes::new( Some(DomaintypeId::from_external_id("test")), - [( - "test".to_owned(), + [ Attribute { typ: "test".to_owned(), value: serde_json::Value::String("test".to_owned()).into(), }, - )] + ] .into_iter() .collect(), ), @@ -689,13 +683,12 @@ async fn contradict_end_time() { namespace: "testns".into(), attributes: Attributes::new( Some(DomaintypeId::from_external_id("test")), - [( - "test".to_owned(), + [ Attribute { typ: "test".to_owned(), value: serde_json::Value::String("test".to_owned()).into(), }, - )] + ] .into_iter() .collect(), ), @@ -815,13 +808,12 @@ async fn end_activity() { namespace: "testns".into(), attributes: Attributes::new( Some(DomaintypeId::from_external_id("test")), - [( - "test".to_owned(), + [ Attribute { typ: "test".to_owned(), value: serde_json::Value::String("test".to_owned()).into(), }, - )] + ] .into_iter() .collect(), ), @@ -973,13 +965,12 @@ async fn activity_use() { namespace: "testns".into(), attributes: Attributes::new( Some(DomaintypeId::from_external_id("test")), - [( - "test".to_owned(), + [ Attribute { typ: "test".to_owned(), value: serde_json::Value::String("test".to_owned()).into(), }, - )] + ] .into_iter() .collect(), ), @@ -1032,13 +1023,12 @@ async fn activity_use() { namespace: "testns".into(), attributes: Attributes::new( Some(DomaintypeId::from_external_id("test")), - [( - "test".to_owned(), + [ Attribute { typ: "test".to_owned(), value: serde_json::Value::String("test".to_owned()).into(), }, - )] + ] .into_iter() .collect(), ), @@ -1175,13 +1165,12 @@ async fn activity_generate() { namespace: "testns".into(), attributes: Attributes::new( Some(DomaintypeId::from_external_id("test")), - [( - "test".to_owned(), + [ Attribute { typ: "test".to_owned(), value: serde_json::Value::String("test".to_owned()).into(), }, - )] + ] .into_iter() .collect(), ), diff --git a/crates/chronicle/Cargo.toml b/crates/chronicle/Cargo.toml index 6a82594a..fe3c1bb6 100644 --- a/crates/chronicle/Cargo.toml +++ b/crates/chronicle/Cargo.toml @@ -9,6 +9,7 @@ version = "0.7.5" [dependencies] Inflector = { workspace = true } async-graphql = { workspace = true } +async-signals = { workspace = true } async-trait = { workspace = true } cfg-if = { workspace = true } chronicle-signing = { workspace = true } @@ -27,6 +28,7 @@ iri-string = { version = "^0.7", default-features = false, features = [ ] } is-terminal = { workspace = true } jsonschema = { workspace = true } +libc = { version = "^0.2" } opa = { workspace = true } opentelemetry = { workspace = true } percent-encoding = { workspace = true } diff --git a/crates/chronicle/src/bootstrap/mod.rs b/crates/chronicle/src/bootstrap/mod.rs index 63b4ee36..132777a4 100644 --- a/crates/chronicle/src/bootstrap/mod.rs +++ b/crates/chronicle/src/bootstrap/mod.rs @@ -16,7 +16,7 @@ use common::{ }; #[cfg(feature = "devmode")] use embedded_substrate::EmbeddedSubstrate; -use futures::{future::join, Future, FutureExt}; +use futures::{future::join, Future, FutureExt, StreamExt}; #[cfg(not(feature = "devmode"))] use protocol_substrate_chronicle::ChronicleSubstrateClient; @@ -151,7 +151,7 @@ pub async fn arrow_api_server( match addresses { Some(addresses) => { - chronicle_arrow::run_flight_service(domain, pool, api, addresses, record_batch_size) + chronicle_arrow::run_flight_service(domain, pool, api, &addresses, record_batch_size) .await .map_err(|e| ApiError::ArrowService(e.into())) .map(|_| Some(futures::future::ready(Ok(())))) @@ -575,6 +575,16 @@ where serve_data, ); + tokio::task::spawn(async move { + use async_signals::Signals; + + let mut signals = Signals::new(vec![libc::SIGHUP, libc::SIGINT]).unwrap(); + + signals.next().await; + chronicle_arrow::trigger_shutdown(); + api::chronicle_graphql::trigger_shutdown(); + }); + let (gql_result, arrow_result) = tokio::join!(gql, arrow); if let Err(e) = gql_result { diff --git a/crates/common/src/prov/model/mod.rs b/crates/common/src/prov/model/mod.rs index 5361bc38..d25dfdd3 100644 --- a/crates/common/src/prov/model/mod.rs +++ b/crates/common/src/prov/model/mod.rs @@ -679,6 +679,7 @@ impl ProvModel { .sum(), } } + /// Merge the supplied ProvModel into this one pub fn combine(&mut self, other: &ProvModel) { self.namespaces.extend(other.namespaces.clone()); @@ -1006,22 +1007,20 @@ impl ProvModel { activity.and_then(|activity| activity.started), activity.and_then(|activity| activity.ended), ) { - (Some(TimeWrapper(started)), _) if started != time.0 => { + (Some(TimeWrapper(started)), _) if started != time.0 => return Err(Contradiction::start_date_alteration( id.into(), namespace, started, time.0, - )) - }, - (_, Some(TimeWrapper(ended))) if ended < time.0 => { + )), + (_, Some(TimeWrapper(ended))) if ended < time.0 => return Err(Contradiction::invalid_range( id.into(), namespace, time.0, ended, - )) - }, + )), _ => {}, }; @@ -1041,22 +1040,20 @@ impl ProvModel { activity.and_then(|activity| activity.started), activity.and_then(|activity| activity.ended), ) { - (_, Some(TimeWrapper(ended))) if ended != time.0 => { + (_, Some(TimeWrapper(ended))) if ended != time.0 => return Err(Contradiction::end_date_alteration( id.into(), namespace, ended, time.0, - )) - }, - (Some(TimeWrapper(started)), _) if started > time.0 => { + )), + (Some(TimeWrapper(started)), _) if started > time.0 => return Err(Contradiction::invalid_range( id.into(), namespace, started, time.0, - )) - }, + )), _ => {}, }; diff --git a/crates/common/src/prov/operations.rs b/crates/common/src/prov/operations.rs index 3569ce97..edcf3db2 100644 --- a/crates/common/src/prov/operations.rs +++ b/crates/common/src/prov/operations.rs @@ -763,7 +763,8 @@ impl ChronicleOperation { } } - // Chronicle is open world, so the use of an id implies that it exists. Match an operation and return the implied existential operations. + // Chronicle is open world, so the use of an id implies that it exists. Match an operation and + // return the implied existential operations. pub fn implied_by(&self) -> Vec { match self { ChronicleOperation::AgentActsOnBehalfOf(o) => vec![