Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2c6f9ed
rebase commit on new master
Vdeub-cloudinary Jul 3, 2025
5fbc26a
add tests
Vdeub-cloudinary Jul 3, 2025
d791f85
fix tests
Vdeub-cloudinary Jul 3, 2025
c6019bf
fix tests due to wrong mock data
Vdeub-cloudinary Jul 3, 2025
f25738a
fix tests due to wrong mock data + api missing method
Vdeub-cloudinary Jul 3, 2025
4c068ea
fix tests due to wrong definition
Vdeub-cloudinary Jul 3, 2025
d0f67ea
fix list first
Vdeub-cloudinary Jul 3, 2025
e77f822
fix patch
Vdeub-cloudinary Jul 3, 2025
c0a1d64
fix assertion error for list
Vdeub-cloudinary Jul 3, 2025
0e51e47
fix compare create
Vdeub-cloudinary Jul 3, 2025
03d053d
add listing after creation test
Vdeub-cloudinary Jul 3, 2025
36e19c3
fix test list after creation
Vdeub-cloudinary Jul 3, 2025
b7486b1
fix tests
Vdeub-cloudinary Jul 10, 2025
820c889
Merge branch 'master' into devx-16946-smd-for-clone
Vdeub-cloudinary Jul 16, 2025
3e3d67c
fix declarations due to conflict resolution mistake
Vdeub-cloudinary Jul 16, 2025
a2cf5a1
fix fields declaration for smd
Vdeub-cloudinary Jul 16, 2025
2b6fd28
improve scripts based on discussions
Vdeub-cloudinary Jul 21, 2025
23febe2
tentatively add user input mock
Vdeub-cloudinary Jul 21, 2025
b074d0e
tentatively add user input mock - 2
Vdeub-cloudinary Jul 21, 2025
17981a8
tentatively add user input mock - 3
Vdeub-cloudinary Jul 21, 2025
fe0233e
tentatively add user input mock - 4
Vdeub-cloudinary Jul 21, 2025
a626d18
tentatively add user input mock - 5
Vdeub-cloudinary Jul 21, 2025
30c8f6b
revamp code to make it more reusable and improve tests
Vdeub-cloudinary Dec 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 96 additions & 7 deletions cloudinary_cli/modules/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
from cloudinary_cli.utils.utils import normalize_list_params, print_help_and_exit
import cloudinary
from cloudinary_cli.utils.utils import run_tasks_concurrently
from cloudinary_cli.utils.api_utils import upload_file
from cloudinary_cli.utils.config_utils import load_config, get_cloudinary_config, config_to_dict
from cloudinary_cli.utils.api_utils import upload_file, handle_api_command
from cloudinary_cli.utils.json_utils import print_json
from cloudinary_cli.utils.config_utils import load_config, get_cloudinary_config, config_to_dict, config_to_tuple_list
from cloudinary_cli.defaults import logger
from cloudinary_cli.core.search import execute_single_request, handle_auto_pagination

