Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Parquet format output not working in CLI for show commands #25997

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions influxdb3/src/commands/show.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
use clap::Parser;
use secrecy::{ExposeSecret, Secret};
use std::error::Error;
use std::io;
use std::str::Utf8Error;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use url::Url;

use crate::commands::common::Format;
use system::Error as SystemCommandError;
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error(transparent)]
Client(#[from] influxdb3_client::Error),

#[error(
"must specify an output file path with `--output` parameter when formatting \
the output as `parquet`"
)]
NoOutputFileForParquet,

#[error("invalid UTF8 received from server: {0}")]
Utf8(#[from] Utf8Error),

#[error("io error: {0}")]
Io(#[from] io::Error),

#[error(transparent)]
SystemCommand(#[from] SystemCommandError),
}

mod system;
use system::SystemConfig;
Expand Down Expand Up @@ -45,30 +69,48 @@ pub struct DatabaseConfig {
/// The format in which to output the list of databases
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the list of databases into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
pub(crate) async fn command(config: Config) -> Result<(), Error> {
match config.cmd {
SubCommand::Databases(DatabaseConfig {
host_url,
auth_token,
show_deleted,
output_format,
output_file_path,
}) => {
let mut client = influxdb3_client::Client::new(host_url)?;

if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}

let resp_bytes = client
let mut resp_bytes = client
.api_v3_configure_db_show()
.with_format(output_format.into())
.with_format(output_format.clone().into())
.with_show_deleted(show_deleted)
.send()
.await?;

println!("{}", std::str::from_utf8(&resp_bytes)?);
if let Some(path) = output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
f.write_all_buf(&mut resp_bytes).await?;
} else {
if output_format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}
println!("{}", std::str::from_utf8(&resp_bytes)?);
}
}
SubCommand::System(cfg) => system::command(cfg).await?,
}
Expand Down
118 changes: 101 additions & 17 deletions influxdb3/src/commands/show/system.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::super::common::{Format, InfluxDb3Config};
use clap::Parser;
use influxdb3_client::Client;
use secrecy::ExposeSecret;
use serde::Deserialize;

use super::super::common::{Format, InfluxDb3Config};
use std::io;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;

#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
Expand All @@ -15,6 +17,15 @@ pub(crate) enum Error {

#[error("system table '{0}' not found: {1}")]
SystemTableNotFound(String, SystemTableNotFound),

#[error(
"must specify an output file path with `--output` parameter when formatting \
the output as `parquet`"
)]
NoOutputFileForParquet,

