Skip to content

Commit

Permalink
Add graceful termination on SIGINT/SIGHUP
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan <[email protected]>
  • Loading branch information
ryan-s-roberts committed Apr 16, 2024
1 parent fb57d4e commit 748474e
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 157 deletions.
37 changes: 36 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ anyhow = { version = "^1", features = ["backtrace"] }
assert_fs = "1.0"
async-graphql = "^7"
async-graphql-poem = "^7"
async-signals = "^0.4"
async-stream = "^0.3.3"
async-trait = "^0.1.61"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
Expand Down
26 changes: 11 additions & 15 deletions crates/api/src/chronicle_graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,8 @@ impl IriEndpoint {
.body("failed to compact JSON response"))
},
},
Err(StoreError::Db(diesel::result::Error::NotFound))
| Err(StoreError::RecordNotFound) => {
Err(StoreError::Db(diesel::result::Error::NotFound)) |
Err(StoreError::RecordNotFound) => {
tracing::debug!("not found: {prov_type} {} in {ns}", id.external_id_part());
Ok(poem::Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -820,9 +820,8 @@ impl IriEndpoint {

match ChronicleIri::from_str(&ns_iri.iri) {
Ok(iri) => Ok(Ok((ns_iri.ns.into(), iri))),
Err(error) => {
Ok(Err(Response::builder().status(StatusCode::NOT_FOUND).body(error.to_string())))
},
Err(error) =>
Ok(Err(Response::builder().status(StatusCode::NOT_FOUND).body(error.to_string()))),
}
}

Expand All @@ -833,24 +832,21 @@ impl IriEndpoint {
claims: Option<&JwtClaims>,
) -> poem::Result<poem::Response> {
match self.parse_ns_iri_from_uri_path(req).await? {
Ok((ns, ChronicleIri::Activity(id))) => {
Ok((ns, ChronicleIri::Activity(id))) =>
self.response_for_query(claims, "activity", &id, &ns, |mut conn, id, ns| {
self.store.prov_model_for_activity_id(&mut conn, id, ns)
})
.await
},
Ok((ns, ChronicleIri::Agent(id))) => {
.await,
Ok((ns, ChronicleIri::Agent(id))) =>
self.response_for_query(claims, "agent", &id, &ns, |mut conn, id, ns| {
self.store.prov_model_for_agent_id(&mut conn, id, ns)
})
.await
},
Ok((ns, ChronicleIri::Entity(id))) => {
.await,
Ok((ns, ChronicleIri::Entity(id))) =>
self.response_for_query(claims, "entity", &id, &ns, |mut conn, id, ns| {
self.store.prov_model_for_entity_id(&mut conn, id, ns)
})
.await
},
.await,
Ok(_) => Ok(poem::Response::builder()
.status(StatusCode::NOT_FOUND)
.body("may query only: activity, agent, entity")),
Expand Down Expand Up @@ -989,7 +985,7 @@ lazy_static! {
static ref SHUTDOWN_SIGNAL: Arc<Semaphore> = Arc::new(Semaphore::new(0));
}

fn trigger_shutdown() {
pub fn trigger_shutdown() {
SHUTDOWN_SIGNAL.add_permits(1);
}

Expand Down
87 changes: 36 additions & 51 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,12 +511,11 @@ where
};

match stage {
SubmissionStage::Submitted(Ok(id)) => {
SubmissionStage::Submitted(Ok(id)) =>
if id == tx_id {
debug!("Depth charge operation submitted: {id}");
continue;
}
},
},
SubmissionStage::Submitted(Err(err)) => {
if err.tx_id() == &tx_id {
error!("Depth charge transaction rejected by Chronicle: {} {}",
Expand Down Expand Up @@ -620,6 +619,7 @@ where
connection: &mut PgConnection,
to_apply: &Vec<ChronicleOperation>,
) -> Result<Option<Vec<ChronicleOperation>>, ApiError> {
debug!(checking_for_effects = to_apply.len());
let mut model = ProvModel::default();
let mut transactions = Vec::<ChronicleOperation>::with_capacity(to_apply.len());
for op in to_apply {
Expand All @@ -630,30 +630,27 @@ where
model.namespace_context(&namespace);
model
},
ChronicleOperation::AgentExists(AgentExists { ref namespace, ref id }) => {
ChronicleOperation::AgentExists(AgentExists { ref namespace, ref id }) =>
self.store.apply_prov_model_for_agent_id(
connection,
model,
id,
namespace.external_id_part(),
)?
},
ChronicleOperation::ActivityExists(ActivityExists { ref namespace, ref id }) => {
)?,
ChronicleOperation::ActivityExists(ActivityExists { ref namespace, ref id }) =>
self.store.apply_prov_model_for_activity_id(
connection,
model,
id,
namespace.external_id_part(),
)?
},
ChronicleOperation::EntityExists(EntityExists { ref namespace, ref id }) => {
)?,
ChronicleOperation::EntityExists(EntityExists { ref namespace, ref id }) =>
self.store.apply_prov_model_for_entity_id(
connection,
model,
id,
namespace.external_id_part(),
)?
},
)?,
ChronicleOperation::ActivityUses(ActivityUses {
ref namespace,
ref id,
Expand All @@ -666,47 +663,42 @@ where
namespace.external_id_part(),
)?,
ChronicleOperation::SetAttributes(ref o) => match o {
SetAttributes::Activity { namespace, id, .. } => {
SetAttributes::Activity { namespace, id, .. } =>
self.store.apply_prov_model_for_activity_id(
connection,
model,
id,
namespace.external_id_part(),
)?
},
SetAttributes::Agent { namespace, id, .. } => {
)?,
SetAttributes::Agent { namespace, id, .. } =>
self.store.apply_prov_model_for_agent_id(
connection,
model,
id,
namespace.external_id_part(),
)?
},
SetAttributes::Entity { namespace, id, .. } => {
)?,
SetAttributes::Entity { namespace, id, .. } =>
self.store.apply_prov_model_for_entity_id(
connection,
model,
id,
namespace.external_id_part(),
)?
},
)?,
},
ChronicleOperation::StartActivity(StartActivity { namespace, id, .. }) => {
ChronicleOperation::StartActivity(StartActivity { namespace, id, .. }) =>
self.store.apply_prov_model_for_activity_id(
connection,
model,
id,
namespace.external_id_part(),
)?
},
ChronicleOperation::EndActivity(EndActivity { namespace, id, .. }) => {
)?,
ChronicleOperation::EndActivity(EndActivity { namespace, id, .. }) =>
self.store.apply_prov_model_for_activity_id(
connection,
model,
id,
namespace.external_id_part(),
)?
},
)?,
ChronicleOperation::WasInformedBy(WasInformedBy {
namespace,
activity,
Expand Down Expand Up @@ -870,6 +862,7 @@ where
if applying_new_namespace {
self.submit(id, identity, to_apply)
} else if let Some(to_apply) = self.check_for_effects(connection, &to_apply)? {
info!(sending_operations = to_apply.len());
self.submit(id, identity, to_apply)
} else {
info!("API call will not result in any data changes");
Expand Down Expand Up @@ -1243,21 +1236,16 @@ where
#[instrument(skip(self))]
async fn dispatch(&mut self, command: (ApiCommand, AuthId)) -> Result<ApiResponse, ApiError> {
match command {
(ApiCommand::DepthCharge(DepthChargeCommand { namespace }), identity) => {
self.depth_charge(namespace, identity).await
},
(ApiCommand::Import(ImportCommand { operations }), identity) => {
self.submit_import_operations(identity, operations).await
},
(ApiCommand::NameSpace(NamespaceCommand::Create { id }), identity) => {
self.create_namespace(&id, identity).await
},
(ApiCommand::Agent(AgentCommand::Create { id, namespace, attributes }), identity) => {
self.create_agent(id, namespace, attributes, identity).await
},
(ApiCommand::Agent(AgentCommand::UseInContext { id, namespace }), _identity) => {
self.use_agent_in_cli_context(id, namespace).await
},
(ApiCommand::DepthCharge(DepthChargeCommand { namespace }), identity) =>
self.depth_charge(namespace, identity).await,
(ApiCommand::Import(ImportCommand { operations }), identity) =>
self.submit_import_operations(identity, operations).await,
(ApiCommand::NameSpace(NamespaceCommand::Create { id }), identity) =>
self.create_namespace(&id, identity).await,
(ApiCommand::Agent(AgentCommand::Create { id, namespace, attributes }), identity) =>
self.create_agent(id, namespace, attributes, identity).await,
(ApiCommand::Agent(AgentCommand::UseInContext { id, namespace }), _identity) =>
self.use_agent_in_cli_context(id, namespace).await,
(
ApiCommand::Agent(AgentCommand::Delegate {
id,
Expand All @@ -1284,9 +1272,8 @@ where
ApiCommand::Activity(ActivityCommand::End { id, namespace, time, agent }),
identity,
) => self.end_activity(id, namespace, time, agent, identity).await,
(ApiCommand::Activity(ActivityCommand::Use { id, namespace, activity }), identity) => {
self.activity_use(id, namespace, activity, identity).await
},
(ApiCommand::Activity(ActivityCommand::Use { id, namespace, activity }), identity) =>
self.activity_use(id, namespace, activity, identity).await,
(
ApiCommand::Activity(ActivityCommand::WasInformedBy {
id,
Expand All @@ -1308,10 +1295,9 @@ where
ApiCommand::Entity(EntityCommand::Attribute { id, namespace, responsible, role }),
identity,
) => self.attribute(namespace, responsible, id, role, identity).await,
(ApiCommand::Entity(EntityCommand::Create { id, namespace, attributes }), identity) => {
(ApiCommand::Entity(EntityCommand::Create { id, namespace, attributes }), identity) =>
self.create_entity(EntityId::from_external_id(&id), namespace, attributes, identity)
.await
},
.await,
(
ApiCommand::Activity(ActivityCommand::Generate { id, namespace, activity }),
identity,
Expand All @@ -1325,10 +1311,9 @@ where
derivation,
}),
identity,
) => {
) =>
self.entity_derive(id, namespace, activity, used_entity, derivation, identity)
.await
},
.await,
(ApiCommand::Query(query), _identity) => self.query(query).await,
}
}
Expand Down
Loading

0 comments on commit 748474e

Please sign in to comment.