Skip to content

Commit

Permalink
Additional relation support:
Browse files Browse the repository at this point in the history
* derivation
* agent attribution

Signed-off-by: Ryan <[email protected]>
  • Loading branch information
ryan-s-roberts committed Apr 27, 2024
1 parent 2a6ba18 commit 5b395f5
Show file tree
Hide file tree
Showing 35 changed files with 1,849 additions and 1,255 deletions.
368 changes: 189 additions & 179 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions crates/api/src/chronicle_graphql/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ use common::prov::Role;
use diesel::prelude::*;
use std::collections::HashMap;

use crate::chronicle_graphql::DatabaseContext;

pub async fn namespace<'a>(
namespaceid: i32,
ctx: &Context<'a>,
) -> async_graphql::Result<Namespace> {
use chronicle_persistence::schema::namespace::{self, dsl};
let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand All @@ -34,7 +36,7 @@ pub async fn was_associated_with<'a>(
role: String,
}

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;
let mut connection = store.connection()?;

let delegation_entries = delegation::table
Expand Down Expand Up @@ -94,7 +96,7 @@ pub async fn was_associated_with<'a>(
pub async fn used<'a>(id: i32, ctx: &Context<'a>) -> async_graphql::Result<Vec<Entity>> {
use chronicle_persistence::schema::usage::{self, dsl};

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand All @@ -114,7 +116,7 @@ pub async fn was_informed_by<'a>(
) -> async_graphql::Result<Vec<Activity>> {
use chronicle_persistence::schema::wasinformedby::{self, dsl};

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand All @@ -133,7 +135,7 @@ pub async fn was_informed_by<'a>(
pub async fn generated<'a>(id: i32, ctx: &Context<'a>) -> async_graphql::Result<Vec<Entity>> {
use chronicle_persistence::schema::generation::{self, dsl};

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand All @@ -153,7 +155,7 @@ pub async fn load_attribute<'a>(
) -> async_graphql::Result<Option<serde_json::Value>> {
use chronicle_persistence::schema::activity_attribute;

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand Down
10 changes: 6 additions & 4 deletions crates/api/src/chronicle_graphql/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use chronicle_persistence::{
use common::prov::Role;
use diesel::prelude::*;

use crate::chronicle_graphql::DatabaseContext;

pub async fn namespace<'a>(
namespace_id: i32,
ctx: &Context<'a>,
) -> async_graphql::Result<Namespace> {
use chronicle_persistence::schema::namespace::{self, dsl};
let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand All @@ -29,7 +31,7 @@ pub async fn acted_on_behalf_of<'a>(
delegation::{self, dsl},
};

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand All @@ -55,7 +57,7 @@ pub async fn attribution<'a>(
entity as entity_dsl,
};

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand All @@ -77,7 +79,7 @@ pub async fn load_attribute<'a>(
) -> async_graphql::Result<Option<serde_json::Value>> {
use chronicle_persistence::schema::agent_attribute;

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand Down
16 changes: 10 additions & 6 deletions crates/api/src/chronicle_graphql/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ pub enum Error {
UnexpectedResponse { server: String, status: StatusCode },
}

#[derive(Clone)]
pub struct TokenChecker {
client: reqwest::Client,
verifier: Option<RemoteJwksVerifier>,
verifier: Option<Arc<RemoteJwksVerifier>>,
jwks_uri: Option<JwksUri>,
userinfo_uri: Option<UserInfoUri>,
userinfo_cache: Arc<Mutex<TimedCache<String, Map<String, Value>>>>,
Expand All @@ -60,11 +61,14 @@ impl TokenChecker {
Self {
client: reqwest::Client::new(),
verifier: jwks_uri.map(|uri| {
RemoteJwksVerifier::new(
uri.full_uri(),
None,
Duration::from_secs(cache_expiry_seconds.into()),
)
{
RemoteJwksVerifier::new(
uri.full_uri(),
None,
Duration::from_secs(cache_expiry_seconds.into()),
)
}
.into()
}),
jwks_uri: jwks_uri.cloned(),
userinfo_uri: userinfo_uri.cloned(),
Expand Down
14 changes: 8 additions & 6 deletions crates/api/src/chronicle_graphql/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use chronicle_persistence::{
use common::prov::{operations::DerivationType, Role};
use diesel::prelude::*;

use crate::chronicle_graphql::DatabaseContext;

async fn typed_derivation<'a>(
id: i32,
ctx: &Context<'a>,
Expand All @@ -16,7 +18,7 @@ async fn typed_derivation<'a>(
entity as entitydsl,
};

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand All @@ -35,7 +37,7 @@ pub async fn namespace<'a>(
) -> async_graphql::Result<Namespace> {
use chronicle_persistence::schema::namespace::{self, dsl};

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand All @@ -52,7 +54,7 @@ pub async fn was_attributed_to<'a>(
) -> async_graphql::Result<Vec<(Agent, Option<Role>)>> {
use chronicle_persistence::schema::{agent, attribution};

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;
let mut connection = store.connection()?;

let res = attribution::table
Expand All @@ -77,7 +79,7 @@ pub async fn was_generated_by<'a>(
) -> async_graphql::Result<Vec<Activity>> {
use chronicle_persistence::schema::generation::{self, dsl};

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand All @@ -99,7 +101,7 @@ pub async fn was_derived_from<'a>(
entity as entitydsl,
};

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand Down Expand Up @@ -133,7 +135,7 @@ pub async fn load_attribute<'a>(
) -> async_graphql::Result<Option<serde_json::Value>> {
use chronicle_persistence::schema::entity_attribute;

let store = ctx.data_unchecked::<Store>();
let store = ctx.data::<DatabaseContext>()?;

let mut connection = store.connection()?;

Expand Down
105 changes: 74 additions & 31 deletions crates/api/src/chronicle_graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use async_graphql_poem::{
GraphQLWebSocket,
};

use crate::Store;
use common::{
identity::{AuthId, IdentityError, JwtClaims, OpaData, SignedIdentity},
ledger::{SubmissionError, SubmissionStage},
Expand Down Expand Up @@ -56,7 +57,7 @@ use crate::{ApiDispatch, ApiError, StoreError};
#[macro_use]
pub mod activity;
pub mod agent;
mod authorization;
pub mod authorization;
mod cursor_project;
pub mod entity;
pub mod mutation;
Expand Down Expand Up @@ -192,19 +193,6 @@ impl ErrorExtensions for GraphQlError {
}
}

#[derive(Derivative)]
#[derivative(Debug)]
pub struct Store {
#[derivative(Debug = "ignore")]
pub pool: Pool<ConnectionManager<PgConnection>>,
}

impl Store {
pub fn new(pool: Pool<ConnectionManager<PgConnection>>) -> Self {
Store { pool }
}
}

pub struct Commit {
pub tx_id: String,
}
Expand Down Expand Up @@ -453,6 +441,18 @@ impl SecurityConf {
) -> Self {
Self { jwks_uri, userinfo_uri, id_claims, jwt_must_claim, allow_anonymous, opa }
}

pub fn as_endpoint_conf(&self, cache_expiry_seconds: u32) -> EndpointSecurityConfiguration {
EndpointSecurityConfiguration::new(
TokenChecker::new(
self.jwks_uri.as_ref(),
self.userinfo_uri.as_ref(),
cache_expiry_seconds,
),
self.jwt_must_claim.clone(),
self.allow_anonymous,
)
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -576,14 +576,15 @@ async fn execute_opa_check(
}
}

struct EndpointSecurityConfiguration {
#[derive(Clone)]
pub struct EndpointSecurityConfiguration {
checker: TokenChecker,
must_claim: HashMap<String, String>,
allow_anonymous: bool,
pub must_claim: HashMap<String, String>,
pub allow_anonymous: bool,
}

impl EndpointSecurityConfiguration {
fn new(
pub fn new(
checker: TokenChecker,
must_claim: HashMap<String, String>,
allow_anonymous: bool,
Expand Down Expand Up @@ -984,6 +985,51 @@ async fn await_shutdown() {
let _permit = SHUTDOWN_SIGNAL.acquire().await.unwrap();
}


#[derive(Clone)]
struct DatabaseContext {
pool: Pool<ConnectionManager<PgConnection>>,
}

impl DatabaseContext {
fn new(pool: &Pool<ConnectionManager<PgConnection>>) -> Result<Self, StoreError> {
Ok(DatabaseContext { pool: pool.clone() })
}

fn connection(&self) -> Result<PooledConnection<ConnectionManager<PgConnection>>, r2d2::Error> {
self.pool.get()
}
}

pub fn construct_schema<Query, Mutation, Subscription>(
query: Query,
mutation: Mutation,
subscription: Subscription,
claim_parser: Option<AuthFromJwt>,
pool: &Pool<ConnectionManager<PgConnection>>,
api: &ApiDispatch,
opa: ExecutorContext,
) -> Result<Schema<Query, Mutation, Subscription>, StoreError>
where
Query: ObjectType + Copy + 'static,
Mutation: ObjectType + Copy + 'static,
Subscription: SubscriptionType + 'static,
{
let mut schema = Schema::build(query, mutation, subscription)
.extension(OpaCheck { claim_parser: claim_parser.clone() });

if let Some(claim_parser) = &claim_parser {
schema = schema.extension(claim_parser.clone());
}

Ok(schema
.data(api.clone())
.data(opa.clone())
.data(AuthId::anonymous())
.data(DatabaseContext::new(pool)?)
.finish())
}

#[async_trait::async_trait]
impl<Query, Mutation> ChronicleApiServer for ChronicleGraphQl<Query, Mutation>
where
Expand All @@ -1004,19 +1050,16 @@ where
let claim_parser = sec
.id_claims
.map(|id_claims| AuthFromJwt { id_claims, allow_anonymous: sec.allow_anonymous });
let mut schema = Schema::build(self.query, self.mutation, Subscription)
//TODO: update
// .extension(OpenTelemetry::new(opentelemetry::global::tracer("chronicle-api-gql")))
.extension(OpaCheck { claim_parser: claim_parser.clone() });
if let Some(claim_parser) = &claim_parser {
schema = schema.extension(claim_parser.clone());
}
let schema = schema
.data(Store::new(pool.clone()))
.data(api)
.data(sec.opa.clone())
.data(AuthId::anonymous())
.finish();

let schema = construct_schema(
self.query,
self.mutation,
Subscription,
claim_parser.clone(),
&pool,
&api,
sec.opa.clone(),
)?;

let iri_endpoint = |secconf| IriEndpoint {
secconf,
Expand Down
Loading

0 comments on commit 5b395f5

Please sign in to comment.