Skip to content

Commit

Permalink
* Add operationhash signed extrinsic
Browse files Browse the repository at this point in the history
* Remove ImOnline, as it's rather more ImNotOnline

Signed-off-by: Ryan <[email protected]>
  • Loading branch information
ryan-s-roberts committed Jun 24, 2024
1 parent aae68da commit 716b1c1
Show file tree
Hide file tree
Showing 43 changed files with 595 additions and 2,228 deletions.
532 changes: 149 additions & 383 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 1 addition & 5 deletions charts/chronicle/templates/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ spec:
{{ required "If 'auth.required' you need to provide at least 'auth.jwks.url' or 'auth.userinfo.url', or 'devIdProvider.enabled' must be 'true'!" .Values.auth.jwks.url }}
{{ end }}
{{ end }}
sleep 10
chronicle \
--chronicle-key-from-path /vault/secrets \
--batcher-key-from-path /vault/secrets \
Expand All @@ -95,10 +95,6 @@ spec:
env: {{ include "lib.safeToYaml" .Values.env | nindent 12 }}
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "grpc://signoz-otel-collector.observability.svc.cluster.local:4317"
- name: OTEL_TRACES_EXPORTER
value: otlp
- name: OTEL_METRICS_EXPORTER
value: otlp
- name: POD_NAME
valueFrom:
fieldRef:
Expand Down
7 changes: 4 additions & 3 deletions crates/api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
+ Sync
+ 'static,
{
#[instrument(skip(ledger))]
#[instrument(skip(ledger), name = "api_new")]
pub async fn new(
pool: Pool<ConnectionManager<PgConnection>>,
ledger: LEDGER,
Expand Down Expand Up @@ -282,7 +282,7 @@ where
trace!(delta = %serde_json::to_string_pretty(&diff.to_json().compact().await.unwrap()).unwrap());

api.sync( diff.clone().into(), &block_id,tx )
.instrument(info_span!("Incoming confirmation", offset = ?block_id, tx = %tx))
.instrument(info_span!("incoming_confirmation", offset = ?block_id, tx = %tx))
.await
.map_err(|e| {
error!(?e, "Api sync to confirmed commit");
Expand Down Expand Up @@ -1280,13 +1280,14 @@ where
.await?
}

#[instrument(level = "trace", skip(self), ret(Debug))]
#[instrument(skip(self, prov), ret(Debug))]
async fn sync(
&self,
prov: Box<ProvModel>,
block_id: &BlockId,
tx_id: ChronicleTransactionId,
) -> Result<ApiResponse, ApiError> {
trace!(prov = ?prov);
let api = self.clone();
let block_id = *block_id;
tokio::task::spawn_blocking(move || {
Expand Down
6 changes: 3 additions & 3 deletions crates/api/src/chronicle_graphql/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct TokenChecker {
}

impl TokenChecker {
#[instrument(level = "debug")]
#[instrument(name = "api_token_checker_new")]
pub fn new(
jwks_uri: Option<&JwksUri>,
userinfo_uri: Option<&UserInfoUri>,
Expand Down Expand Up @@ -99,7 +99,7 @@ impl TokenChecker {
Ok(())
}

#[instrument(level = "trace", skip_all, err)]
#[instrument(name = "api_token_checker_attempt_jwt", skip_all, err)]
async fn attempt_jwt(&self, token: &str) -> Result<Map<String, Value>, Error> {
use base64::engine::general_purpose::{GeneralPurpose, URL_SAFE_NO_PAD};
const BASE64_ENGINE: GeneralPurpose = URL_SAFE_NO_PAD;
Expand Down Expand Up @@ -128,7 +128,7 @@ impl TokenChecker {
}
}

#[instrument(level = "debug", skip_all, err)]
#[instrument(name = "api_token_checker_verify_token", skip_all, err)]
pub async fn verify_token(&self, token: &str) -> Result<Map<String, Value>, Error> {
let mut claims = Map::new();
let mut error = None;
Expand Down
8 changes: 4 additions & 4 deletions crates/api/src/chronicle_graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use common::{
},
};

use crate::{dispatch::ApiDispatch, error::ApiError, Store, StoreError};
use crate::{dispatch::ApiDispatch, error::ApiError, StoreError};

use self::authorization::TokenChecker;

Expand Down Expand Up @@ -501,7 +501,7 @@ fn check_required_claim(must_value: &str, actual_value: &serde_json::Value) -> b
}
}

#[instrument(level = "trace", ret(Debug))]
#[instrument(name = "api_check_required_claims", ret(Debug))]
fn check_required_claims(
must_claim: &HashMap<String, String>,
actual_claims: &serde_json::Map<String, serde_json::Value>,
Expand Down Expand Up @@ -611,7 +611,7 @@ where
M: ObjectType + 'static,
S: SubscriptionType + 'static,
{
#[instrument(level = "debug", skip_all, ret(Debug))]
#[instrument(name = "api_query_endpoint_respond", skip_all, ret(Debug))]
async fn respond(
&self,
req: poem::Request,
Expand Down Expand Up @@ -885,7 +885,7 @@ pub struct AuthFromJwt {
}

impl AuthFromJwt {
#[instrument(level = "debug", ret(Debug))]
#[instrument(name = "api_auth_from_jwt_identity", ret(Debug))]
fn identity(&self, claims: &JwtClaims) -> Result<AuthId, IdentityError> {
AuthId::from_jwt_claims(claims, &self.id_claims)
}
Expand Down
10 changes: 5 additions & 5 deletions crates/api/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct ApiDispatch {
}

impl ApiDispatch {
#[instrument]
#[instrument(name = "api_dispatch", skip(self, command, identity))]
pub async fn dispatch(
&self,
command: ApiCommand,
Expand All @@ -42,7 +42,7 @@ impl ApiDispatch {
reply.ok_or(ApiError::ApiShutdownRx {})?
}

#[instrument]
#[instrument(name = "api_handle_import_command", skip(self, identity, operations))]
pub async fn handle_import_command(
&self,
identity: AuthId,
Expand All @@ -51,7 +51,7 @@ impl ApiDispatch {
self.import_operations(identity, operations).await
}

#[instrument]
#[instrument(name = "api_import_operations", skip(self, identity, operations))]
async fn import_operations(
&self,
identity: AuthId,
Expand All @@ -61,7 +61,7 @@ impl ApiDispatch {
.await
}

#[instrument]
#[instrument(name = "api_handle_depth_charge", skip(self, namespace, uuid))]
pub async fn handle_depth_charge(
&self,
namespace: &str,
Expand All @@ -74,7 +74,7 @@ impl ApiDispatch {
.await
}

#[instrument]
#[instrument(name = "api_dispatch_depth_charge", skip(self, identity, namespace))]
async fn dispatch_depth_charge(
&self,
identity: AuthId,
Expand Down
6 changes: 3 additions & 3 deletions crates/chronicle-arrow/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub fn schema_for_entity(entity: &EntityDef) -> Schema {
for attribute in &entity.attributes {
if let Some(data_type) = field_for_domain_primitive(&attribute.primitive_type) {
builder.push(arrow_schema::Field::new(
&attribute.preserve_inflection(),
attribute.preserve_inflection(),
data_type,
true,
));
Expand Down Expand Up @@ -163,7 +163,7 @@ pub fn schema_for_activity(activity: &ActivityDef) -> Schema {

for attribute in &activity.attributes {
if let Some(typ) = field_for_domain_primitive(&attribute.primitive_type) {
builder.push(arrow_schema::Field::new(&attribute.preserve_inflection(), typ, true));
builder.push(arrow_schema::Field::new(attribute.preserve_inflection(), typ, true));
}
}

Expand Down Expand Up @@ -214,7 +214,7 @@ pub fn schema_for_agent(agent: &AgentDef) -> Schema {
builder.push(arrow_schema::Field::new("id", arrow_schema::DataType::Utf8, false));
for attribute in &agent.attributes {
if let Some(typ) = field_for_domain_primitive(&attribute.primitive_type) {
builder.push(arrow_schema::Field::new(&attribute.preserve_inflection(), typ, true));
builder.push(arrow_schema::Field::new(attribute.preserve_inflection(), typ, true));
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/chronicle-persistence/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration};
use std::{str::FromStr, time::Duration};

use chrono::{TimeZone, Utc};
use common::{
Expand Down Expand Up @@ -124,7 +124,7 @@ type WasInformedBys = Vec<String>;
type Associations = Vec<(String, String)>;

impl Store {
#[instrument(name = "Bind namespace", skip(self))]
#[instrument(skip(self))]
pub fn namespace_binding(&self, external_id: &str, uuid: Uuid) -> Result<(), StoreError> {
use schema::namespace::dsl;

Expand Down
41 changes: 31 additions & 10 deletions crates/chronicle-signing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,26 @@ impl WithSecret for ChronicleSigning {

let signing_result = secret.value.exposed_in_as_str(|secret| {
(
// Convert hex encoded seed to SigningKey
hex::decode(secret.trim_start_matches("0x"))
.map_err(|_| SecretError::DecodingFailure)
.and_then(|secret| {
SigningKey::from_bytes(&secret).map_err(|_| SecretError::InvalidPrivateKey)
})
.map(&f),
{
// Convert hex encoded seed to SigningKey
hex::decode(secret.trim_start_matches("0x").trim_end())
.map_err(|e| {
tracing::error!(
"error decoding secret: {} {} {} {:?}",
secret,
&secret[2..],
&secret[2..].len(),
e
);

SecretError::DecodingFailure
})
.and_then(|secret| {
SigningKey::from_bytes(&secret)
.map_err(|_| SecretError::InvalidPrivateKey)
})
.map(&f)
},
secret,
)
});
Expand All @@ -268,8 +281,16 @@ impl WithSecret for ChronicleSigning {
let signing_result = secret.value.exposed_in_as_str(|secret| {
(
// Convert hex encoded seed to SigningKey
hex::decode(secret.trim_start_matches("0x"))
.map_err(|_| SecretError::DecodingFailure)
hex::decode(secret.trim_start_matches("0x").trim_end())
.map_err(|e| {
tracing::error!(
"error decoding secret: {} {} {:?}",
secret,
&secret[2..],
e
);
SecretError::DecodingFailure
})
.and_then(|secret| {
SigningKey::from_bytes(&secret).map_err(|_| SecretError::InvalidPrivateKey)
})
Expand All @@ -293,7 +314,7 @@ impl WithSecret for ChronicleSigning {
let key = secret.value.exposed_in_as_str(|secret| {
(
// Convert hex encoded seed to SigningKey
hex::decode(secret.trim_start_matches("0x"))
hex::decode(secret.trim_start_matches("0x").trim_end())
.map_err(|_| SecretError::DecodingFailure)
.and_then(|decoded_secret| {
SigningKey::from_bytes(&decoded_secret)
Expand Down
19 changes: 5 additions & 14 deletions crates/chronicle-telemetry/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
use opentelemetry::{
global,
logs::LogError,
metrics::MetricsError,
trace::{TraceContextExt, TraceError, Tracer, TracerProvider},
Key, KeyValue,
};
use opentelemetry::{global, logs::LogError, metrics::MetricsError, trace::TraceError};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{ExportConfig, WithExportConfig};
use opentelemetry_sdk::{
logs::Config,
runtime,
trace::{self as sdktrace, Config, RandomIdGenerator},
Resource,
trace::{self as sdktrace, RandomIdGenerator},
};
use tracing::{span, Level};
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{fmt::format::FmtSpan, prelude::*, EnvFilter};

fn init_tracer_provider() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.with_trace_config(Config::default().with_id_generator(RandomIdGenerator::default()))
.install_batch(runtime::Tokio)
}

Expand Down Expand Up @@ -67,13 +58,13 @@ pub fn telemetry(console_logging: ConsoleLogging) {
match console_logging {
ConsoleLogging::Json => Some(Box::new(
tracing_subscriber::fmt::layer()
.with_span_events(FmtSpan::ACTIVE)
.with_span_events(FmtSpan::NONE)
.compact()
.json(),
)),
ConsoleLogging::Pretty => Some(Box::new(
tracing_subscriber::fmt::layer()
.with_span_events(FmtSpan::ACTIVE)
.with_span_events(FmtSpan::NONE)
.compact()
.pretty(),
)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Stubstrate {
impl LedgerReader for Stubstrate {
type Error = SubxtClientError;
type Event = ChronicleEvent;
type EventCodec = ChronicleEventCodec<PolkadotConfig>;
type EventCodec = ChronicleEventCodec;

async fn block_height(&self) -> Result<(Position, BlockId), Self::Error> {
unimplemented!();
Expand Down
24 changes: 10 additions & 14 deletions crates/chronicle/src/bootstrap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,25 @@ mod cli;
pub mod opa;

#[cfg(not(feature = "devmode"))]
fn validator_address(options: &ArgMatches) -> Result<Vec<SocketAddr>, CliError> {
fn validator_address(options: &ArgMatches) -> Result<Url, CliError> {
Ok(options
.value_of("validator")
.map(str::to_string)
.ok_or(CliError::MissingArgument { arg: "validator".to_owned() })
.and_then(|s| Url::parse(&s).map_err(CliError::from))
.map(|u| u.socket_addrs(|| Some(4004)))
.map_err(CliError::from)??)
.map_err(CliError::from)?)
}

#[allow(dead_code)]
#[cfg(not(feature = "devmode"))]
#[tracing::instrument(level = "debug", skip(options))]
async fn ledger(
options: &ArgMatches,
) -> Result<ChronicleSubstrateClient<protocol_substrate::PolkadotConfig>, CliError> {
async fn ledger(options: &ArgMatches) -> Result<ChronicleSubstrateClient, CliError> {
let url = options
.value_of("validator")
.map(str::to_string)
.ok_or_else(|| CliError::MissingArgument { arg: "validator".to_owned() })?;

let client =
ChronicleSubstrateClient::<protocol_substrate::PolkadotConfig>::connect(url).await?;
let client = ChronicleSubstrateClient::connect(url).await?;

Ok(client)
}
Expand Down Expand Up @@ -292,13 +288,13 @@ pub async fn api(
remote_opa: Option<PolicyAddress>,
liveness_check_interval: Option<u64>,
) -> Result<ApiDispatch, CliError> {
use protocol_substrate::PolkadotConfig;
use protocol_substrate::ChronicleConfig;

let embedded_tp = in_mem_ledger(options).await?;

Ok(Api::new(
pool.clone(),
embedded_tp.connect_chronicle::<PolkadotConfig>().await?,
embedded_tp.connect_chronicle::<ChronicleConfig>().await?,
UniqueUuid,
chronicle_signing(options).await?,
vec![],
Expand Down Expand Up @@ -416,9 +412,9 @@ async fn configure_opa(options: &ArgMatches) -> Result<ConfiguredOpa, CliError>
Ok(ConfiguredOpa::Url(opa))
} else {
let (opa, settings) = self::opa::opa_executor_from_substrate_state(
&ChronicleSubstrateClient::connect_socket_addr(validator_address(options)?[0]).await?,
&protocol_substrate_opa::OpaSubstrateClient::connect_socket_addr(
validator_address(options)?[0],
&ChronicleSubstrateClient::connect(validator_address(options)?).await?,
&protocol_substrate_opa::OpaSubstrateClient::connect(
validator_address(options)?,
)
.await?,
)
Expand Down Expand Up @@ -463,7 +459,7 @@ fn configure_depth_charge(matches: &ArgMatches) -> Option<u64> {
None
}

#[instrument(skip(gql, cli))]
#[instrument(skip(gql, cli, domain))]
async fn execute_subcommand<Query, Mutation>(
gql: ChronicleGraphQl<Query, Mutation>,
domain: &ChronicleDomainDef,
Expand Down
Loading

0 comments on commit 716b1c1

Please sign in to comment.