Expand Down Expand Up @@ -32,7 +33,7 @@
@option("-w", "--concurrent_workers", type=int, default=30,
help="Specify the number of concurrent network threads.")
@option("-fi", "--fields", multiple=True,
help="Specify whether to copy tags and/or context. Valid options: `tags,context`.")
help="Specify whether to copy tags and/or context. Valid options: `tags,context,metadata`.")
@option("-se", "--search_exp", default="",
help="Define a search expression to filter the assets to clone.")
@option("--async", "async_", is_flag=True, default=False,
Expand All @@ -54,19 +55,31 @@ def clone(target, force, overwrite, concurrent_workers, fields, search_exp, asyn
return False

source_assets = search_assets(force, search_exp)
if 'metadata' in fields:
source_metadata = list_metadata_items("metadata_fields")
if source_metadata.get('metadata_fields'):
target_metadata = list_metadata_items("metadata_fields", config_to_tuple_list(target_config))
fields_compare = compare_create_metadata_items(source_metadata, target_metadata, config_to_tuple_list(target_config), key="metadata_fields")
source_metadata_rules = list_metadata_items("metadata_rules")
if source_metadata_rules.get('metadata_rules'):
target_metadata_rules = list_metadata_items("metadata_rules", config_to_tuple_list(target_config))
rules_compare = compare_create_metadata_items(source_metadata_rules,target_metadata_rules, config_to_tuple_list(target_config), key="metadata_rules", id_field="name")
else:
logger.info(style(f"No metadata rules found in {cloudinary.config().cloud_name}", fg="yellow"))
else:
logger.info(style(f"No metadata found in {cloudinary.config().cloud_name}", fg="yellow"))

upload_list = []
for r in source_assets.get('resources'):
updated_options, asset_url = process_metadata(r, overwrite, async_, notification_url,
normalize_list_params(fields))
updated_options.update(config_to_dict(target_config))
upload_list.append((asset_url, {**updated_options}))

if not upload_list:
logger.error(style(f'No assets found in {cloudinary.config().cloud_name}', fg="red"))
logger.error(style(f"No assets found in {cloudinary.config().cloud_name}", fg="red"))
return False

logger.info(style(f'Copying {len(upload_list)} asset(s) from {cloudinary.config().cloud_name} to {target_config.cloud_name}', fg="blue"))
logger.info(style(f"Copying {len(upload_list)} asset(s) from {cloudinary.config().cloud_name} to {target_config.cloud_name}", fg="blue"))

run_tasks_concurrently(upload_file, upload_list, concurrent_workers)

Expand All @@ -75,7 +88,7 @@ def clone(target, force, overwrite, concurrent_workers, fields, search_exp, asyn

def search_assets(force, search_exp):
search = cloudinary.search.Search().expression(search_exp)
search.fields(['tags', 'context', 'access_control', 'secure_url', 'display_name'])
search.fields(['tags', 'context', 'access_control', 'secure_url', 'display_name','metadata'])
search.max_results(DEFAULT_MAX_RESULTS)

res = execute_single_request(search, fields_to_keep="")
Expand All @@ -84,6 +97,80 @@ def search_assets(force, search_exp):
return res


def list_metadata_items(method_key, *options):
api_method_name = 'list_' + method_key
params = [api_method_name]
if options:
options = options[0]
res = handle_api_command(params, (), options, None, None, None,
doc_url="", api_instance=cloudinary.api,
api_name="admin",
auto_paginate=True,
force=True, return_data=True)
res.get(method_key, []).sort(key=lambda x: x["external_id"])

return res


def create_metadata_item(api_method_name, item, *options):
params = (api_method_name, item)
if options:
options = options[0]
res = handle_api_command(params, (), options, None, None, None,
doc_url="", api_instance=cloudinary.api,
api_name="admin",
return_data=True)

return res


def deep_diff(obj_source, obj_target):
diffs = {}
for k in set(obj_source.keys()).union(obj_target.keys()):
if obj_source.get(k) != obj_target.get(k):
diffs[k] = {"json_source": obj_source.get(k), "json_target": obj_target.get(k)}

return diffs


def compare_create_metadata_items(json_source, json_target, target_config, key, id_field = "external_id"):
list_source = {item[id_field]: item for item in json_source.get(key, [])}
list_target = {item[id_field]: item for item in json_target.get(key, [])}

only_in_source = list(list_source.keys() - list_target.keys())
common = list_source.keys() & list_target.keys()

if not len(only_in_source):
logger.info(style(f"{(' '.join(key.split('_')))} in {dict(target_config)['cloud_name']} and in {cloudinary.config().cloud_name} are identical. No {(' '.join(key.split('_')))} will be cloned", fg="yellow"))
else:
logger.info(style(f"Copying {len(only_in_source)} {(' '.join(key.split('_')))} from {cloudinary.config().cloud_name} to {dict(target_config)['cloud_name']}", fg="blue"))

for key_field in only_in_source:
if key == 'metadata_fields':
try:
res = create_metadata_item('add_metadata_field', list_source[key_field],target_config)
logger.info(style(f"Successfully created {(' '.join(key.split('_')))} {key_field} to {dict(target_config)['cloud_name']}", fg="green"))
except Exception as e:
logger.error(style(f"Error when creating {(' '.join(key.split('_')))} {key_field} to {dict(target_config)['cloud_name']}", fg="red"))
else:
try:
res = create_metadata_item('add_metadata_rule', list_source[key_field],target_config)
logger.info(style(f"Successfully created {(' '.join(key.split('_')))} {key_field} to {dict(target_config)['cloud_name']}", fg="green"))
except Exception as e:
logger.error(style(f"Error when creating {(' '.join(key.split('_')))} {key_field} to {dict(target_config)['cloud_name']}", fg="red"))


diffs = {}
for id_ in common:
if list_source[id_] != list_target[id_]:
diffs[id_] = deep_diff(list_source[id_], list_target[id_])

return {
"only_in_json_source": only_in_source,
"differences": diffs
}


def process_metadata(res, overwrite, async_, notification_url, copy_fields=""):
cloned_options = {}
asset_url = res.get('secure_url')
Expand All @@ -96,6 +183,8 @@ def process_metadata(res, overwrite, async_, notification_url, copy_fields=""):
cloned_options['tags'] = res.get('tags')
if "context" in copy_fields:
cloned_options['context'] = res.get('context')
if "metadata" in copy_fields:
cloned_options['metadata'] = res.get('metadata')
if res.get('folder'):
# This is required to put the asset in the correct asset_folder
# when copying from a fixed to DF (dynamic folder) cloud as if
Expand Down
3 changes: 3 additions & 0 deletions cloudinary_cli/utils/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def get_cloudinary_config(target):
def config_to_dict(config):
return {k: v for k, v in config.__dict__.items() if not k.startswith("_")}

def config_to_tuple_list(config):
return [(k, v) for k, v in config.__dict__.items() if not k.startswith("_")]

def show_cloudinary_config(cloudinary_config):
obfuscated_config = config_to_dict(cloudinary_config)

Expand Down
Loading