Skip to content

Commit

Permalink
Refactor arrow modules, update otel support
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan <[email protected]>
  • Loading branch information
ryan-s-roberts committed Apr 18, 2024
1 parent 748474e commit 303e91a
Show file tree
Hide file tree
Showing 18 changed files with 935 additions and 740 deletions.
439 changes: 268 additions & 171 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ mockito = "1.1"
newtype-derive-2018 = "0.2.1"
oauth2 = "4.4"
opa = { git = "https://github.com/chronicleworks/opa-rs", rev = "9fa2fbce" }
opentelemetry = { version = "^0.21" }
opentelemetry = { version = "0.22" }
owo-colors = "3.5.0"
parking_lot = "0.12.0"
percent-encoding = "2.1.0"
Expand Down Expand Up @@ -149,7 +149,7 @@ toml = "0.7.3"
tracing = "^0.1.35"
tracing-elastic-apm = "^3.2.3"
tracing-log = "^0.1.3"
tracing-opentelemetry = "^0.22"
tracing-opentelemetry = "0.23"
tracing-subscriber = { version = "^0.3.15", features = [
"default",
"registry",
Expand Down
103 changes: 49 additions & 54 deletions crates/api/src/chronicle_graphql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use async_graphql::{
extensions::OpenTelemetry,
http::{playground_source, GraphQLPlaygroundConfig, ALL_WEBSOCKET_PROTOCOLS},
scalar, Context, Enum, Error, ErrorExtensions, ObjectType, Schema, ServerError, SimpleObject,
Subscription, SubscriptionType,
Expand All @@ -24,7 +23,7 @@ use diesel::{
r2d2::{ConnectionManager, Pool, PooledConnection},
PgConnection, Queryable,
};
use futures::{Future, Stream};
use futures::Stream;
use lazy_static::lazy_static;
use poem::{
get, handler,
Expand Down Expand Up @@ -629,18 +628,16 @@ where
{
type Output = poem::Response;

fn call(&self, req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(req, |api_req| {
if let Some(claims) = checked_claims {
api_req.0.data(claims)
} else {
api_req.0
}
})
.await
}
async fn call(&self, req: poem::Request) -> poem::Result<Self::Output> {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(req, |api_req| {
if let Some(claims) = checked_claims {
api_req.0.data(claims)
} else {
api_req.0
}
})
.await
}
}

Expand Down Expand Up @@ -683,21 +680,19 @@ where
{
type Output = poem::Response;

fn call(&self, req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(
req,
if let Some(claims) = checked_claims {
let mut data = async_graphql::Data::default();
data.insert(claims);
data
} else {
async_graphql::Data::default()
},
)
.await
}
async fn call(&self, req: poem::Request) -> poem::Result<Self::Output> {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(
req,
if let Some(claims) = checked_claims {
let mut data = async_graphql::Data::default();
data.insert(claims);
data
} else {
async_graphql::Data::default()
},
)
.await
}
}

Expand Down Expand Up @@ -755,8 +750,8 @@ impl IriEndpoint {
.body("failed to compact JSON response"))
},
},
Err(StoreError::Db(diesel::result::Error::NotFound)) |
Err(StoreError::RecordNotFound) => {
Err(StoreError::Db(diesel::result::Error::NotFound))
| Err(StoreError::RecordNotFound) => {
tracing::debug!("not found: {prov_type} {} in {ns}", id.external_id_part());
Ok(poem::Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -820,8 +815,9 @@ impl IriEndpoint {

match ChronicleIri::from_str(&ns_iri.iri) {
Ok(iri) => Ok(Ok((ns_iri.ns.into(), iri))),
Err(error) =>
Ok(Err(Response::builder().status(StatusCode::NOT_FOUND).body(error.to_string()))),
Err(error) => {
Ok(Err(Response::builder().status(StatusCode::NOT_FOUND).body(error.to_string())))
},
}
}

Expand All @@ -832,21 +828,24 @@ impl IriEndpoint {
claims: Option<&JwtClaims>,
) -> poem::Result<poem::Response> {
match self.parse_ns_iri_from_uri_path(req).await? {
Ok((ns, ChronicleIri::Activity(id))) =>
Ok((ns, ChronicleIri::Activity(id))) => {
self.response_for_query(claims, "activity", &id, &ns, |mut conn, id, ns| {
self.store.prov_model_for_activity_id(&mut conn, id, ns)
})
.await,
Ok((ns, ChronicleIri::Agent(id))) =>
.await
},
Ok((ns, ChronicleIri::Agent(id))) => {
self.response_for_query(claims, "agent", &id, &ns, |mut conn, id, ns| {
self.store.prov_model_for_agent_id(&mut conn, id, ns)
})
.await,
Ok((ns, ChronicleIri::Entity(id))) =>
.await
},
Ok((ns, ChronicleIri::Entity(id))) => {
self.response_for_query(claims, "entity", &id, &ns, |mut conn, id, ns| {
self.store.prov_model_for_entity_id(&mut conn, id, ns)
})
.await,
.await
},
Ok(_) => Ok(poem::Response::builder()
.status(StatusCode::NOT_FOUND)
.body("may query only: activity, agent, entity")),
Expand All @@ -858,15 +857,13 @@ impl IriEndpoint {
impl Endpoint for IriEndpoint {
type Output = poem::Response;

fn call(&self, req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let checked_claims = if let Some(secconf) = &self.secconf {
check_claims(secconf, &req).await?
} else {
None
};
self.respond(req, checked_claims.as_ref()).await
}
async fn call(&self, req: poem::Request) -> poem::Result<Self::Output> {
let checked_claims = if let Some(secconf) = &self.secconf {
check_claims(secconf, &req).await?
} else {
None
};
self.respond(req, checked_claims.as_ref()).await
}
}

Expand All @@ -875,11 +872,9 @@ struct LdContextEndpoint;
impl Endpoint for LdContextEndpoint {
type Output = poem::Response;

fn call(&self, _req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let context: &serde_json::Value = &common::context::PROV;
Ok(IntoResponse::into_response(poem::web::Json(context)))
}
async fn call(&self, _req: poem::Request) -> poem::Result<Self::Output> {
let context: &serde_json::Value = &common::context::PROV;
Ok(IntoResponse::into_response(poem::web::Json(context)))
}
}

Expand Down Expand Up @@ -1014,7 +1009,7 @@ where
.id_claims
.map(|id_claims| AuthFromJwt { id_claims, allow_anonymous: sec.allow_anonymous });
let mut schema = Schema::build(self.query, self.mutation, Subscription)
.extension(OpenTelemetry::new(opentelemetry::global::tracer("chronicle-api-gql")))
//TODO: update .extension(OpenTelemetry::new(opentelemetry::global::tracer("chronicle-api-gql")))
.extension(OpaCheck { claim_parser: claim_parser.clone() });
if let Some(claim_parser) = &claim_parser {
schema = schema.extension(claim_parser.clone());
Expand Down
2 changes: 1 addition & 1 deletion crates/api/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::Arc};

use chrono::{DateTime, Utc};
use futures::{AsyncRead, Future};
use futures::{AsyncRead};

use serde::{Deserialize, Serialize};

Expand Down
107 changes: 103 additions & 4 deletions crates/chronicle-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use arrow_flight::{
Ticket,
};

use arrow_flight::{IpcMessage, SchemaAsIpc};
use arrow_flight::{FlightEndpoint, IpcMessage, SchemaAsIpc};

use arrow_schema::ArrowError;

Expand All @@ -31,12 +31,13 @@ use futures::{
};

use meta::{DomainTypeMeta, Term};
use operations::create_flight_info_for_type;
use query::EntityAndReferences;
use query::{
activity_count_by_type, agent_count_by_type, entity_count_by_type, EntityAndReferences,
};
use r2d2::Pool;
use serde::Serialize;
use std::net::SocketAddr;
use tokio::sync::Semaphore;
use tokio::task::spawn_blocking;

use std::{sync::Arc, vec::Vec};
use tonic::{transport::Server, Request, Response, Status, Streaming};
Expand Down Expand Up @@ -132,6 +133,104 @@ pub enum ChronicleArrowError {
),
}

#[instrument(skip(pool, term, domaintype))]
pub async fn calculate_count_by_metadata_term(
pool: &Pool<ConnectionManager<PgConnection>>,
term: &Term,
domaintype: Option<String>,
) -> Result<i64, Status> {
let pool = pool.clone();
match term {
Term::Entity => {
spawn_blocking(move || {
entity_count_by_type(
&pool,
domaintype.map(|x| x.to_string()).iter().map(|s| s.as_str()).collect(),
)
})
.await
},
Term::Agent => {
spawn_blocking(move || {
agent_count_by_type(
&pool,
domaintype.map(|x| x.to_string()).iter().map(|s| s.as_str()).collect(),
)
})
.await
},
Term::Activity => {
spawn_blocking(move || {
activity_count_by_type(
&pool,
domaintype.map(|x| x.to_string()).iter().map(|s| s.as_str()).collect(),
)
})
.await
},
_ => Ok(Ok(0)),
}
.map_err(|e| Status::from_error(e.into()))
.and_then(|res| res.map_err(|e| Status::from_error(e.into())))
}

pub async fn create_flight_info_for_type(
pool: Arc<Pool<ConnectionManager<PgConnection>>>,
domain_items: Vec<impl TypeName + Send + Sync + 'static>,
term: Term,
record_batch_size: usize,
) -> BoxStream<'static, Result<FlightInfo, Status>> {
stream::iter(domain_items.into_iter().map(|item| Ok::<_, tonic::Status>(item)))
.then(move |item| {
let pool = pool.clone();
async move {
let item = item?; // Handle the Result from the iterator
let descriptor_path = vec![term.to_string(), item.as_type_name()];
let metadata =
get_domain_type_meta_from_cache(&descriptor_path).ok_or_else(|| {
Status::from_error(Box::new(ChronicleArrowError::MissingSchemaError))
})?;

let count = calculate_count_by_metadata_term(
&pool,
&term,
Some(item.as_type_name().to_string()),
)
.await?;

let tickets = (0..count)
.step_by(record_batch_size as _)
.map(|start| {
let end = std::cmp::min(start as usize + record_batch_size, count as usize);

let ticket_metadata = ChronicleTicket::new(
term,
metadata.typ.as_ref().map(|x| x.as_domain_type_id()),
start as _,
(end - start as usize) as _,
);
Ticket::try_from(ticket_metadata)
.map_err(|e| Status::from_error(Box::new(ChronicleArrowError::from(e))))
})
.collect::<Result<Vec<_>, _>>()?;

let mut flight_info = FlightInfo::new();

for ticket in tickets {
flight_info =
flight_info.with_endpoint(FlightEndpoint::new().with_ticket(ticket));
}

Ok(flight_info
.with_descriptor(FlightDescriptor::new_path(descriptor_path))
.try_with_schema(&metadata.schema)
.map_err(|e| Status::from_error(Box::new(ChronicleArrowError::from(e))))?
.with_total_records(count))
}
})
.boxed()
}

#[derive(Clone)]
pub struct FlightServiceImpl {
domain: common::domain::ChronicleDomainDef,
Expand Down
Loading

0 comments on commit 303e91a

Please sign in to comment.