Skip to content

Commit

Permalink
Chronicle data flight / schema describe operations
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan <[email protected]>
  • Loading branch information
ryan-s-roberts committed Jul 25, 2024
1 parent 6505c83 commit 8806ddb
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 14 deletions.
18 changes: 15 additions & 3 deletions crates/chronicle-data/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use clap::{Parser, Subcommand};
#[derive(Parser)]
#[clap(name = "chronicle-data", about = "CLI for Chronicle Data operations")]
pub struct Cli {
#[arg(long, help = "The Chronicle server URL", global = true, required = true)]
#[arg(long, help = "The Chronicle server URL", global = true, default_value = "http://localhost:9983")]
pub chronicle: String,

#[arg(long, help = "Authentication token", global = true, required = false)]
Expand All @@ -23,13 +23,25 @@ pub enum Commands {
#[clap(about = "Handles incoming data operations")]
In,
#[clap(about = "Handles outgoing data operations")]
Out,
Out {
#[arg(long, help = "The path for the data operation", required = true)]
path: String,

#[arg(long, help = "Optional start date for the data operation", required = false)]
from: Option<String>,

#[arg(long, help = "Optional end date for the data operation", required = false)]
to: Option<String>,
},
}

#[derive(Subcommand)]
pub enum DescribeSubcommands {
#[clap(about = "Describes the data schema")]
Schema,
Schema {
#[arg(long, help = "The path for the data operation", required = true)]
path: String,
},
#[clap(about = "List the available flights")]
Flights,
}
42 changes: 33 additions & 9 deletions crates/chronicle-data/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow_flight::{flight_service_client::FlightServiceClient, FlightInfo};
use arrow_flight::{flight_service_client::FlightServiceClient, FlightDescriptor, FlightInfo};
use arrow_schema::Schema;
use clap::Parser;
use cli::{Cli, Commands, DescribeSubcommands};
Expand Down Expand Up @@ -29,12 +29,23 @@ fn format_schema_as_table(schema: &Schema) -> String {
table.to_string()
}

fn format_flight_info_as_table(_flight_infos: Vec<FlightInfo>) -> String {
fn format_flight_info_as_table(flight_infos: Vec<FlightInfo>) -> String {
let mut table = Table::new();
table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE);

table.set_titles(row!["Descriptor", "Endpoints", "Summary"]);

for flight_info in flight_infos {
let descriptor = format!("{:?}", flight_info.flight_descriptor);
let endpoints = flight_info.endpoint.iter()
.map(|e| format!("{:?}", e.location))
.collect::<Vec<_>>()
.join(", ");
let summary = format!("{:?}", flight_info.total_records);

table.add_row(row![descriptor, endpoints, summary]);
}

table.to_string()
}

Expand All @@ -53,25 +64,38 @@ async fn list_flights(
Ok(flights_info)
}

async fn get_schema(path: Vec<String>,client: &mut FlightServiceClient<Channel>) -> Result<Schema, Box<dyn std::error::Error>> {
let flight_descriptor = FlightDescriptor::new_path(path);
let request = tonic::Request::new(flight_descriptor);
let response = client.get_schema(request).await?;
let schema_result = response.into_inner();

Schema::try_from(&schema_result).map_err(|e| e.into())
}

#[tokio::main]
async fn main() {
let cli = Cli::parse();
let _client = init_flight_client(&cli).await.expect("Failed to initialize the Flight client");
let mut client = init_flight_client(&cli).await.expect("Failed to initialize the Flight client");

match &cli.command {
match &cli.command {
Commands::Describe { subcommand } => match subcommand {
DescribeSubcommands::Schema => {
println!("Describing the data schema...");
DescribeSubcommands::Schema{path, ..} => {
let schema = get_schema(path.split('/').map(String::from).collect(), &mut client).await.expect("Failed to get schema");
println!("{}", format_schema_as_table(&schema));
},
DescribeSubcommands::Flights => {
println!("Listing available flights...");
match list_flights(&mut client).await {
Ok(flights) => println!("{}", format_flight_info_as_table(flights)),
Err(e) => eprintln!("Error listing flights: {}", e),
}
},
},
Commands::In => {
println!("Handling incoming data operations...");
},
Commands::Out => {
println!("Handling outgoing data operations...");
Commands::Out { path, from, to } => {

},
}
}
3 changes: 1 addition & 2 deletions crates/chronicle/src/bootstrap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,9 @@ pub async fn api(

let embedded_tp = in_mem_ledger(options).await?;

Ok(Api::new(
Ok(Api::create_dispatch(
pool.clone(),
embedded_tp.connect_chronicle::<ChronicleConfig>().await?,
UniqueUuid,
chronicle_signing(options).await?,
namespace_bindings(options),
remote_opa,
Expand Down

0 comments on commit 8806ddb

Please sign in to comment.