#[error("io error: {0}")]
Io(#[from] io::Error),
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -77,6 +88,10 @@ pub struct TableListConfig {
/// The format in which to output the query
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the table lists output into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

const SYS_TABLES_QUERY: &str = "WITH cols (table_name, column_name) AS (SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = 'system' ORDER BY (table_name, column_name)) SELECT table_name, array_agg(column_name) AS column_names FROM cols GROUP BY table_name ORDER BY table_name";
Expand All @@ -100,14 +115,32 @@ impl std::fmt::Display for SystemTableNotFound {

impl SystemCommandRunner {
async fn list(&self, config: TableListConfig) -> Result<()> {
let bs = self
let TableListConfig {
output_format,
output_file_path,
} = &config;

let mut bs = self
.client
.api_v3_query_sql(self.db.as_str(), SYS_TABLES_QUERY)
.format(config.output_format.into())
.format(output_format.clone().into())
.send()
.await?;

println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
if let Some(path) = output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
f.write_all_buf(&mut bs).await?;
} else {
if output_format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}
println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
}

Ok(())
}
Expand All @@ -124,7 +157,7 @@ pub struct TableConfig {
limit: u16,

/// Order by the specified fields.
#[clap(long = "order-by", short = 'o', num_args = 1, value_delimiter = ',')]
#[clap(long = "order-by", num_args = 1, value_delimiter = ',')]
order_by: Vec<String>,

/// Select specified fields from table.
Expand All @@ -134,6 +167,10 @@ pub struct TableConfig {
/// The format in which to output the query
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the table output into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

impl SystemCommandRunner {
Expand All @@ -157,6 +194,7 @@ impl SystemCommandRunner {
select,
order_by,
output_format,
output_file_path,
} = &config;

let select_expr = if !select.is_empty() {
Expand Down Expand Up @@ -185,7 +223,7 @@ impl SystemCommandRunner {

let query = clauses.join("\n");

let bs = match client
let mut bs = match client
.api_v3_query_sql(db, query)
.format(output_format.clone().into())
.send()
Expand All @@ -205,8 +243,20 @@ impl SystemCommandRunner {
}
};

println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());

if let Some(path) = output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
f.write_all_buf(&mut bs).await?;
} else {
if output_format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}
println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
}
Ok(())
}
}
Expand All @@ -221,25 +271,44 @@ pub struct SummaryConfig {
/// The format in which to output the query
#[clap(value_enum, long = "format", default_value = "pretty")]
output_format: Format,

/// Put the summary into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,
}

impl SystemCommandRunner {
async fn summary(&self, config: SummaryConfig) -> Result<()> {
self.summarize_all_tables(config.limit, &config.output_format)
.await?;
self.summarize_all_tables(
config.limit,
&config.output_format,
&config.output_file_path,
)
.await?;
Ok(())
}

async fn summarize_all_tables(&self, limit: u16, format: &Format) -> Result<()> {
async fn summarize_all_tables(
&self,
limit: u16,
format: &Format,
output_file_path: &Option<String>,
) -> Result<()> {
let system_tables = self.get_system_tables().await?;
for table in system_tables {
self.summarize_table(table.table_name.as_str(), limit, format)
self.summarize_table(table.table_name.as_str(), limit, format, output_file_path)
.await?;
}
Ok(())
}

async fn summarize_table(&self, table_name: &str, limit: u16, format: &Format) -> Result<()> {
async fn summarize_table(
&self,
table_name: &str,
limit: u16,
format: &Format,
output_file_path: &Option<String>,
) -> Result<()> {
let Self { db, client } = self;
let mut clauses = vec![format!("SELECT * FROM system.{table_name}")];

Expand All @@ -257,14 +326,29 @@ impl SystemCommandRunner {

let query = clauses.join("\n");

let bs = client
let mut bs = client
.api_v3_query_sql(db, query)
.format(format.clone().into())
.send()
.await?;

println!("{table_name} summary:");
println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
if let Some(path) = output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
f.write_all_buf(&mut bs).await?;
} else {
if format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}

println!("{table_name} summary:");
println!("{}", String::from_utf8(bs.as_ref().to_vec()).unwrap());
}

Ok(())
}
}
Expand Down
12 changes: 12 additions & 0 deletions influxdb3/tests/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,18 @@ async fn test_show_system() {
name: "iox schema table name exists, but should error because we're concerned here with system tables",
args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "table", "cpu"],
},
FailTestCase {
name: "fail without output-file when format is parquet for table",
args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "table", "--format", "parquet","distinct_caches"]
},
FailTestCase {
name: "fail without output-file when format is parquet for table-list",
args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "table-list", "--format", "parquet"]
},
FailTestCase {
name: "fail without output-file when format is parquet for summary",
args: vec!["show", "system", "--host", server_addr.as_str(), "--database", db_name, "summary", "--format", "parquet"]
},
];

for case in cases {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
source: influxdb3/tests/cli/mod.rs
expression: output
---
Show command failed: must specify an output file path with `--output` parameter when formatting the output as `parquet`
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
source: influxdb3/tests/cli/mod.rs
expression: output
---
Show command failed: must specify an output file path with `--output` parameter when formatting the output as `parquet`
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
source: influxdb3/tests/cli/mod.rs
expression: output
---
Show command failed: must specify an output file path with `--output` parameter when formatting the output as `parquet`