diff --git a/icechunk/src/cli/interface.rs b/icechunk/src/cli/interface.rs index f2df1810d..5e56b144e 100644 --- a/icechunk/src/cli/interface.rs +++ b/icechunk/src/cli/interface.rs @@ -1,13 +1,18 @@ -use crate::repository::VersionInfo; +use crate::format::ChunkIndices; +use crate::format::snapshot::{NodeData, DimensionName}; +use crate::repository::{VersionInfo, RepositoryErrorKind}; +use chrono::{Local}; use clap::{Args, Parser, Subcommand}; use dialoguer::{Input, Select}; -use futures::stream::StreamExt; +use futures::stream::{StreamExt}; +use itertools::Itertools; use serde_yaml_ng; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs::{File, create_dir_all}; use std::io::stdout; use std::path::PathBuf; use std::sync::Arc; +use std::iter::zip; use anyhow::{Context, Ok, Result}; @@ -16,6 +21,7 @@ use crate::storage::{ new_tigris_storage, }; use crate::{Repository, RepositoryConfig, Storage, new_s3_storage}; +use crate::format::{SnapshotId, snapshot::SnapshotInfo}; use crate::cli::config::{CliConfig, RepositoryAlias, RepositoryDefinition}; use crate::config::{AzureCredentials, GcsCredentials, S3Credentials, S3Options}; @@ -32,12 +38,20 @@ pub struct IcechunkCLI { #[derive(Debug, Subcommand)] enum Command { - #[command(subcommand, about = "Manage repositories")] - Repo(RepoCommand), - #[command(subcommand, about = "Manage snapshots")] - Snapshot(SnapshotCommand), #[command(subcommand, about = "Manage configuration")] Config(ConfigCommand), + #[command(subcommand, about = "Manage repositories")] + Repo(RepoCommand), + #[clap(name = "ancestry", about = "Show ancestry of a branch, tag or snapshot")] + Ancestry(AncestryArgs), + #[clap(name = "inspect", about = "Show snapshot details")] + Inspect(InspectArgs), + #[command(subcommand, about = "Manage branches")] + Branch(BranchCommand), + #[command(subcommand, about = "Manage tags")] + Tag(TagCommand), + #[clap(name = "diff", about = "Show diff between two refs.")] + Diff(DiffArgs) } #[derive(Debug, Subcommand)] @@ -46,10 +60,113 @@ enum RepoCommand { Create(CreateCommand), } +#[derive(Debug, Args)] +struct AncestryArgs { + #[arg(name = "alias", help = "Alias of the repository in the config")] + repo: RepositoryAlias, + #[arg( + name = "reference", + help = "ID of snapshot to show ancestry for" + )] + reference: String, + #[arg(short = 'n', default_value_t = 10, help = "Number of snapshots to list")] + n: usize, +} + +#[derive(Debug, Args)] +struct InspectArgs { + #[arg(name = "alias", help = "Alias of the repository in the config")] + repo: RepositoryAlias, + #[arg( + name = "reference", + help = "ID of snapshot to inspect" + )] + reference: String, +} + #[derive(Debug, Subcommand)] -enum SnapshotCommand { - #[clap(name = "list", about = "List snapshots in a repository")] - List(ListCommand), +enum BranchCommand { + #[clap(name = "list", about = "List branches")] + List(BranchListArgs), + #[clap(name = "create", about = "Create branch")] + Create(BranchCreateArgs), + #[clap(name = "delete", about = "Delete branch")] + Delete(BranchDeleteArgs), +} + +#[derive(Debug, Args)] +struct BranchListArgs { + #[arg(name = "alias", help = "Alias of the repository in the config")] + repo: RepositoryAlias, +} + +#[derive(Debug, Args)] +struct BranchCreateArgs { + #[arg(name = "alias", help = "Alias of the repository in the config")] + repo: RepositoryAlias, + #[arg(name = "branch", help = "Name of the branch to create")] + branch: String, + #[arg( + name = "from", + help = "ID of snapshot to create the branch from" + )] + from: String, +} + +#[derive(Debug, Args)] +struct BranchDeleteArgs { + #[arg(name = "alias", help = "Alias of the repository in the config")] + repo: RepositoryAlias, + #[arg(name = "branch", help = "Name of the branch to delete")] + branch: String, +} + + +#[derive(Debug, Subcommand)] +enum TagCommand { + #[clap(name = "list", about = "List tags")] + List(TagListArgs), + #[clap(name = "create", about = "Create tag")] + Create(TagCreateArgs), + #[clap(name = "delete", about = "Delete tag")] + Delete(TagDeleteArgs), +} + +#[derive(Debug, Args)] +struct TagListArgs { + #[arg(name = "alias", help = "Alias of the repository in the config")] + repo: RepositoryAlias, +} + +#[derive(Debug, Args)] +struct TagCreateArgs { + #[arg(name = "alias", help = "Alias of the repository in the config")] + repo: RepositoryAlias, + #[arg(name = "tag", help = "Name of the tag to create")] + tag: String, + #[arg( + name = "target", + help = "ID of snapshot to create the tag for" + )] + target: String, +} + +#[derive(Debug, Args)] +struct TagDeleteArgs { + #[arg(name = "alias", help = "Alias of the repository in the config")] + repo: RepositoryAlias, + #[arg(name = "tag", help = "Name of the tag to delete")] + tag: String, +} + +#[derive(Debug, Args)] +struct DiffArgs { + #[arg(name = "alias", help = "Alias of the repository in the config")] + repo: RepositoryAlias, + #[arg(name = "from", help = "Source snapshot ID")] + from: String, + #[arg(name = "to", help = "Target snapshot ID")] + to: String, } #[derive(Debug, Subcommand)] @@ -91,20 +208,6 @@ struct AddCommand { repo: RepositoryAlias, } -#[derive(Debug, Args)] -struct ListCommand { - repo: RepositoryAlias, - #[arg(short = 'n', default_value_t = 10, help = "Number of snapshots to list")] - n: usize, - #[arg( - short = 'b', - long = "branch", - default_value = "main", - help = "Branch to list snapshots from" - )] - branch: String, -} - const CONFIG_DIR: &str = "icechunk"; const CONFIG_NAME: &str = "cli-config.yaml"; @@ -203,6 +306,28 @@ async fn get_storage( } } +async fn open_repository( + repo_alias: &RepositoryAlias, + config: &CliConfig, +) -> Result { + let repo = + config.repos.get(repo_alias).context(format!("Repository {:?} not found in config", repo_alias))?; + let storage = get_storage(repo).await?; + let config = Some(repo.get_config().clone()); + + let repository = Repository::open(config, Arc::clone(&storage), HashMap::new()) + .await + .context(format!("Failed to open repository {:?}", repo_alias))?; + + Ok(repository) +} + +fn parse_snapshot(reference: &String) -> Result { + let snapshot_id = SnapshotId::try_from(reference.as_str()) + .map_err(|_| RepositoryErrorKind::InvalidSnapshotId(reference.clone()))?; + Ok(snapshot_id) +} + async fn repo_create(init_cmd: &CreateCommand, config: &CliConfig) -> Result<()> { let repo = config.repos.get(&init_cmd.repo).context("Repository not found in config")?; @@ -219,32 +344,336 @@ async fn repo_create(init_cmd: &CreateCommand, config: &CliConfig) -> Result<()> Ok(()) } -async fn snapshot_list( - list_cmd: &ListCommand, +async fn list_branches( + args: &BranchListArgs, config: &CliConfig, - mut writer: impl std::io::Write, ) -> Result<()> { - let repo = - config.repos.get(&list_cmd.repo).context("Repository not found in config")?; - let storage = get_storage(repo).await?; - let config = Some(repo.get_config().clone()); + let repository = open_repository(&args.repo, config).await?; + let branches = repository.list_branches().await.context("Failed to list branches")?; + for branch in branches { + let snapshot = repository.lookup_branch(branch.as_str()).await?; + println!("{} {}", snapshot, branch); + } + Ok(()) +} - let repository = Repository::open(config, Arc::clone(&storage), HashMap::new()) +async fn create_branch( + args: &BranchCreateArgs, + config: &CliConfig, +) -> Result<()> { + let repository = open_repository(&args.repo, config).await?; + let from_snapshot = parse_snapshot(&args.from)?; + repository + .create_branch(&args.branch, &from_snapshot) + .await + .context(format!( + "Failed to create branch {:?} from {:?}", + args.branch, args.from + ))?; + + println!( + "✅ Created branch {:?} from {:?} in repository {:?}", + args.branch, args.from, args.repo + ); + + Ok(()) +} + +async fn delete_branch( + args: &BranchDeleteArgs, + config: &CliConfig, +) -> Result<()> { + let repository = open_repository(&args.repo, config).await?; + + repository + .delete_branch(&args.branch) + .await + .context(format!("Failed to delete branch {:?}", args.branch))?; + + println!( + "✅ Deleted branch {:?} in repository {:?}", + args.branch, args.repo + ); + + Ok(()) +} + +async fn list_tags( + args: &TagListArgs, + config: &CliConfig, +) -> Result<()> { + let repository = open_repository(&args.repo, config).await?; + let tags = repository.list_tags().await.context("Failed to list tags")?; + for tag in tags { + let snapshot = repository.lookup_tag(tag.as_str()).await?; + println!("{} {}", snapshot, tag); + } + Ok(()) +} + +async fn create_tag( + args: &TagCreateArgs, + config: &CliConfig, +) -> Result<()> { + let repository = open_repository(&args.repo, config).await?; + let target_snapshot = parse_snapshot(&args.target)?; + + repository + .create_tag(&args.tag, &target_snapshot) + .await + .context(format!( + "Failed to create tag {:?} from {:?}", + args.tag, args.target + ))?; + + println!( + "✅ Created tag {:?} for {:?} in repository {:?}", + args.tag, target_snapshot, args.repo + ); + + Ok(()) +} + +async fn delete_tag( + args: &TagDeleteArgs, + config: &CliConfig, +) -> Result<()> { + let repository = open_repository(&args.repo, config).await?; + + repository + .delete_tag(&args.tag) .await - .context(format!("Failed to open repository {:?}", list_cmd.repo))?; + .context(format!("Failed to delete tag {:?}", args.tag))?; + + println!( + "✅ Deleted tag {:?} in repository {:?}", + args.tag, args.repo + ); - let branch_ref = VersionInfo::BranchTipRef(list_cmd.branch.clone()); - let ancestry = repository.ancestry(&branch_ref).await?; + Ok(()) +} - let snapshots: Vec<_> = ancestry.take(list_cmd.n).collect().await; +fn show_snapshot(mut writer: impl std::io::Write, snapshot: SnapshotInfo, with_meta: bool) -> Result<()> { + writeln!(writer, "Snapshot: {}", snapshot.id)?; + writeln!(writer, "Date: {}", snapshot.flushed_at.with_timezone(&Local).format("%B %d %Y %H:%M:%S"))?; + if with_meta && !snapshot.metadata.is_empty() { + writeln!(writer, "Metadata:")?; + for (key, value) in snapshot.metadata { + writeln!(writer, " {}: {}", key, value)?; + } + } + if !snapshot.message.is_empty() { + writeln!(writer, "\n {}", snapshot.message)?; + } + Ok(()) +} +async fn ancestry( + args: &AncestryArgs, + config: &CliConfig, + mut writer: impl std::io::Write, +) -> Result<()> { + let repository = open_repository(&args.repo, config).await?; + let snapshot = parse_snapshot(&args.reference)?; + let ancestry = repository.ancestry(&VersionInfo::SnapshotId(snapshot)).await?; + let snapshots: Vec<_> = ancestry.take(args.n).collect().await; for snapshot in snapshots { - writeln!(writer, "{:?}", snapshot.context("Failed to get snapshot")?)?; + show_snapshot(&mut writer, snapshot.context("Failed to get snapshot")?, false)?; + writeln!(writer, "")?; + } + Ok(()) +} + + +fn array_shape_string(dimension_name: &Option>, values: Vec) -> String { + match dimension_name { + Some(names) => { + zip(names.iter(), values.iter()) + .map(|(name, v)| + match name { + DimensionName::Name(n) => format!("{}={}", n, v), + DimensionName::NotSpecified => format!("{}", v), + } + ) + .join(", ") + }, + None => { + values.join(", ") + } + } +} + +async fn inspect( + args: &InspectArgs, + config: &CliConfig, + mut writer: impl std::io::Write, +) -> Result<()> { + let repository = open_repository(&args.repo, config).await?; + let snapshot_id = parse_snapshot(&args.reference)?; + let snapshot = repository.asset_manager().fetch_snapshot(&snapshot_id).await?; + + writeln!(writer, "Snapshot: {}", snapshot_id)?; + writeln!(writer, "Date: {}", snapshot.flushed_at()?.with_timezone(&Local).format("%B %d %Y %H:%M:%S"))?; + let metadata = snapshot.metadata()?; + if !metadata.is_empty() { + writeln!(writer, "Metadata:")?; + for (key, value) in metadata { + writeln!(writer, " {}: {}", key, value)?; + } + } + let message = snapshot.message(); + if !message.is_empty() { + writeln!(writer, "Message:\n {}", message)?; + } + + writeln!(writer, "\n-- Nodes --")?; + for node_info_res in snapshot.iter() { + let node_info = node_info_res.context("Failed to get snapshot info")?; + let node_data = node_info.node_data; + match node_data { + NodeData::Array {shape, dimension_names, manifests} => { + let array_size: Vec<_> = shape.iter().map(|s| s.array_length().to_string()).collect(); + let chunk_size: Vec<_> = shape.iter().map(|s| format!( + "{}({})", + s.chunk_length(), + (s.array_length() + s.chunk_length() - 1)/s.chunk_length() + )).collect(); + writeln!(writer, "{}", node_info.path)?; + writeln!(writer, " size: {}", array_shape_string(&dimension_names, array_size))?; + writeln!(writer, " chunk: {}", array_shape_string(&dimension_names, chunk_size))?; + for manifest in manifests { + let extents = manifest.extents.iter().map(|e| + if e.end - 1 == e.start { + e.start.to_string() + } else { + format!("{}-{}", e.start, e.end - 1) + } + ).collect(); + writeln!(writer, " manifest: {} {}", + manifest.object_id, + array_shape_string(&dimension_names, extents) + )?; + } + }, + NodeData::Group => { + writeln!(writer, "{}", node_info.path)?; + }, + } + } + + writeln!(writer, "\n-- Manifests --")?; + for manifest_info in snapshot.manifest_files() { + writeln!(writer, "{}", manifest_info.id)?; + writeln!(writer, " num chunk refs: {}", manifest_info.num_chunk_refs)?; + // TODO: Human-friendly size + writeln!(writer, " size (bytes): {}", manifest_info.size_bytes)?; + } + + + Ok(()) +} + +async fn diff( + args: &DiffArgs, + config: &CliConfig, + mut writer: impl std::io::Write, +) -> Result<()> { + let repository = open_repository(&args.repo, config).await?; + + let from_ref = VersionInfo::SnapshotId(parse_snapshot(&args.from)?); + let to_ref = VersionInfo::SnapshotId(parse_snapshot(&args.to)?); + + let diff = repository + .diff(&from_ref, &to_ref) + .await + .context(format!( + "Failed to compute diff between {:?} and {:?}", + args.from, args.to + ))?; + + let new_arrays_hash: HashSet<_> = diff.new_arrays.iter().cloned().collect(); + let new_groups_hash: HashSet<_> = diff.new_groups.iter().cloned().collect(); + let deleted_arrays_hash: HashSet<_> = diff.deleted_arrays.iter().cloned().collect(); + let deleted_groups_hash: HashSet<_> = diff.deleted_groups.iter().cloned().collect(); + let updated_arrays_hash: HashSet<_> = diff.updated_arrays.iter().cloned().collect(); + let updated_groups_hash: HashSet<_> = diff.updated_groups.iter().cloned().collect(); + let updated_chunks_hash: HashSet<_> = diff.updated_chunks.keys().cloned().collect(); + + let modified_paths = { + let mut modified_paths: Vec<_> = new_arrays_hash + .union(&new_groups_hash) + .cloned() + .collect::>() + .union(&deleted_arrays_hash) + .cloned() + .collect::>() + .union(&deleted_groups_hash) + .cloned() + .collect::>() + .union(&updated_arrays_hash) + .cloned() + .collect::>() + .union(&updated_groups_hash) + .cloned() + .collect::>() + .union(&updated_chunks_hash) + .cloned() + .collect(); + + modified_paths.sort(); + modified_paths + }; + + + for path in modified_paths { + let is_new = new_arrays_hash.contains(&path) || new_groups_hash.contains(&path); + let is_deleted = deleted_arrays_hash.contains(&path) || deleted_groups_hash.contains(&path); + let is_updated = updated_arrays_hash.contains(&path) || updated_groups_hash.contains(&path); + let has_updated_chunks = updated_chunks_hash.contains(&path); + + // Sometimes a path will be new or deleted and also have updated chunks. + // In that case we do not print updated chunks as they are all new or all deleted. + if is_new && is_deleted { + writeln!(writer, "-+ {}", path.to_string())?; + continue; + } + else if is_new { + writeln!(writer, "+ {}", path.to_string())?; + continue; + } else if is_deleted { + writeln!(writer, "- {}", path.to_string())?; + continue; + } else if is_updated { + writeln!(writer, "~ {}", path.to_string())?; + } else if has_updated_chunks { + writeln!(writer, " {}", path.to_string())?; + } + + fn chunk_to_string(c: &ChunkIndices) -> String { + let idxs = c.0.iter().map(|i| i.to_string()).join(", "); + format!("({})", idxs) + } + + let updated_chunks = diff.updated_chunks.get(&path); + match updated_chunks { + Some(chunks) => { + write!(writer, " Modified {} chunk(s): ", chunks.len())?; + let t = chunks.iter().take(10).map(chunk_to_string).join(", "); + write!(writer, " {}", t)?; + if chunks.len() > 10 { + writeln!(writer, ", ...")?; + } else { + writeln!(writer, "")?; + } + } + None => {} + } } Ok(()) } + async fn config_add(add_cmd: &AddCommand, config: &CliConfig) -> Result { if config.repos.contains_key(&add_cmd.repo) { return Err(anyhow::anyhow!("Repository {:?} already exists", add_cmd.repo)); @@ -420,8 +849,41 @@ pub async fn run_cli(args: IcechunkCLI) -> Result<()> { Command::Repo(RepoCommand::Create(init_cmd)) => { repo_create(&init_cmd, &config).await } - Command::Snapshot(SnapshotCommand::List(list_cmd)) => { - snapshot_list(&list_cmd, &config, stdout()).await + Command::Ancestry(ancestry_args) => { + ancestry(&ancestry_args, &config, stdout()).await?; + Ok(()) + } + Command::Inspect(inspect_args) => { + inspect(&inspect_args, &config, stdout()).await?; + Ok(()) + } + Command::Branch(BranchCommand::List(list_args)) => { + list_branches(&list_args, &config).await?; + Ok(()) + } + Command::Branch(BranchCommand::Create(create_args)) => { + create_branch(&create_args, &config).await?; + Ok(()) + } + Command::Branch(BranchCommand::Delete(delete_args)) => { + delete_branch(&delete_args, &config).await?; + Ok(()) + } + Command::Tag(TagCommand::List(list_args)) => { + list_tags(&list_args, &config).await?; + Ok(()) + } + Command::Tag(TagCommand::Create(create_args)) => { + create_tag(&create_args, &config).await?; + Ok(()) + } + Command::Tag(TagCommand::Delete(delete_args)) => { + delete_tag(&delete_args, &config).await?; + Ok(()) + } + Command::Diff(diff_args) => { + diff(&diff_args, &config, stdout()).await?; + Ok(()) } Command::Config(ConfigCommand::Init(init_cmd)) => { let new_config = config_init(&init_cmd, &config).await?; @@ -447,6 +909,8 @@ mod tests { use super::*; + use regex::Regex; + #[tokio_test] async fn test_repo_create() { let temp = assert_fs::TempDir::new().unwrap(); @@ -478,7 +942,7 @@ mod tests { } #[tokio_test] - async fn test_snapshot_list() { + async fn test_config_list() { let temp = assert_fs::TempDir::new().unwrap(); let path = temp.path().to_path_buf(); @@ -493,24 +957,17 @@ mod tests { let config = CliConfig { repos }; - let init_cmd = CreateCommand { repo: repo_alias.clone() }; - - repo_create(&init_cmd, &config).await.unwrap(); - - let list_cmd = - ListCommand { repo: repo_alias.clone(), n: 10, branch: "main".to_string() }; - let mut writer = Vec::new(); - snapshot_list(&list_cmd, &config.clone(), &mut writer).await.unwrap(); + + config_list(&config, &mut writer).await.unwrap(); let output = String::from_utf8(writer).unwrap(); - assert_eq!(output.lines().count(), 1); - assert!(output.contains("SnapshotInfo")); + assert!(output.contains("LocalFileSystem")); } #[tokio_test] - async fn test_config_list() { + async fn test_ancestry() { let temp = assert_fs::TempDir::new().unwrap(); let path = temp.path().to_path_buf(); @@ -525,12 +982,26 @@ mod tests { let config = CliConfig { repos }; - let mut writer = Vec::new(); - - config_list(&config, &mut writer).await.unwrap(); + let init_cmd = CreateCommand { repo: repo_alias.clone() }; + repo_create(&init_cmd, &config).await.unwrap(); + let args = AncestryArgs { + repo: repo_alias.clone(), + reference: "main".to_string(), + n: 10, + }; + let mut writer = Vec::new(); + ancestry(&args, &config, &mut writer).await.unwrap(); let output = String::from_utf8(writer).unwrap(); + println!("{}", output); - assert!(output.contains("LocalFileSystem")); + let re = Regex::new( +r"Snapshot: [0-9A-Z]{20} +Date: \w+ \d{2} \d+ \d{2}:\d{2}:\d{2} + + Repository initialized +" + ).unwrap(); + assert!(re.is_match(output.as_str())); } }