Skip to content

Commit

Permalink
fix: hack together perftest subcommand
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph Livesey <[email protected]>
  • Loading branch information
suchapalaver committed Jul 28, 2023
1 parent 87d58ad commit c8a57a3
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 6 deletions.
54 changes: 50 additions & 4 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use diesel_migrations::MigrationHarness;
use futures::{select, FutureExt, StreamExt};

use common::{
attributes::Attributes,
attributes::{Attribute, Attributes},
commands::*,
identity::{AuthId, IdentityError},
k256::ecdsa::SigningKey,
Expand All @@ -33,8 +33,8 @@ use common::{
},
to_json_ld::ToJson,
ActivityId, AgentId, ChronicleIri, ChronicleTransaction, ChronicleTransactionId,
Contradiction, EntityId, ExternalId, ExternalIdPart, NamespaceId, ProcessorError,
ProvModel, Role, SYSTEM_ID, SYSTEM_UUID,
Contradiction, DomaintypeId, EntityId, ExternalId, ExternalIdPart, NamespaceId,
ProcessorError, ProvModel, Role, SYSTEM_ID, SYSTEM_UUID,
},
signing::{DirectoryStoredKeys, SignerError},
};
Expand All @@ -55,7 +55,10 @@ use std::{
};
use thiserror::Error;
use tokio::{
sync::mpsc::{self, error::SendError, Sender},
sync::{
broadcast::error::RecvError,
mpsc::{self, error::SendError, Sender},
},
task::JoinError,
};

Expand All @@ -67,6 +70,9 @@ use uuid::Uuid;

#[derive(Error, Debug)]
pub enum ApiError {
#[error("Failure in commit notification stream: {0}")]
CommitNoticiationStream(#[from] RecvError),

#[error("Storage: {0:?}")]
Store(#[from] persistence::StoreError),

Expand Down Expand Up @@ -126,6 +132,9 @@ pub enum ApiError {

#[error("Sawtooth communication error: {0}")]
SawtoothCommunicationError(#[from] SawtoothCommunicationError),

#[error("Perftest unable to complete")]
PerftestError,
}

/// Ugly but we need this until ! is stable, see <https://github.com/rust-lang/rust/issues/64715>
Expand Down Expand Up @@ -252,6 +261,43 @@ impl ApiDispatch {
)
.await
}

#[instrument]
pub async fn handle_perftest(
&self,
id: AuthId,
namespace: NamespaceId,
) -> Result<ApiResponse, ApiError> {
self.dispatch_perftest(id, namespace).await
}

#[instrument]
async fn dispatch_perftest(
&self,
identity: AuthId,
namespace: NamespaceId,
) -> Result<ApiResponse, ApiError> {
self.dispatch(
ApiCommand::Activity(ActivityCommand::Create {
external_id: Uuid::new_v4().to_string().into(),
namespace: common::prov::SYSTEM_ID.into(),
attributes: Attributes {
typ: Some(DomaintypeId::from_external_id("perftest")),
attributes: [(
"perftest".to_owned(),
Attribute {
typ: "perftest".to_owned(),
value: serde_json::Value::String("perftest".to_owned()),
},
)]
.into_iter()
.collect(),
},
}),
identity,
)
.await
}
}

fn install_prometheus_metrics_exporter() {
Expand Down
12 changes: 12 additions & 0 deletions crates/chronicle/src/bootstrap/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub enum CliError {
#[error("Bad argument: {0}")]
ArgumentParsing(#[from] clap::Error),

#[error("Blocking thread pool: {0}")]
Join(#[from] tokio::task::JoinError),

#[error("Invalid IRI: {0}")]
InvalidIri(#[from] iref::Error),

Expand Down Expand Up @@ -1011,6 +1014,15 @@ impl SubCommand for CliModel {
.value_parser(StringValueParser::new())
.help("A path or url to data import file"),
)
)
.subcommand(
Command::new("perftest")
.arg(
Arg::new("ops")
.value_name("OPS")
.help("Number of operations")
.required(true)
)
);

for agent in self.agents.iter() {
Expand Down
76 changes: 74 additions & 2 deletions crates/chronicle/src/bootstrap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@ use common::{
import::{load_bytes_from_stdin, load_bytes_from_url},
ledger::SubmissionStage,
opa::ExecutorContext,
prov::{operations::ChronicleOperation, to_json_ld::ToJson, ExpandedJson, NamespaceId},
prov::{
operations::ChronicleOperation, to_json_ld::ToJson, ExpandedJson, NamespaceId, SYSTEM_ID,
SYSTEM_UUID,
},
signing::DirectoryStoredKeys,
};
use std::io::IsTerminal;
use std::{
io::IsTerminal,
time::{Duration, Instant},
};
use tracing::{debug, error, info, instrument, warn};
use user_error::UFE;
use uuid::Uuid;

use config::*;
use diesel::{
Expand Down Expand Up @@ -489,6 +496,67 @@ where
.await?;

Ok((response, ret_api))
} else if let Some(matches) = matches.subcommand_matches("perftest") {
let mut ops = matches.value_of("ops").unwrap().parse::<u64>().unwrap();

debug!("Performing {} operations", ops);

let perftest_ops_api = api.clone();

let mut ops_copy = ops;
let start_time = Instant::now();
tokio::task::spawn(async move {
while ops > 0 {
let identity = AuthId::chronicle();
let namespace = system_namespace();
let _res = perftest_ops_api.handle_perftest(identity, namespace).await;
ops -= 1;
}
info!("Perftest operations sent");
});

let perftest_notify_api = api.clone();

let (elapsed_time, failed_submissions) = tokio::task::spawn(async move {
let mut failed_submissions = 0;
while ops_copy > 0 {
let mut tx_notifications = perftest_notify_api.notify_commit.subscribe();
let stage = tx_notifications.recv().await?;

match stage {
SubmissionStage::Submitted(Ok(id)) => {
debug!("Perftest Transaction submitted: {}", id);
}
SubmissionStage::Submitted(Err(err)) => {
debug!(
"Perftest transaction rejected by Chronicle: {} {}",
err,
err.tx_id()
);
ops_copy -= 1;
failed_submissions += 1;
}
SubmissionStage::Committed(commit, _) => {
debug!("Perftest transaction committed: {}", commit.tx_id);
ops_copy -= 1;
debug!("{} operations remaining", ops_copy);
}
SubmissionStage::NotCommitted((id, contradiction, _)) => {
eprintln!("Perftest transaction rejected: {id} {contradiction}");
return Err(ApiError::PerftestError);
}
}
}
let end_time = Instant::now();
let elapsed_time = end_time - start_time;
Ok::<(Duration, u64), ApiError>((elapsed_time, failed_submissions))
})
.await??;
print!("Perftest complete");
println!();
println!("Perftest took {} seconds", elapsed_time.as_secs());
println!("{} operations failed", failed_submissions);
Ok((ApiResponse::Unit, ret_api))
} else if let Some(cmd) = cli.matches(&matches)? {
let identity = AuthId::chronicle();
Ok((api.dispatch(cmd, identity).await?, ret_api))
Expand All @@ -505,6 +573,10 @@ fn get_namespace(matches: &ArgMatches) -> NamespaceId {
NamespaceId::from_external_id(namespace_id, uuid)
}

fn system_namespace() -> NamespaceId {
NamespaceId::from_external_id(SYSTEM_ID, Uuid::try_from(SYSTEM_UUID).unwrap())
}

async fn config_and_exec<Query, Mutation>(
gql: ChronicleGraphQl<Query, Mutation>,
model: CliModel,
Expand Down

0 comments on commit c8a57a3

Please sign in to comment.