From 8806ddb8b33cf35f5051316930195760a8eda060 Mon Sep 17 00:00:00 2001 From: Ryan Date: Thu, 25 Jul 2024 21:07:53 +0100 Subject: [PATCH] Chronicle data flight / schema describe operations Signed-off-by: Ryan --- crates/chronicle-data/src/cli.rs | 18 ++++++++++-- crates/chronicle-data/src/main.rs | 42 +++++++++++++++++++++------ crates/chronicle/src/bootstrap/mod.rs | 3 +- 3 files changed, 49 insertions(+), 14 deletions(-) diff --git a/crates/chronicle-data/src/cli.rs b/crates/chronicle-data/src/cli.rs index 3931f555..48b412d0 100644 --- a/crates/chronicle-data/src/cli.rs +++ b/crates/chronicle-data/src/cli.rs @@ -3,7 +3,7 @@ use clap::{Parser, Subcommand}; #[derive(Parser)] #[clap(name = "chronicle-data", about = "CLI for Chronicle Data operations")] pub struct Cli { - #[arg(long, help = "The Chronicle server URL", global = true, required = true)] + #[arg(long, help = "The Chronicle server URL", global = true, default_value = "http://localhost:9983")] pub chronicle: String, #[arg(long, help = "Authentication token", global = true, required = false)] @@ -23,13 +23,25 @@ pub enum Commands { #[clap(about = "Handles incoming data operations")] In, #[clap(about = "Handles outgoing data operations")] - Out, + Out { + #[arg(long, help = "The path for the data operation", required = true)] + path: String, + + #[arg(long, help = "Optional start date for the data operation", required = false)] + from: Option, + + #[arg(long, help = "Optional end date for the data operation", required = false)] + to: Option, + }, } #[derive(Subcommand)] pub enum DescribeSubcommands { #[clap(about = "Describes the data schema")] - Schema, + Schema { + #[arg(long, help = "The path for the data operation", required = true)] + path: String, + }, #[clap(about = "List the available flights")] Flights, } diff --git a/crates/chronicle-data/src/main.rs b/crates/chronicle-data/src/main.rs index 102d83dd..18152d9b 100644 --- a/crates/chronicle-data/src/main.rs +++ b/crates/chronicle-data/src/main.rs @@ -1,4 +1,4 @@ -use arrow_flight::{flight_service_client::FlightServiceClient, FlightInfo}; +use arrow_flight::{flight_service_client::FlightServiceClient, FlightDescriptor, FlightInfo}; use arrow_schema::Schema; use clap::Parser; use cli::{Cli, Commands, DescribeSubcommands}; @@ -29,12 +29,23 @@ fn format_schema_as_table(schema: &Schema) -> String { table.to_string() } -fn format_flight_info_as_table(_flight_infos: Vec) -> String { +fn format_flight_info_as_table(flight_infos: Vec) -> String { let mut table = Table::new(); table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE); table.set_titles(row!["Descriptor", "Endpoints", "Summary"]); + for flight_info in flight_infos { + let descriptor = format!("{:?}", flight_info.flight_descriptor); + let endpoints = flight_info.endpoint.iter() + .map(|e| format!("{:?}", e.location)) + .collect::>() + .join(", "); + let summary = format!("{:?}", flight_info.total_records); + + table.add_row(row![descriptor, endpoints, summary]); + } + table.to_string() } @@ -53,25 +64,38 @@ async fn list_flights( Ok(flights_info) } +async fn get_schema(path: Vec,client: &mut FlightServiceClient) -> Result> { + let flight_descriptor = FlightDescriptor::new_path(path); + let request = tonic::Request::new(flight_descriptor); + let response = client.get_schema(request).await?; + let schema_result = response.into_inner(); + + Schema::try_from(&schema_result).map_err(|e| e.into()) +} + #[tokio::main] async fn main() { let cli = Cli::parse(); - let _client = init_flight_client(&cli).await.expect("Failed to initialize the Flight client"); + let mut client = init_flight_client(&cli).await.expect("Failed to initialize the Flight client"); - match &cli.command { + match &cli.command { Commands::Describe { subcommand } => match subcommand { - DescribeSubcommands::Schema => { - println!("Describing the data schema..."); + DescribeSubcommands::Schema{path, ..} => { + let schema = get_schema(path.split('/').map(String::from).collect(), &mut client).await.expect("Failed to get schema"); + println!("{}", format_schema_as_table(&schema)); }, DescribeSubcommands::Flights => { - println!("Listing available flights..."); + match list_flights(&mut client).await { + Ok(flights) => println!("{}", format_flight_info_as_table(flights)), + Err(e) => eprintln!("Error listing flights: {}", e), + } }, }, Commands::In => { println!("Handling incoming data operations..."); }, - Commands::Out => { - println!("Handling outgoing data operations..."); + Commands::Out { path, from, to } => { + }, } } diff --git a/crates/chronicle/src/bootstrap/mod.rs b/crates/chronicle/src/bootstrap/mod.rs index 62ed13d6..060e1e48 100644 --- a/crates/chronicle/src/bootstrap/mod.rs +++ b/crates/chronicle/src/bootstrap/mod.rs @@ -305,10 +305,9 @@ pub async fn api( let embedded_tp = in_mem_ledger(options).await?; - Ok(Api::new( + Ok(Api::create_dispatch( pool.clone(), embedded_tp.connect_chronicle::().await?, - UniqueUuid, chronicle_signing(options).await?, namespace_bindings(options), remote_opa,