From e04ad9bde7823477b1378e888c7c5bc45a914013 Mon Sep 17 00:00:00 2001 From: Thomas Sibley Date: Fri, 3 Dec 2021 12:59:02 -0800 Subject: [PATCH 1/3] Compare against relevant remote file for new record notification The GenBank download was being compared against a non-existent file: s3://nextstrain-data/files/ncov/open/gisaid.ndjson.xz which caused it to always skip notifications. --- Snakefile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Snakefile b/Snakefile index 3b8a7d13..ef594212 100644 --- a/Snakefile +++ b/Snakefile @@ -51,7 +51,6 @@ rule download_main_ndjson: message: """Fetching data using the database API""" params: - s3_src_bucket = config["s3_src"], file_on_s3_dst= f"{config['s3_dst']}/{database}.ndjson.xz", file_on_s3_src= f"{config['s3_src']}/{database}.ndjson.xz" output: @@ -67,7 +66,7 @@ rule download_main_ndjson: cleanup_failed_cmd = f"rm {output.ndjson}" run_shell_command_n_times(cmd, msg, cleanup_failed_cmd) if send_notifications: - shell("./bin/notify-on-record-change {output.ndjson} {params.s3_src_bucket}/gisaid.ndjson.xz {database}") + shell("./bin/notify-on-record-change {output.ndjson} {params.file_on_s3_src} {database}") else: shell(""" ./bin/download-from-s3 {params.file_on_s3_dst} {output.ndjson} || \ From fb814d46be22fbd2efb7a466fb8b57ac04f5b02a Mon Sep 17 00:00:00 2001 From: Thomas Sibley Date: Fri, 3 Dec 2021 15:53:30 -0800 Subject: [PATCH 2/3] Reorganize how download_main_ndjson rule is conditionally defined As the message, params, and run blocks are all distinct between fetch_from_database True vs. False, it seems more readable to me to define parallel rule definitions within a top-level conditional. --- Snakefile | 61 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/Snakefile b/Snakefile index ef594212..ab7e0e30 100644 --- a/Snakefile +++ b/Snakefile @@ -23,6 +23,8 @@ if send_notifications: all_targets.append(f"data/{database}/notify.done") if config.get("fetch_from_database", False): all_targets.append(f"data/{database}/raw.upload.done") + if send_notifications: + all_targets.append(f"data/{database}/notify-on-database-change.done") rule all: input: all_targets @@ -47,31 +49,44 @@ def run_shell_command_n_times(cmd, msg, cleanup_failed_cmd, retry_num=5): print(msg + f" has FAILED {retry_num} times. Exiting.") raise Exception("function run_shell_command_n_times has failed") -rule download_main_ndjson: - message: - """Fetching data using the database API""" +if config.get("fetch_from_database", False): + rule download_main_ndjson: + message: + """Fetching data using the database API""" + output: + ndjson = temp(f"data/{database}.ndjson") + run: + run_shell_command_n_times( + msg = f"Fetching from {database}", + cmd = f"./bin/fetch-from-{database} > {output.ndjson}", + cleanup_failed_cmd = f"rm {output.ndjson}", + ) +else: + rule download_main_ndjson: + message: + """Fetching data from our S3 bucket""" + params: + file_on_s3_dst= f"{config['s3_dst']}/{database}.ndjson.xz", + file_on_s3_src= f"{config['s3_src']}/{database}.ndjson.xz" + output: + ndjson = temp(f"data/{database}.ndjson") + shell: """ + ./bin/download-from-s3 {params.file_on_s3_dst} {output.ndjson} || \ + ./bin/download-from-s3 {params.file_on_s3_src} {output.ndjson} + """ + + +rule notify_on_database_change: + message: "Notifying on database NDJSON change" + input: + local_file = f"data/{database}.ndjson" params: - file_on_s3_dst= f"{config['s3_dst']}/{database}.ndjson.xz", - file_on_s3_src= f"{config['s3_src']}/{database}.ndjson.xz" + remote_file = f"{config['s3_src']}/{database}.ndjson.xz" output: - ndjson = temp(f"data/{database}.ndjson") - run: - if config.get("fetch_from_database", False): - if database=="gisaid": - msg = "Fetching from GISAID API" - cmd = f"./bin/fetch-from-gisaid > {output.ndjson}" - else: - msg = "Fetching from GenBank API" - cmd = f"./bin/fetch-from-genbank > {output.ndjson}" - cleanup_failed_cmd = f"rm {output.ndjson}" - run_shell_command_n_times(cmd, msg, cleanup_failed_cmd) - if send_notifications: - shell("./bin/notify-on-record-change {output.ndjson} {params.file_on_s3_src} {database}") - else: - shell(""" - ./bin/download-from-s3 {params.file_on_s3_dst} {output.ndjson} || \ - ./bin/download-from-s3 {params.file_on_s3_src} {output.ndjson} - """) + touch(f"data/{database}/notify-on-database-change.done") + shell: """ + ./bin/notify-on-record-change {input.local_file:q} {params.remote_file:q} {database} + """ rule download_biosample: From 1ff5c2ed263e98a193096389210c0c28c76de1ea Mon Sep 17 00:00:00 2001 From: Thomas Sibley Date: Fri, 3 Dec 2021 12:13:55 -0800 Subject: [PATCH 3/3] Decompress gisaid.ndjson.bz2 after download instead of during MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Streaming decompression during the network fetch leads to decreased network transfer rates¹ and thus a longer network connection lifetime. As we sometimes see transient network disruptions that cause the transfer to fail, my hypothesis is that reducing connection lifetime by decompressing after download will meaningfully reduce our exposure to transient disruptions. This may come at the cost of potentially increasing overall runtime by some unknown amount, but we could try to address that other ways such as switching to xz (with GISAID's cooperation) for faster decompression and/or performing decompression concurrently with downloading but using a disk spool in between (assuming disk io isn't a bottleneck). ¹ About 75% slower in a single test case I ran on my laptop over the Hutch's wired network. --- README.md | 2 +- Snakefile | 9 ++++++++- bin/fetch-from-gisaid | 3 +-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index fc5c0f61..09da74e7 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Relies on data from https://simplemaps.com/data/us-cities. ## Running locally If you're using Pipenv (see below), then run commands from `./bin/…` inside a `pipenv shell` or wrapped with `pipenv run ./bin/…`. -1. Run `./bin/fetch-from-gisaid > data/gisaid.ndjson` +1. Run `./bin/fetch-from-gisaid | bunzip2 > data/gisaid.ndjson` 2. Run `./bin/transform-gisaid data/gisaid.ndjson` 3. Look at `data/gisaid/sequences.fasta` and `data/gisaid/metadata.tsv` diff --git a/Snakefile b/Snakefile index ab7e0e30..e4362ebf 100644 --- a/Snakefile +++ b/Snakefile @@ -54,7 +54,7 @@ if config.get("fetch_from_database", False): message: """Fetching data using the database API""" output: - ndjson = temp(f"data/{database}.ndjson") + ndjson = temp(f"data/{database}.ndjson" + (".bz2" if database == "gisaid" else "")) run: run_shell_command_n_times( msg = f"Fetching from {database}", @@ -76,6 +76,13 @@ else: """ +rule bunzip2: + message: "Decompressing {input}" + input: "{stem}.bz2" + output: "{stem}" + shell: "bunzip2 {input:q}" + + rule notify_on_database_change: message: "Notifying on database NDJSON change" input: diff --git a/bin/fetch-from-gisaid b/bin/fetch-from-gisaid index d649fb6e..243f6288 100755 --- a/bin/fetch-from-gisaid +++ b/bin/fetch-from-gisaid @@ -6,5 +6,4 @@ set -euo pipefail curl "$GISAID_API_ENDPOINT" \ --user "$GISAID_USERNAME_AND_PASSWORD" \ - --fail --silent --show-error --location-trusted --http1.1 \ - | bunzip2 + --fail --silent --show-error --location-trusted --http1.1