Skip to content

Commit

Permalink
Refactor arrow modules, update otel support
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan <[email protected]>
  • Loading branch information
ryan-s-roberts committed Apr 18, 2024
1 parent 748474e commit 2a6ba18
Show file tree
Hide file tree
Showing 24 changed files with 2,143 additions and 1,933 deletions.
439 changes: 268 additions & 171 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ mockito = "1.1"
newtype-derive-2018 = "0.2.1"
oauth2 = "4.4"
opa = { git = "https://github.com/chronicleworks/opa-rs", rev = "9fa2fbce" }
opentelemetry = { version = "^0.21" }
opentelemetry = { version = "0.22" }
owo-colors = "3.5.0"
parking_lot = "0.12.0"
percent-encoding = "2.1.0"
Expand Down Expand Up @@ -149,7 +149,7 @@ toml = "0.7.3"
tracing = "^0.1.35"
tracing-elastic-apm = "^3.2.3"
tracing-log = "^0.1.3"
tracing-opentelemetry = "^0.22"
tracing-opentelemetry = "0.23"
tracing-subscriber = { version = "^0.3.15", features = [
"default",
"registry",
Expand Down
6 changes: 0 additions & 6 deletions crates/api/src/chronicle_graphql/cursor_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,6 @@ use async_graphql::{
connection::{Edge, EmptyFields},
OutputType,
};
use diesel::{
prelude::*,
r2d2::{ConnectionManager, PooledConnection},
};

type Conn = PooledConnection<ConnectionManager<PgConnection>>;

pub fn project_to_nodes<T, I>(
rx: I,
Expand Down
80 changes: 36 additions & 44 deletions crates/api/src/chronicle_graphql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use async_graphql::{
extensions::OpenTelemetry,
http::{playground_source, GraphQLPlaygroundConfig, ALL_WEBSOCKET_PROTOCOLS},
scalar, Context, Enum, Error, ErrorExtensions, ObjectType, Schema, ServerError, SimpleObject,
Subscription, SubscriptionType,
Expand All @@ -24,7 +23,7 @@ use diesel::{
r2d2::{ConnectionManager, Pool, PooledConnection},
PgConnection, Queryable,
};
use futures::{Future, Stream};
use futures::Stream;
use lazy_static::lazy_static;
use poem::{
get, handler,
Expand Down Expand Up @@ -629,18 +628,16 @@ where
{
type Output = poem::Response;

fn call(&self, req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(req, |api_req| {
if let Some(claims) = checked_claims {
api_req.0.data(claims)
} else {
api_req.0
}
})
.await
}
async fn call(&self, req: poem::Request) -> poem::Result<Self::Output> {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(req, |api_req| {
if let Some(claims) = checked_claims {
api_req.0.data(claims)
} else {
api_req.0
}
})
.await
}
}

Expand Down Expand Up @@ -683,21 +680,19 @@ where
{
type Output = poem::Response;

fn call(&self, req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(
req,
if let Some(claims) = checked_claims {
let mut data = async_graphql::Data::default();
data.insert(claims);
data
} else {
async_graphql::Data::default()
},
)
.await
}
async fn call(&self, req: poem::Request) -> poem::Result<Self::Output> {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(
req,
if let Some(claims) = checked_claims {
let mut data = async_graphql::Data::default();
data.insert(claims);
data
} else {
async_graphql::Data::default()
},
)
.await
}
}

Expand Down Expand Up @@ -858,15 +853,13 @@ impl IriEndpoint {
impl Endpoint for IriEndpoint {
type Output = poem::Response;

fn call(&self, req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let checked_claims = if let Some(secconf) = &self.secconf {
check_claims(secconf, &req).await?
} else {
None
};
self.respond(req, checked_claims.as_ref()).await
}
async fn call(&self, req: poem::Request) -> poem::Result<Self::Output> {
let checked_claims = if let Some(secconf) = &self.secconf {
check_claims(secconf, &req).await?
} else {
None
};
self.respond(req, checked_claims.as_ref()).await
}
}

Expand All @@ -875,11 +868,9 @@ struct LdContextEndpoint;
impl Endpoint for LdContextEndpoint {
type Output = poem::Response;

fn call(&self, _req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let context: &serde_json::Value = &common::context::PROV;
Ok(IntoResponse::into_response(poem::web::Json(context)))
}
async fn call(&self, _req: poem::Request) -> poem::Result<Self::Output> {
let context: &serde_json::Value = &common::context::PROV;
Ok(IntoResponse::into_response(poem::web::Json(context)))
}
}

Expand Down Expand Up @@ -1014,7 +1005,8 @@ where
.id_claims
.map(|id_claims| AuthFromJwt { id_claims, allow_anonymous: sec.allow_anonymous });
let mut schema = Schema::build(self.query, self.mutation, Subscription)
.extension(OpenTelemetry::new(opentelemetry::global::tracer("chronicle-api-gql")))
//TODO: update
// .extension(OpenTelemetry::new(opentelemetry::global::tracer("chronicle-api-gql")))
.extension(OpaCheck { claim_parser: claim_parser.clone() });
if let Some(claim_parser) = &claim_parser {
schema = schema.extension(claim_parser.clone());
Expand Down
2 changes: 1 addition & 1 deletion crates/api/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::Arc};

use chrono::{DateTime, Utc};
use futures::{AsyncRead, Future};
use futures::AsyncRead;

use serde::{Deserialize, Serialize};

Expand Down
143 changes: 115 additions & 28 deletions crates/chronicle-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use arrow_flight::{
Ticket,
};

use arrow_flight::{IpcMessage, SchemaAsIpc};
use arrow_flight::{FlightEndpoint, IpcMessage, SchemaAsIpc};

use arrow_schema::ArrowError;

Expand All @@ -31,12 +31,13 @@ use futures::{
};

use meta::{DomainTypeMeta, Term};
use operations::create_flight_info_for_type;
use query::EntityAndReferences;
use query::{
activity_count_by_type, agent_count_by_type, entity_count_by_type, EntityAndReferences,
};
use r2d2::Pool;
use serde::Serialize;
use std::net::SocketAddr;
use tokio::sync::Semaphore;
use tokio::task::spawn_blocking;

use std::{sync::Arc, vec::Vec};
use tonic::{transport::Server, Request, Response, Status, Streaming};
Expand Down Expand Up @@ -132,6 +133,101 @@ pub enum ChronicleArrowError {
),
}

#[instrument(skip(pool, term, domaintype))]
pub async fn calculate_count_by_metadata_term(
pool: &Pool<ConnectionManager<PgConnection>>,
term: &Term,
domaintype: Option<String>,
) -> Result<i64, Status> {
let pool = pool.clone();
match term {
Term::Entity =>
spawn_blocking(move || {
entity_count_by_type(
&pool,
domaintype.map(|x| x.to_string()).iter().map(|s| s.as_str()).collect(),
)
})
.await,
Term::Agent =>
spawn_blocking(move || {
agent_count_by_type(
&pool,
domaintype.map(|x| x.to_string()).iter().map(|s| s.as_str()).collect(),
)
})
.await,
Term::Activity =>
spawn_blocking(move || {
activity_count_by_type(
&pool,
domaintype.map(|x| x.to_string()).iter().map(|s| s.as_str()).collect(),
)
})
.await,
_ => Ok(Ok(0)),
}
.map_err(|e| Status::from_error(e.into()))
.and_then(|res| res.map_err(|e| Status::from_error(e.into())))
}

pub async fn create_flight_info_for_type(
pool: Arc<Pool<ConnectionManager<PgConnection>>>,
domain_items: Vec<impl TypeName + Send + Sync + 'static>,
term: Term,
record_batch_size: usize,
) -> BoxStream<'static, Result<FlightInfo, Status>> {
stream::iter(domain_items.into_iter().map(|item| Ok::<_, tonic::Status>(item)))
.then(move |item| {
let pool = pool.clone();
async move {
let item = item?; // Handle the Result from the iterator
let descriptor_path = vec![term.to_string(), item.as_type_name()];
let metadata =
get_domain_type_meta_from_cache(&descriptor_path).ok_or_else(|| {
Status::from_error(Box::new(ChronicleArrowError::MissingSchemaError))
})?;

let count = calculate_count_by_metadata_term(
&pool,
&term,
Some(item.as_type_name().to_string()),
)
.await?;

let tickets = (0..count)
.step_by(record_batch_size as _)
.map(|start| {
let end = std::cmp::min(start as usize + record_batch_size, count as usize);

let ticket_metadata = ChronicleTicket::new(
term,
metadata.typ.as_ref().map(|x| x.as_domain_type_id()),
start as _,
(end - start as usize) as _,
);
Ticket::try_from(ticket_metadata)
.map_err(|e| Status::from_error(Box::new(ChronicleArrowError::from(e))))
})
.collect::<Result<Vec<_>, _>>()?;

let mut flight_info = FlightInfo::new();

for ticket in tickets {
flight_info =
flight_info.with_endpoint(FlightEndpoint::new().with_ticket(ticket));
}

Ok(flight_info
.with_descriptor(FlightDescriptor::new_path(descriptor_path))
.try_with_schema(&metadata.schema)
.map_err(|e| Status::from_error(Box::new(ChronicleArrowError::from(e))))?
.with_total_records(count))
}
})
.boxed()
}

#[derive(Clone)]
pub struct FlightServiceImpl {
domain: common::domain::ChronicleDomainDef,
Expand Down Expand Up @@ -336,12 +432,11 @@ impl FlightService for FlightServiceImpl {
)
.boxed()
},
_ => {
_ =>
return Err(Status::not_found(format!(
"Definition not found for term: {:?}, type_name: {}",
term, type_name
)))
},
))),
}
.await;

