|
| 1 | +import os |
| 2 | + |
| 3 | +import click |
| 4 | +from cpg_utils.hail_batch import get_batch, get_config, copy_common_env |
| 5 | +from google.cloud import storage |
| 6 | + |
| 7 | + |
| 8 | +def create_md5s_for_files_in_directory(skip_filetypes: tuple[str, str], force_recreate: bool, gs_dir): |
| 9 | + """Validate files with MD5s in the provided gs directory""" |
| 10 | + b = get_batch(f'Create md5 checksums for files in {gs_dir}') |
| 11 | + |
| 12 | + if not gs_dir.startswith('gs://'): |
| 13 | + raise ValueError(f'Expected GS directory, got: {gs_dir}') |
| 14 | + |
| 15 | + billing_project = get_config()['hail']['billing_project'] |
| 16 | + driver_image = get_config()['workflow']['driver_image'] |
| 17 | + |
| 18 | + bucket_name, *components = gs_dir[5:].split('/') |
| 19 | + |
| 20 | + client = storage.Client() |
| 21 | + blobs = client.list_blobs(bucket_name, prefix='/'.join(components)) |
| 22 | + files: set[str] = {f'gs://{bucket_name}/{blob.name}' for blob in blobs} |
| 23 | + for obj in files: |
| 24 | + if obj.endswith('.md5') or obj.endswith(skip_filetypes): |
| 25 | + continue |
| 26 | + if f'{obj}.md5' in files and not force_recreate: |
| 27 | + print(f'{obj}.md5 already exists, skipping') |
| 28 | + continue |
| 29 | + |
| 30 | + print('Creating md5 for', obj) |
| 31 | + job = b.new_job(f'Create {os.path.basename(obj)}.md5') |
| 32 | + create_md5(job, obj, billing_project, driver_image) |
| 33 | + |
| 34 | + b.run(wait=False) |
| 35 | + |
| 36 | + |
| 37 | +def create_md5(job, file, billing_project, driver_image): |
| 38 | + """ |
| 39 | + Streams the file with gsutil and calculates the md5 checksum, |
| 40 | + then uploads the checksum to the same path as filename.md5. |
| 41 | + """ |
| 42 | + copy_common_env(job) |
| 43 | + job.image(driver_image) |
| 44 | + md5 = f'{file}.md5' |
| 45 | + job.command( |
| 46 | + f"""\ |
| 47 | + set -euxo pipefail |
| 48 | + gcloud -q auth activate-service-account --key-file=$GOOGLE_APPLICATION_CREDENTIALS |
| 49 | + gsutil cat {file} | md5sum | cut -d " " -f1 > /tmp/uploaded.md5 |
| 50 | + gsutil -u {billing_project} cp /tmp/uploaded.md5 {md5} |
| 51 | + """ |
| 52 | + ) |
| 53 | + |
| 54 | + return job |
| 55 | + |
| 56 | + |
| 57 | +@click.command() |
| 58 | +@click.option('--skip-filetypes', '-s', default=('.crai', '.tbi'), multiple=True) |
| 59 | +@click.option('--force-recreate', '-f', is_flag=True, default=False) |
| 60 | +@click.argument('gs_dir') |
| 61 | +def main(skip_filetypes: tuple[str, str], force_recreate: bool, gs_dir: str): |
| 62 | + """Scans the directory for files and creates md5 checksums for them.""" |
| 63 | + create_md5s_for_files_in_directory(skip_filetypes, force_recreate, gs_dir=gs_dir) |
| 64 | + |
| 65 | + |
| 66 | +if __name__ == '__main__': |
| 67 | + # pylint: disable=no-value-for-parameter |
| 68 | + main() |
0 commit comments