Expand Down Expand Up @@ -369,9 +464,8 @@ impl FlightService for FlightServiceImpl {
let ipc_message_result = SchemaAsIpc::new(&schema.schema, &options).try_into();
match ipc_message_result {
Ok(IpcMessage(schema)) => Ok(Response::new(SchemaResult { schema })),
Err(e) => {
Err(Status::internal(format!("Failed to convert schema to IPC message: {}", e)))
},
Err(e) =>
Err(Status::internal(format!("Failed to convert schema to IPC message: {}", e))),
}
}

Expand Down Expand Up @@ -477,12 +571,8 @@ impl FlightService for FlightServiceImpl {
Some(descriptor) => descriptor,
None => return Err(Status::invalid_argument("Flight data has no descriptor")),
},
Some(Err(e)) => {
return Err(Status::internal(format!(
"Failed to get first item from stream: {}",
e
)))
},
Some(Err(e)) =>
return Err(Status::internal(format!("Failed to get first item from stream: {}", e))),
None => {
return Err(Status::invalid_argument("Stream is empty"));
},
Expand Down Expand Up @@ -613,8 +703,8 @@ mod tests {
use crate::{
meta::{cache_domain_schemas, get_domain_type_meta_from_cache, DomainTypeMeta},
query::{
ActedOnBehalfOfRef, ActivityAndReferences, AgentAndReferences, AgentAttributionRef,
AssociationRef, DerivationRef, EntityAndReferences, EntityAttributionRef,
ActedOnBehalfOfRef, ActivityAndReferences, ActivityAssociationRef, AgentAndReferences,
AgentAttributionRef, DerivationRef, EntityAndReferences, EntityAttributionRef,
},
};

Expand Down Expand Up @@ -693,16 +783,13 @@ roles:
.iter()
.map(|(name, typ)| {
let value = match typ {
PrimitiveType::String => {
serde_json::Value::String(format!("{}-value", name))
},
PrimitiveType::Int => {
serde_json::Value::Number(serde_json::Number::from(42))
},
PrimitiveType::String =>
serde_json::Value::String(format!("{}-value", name)),
PrimitiveType::Int =>
serde_json::Value::Number(serde_json::Number::from(42)),
PrimitiveType::Bool => serde_json::Value::Bool(true),
PrimitiveType::JSON => {
serde_json::Value::String(format!("{{\"{}\": \"example\"}}", name))
},
PrimitiveType::JSON =>
serde_json::Value::String(format!("{{\"{}\": \"example\"}}", name)),
};
Attribute::new(name, value)
})
Expand Down Expand Up @@ -797,7 +884,7 @@ roles:
ended: Some(Utc.with_ymd_and_hms(2022, 1, 2, 0, 0, 0).unwrap()),
generated: vec![format!("entity-{}", i), format!("entity-{}", i + 1)],
was_informed_by: vec![format!("activity-{}", i), format!("activity-{}", i + 1)],
was_associated_with: vec![AssociationRef {
was_associated_with: vec![ActivityAssociationRef {
responsible_agent: format!("agent-{}", i),
responsible_role: Some("CERTIFIER".to_string()),
delegate_agent: Some(format!("agent-{}", i + 1)),
Expand Down
Loading

0 comments on commit 2a6ba18

Please sign in to comment.