From cc5acf194a425434fc0a16d0dd4e8ccfe4e3ed95 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Fri, 15 Mar 2024 20:16:45 +0100 Subject: [PATCH 01/24] first changes to parallelize download --- examples/example.py | 15 +++++--- src/ibc_api/utils.py | 92 ++++++++++++++++++++++++++++++++++---------- 2 files changed, 81 insertions(+), 26 deletions(-) diff --git a/examples/example.py b/examples/example.py index 4e4b887..b0b2243 100644 --- a/examples/example.py +++ b/examples/example.py @@ -1,13 +1,18 @@ -import ibc_api.utils as ibc - +#%% +import pdb +import sys +sys.path.append('/home/fer/HBP_IBC/api/src/ibc_api') +import utils as ibc +pdb.set_trace() +#%% # Fetch info on all available files # Load as a pandas dataframe and save as ibc_data/available_{data_type}.csv db = ibc.get_info(data_type="volume_maps") # Keep statistic maps for sub-08, for task-Discount -filtered_db = ibc.filter_data(db, subject_list=["08"], task_list=["Discount"]) - +filtered_db = ibc.filter_data(db, subject_list=["04"], task_list=["Lec1"]) +#%% # Download all statistic maps for sub-08, task-Discount # Also creates ibc_data/downloaded_volume_maps.csv # which contains local file paths and time of download -downloaded_db = ibc.download_data(filtered_db) +downloaded_db = ibc.download_data(filtered_db, parallel=True) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 8255764..fc35952 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -1,6 +1,6 @@ """API to fetch IBC data from EBRAINS via Human Data Gateway using siibra. """ - +# %$ import siibra from siibra.retrieval.repositories import EbrainsHdgConnector from siibra.retrieval.requests import EbrainsRequest, SiibraHttpRequestError @@ -10,9 +10,12 @@ from siibra.retrieval.cache import CACHE import pandas as pd from datetime import datetime -from . import metadata as md +#from . import metadata as md +import metadata as md import json import numpy as np +from joblib import Parallel, delayed +import pdb # clear cache CACHE.clear() @@ -385,7 +388,39 @@ def _download_file(src_file, dst_file, connector): return [] -def download_data(db, save_to=None): +def _download_single_row(data_type, local_db_file, save_to, src_file, + dst_file): + """Download a file from ebrains. + + Parameters + ---------- + src_file : str + path to the file on ebrains + dst_file : str + path to save the file to locally + connector : EbrainsHdgConnector + connector to the IBC dataset on ebrains + + Returns + ------- + str, datetime + path to the downloaded file and time at which it was downloaded + """ + print("Going parallel!", src_file) + connector = _connect_ebrains(data_type) + print("Connected to ebrains!") + dst_file = os.path.join(save_to, dst_file) + file_name = _download_file(src_file, dst_file, connector) + file_time = datetime.now() + local_db = _update_local_db(local_db_file, file_name, file_time) + CACHE.run_maintenance() + print(src_file, "Downloaded") + + return local_db + + + +def download_data(db, save_to=None, parallel=True): """Download the files in a (filtered) dataframe. Parameters @@ -406,7 +441,7 @@ def download_data(db, save_to=None): db_length = len(db) if db_length == 0: raise ValueError( - f"The input dataframe is empty. Please make sure that it atleast has columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." + f"The input dataframe is empty. Please make sure that it at least has columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." ) else: print(f"Found {db_length} files to download.") @@ -416,33 +451,48 @@ def download_data(db, save_to=None): raise ValueError( f"The input dataframe should have columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." ) - + # get data type from db data_type = db["dataset"].unique()[0] # connect to ebrains dataset - connector = _connect_ebrains(data_type) + #connector = _connect_ebrains(data_type) # get the file names as they are on ebrains src_file_names, dst_file_names = get_file_paths(db) # set the save directory save_to = _create_root_dir(save_to) # track downloaded file names and times local_db_file = os.path.join(save_to, f"downloaded_{data_type}.csv") + + # Initialize a tqdm progress bar + progress_bar = tqdm(total=len(db), desc="Overall Progress", + colour="green") + + if parallel: + # Use all available cores + n_jobs = 4 + # Download the files in parallel + local_db = Parallel(n_jobs=n_jobs)( + delayed(_download_single_row)(data_type, local_db_file, save_to, + src_file, dst_file) + for src_file, dst_file in zip(src_file_names, dst_file_names) + ) + else: # download the files - for src_file, dst_file in tqdm( - zip(src_file_names, dst_file_names), - position=1, - leave=True, - total=db_length, - desc="Overall Progress: ", - colour="green", - ): - # final file path to save the data - dst_file = os.path.join(save_to, dst_file) - file_name = _download_file(src_file, dst_file, connector) - file_time = datetime.now() - local_db = _update_local_db(local_db_file, file_name, file_time) - # keep cache < 2GB, delete oldest files first - CACHE.run_maintenance() + connector = _connect_ebrains(data_type) + for src_file, dst_file in zip(src_file_names, dst_file_names): + print("Going serial", src_file) + # final file path to save the data + dst_file = os.path.join(save_to, dst_file) + file_name = _download_file(src_file, dst_file, connector) + file_time = datetime.now() + local_db = _update_local_db(local_db_file, file_name, file_time) + print(src_file, "Downloaded") + # keep cache < 2GB, delete oldest files first + CACHE.run_maintenance() + + # close the progress bar + progress_bar.close() + print( f"Downloaded requested files from IBC {data_type} dataset. See " f"{local_db_file} for details." From 4bc3a13a66f4813b8d130d20aef458f9d54664f5 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Mon, 15 Jul 2024 18:45:51 +0200 Subject: [PATCH 02/24] going paralleeeeeeel --- src/ibc_api/utils.py | 116 +++++++++++++++---------------------------- 1 file changed, 40 insertions(+), 76 deletions(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index fc35952..93739e1 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -1,5 +1,6 @@ """API to fetch IBC data from EBRAINS via Human Data Gateway using siibra. """ + # %$ import siibra from siibra.retrieval.repositories import EbrainsHdgConnector @@ -10,7 +11,8 @@ from siibra.retrieval.cache import CACHE import pandas as pd from datetime import datetime -#from . import metadata as md + +# from . import metadata as md import metadata as md import json import numpy as np @@ -270,17 +272,15 @@ def get_file_paths(db, metadata=METADATA): return remote_file_names, local_file_names -def _update_local_db(db_file, file_names, file_times): +def _update_local_db(db_file, files_data): """Update the local database of downloaded files. Parameters ---------- db_file : str path to the local database file - file_names : str or list - path to the downloaded file(s) - file_times : str or list - time at which the file(s) were downloaded + files_data : list of tuples + list of tuples where each tuple contains (file_name, file_time) Returns ------- @@ -288,9 +288,8 @@ def _update_local_db(db_file, file_names, file_times): updated local database """ - if type(file_names) is str: - file_names = [file_names] - file_times = [file_times] + file_names = [file_data[0] for file_data in files_data] + file_times = [file_data[1] for file_data in files_data] if not os.path.exists(db_file): # create a new database @@ -381,46 +380,14 @@ def _download_file(src_file, dst_file, connector): os.makedirs(dst_file_dir, exist_ok=True) # save the file locally dst_file = _write_file(dst_file, src_data) - return dst_file + download_time = datetime.now() + return dst_file, download_time else: print(f"File {dst_file} already exists, skipping download.") - - return [] + return dst_file, None -def _download_single_row(data_type, local_db_file, save_to, src_file, - dst_file): - """Download a file from ebrains. - - Parameters - ---------- - src_file : str - path to the file on ebrains - dst_file : str - path to save the file to locally - connector : EbrainsHdgConnector - connector to the IBC dataset on ebrains - - Returns - ------- - str, datetime - path to the downloaded file and time at which it was downloaded - """ - print("Going parallel!", src_file) - connector = _connect_ebrains(data_type) - print("Connected to ebrains!") - dst_file = os.path.join(save_to, dst_file) - file_name = _download_file(src_file, dst_file, connector) - file_time = datetime.now() - local_db = _update_local_db(local_db_file, file_name, file_time) - CACHE.run_maintenance() - print(src_file, "Downloaded") - - return local_db - - - -def download_data(db, save_to=None, parallel=True): +def download_data(db, save_to=None): """Download the files in a (filtered) dataframe. Parameters @@ -441,7 +408,7 @@ def download_data(db, save_to=None, parallel=True): db_length = len(db) if db_length == 0: raise ValueError( - f"The input dataframe is empty. Please make sure that it at least has columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." + f"The input dataframe is empty. Please make sure that it atleast has columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." ) else: print(f"Found {db_length} files to download.") @@ -451,11 +418,11 @@ def download_data(db, save_to=None, parallel=True): raise ValueError( f"The input dataframe should have columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." ) - + # get data type from db data_type = db["dataset"].unique()[0] # connect to ebrains dataset - #connector = _connect_ebrains(data_type) + connector = _connect_ebrains(data_type) # get the file names as they are on ebrains src_file_names, dst_file_names = get_file_paths(db) # set the save directory @@ -463,36 +430,33 @@ def download_data(db, save_to=None, parallel=True): # track downloaded file names and times local_db_file = os.path.join(save_to, f"downloaded_{data_type}.csv") - # Initialize a tqdm progress bar - progress_bar = tqdm(total=len(db), desc="Overall Progress", - colour="green") - - if parallel: - # Use all available cores - n_jobs = 4 - # Download the files in parallel - local_db = Parallel(n_jobs=n_jobs)( - delayed(_download_single_row)(data_type, local_db_file, save_to, - src_file, dst_file) - for src_file, dst_file in zip(src_file_names, dst_file_names) - ) - else: - # download the files - connector = _connect_ebrains(data_type) - for src_file, dst_file in zip(src_file_names, dst_file_names): - print("Going serial", src_file) - # final file path to save the data - dst_file = os.path.join(save_to, dst_file) + """ + def download_and_update(src_file, dst_file, connector): + if not os.path.exists(dst_file): + #dst_file_path = os.path.join(save_to, dst_file) file_name = _download_file(src_file, dst_file, connector) file_time = datetime.now() - local_db = _update_local_db(local_db_file, file_name, file_time) - print(src_file, "Downloaded") - # keep cache < 2GB, delete oldest files first - CACHE.run_maintenance() - - # close the progress bar - progress_bar.close() - + """ + + # download the files + results = Parallel(n_jobs=4, backend="threading")( + delayed(_download_file)(src_file, dst_file, connector) + for src_file, dst_file in tqdm( + zip(src_file_names, dst_file_names), + position=1, + leave=True, + total=db_length, + desc="Overall Progress: ", + colour="green", + ) + ) + + # filter out results with None as download time to save a clean database + results = [result for result in results if result[1] is not None] + local_db = _update_local_db(local_db_file, results) + + CACHE.run_maintenance() + print( f"Downloaded requested files from IBC {data_type} dataset. See " f"{local_db_file} for details." From 251c3137b29e066d9a67422edf5e0a7d279e3715 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Wed, 17 Jul 2024 17:25:31 +0200 Subject: [PATCH 03/24] minor changes --- src/ibc_api/utils.py | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 93739e1..970b0e2 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -1,23 +1,23 @@ """API to fetch IBC data from EBRAINS via Human Data Gateway using siibra. """ +import json +import os +from datetime import datetime + +import nibabel +import numpy as np +import pandas as pd + # %$ import siibra +from joblib import Parallel, delayed +from siibra.retrieval.cache import CACHE from siibra.retrieval.repositories import EbrainsHdgConnector from siibra.retrieval.requests import EbrainsRequest, SiibraHttpRequestError -import os from tqdm import tqdm -import nibabel -from siibra.retrieval.cache import CACHE -import pandas as pd -from datetime import datetime -# from . import metadata as md -import metadata as md -import json -import numpy as np -from joblib import Parallel, delayed -import pdb +from . import metadata as md # clear cache CACHE.clear() @@ -430,14 +430,6 @@ def download_data(db, save_to=None): # track downloaded file names and times local_db_file = os.path.join(save_to, f"downloaded_{data_type}.csv") - """ - def download_and_update(src_file, dst_file, connector): - if not os.path.exists(dst_file): - #dst_file_path = os.path.join(save_to, dst_file) - file_name = _download_file(src_file, dst_file, connector) - file_time = datetime.now() - """ - # download the files results = Parallel(n_jobs=4, backend="threading")( delayed(_download_file)(src_file, dst_file, connector) From 3557bf9681e7192899f53714bf1f5e5218eaad02 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Thu, 18 Jul 2024 16:59:28 +0200 Subject: [PATCH 04/24] going paralleeeeeeeeel (better I hope) --- src/ibc_api/utils.py | 72 ++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 29 deletions(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 970b0e2..f475fcc 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -1,6 +1,7 @@ """API to fetch IBC data from EBRAINS via Human Data Gateway using siibra. """ +# %$ import json import os from datetime import datetime @@ -8,8 +9,6 @@ import nibabel import numpy as np import pandas as pd - -# %$ import siibra from joblib import Parallel, delayed from siibra.retrieval.cache import CACHE @@ -231,7 +230,7 @@ def filter_data(db, subject_list=SUBJECTS, task_list=False): return filtered_db -def get_file_paths(db, metadata=METADATA): +def get_file_paths(db, metadata=METADATA, save_to_dir=None): """Get the remote and local file paths for each file in a (filtered) dataframe. Parameters @@ -256,7 +255,10 @@ def get_file_paths(db, metadata=METADATA): remote_file_names = [] local_file_names = [] remote_root_dir = md.select_dataset(data_type, metadata)["root"] - local_root_dir = data_type + if save_to_dir == None: + local_root_dir = data_type + else: + local_root_dir = os.path.join(save_to_dir, data_type) for file in file_names: # put the file path together # always use "/" as the separator for remote file paths @@ -272,7 +274,8 @@ def get_file_paths(db, metadata=METADATA): return remote_file_names, local_file_names -def _update_local_db(db_file, files_data): +# def _update_local_db(db_file, files_data): +def _update_local_db(db_file, file_names, file_times): """Update the local database of downloaded files. Parameters @@ -288,8 +291,12 @@ def _update_local_db(db_file, files_data): updated local database """ - file_names = [file_data[0] for file_data in files_data] - file_times = [file_data[1] for file_data in files_data] + # file_names = [file_data[0] for file_data in files_data] + # file_times = [file_data[1] for file_data in files_data] + + if type(file_names) is str: + file_names = [file_names] + file_times = [file_times] if not os.path.exists(db_file): # create a new database @@ -372,6 +379,7 @@ def _download_file(src_file, dst_file, connector): str, datetime path to the downloaded file and time at which it was downloaded """ + # CACHE.run_maintenance() if not os.path.exists(dst_file): # load the file from ebrains src_data = connector.get(src_file) @@ -380,11 +388,11 @@ def _download_file(src_file, dst_file, connector): os.makedirs(dst_file_dir, exist_ok=True) # save the file locally dst_file = _write_file(dst_file, src_data) - download_time = datetime.now() - return dst_file, download_time + # download_time = datetime.now() + return dst_file else: print(f"File {dst_file} already exists, skipping download.") - return dst_file, None + return dst_file def download_data(db, save_to=None): @@ -423,35 +431,41 @@ def download_data(db, save_to=None): data_type = db["dataset"].unique()[0] # connect to ebrains dataset connector = _connect_ebrains(data_type) - # get the file names as they are on ebrains - src_file_names, dst_file_names = get_file_paths(db) # set the save directory save_to = _create_root_dir(save_to) # track downloaded file names and times local_db_file = os.path.join(save_to, f"downloaded_{data_type}.csv") + # get the file names as they are on ebrains + src_file_names, dst_file_names = get_file_paths(db, save_to_dir=save_to) # download the files - results = Parallel(n_jobs=4, backend="threading")( - delayed(_download_file)(src_file, dst_file, connector) - for src_file, dst_file in tqdm( - zip(src_file_names, dst_file_names), - position=1, - leave=True, - total=db_length, - desc="Overall Progress: ", - colour="green", + with tqdm( + total=db_length, + position=1, + leave=True, + desc="Overall Progress", + colour="green", + ) as pbar: + + def _download_and_update_progress(src_file, dst_file, connector): + file_name = _download_file(src_file, dst_file, connector) + file_time = datetime.now() + local_db = _update_local_db(local_db_file, file_name, file_time) + CACHE.run_maintenance() # keep cache < 2GB + pbar.update(1) + + return file_name, file_time, local_db + + results = Parallel(n_jobs=8, backend="threading")( + delayed(_download_and_update_progress)( + src_file, dst_file, connector + ) + for src_file, dst_file in zip(src_file_names, dst_file_names) ) - ) - - # filter out results with None as download time to save a clean database - results = [result for result in results if result[1] is not None] - local_db = _update_local_db(local_db_file, results) - - CACHE.run_maintenance() print( f"Downloaded requested files from IBC {data_type} dataset. See " f"{local_db_file} for details." ) - return local_db + return results From 1362c1272a94926955c975f6d57ec6b9b9118417 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Thu, 18 Jul 2024 17:03:45 +0200 Subject: [PATCH 05/24] untrack the token --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 7761abd..4f6f7e8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ src/ibc_api.egg-info src/ibc_api/__pycache__ src/ibc-api ibc_data -.ipynb_checkpoints \ No newline at end of file +.ipynb_checkpoints +src/ibc_api/data/token \ No newline at end of file From 855a49f044f18edb1e86d84eac9614496ded5554 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Thu, 18 Jul 2024 17:05:55 +0200 Subject: [PATCH 06/24] adding joblib to dependencies --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 9e13a83..e7a0331 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,4 +11,5 @@ dependencies = [ "nibabel", "pandas", "tqdm", + "joblib", ] \ No newline at end of file From dae7251445e561f43d1f3cdd1b5947f1b4f05b66 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Wed, 7 Aug 2024 18:03:36 +0200 Subject: [PATCH 07/24] removing tqdm dependency and changes after testing --- src/ibc_api/utils.py | 56 ++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index f475fcc..bece3dd 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -10,11 +10,10 @@ import numpy as np import pandas as pd import siibra -from joblib import Parallel, delayed +from joblib import Memory, Parallel, delayed from siibra.retrieval.cache import CACHE from siibra.retrieval.repositories import EbrainsHdgConnector from siibra.retrieval.requests import EbrainsRequest, SiibraHttpRequestError -from tqdm import tqdm from . import metadata as md @@ -31,6 +30,11 @@ TOKEN_ROOT = os.path.join(os.path.dirname(__file__), "data") os.makedirs(TOKEN_ROOT, exist_ok=True) +# memory cache +joblib_cache_dir = os.path.join(os.path.dirname(__file__), "cache") +os.makedirs(joblib_cache_dir, exist_ok=True) +memory = Memory(joblib_cache_dir, verbose=0) + def _authenticate(token_dir=TOKEN_ROOT): """This function authenticates you to EBRAINS. It would return a link that @@ -274,7 +278,6 @@ def get_file_paths(db, metadata=METADATA, save_to_dir=None): return remote_file_names, local_file_names -# def _update_local_db(db_file, files_data): def _update_local_db(db_file, file_names, file_times): """Update the local database of downloaded files. @@ -291,9 +294,6 @@ def _update_local_db(db_file, file_names, file_times): updated local database """ - # file_names = [file_data[0] for file_data in files_data] - # file_times = [file_data[1] for file_data in files_data] - if type(file_names) is str: file_names = [file_names] file_times = [file_times] @@ -362,6 +362,7 @@ def _write_file(file, data): return file +@memory.cache def _download_file(src_file, dst_file, connector): """Download a file from ebrains. @@ -395,7 +396,8 @@ def _download_file(src_file, dst_file, connector): return dst_file -def download_data(db, save_to=None): +# download the files +def download_data(db, num_jobs=4, save_to=None): """Download the files in a (filtered) dataframe. Parameters @@ -430,6 +432,7 @@ def download_data(db, save_to=None): # get data type from db data_type = db["dataset"].unique()[0] # connect to ebrains dataset + print("... Fetching token and connecting to EBRAINS ...") connector = _connect_ebrains(data_type) # set the save directory save_to = _create_root_dir(save_to) @@ -438,30 +441,21 @@ def download_data(db, save_to=None): # get the file names as they are on ebrains src_file_names, dst_file_names = get_file_paths(db, save_to_dir=save_to) - # download the files - with tqdm( - total=db_length, - position=1, - leave=True, - desc="Overall Progress", - colour="green", - ) as pbar: - - def _download_and_update_progress(src_file, dst_file, connector): - file_name = _download_file(src_file, dst_file, connector) - file_time = datetime.now() - local_db = _update_local_db(local_db_file, file_name, file_time) - CACHE.run_maintenance() # keep cache < 2GB - pbar.update(1) - - return file_name, file_time, local_db - - results = Parallel(n_jobs=8, backend="threading")( - delayed(_download_and_update_progress)( - src_file, dst_file, connector - ) - for src_file, dst_file in zip(src_file_names, dst_file_names) - ) + # helper to process the parallel download + def _download_and_update_progress(src_file, dst_file, connector): + file_name = _download_file(src_file, dst_file, connector) + file_time = datetime.now() + local_db = _update_local_db(local_db_file, file_name, file_time) + CACHE.run_maintenance() # keep cache < 2GB + + return file_name, file_time, local_db + + # download finally + print("...Starting download...") + results = Parallel(n_jobs=num_jobs, backend="threading", verbose=10)( + delayed(_download_and_update_progress)(src_file, dst_file, connector) + for src_file, dst_file in zip(src_file_names, dst_file_names) + ) print( f"Downloaded requested files from IBC {data_type} dataset. See " From 871efa82b9111ace60d461408b0bc16d9f124671 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Fri, 15 Mar 2024 20:16:45 +0100 Subject: [PATCH 08/24] first changes to parallelize download --- examples/example.py | 15 +++++--- src/ibc_api/utils.py | 92 ++++++++++++++++++++++++++++++++++---------- 2 files changed, 81 insertions(+), 26 deletions(-) diff --git a/examples/example.py b/examples/example.py index 4e4b887..b0b2243 100644 --- a/examples/example.py +++ b/examples/example.py @@ -1,13 +1,18 @@ -import ibc_api.utils as ibc - +#%% +import pdb +import sys +sys.path.append('/home/fer/HBP_IBC/api/src/ibc_api') +import utils as ibc +pdb.set_trace() +#%% # Fetch info on all available files # Load as a pandas dataframe and save as ibc_data/available_{data_type}.csv db = ibc.get_info(data_type="volume_maps") # Keep statistic maps for sub-08, for task-Discount -filtered_db = ibc.filter_data(db, subject_list=["08"], task_list=["Discount"]) - +filtered_db = ibc.filter_data(db, subject_list=["04"], task_list=["Lec1"]) +#%% # Download all statistic maps for sub-08, task-Discount # Also creates ibc_data/downloaded_volume_maps.csv # which contains local file paths and time of download -downloaded_db = ibc.download_data(filtered_db) +downloaded_db = ibc.download_data(filtered_db, parallel=True) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 8255764..fc35952 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -1,6 +1,6 @@ """API to fetch IBC data from EBRAINS via Human Data Gateway using siibra. """ - +# %$ import siibra from siibra.retrieval.repositories import EbrainsHdgConnector from siibra.retrieval.requests import EbrainsRequest, SiibraHttpRequestError @@ -10,9 +10,12 @@ from siibra.retrieval.cache import CACHE import pandas as pd from datetime import datetime -from . import metadata as md +#from . import metadata as md +import metadata as md import json import numpy as np +from joblib import Parallel, delayed +import pdb # clear cache CACHE.clear() @@ -385,7 +388,39 @@ def _download_file(src_file, dst_file, connector): return [] -def download_data(db, save_to=None): +def _download_single_row(data_type, local_db_file, save_to, src_file, + dst_file): + """Download a file from ebrains. + + Parameters + ---------- + src_file : str + path to the file on ebrains + dst_file : str + path to save the file to locally + connector : EbrainsHdgConnector + connector to the IBC dataset on ebrains + + Returns + ------- + str, datetime + path to the downloaded file and time at which it was downloaded + """ + print("Going parallel!", src_file) + connector = _connect_ebrains(data_type) + print("Connected to ebrains!") + dst_file = os.path.join(save_to, dst_file) + file_name = _download_file(src_file, dst_file, connector) + file_time = datetime.now() + local_db = _update_local_db(local_db_file, file_name, file_time) + CACHE.run_maintenance() + print(src_file, "Downloaded") + + return local_db + + + +def download_data(db, save_to=None, parallel=True): """Download the files in a (filtered) dataframe. Parameters @@ -406,7 +441,7 @@ def download_data(db, save_to=None): db_length = len(db) if db_length == 0: raise ValueError( - f"The input dataframe is empty. Please make sure that it atleast has columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." + f"The input dataframe is empty. Please make sure that it at least has columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." ) else: print(f"Found {db_length} files to download.") @@ -416,33 +451,48 @@ def download_data(db, save_to=None): raise ValueError( f"The input dataframe should have columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." ) - + # get data type from db data_type = db["dataset"].unique()[0] # connect to ebrains dataset - connector = _connect_ebrains(data_type) + #connector = _connect_ebrains(data_type) # get the file names as they are on ebrains src_file_names, dst_file_names = get_file_paths(db) # set the save directory save_to = _create_root_dir(save_to) # track downloaded file names and times local_db_file = os.path.join(save_to, f"downloaded_{data_type}.csv") + + # Initialize a tqdm progress bar + progress_bar = tqdm(total=len(db), desc="Overall Progress", + colour="green") + + if parallel: + # Use all available cores + n_jobs = 4 + # Download the files in parallel + local_db = Parallel(n_jobs=n_jobs)( + delayed(_download_single_row)(data_type, local_db_file, save_to, + src_file, dst_file) + for src_file, dst_file in zip(src_file_names, dst_file_names) + ) + else: # download the files - for src_file, dst_file in tqdm( - zip(src_file_names, dst_file_names), - position=1, - leave=True, - total=db_length, - desc="Overall Progress: ", - colour="green", - ): - # final file path to save the data - dst_file = os.path.join(save_to, dst_file) - file_name = _download_file(src_file, dst_file, connector) - file_time = datetime.now() - local_db = _update_local_db(local_db_file, file_name, file_time) - # keep cache < 2GB, delete oldest files first - CACHE.run_maintenance() + connector = _connect_ebrains(data_type) + for src_file, dst_file in zip(src_file_names, dst_file_names): + print("Going serial", src_file) + # final file path to save the data + dst_file = os.path.join(save_to, dst_file) + file_name = _download_file(src_file, dst_file, connector) + file_time = datetime.now() + local_db = _update_local_db(local_db_file, file_name, file_time) + print(src_file, "Downloaded") + # keep cache < 2GB, delete oldest files first + CACHE.run_maintenance() + + # close the progress bar + progress_bar.close() + print( f"Downloaded requested files from IBC {data_type} dataset. See " f"{local_db_file} for details." From a0f46fc42177fea73d5342a794ef2bc343fc37ad Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Mon, 15 Jul 2024 18:45:51 +0200 Subject: [PATCH 09/24] going paralleeeeeeel --- src/ibc_api/utils.py | 116 +++++++++++++++---------------------------- 1 file changed, 40 insertions(+), 76 deletions(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index fc35952..93739e1 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -1,5 +1,6 @@ """API to fetch IBC data from EBRAINS via Human Data Gateway using siibra. """ + # %$ import siibra from siibra.retrieval.repositories import EbrainsHdgConnector @@ -10,7 +11,8 @@ from siibra.retrieval.cache import CACHE import pandas as pd from datetime import datetime -#from . import metadata as md + +# from . import metadata as md import metadata as md import json import numpy as np @@ -270,17 +272,15 @@ def get_file_paths(db, metadata=METADATA): return remote_file_names, local_file_names -def _update_local_db(db_file, file_names, file_times): +def _update_local_db(db_file, files_data): """Update the local database of downloaded files. Parameters ---------- db_file : str path to the local database file - file_names : str or list - path to the downloaded file(s) - file_times : str or list - time at which the file(s) were downloaded + files_data : list of tuples + list of tuples where each tuple contains (file_name, file_time) Returns ------- @@ -288,9 +288,8 @@ def _update_local_db(db_file, file_names, file_times): updated local database """ - if type(file_names) is str: - file_names = [file_names] - file_times = [file_times] + file_names = [file_data[0] for file_data in files_data] + file_times = [file_data[1] for file_data in files_data] if not os.path.exists(db_file): # create a new database @@ -381,46 +380,14 @@ def _download_file(src_file, dst_file, connector): os.makedirs(dst_file_dir, exist_ok=True) # save the file locally dst_file = _write_file(dst_file, src_data) - return dst_file + download_time = datetime.now() + return dst_file, download_time else: print(f"File {dst_file} already exists, skipping download.") - - return [] + return dst_file, None -def _download_single_row(data_type, local_db_file, save_to, src_file, - dst_file): - """Download a file from ebrains. - - Parameters - ---------- - src_file : str - path to the file on ebrains - dst_file : str - path to save the file to locally - connector : EbrainsHdgConnector - connector to the IBC dataset on ebrains - - Returns - ------- - str, datetime - path to the downloaded file and time at which it was downloaded - """ - print("Going parallel!", src_file) - connector = _connect_ebrains(data_type) - print("Connected to ebrains!") - dst_file = os.path.join(save_to, dst_file) - file_name = _download_file(src_file, dst_file, connector) - file_time = datetime.now() - local_db = _update_local_db(local_db_file, file_name, file_time) - CACHE.run_maintenance() - print(src_file, "Downloaded") - - return local_db - - - -def download_data(db, save_to=None, parallel=True): +def download_data(db, save_to=None): """Download the files in a (filtered) dataframe. Parameters @@ -441,7 +408,7 @@ def download_data(db, save_to=None, parallel=True): db_length = len(db) if db_length == 0: raise ValueError( - f"The input dataframe is empty. Please make sure that it at least has columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." + f"The input dataframe is empty. Please make sure that it atleast has columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." ) else: print(f"Found {db_length} files to download.") @@ -451,11 +418,11 @@ def download_data(db, save_to=None, parallel=True): raise ValueError( f"The input dataframe should have columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." ) - + # get data type from db data_type = db["dataset"].unique()[0] # connect to ebrains dataset - #connector = _connect_ebrains(data_type) + connector = _connect_ebrains(data_type) # get the file names as they are on ebrains src_file_names, dst_file_names = get_file_paths(db) # set the save directory @@ -463,36 +430,33 @@ def download_data(db, save_to=None, parallel=True): # track downloaded file names and times local_db_file = os.path.join(save_to, f"downloaded_{data_type}.csv") - # Initialize a tqdm progress bar - progress_bar = tqdm(total=len(db), desc="Overall Progress", - colour="green") - - if parallel: - # Use all available cores - n_jobs = 4 - # Download the files in parallel - local_db = Parallel(n_jobs=n_jobs)( - delayed(_download_single_row)(data_type, local_db_file, save_to, - src_file, dst_file) - for src_file, dst_file in zip(src_file_names, dst_file_names) - ) - else: - # download the files - connector = _connect_ebrains(data_type) - for src_file, dst_file in zip(src_file_names, dst_file_names): - print("Going serial", src_file) - # final file path to save the data - dst_file = os.path.join(save_to, dst_file) + """ + def download_and_update(src_file, dst_file, connector): + if not os.path.exists(dst_file): + #dst_file_path = os.path.join(save_to, dst_file) file_name = _download_file(src_file, dst_file, connector) file_time = datetime.now() - local_db = _update_local_db(local_db_file, file_name, file_time) - print(src_file, "Downloaded") - # keep cache < 2GB, delete oldest files first - CACHE.run_maintenance() - - # close the progress bar - progress_bar.close() - + """ + + # download the files + results = Parallel(n_jobs=4, backend="threading")( + delayed(_download_file)(src_file, dst_file, connector) + for src_file, dst_file in tqdm( + zip(src_file_names, dst_file_names), + position=1, + leave=True, + total=db_length, + desc="Overall Progress: ", + colour="green", + ) + ) + + # filter out results with None as download time to save a clean database + results = [result for result in results if result[1] is not None] + local_db = _update_local_db(local_db_file, results) + + CACHE.run_maintenance() + print( f"Downloaded requested files from IBC {data_type} dataset. See " f"{local_db_file} for details." From c6073b6a9662a63b670aa4ef67e6d093f6bcbc5b Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Wed, 17 Jul 2024 17:25:31 +0200 Subject: [PATCH 10/24] minor changes --- src/ibc_api/utils.py | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 93739e1..970b0e2 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -1,23 +1,23 @@ """API to fetch IBC data from EBRAINS via Human Data Gateway using siibra. """ +import json +import os +from datetime import datetime + +import nibabel +import numpy as np +import pandas as pd + # %$ import siibra +from joblib import Parallel, delayed +from siibra.retrieval.cache import CACHE from siibra.retrieval.repositories import EbrainsHdgConnector from siibra.retrieval.requests import EbrainsRequest, SiibraHttpRequestError -import os from tqdm import tqdm -import nibabel -from siibra.retrieval.cache import CACHE -import pandas as pd -from datetime import datetime -# from . import metadata as md -import metadata as md -import json -import numpy as np -from joblib import Parallel, delayed -import pdb +from . import metadata as md # clear cache CACHE.clear() @@ -430,14 +430,6 @@ def download_data(db, save_to=None): # track downloaded file names and times local_db_file = os.path.join(save_to, f"downloaded_{data_type}.csv") - """ - def download_and_update(src_file, dst_file, connector): - if not os.path.exists(dst_file): - #dst_file_path = os.path.join(save_to, dst_file) - file_name = _download_file(src_file, dst_file, connector) - file_time = datetime.now() - """ - # download the files results = Parallel(n_jobs=4, backend="threading")( delayed(_download_file)(src_file, dst_file, connector) From 5e8a6b8596b27fdb30fa2807fcd1e711e843598c Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Thu, 18 Jul 2024 16:59:28 +0200 Subject: [PATCH 11/24] going paralleeeeeeeeel (better I hope) --- src/ibc_api/utils.py | 72 ++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 29 deletions(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 970b0e2..f475fcc 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -1,6 +1,7 @@ """API to fetch IBC data from EBRAINS via Human Data Gateway using siibra. """ +# %$ import json import os from datetime import datetime @@ -8,8 +9,6 @@ import nibabel import numpy as np import pandas as pd - -# %$ import siibra from joblib import Parallel, delayed from siibra.retrieval.cache import CACHE @@ -231,7 +230,7 @@ def filter_data(db, subject_list=SUBJECTS, task_list=False): return filtered_db -def get_file_paths(db, metadata=METADATA): +def get_file_paths(db, metadata=METADATA, save_to_dir=None): """Get the remote and local file paths for each file in a (filtered) dataframe. Parameters @@ -256,7 +255,10 @@ def get_file_paths(db, metadata=METADATA): remote_file_names = [] local_file_names = [] remote_root_dir = md.select_dataset(data_type, metadata)["root"] - local_root_dir = data_type + if save_to_dir == None: + local_root_dir = data_type + else: + local_root_dir = os.path.join(save_to_dir, data_type) for file in file_names: # put the file path together # always use "/" as the separator for remote file paths @@ -272,7 +274,8 @@ def get_file_paths(db, metadata=METADATA): return remote_file_names, local_file_names -def _update_local_db(db_file, files_data): +# def _update_local_db(db_file, files_data): +def _update_local_db(db_file, file_names, file_times): """Update the local database of downloaded files. Parameters @@ -288,8 +291,12 @@ def _update_local_db(db_file, files_data): updated local database """ - file_names = [file_data[0] for file_data in files_data] - file_times = [file_data[1] for file_data in files_data] + # file_names = [file_data[0] for file_data in files_data] + # file_times = [file_data[1] for file_data in files_data] + + if type(file_names) is str: + file_names = [file_names] + file_times = [file_times] if not os.path.exists(db_file): # create a new database @@ -372,6 +379,7 @@ def _download_file(src_file, dst_file, connector): str, datetime path to the downloaded file and time at which it was downloaded """ + # CACHE.run_maintenance() if not os.path.exists(dst_file): # load the file from ebrains src_data = connector.get(src_file) @@ -380,11 +388,11 @@ def _download_file(src_file, dst_file, connector): os.makedirs(dst_file_dir, exist_ok=True) # save the file locally dst_file = _write_file(dst_file, src_data) - download_time = datetime.now() - return dst_file, download_time + # download_time = datetime.now() + return dst_file else: print(f"File {dst_file} already exists, skipping download.") - return dst_file, None + return dst_file def download_data(db, save_to=None): @@ -423,35 +431,41 @@ def download_data(db, save_to=None): data_type = db["dataset"].unique()[0] # connect to ebrains dataset connector = _connect_ebrains(data_type) - # get the file names as they are on ebrains - src_file_names, dst_file_names = get_file_paths(db) # set the save directory save_to = _create_root_dir(save_to) # track downloaded file names and times local_db_file = os.path.join(save_to, f"downloaded_{data_type}.csv") + # get the file names as they are on ebrains + src_file_names, dst_file_names = get_file_paths(db, save_to_dir=save_to) # download the files - results = Parallel(n_jobs=4, backend="threading")( - delayed(_download_file)(src_file, dst_file, connector) - for src_file, dst_file in tqdm( - zip(src_file_names, dst_file_names), - position=1, - leave=True, - total=db_length, - desc="Overall Progress: ", - colour="green", + with tqdm( + total=db_length, + position=1, + leave=True, + desc="Overall Progress", + colour="green", + ) as pbar: + + def _download_and_update_progress(src_file, dst_file, connector): + file_name = _download_file(src_file, dst_file, connector) + file_time = datetime.now() + local_db = _update_local_db(local_db_file, file_name, file_time) + CACHE.run_maintenance() # keep cache < 2GB + pbar.update(1) + + return file_name, file_time, local_db + + results = Parallel(n_jobs=8, backend="threading")( + delayed(_download_and_update_progress)( + src_file, dst_file, connector + ) + for src_file, dst_file in zip(src_file_names, dst_file_names) ) - ) - - # filter out results with None as download time to save a clean database - results = [result for result in results if result[1] is not None] - local_db = _update_local_db(local_db_file, results) - - CACHE.run_maintenance() print( f"Downloaded requested files from IBC {data_type} dataset. See " f"{local_db_file} for details." ) - return local_db + return results From dfe61ebc53e3bd54e613cc66fabed4e1bf12de20 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Thu, 18 Jul 2024 17:03:45 +0200 Subject: [PATCH 12/24] untrack the token --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 7761abd..4f6f7e8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ src/ibc_api.egg-info src/ibc_api/__pycache__ src/ibc-api ibc_data -.ipynb_checkpoints \ No newline at end of file +.ipynb_checkpoints +src/ibc_api/data/token \ No newline at end of file From b9784d02f8b8aa6450b82aa4c8250c8250c74344 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Thu, 18 Jul 2024 17:05:55 +0200 Subject: [PATCH 13/24] adding joblib to dependencies --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 9e13a83..e7a0331 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,4 +11,5 @@ dependencies = [ "nibabel", "pandas", "tqdm", + "joblib", ] \ No newline at end of file From 29836c5c0233243d358e800b8aed0e3572603313 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Wed, 7 Aug 2024 18:03:36 +0200 Subject: [PATCH 14/24] removing tqdm dependency and changes after testing --- src/ibc_api/utils.py | 56 ++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index f475fcc..bece3dd 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -10,11 +10,10 @@ import numpy as np import pandas as pd import siibra -from joblib import Parallel, delayed +from joblib import Memory, Parallel, delayed from siibra.retrieval.cache import CACHE from siibra.retrieval.repositories import EbrainsHdgConnector from siibra.retrieval.requests import EbrainsRequest, SiibraHttpRequestError -from tqdm import tqdm from . import metadata as md @@ -31,6 +30,11 @@ TOKEN_ROOT = os.path.join(os.path.dirname(__file__), "data") os.makedirs(TOKEN_ROOT, exist_ok=True) +# memory cache +joblib_cache_dir = os.path.join(os.path.dirname(__file__), "cache") +os.makedirs(joblib_cache_dir, exist_ok=True) +memory = Memory(joblib_cache_dir, verbose=0) + def _authenticate(token_dir=TOKEN_ROOT): """This function authenticates you to EBRAINS. It would return a link that @@ -274,7 +278,6 @@ def get_file_paths(db, metadata=METADATA, save_to_dir=None): return remote_file_names, local_file_names -# def _update_local_db(db_file, files_data): def _update_local_db(db_file, file_names, file_times): """Update the local database of downloaded files. @@ -291,9 +294,6 @@ def _update_local_db(db_file, file_names, file_times): updated local database """ - # file_names = [file_data[0] for file_data in files_data] - # file_times = [file_data[1] for file_data in files_data] - if type(file_names) is str: file_names = [file_names] file_times = [file_times] @@ -362,6 +362,7 @@ def _write_file(file, data): return file +@memory.cache def _download_file(src_file, dst_file, connector): """Download a file from ebrains. @@ -395,7 +396,8 @@ def _download_file(src_file, dst_file, connector): return dst_file -def download_data(db, save_to=None): +# download the files +def download_data(db, num_jobs=4, save_to=None): """Download the files in a (filtered) dataframe. Parameters @@ -430,6 +432,7 @@ def download_data(db, save_to=None): # get data type from db data_type = db["dataset"].unique()[0] # connect to ebrains dataset + print("... Fetching token and connecting to EBRAINS ...") connector = _connect_ebrains(data_type) # set the save directory save_to = _create_root_dir(save_to) @@ -438,30 +441,21 @@ def download_data(db, save_to=None): # get the file names as they are on ebrains src_file_names, dst_file_names = get_file_paths(db, save_to_dir=save_to) - # download the files - with tqdm( - total=db_length, - position=1, - leave=True, - desc="Overall Progress", - colour="green", - ) as pbar: - - def _download_and_update_progress(src_file, dst_file, connector): - file_name = _download_file(src_file, dst_file, connector) - file_time = datetime.now() - local_db = _update_local_db(local_db_file, file_name, file_time) - CACHE.run_maintenance() # keep cache < 2GB - pbar.update(1) - - return file_name, file_time, local_db - - results = Parallel(n_jobs=8, backend="threading")( - delayed(_download_and_update_progress)( - src_file, dst_file, connector - ) - for src_file, dst_file in zip(src_file_names, dst_file_names) - ) + # helper to process the parallel download + def _download_and_update_progress(src_file, dst_file, connector): + file_name = _download_file(src_file, dst_file, connector) + file_time = datetime.now() + local_db = _update_local_db(local_db_file, file_name, file_time) + CACHE.run_maintenance() # keep cache < 2GB + + return file_name, file_time, local_db + + # download finally + print("...Starting download...") + results = Parallel(n_jobs=num_jobs, backend="threading", verbose=10)( + delayed(_download_and_update_progress)(src_file, dst_file, connector) + for src_file, dst_file in zip(src_file_names, dst_file_names) + ) print( f"Downloaded requested files from IBC {data_type} dataset. See " From 030c22b17a770780604ca9e71fe92cdd05aef90f Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Thu, 8 Aug 2024 15:33:58 +0200 Subject: [PATCH 15/24] managing download dataframe differently --- src/ibc_api/utils.py | 84 +++++++++++++++++++++++++++----------------- 1 file changed, 52 insertions(+), 32 deletions(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index bece3dd..2a5a196 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -246,7 +246,8 @@ def get_file_paths(db, metadata=METADATA, save_to_dir=None): Returns ------- filenames, list - lists of file paths for each file in the input dataframe. First list is the remote file paths and second list is the local file paths + lists of file paths for each file in the input dataframe. First list + is the remote file paths and second list is the local file paths """ # get the data type from the db data_type = db["dataset"].unique() @@ -278,7 +279,7 @@ def get_file_paths(db, metadata=METADATA, save_to_dir=None): return remote_file_names, local_file_names -def _update_local_db(db_file, file_names, file_times): +def _update_local_db(db_file, files_data): """Update the local database of downloaded files. Parameters @@ -293,27 +294,29 @@ def _update_local_db(db_file, file_names, file_times): pandas.DataFrame updated local database """ - - if type(file_names) is str: - file_names = [file_names] - file_times = [file_times] - if not os.path.exists(db_file): # create a new database - db = pd.DataFrame( - {"local_path": file_names, "downloaded_on": file_times} - ) + db = pd.DataFrame(columns=["local_path", "downloaded_on"]) else: - # load the database - db = pd.read_csv(db_file, index_col=False) - new_db = pd.DataFrame( - {"local_path": file_names, "downloaded_on": file_times} - ) - # update the database - db = pd.concat([db, new_db]) - db.reset_index(drop=True, inplace=True) + try: + # load the database + db = pd.read_csv(db_file, index_col=False) + except ( + pd.errors.EmptyDataError, + pd.errors.ParserError, + FileNotFoundError, + ): + print("Empty database file. Creating a new one.") + db = pd.DataFrame(columns=["local_path", "downloaded_on"]) + + downloaded_db = pd.DataFrame( + files_data, columns=["local_path", "downloaded_on"] + ) + + new_db = pd.concat([db, downloaded_db], ignore_index=True) + new_db.reset_index(drop=True, inplace=True) # save the database - db.to_csv(db_file, index=False) + new_db.to_csv(db_file, index=False) return db @@ -327,6 +330,11 @@ def _write_file(file, data): path to the file to write to data : data fetched from ebrains data to write to the file + + Returns + ------- + file: str + path to the written """ # check file type and write accordingly if type(data) == nibabel.nifti1.Nifti1Image: @@ -362,7 +370,6 @@ def _write_file(file, data): return file -@memory.cache def _download_file(src_file, dst_file, connector): """Download a file from ebrains. @@ -389,15 +396,14 @@ def _download_file(src_file, dst_file, connector): os.makedirs(dst_file_dir, exist_ok=True) # save the file locally dst_file = _write_file(dst_file, src_data) - # download_time = datetime.now() return dst_file else: print(f"File {dst_file} already exists, skipping download.") return dst_file -# download the files -def download_data(db, num_jobs=4, save_to=None): +@memory.cache +def download_data(db, num_jobs=2, save_to=None): """Download the files in a (filtered) dataframe. Parameters @@ -405,6 +411,8 @@ def download_data(db, num_jobs=4, save_to=None): db : pandas.DataFrame dataframe with information about files in the dataset, ideally a subset of the full dataset + num_jobs : int, optional + number of parallel jobs to run, by default 2 save_to : str, optional where to save the data, by default None, in which case the data is saved in a directory called "ibc_data" in the current working directory @@ -418,7 +426,7 @@ def download_data(db, num_jobs=4, save_to=None): db_length = len(db) if db_length == 0: raise ValueError( - f"The input dataframe is empty. Please make sure that it atleast has columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." + f"The input dataframe is empty. Please make sure that it at least has columns 'dataset' and 'path' and a row containing appropriate values corresponding to those columns." ) else: print(f"Found {db_length} files to download.") @@ -436,19 +444,21 @@ def download_data(db, num_jobs=4, save_to=None): connector = _connect_ebrains(data_type) # set the save directory save_to = _create_root_dir(save_to) - # track downloaded file names and times + # file to track downloaded file names and times local_db_file = os.path.join(save_to, f"downloaded_{data_type}.csv") # get the file names as they are on ebrains src_file_names, dst_file_names = get_file_paths(db, save_to_dir=save_to) # helper to process the parallel download def _download_and_update_progress(src_file, dst_file, connector): - file_name = _download_file(src_file, dst_file, connector) - file_time = datetime.now() - local_db = _update_local_db(local_db_file, file_name, file_time) - CACHE.run_maintenance() # keep cache < 2GB - - return file_name, file_time, local_db + try: + file_name = _download_file(src_file, dst_file, connector) + file_time = datetime.now() + CACHE.run_maintenance() # keep cache < 2GB + return file_name, file_time + except Exception as e: + print(f"Error downloading {src_file}. Error: {e}") + return None, None # download finally print("...Starting download...") @@ -457,9 +467,19 @@ def _download_and_update_progress(src_file, dst_file, connector): for src_file, dst_file in zip(src_file_names, dst_file_names) ) + # update the local database + results = [res for res in results if res[0] is not None] + if len(results) == 0: + raise RuntimeError( + f"No files downloaded! Please check errors and try again." + ) + download_details = _update_local_db(local_db_file, results) print( f"Downloaded requested files from IBC {data_type} dataset. See " f"{local_db_file} for details." ) - return results + # clean up the cache + CACHE.clear() + + return download_details From 6336a63ce5e1405db47c64c3b400673fc4239cec Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Thu, 8 Aug 2024 17:39:01 +0200 Subject: [PATCH 16/24] small format --- src/ibc_api/utils.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 2a5a196..ca4b685 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -370,6 +370,7 @@ def _write_file(file, data): return file +@memory.cache def _download_file(src_file, dst_file, connector): """Download a file from ebrains. @@ -402,7 +403,6 @@ def _download_file(src_file, dst_file, connector): return dst_file -@memory.cache def download_data(db, num_jobs=2, save_to=None): """Download the files in a (filtered) dataframe. @@ -470,9 +470,7 @@ def _download_and_update_progress(src_file, dst_file, connector): # update the local database results = [res for res in results if res[0] is not None] if len(results) == 0: - raise RuntimeError( - f"No files downloaded! Please check errors and try again." - ) + raise RuntimeError(f"No files downloaded ! Please try again.") download_details = _update_local_db(local_db_file, results) print( f"Downloaded requested files from IBC {data_type} dataset. See " @@ -481,5 +479,6 @@ def _download_and_update_progress(src_file, dst_file, connector): # clean up the cache CACHE.clear() + memory.clear() return download_details From a09073ce1157e09a59cbb5e5bfba9d921e9d1cfb Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Fri, 23 Aug 2024 16:14:05 +0200 Subject: [PATCH 17/24] adjustments to example and cmd feedback --- examples/example.py | 17 ++++++----------- src/ibc_api/utils.py | 4 ++-- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/examples/example.py b/examples/example.py index b0b2243..9d5f0f2 100644 --- a/examples/example.py +++ b/examples/example.py @@ -1,18 +1,13 @@ -#%% -import pdb -import sys -sys.path.append('/home/fer/HBP_IBC/api/src/ibc_api') -import utils as ibc -pdb.set_trace() -#%% +from ibc_api import utils as ibc + # Fetch info on all available files # Load as a pandas dataframe and save as ibc_data/available_{data_type}.csv db = ibc.get_info(data_type="volume_maps") # Keep statistic maps for sub-08, for task-Discount -filtered_db = ibc.filter_data(db, subject_list=["04"], task_list=["Lec1"]) -#%% -# Download all statistic maps for sub-08, task-Discount +filtered_db = ibc.filter_data(db, subject_list=["08"], task_list=["Lec1"]) + +# Download all statistic maps for sub-08, task-Lec1 # Also creates ibc_data/downloaded_volume_maps.csv # which contains local file paths and time of download -downloaded_db = ibc.download_data(filtered_db, parallel=True) +downloaded_db = ibc.download_data(filtered_db, num_jobs=2) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index ca4b685..a30616c 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -461,7 +461,7 @@ def _download_and_update_progress(src_file, dst_file, connector): return None, None # download finally - print("...Starting download...") + print(f"\n...Starting download of {len(src_file_names)} files...") results = Parallel(n_jobs=num_jobs, backend="threading", verbose=10)( delayed(_download_and_update_progress)(src_file, dst_file, connector) for src_file, dst_file in zip(src_file_names, dst_file_names) @@ -474,7 +474,7 @@ def _download_and_update_progress(src_file, dst_file, connector): download_details = _update_local_db(local_db_file, results) print( f"Downloaded requested files from IBC {data_type} dataset. See " - f"{local_db_file} for details." + f"{local_db_file} for details.\n" ) # clean up the cache From b5aecee51c948bdbfe8b3f5eff8a84de7255b950 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Fri, 23 Aug 2024 16:17:36 +0200 Subject: [PATCH 18/24] removing unused imports --- src/ibc_api/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index a30616c..2ef19f7 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -7,7 +7,6 @@ from datetime import datetime import nibabel -import numpy as np import pandas as pd import siibra from joblib import Memory, Parallel, delayed From f64bd29976a0399cc273f8500a0ad7cbc7dafa42 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Fri, 23 Aug 2024 16:25:12 +0200 Subject: [PATCH 19/24] raising error instead of just printing Co-authored-by: Himanshu Aggarwal --- src/ibc_api/utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 2ef19f7..02183f7 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -456,8 +456,7 @@ def _download_and_update_progress(src_file, dst_file, connector): CACHE.run_maintenance() # keep cache < 2GB return file_name, file_time except Exception as e: - print(f"Error downloading {src_file}. Error: {e}") - return None, None + raise(f"Error downloading {src_file}. Error: {e}") # download finally print(f"\n...Starting download of {len(src_file_names)} files...") From 5f270f78085a6b9413c093739585db1b8fce5b42 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Fri, 23 Aug 2024 16:27:59 +0200 Subject: [PATCH 20/24] num_jobs -> n_jobs Co-authored-by: Himanshu Aggarwal --- src/ibc_api/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 02183f7..2980068 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -402,7 +402,7 @@ def _download_file(src_file, dst_file, connector): return dst_file -def download_data(db, num_jobs=2, save_to=None): +def download_data(db, n_jobs=2, save_to=None): """Download the files in a (filtered) dataframe. Parameters From f1cdf1da10aced940665e31d99994fc52d505538 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Fri, 23 Aug 2024 16:28:12 +0200 Subject: [PATCH 21/24] num_jobs -> n_jobs 2 Co-authored-by: Himanshu Aggarwal --- src/ibc_api/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 2980068..656006d 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -410,7 +410,7 @@ def download_data(db, n_jobs=2, save_to=None): db : pandas.DataFrame dataframe with information about files in the dataset, ideally a subset of the full dataset - num_jobs : int, optional + n_jobs : int, optional number of parallel jobs to run, by default 2 save_to : str, optional where to save the data, by default None, in which case the data is From 1406ea00fb07ba248bae5d91601d7f9a60c73e4e Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Fri, 23 Aug 2024 16:28:28 +0200 Subject: [PATCH 22/24] num_jobs -> n_jobs 3 Co-authored-by: Himanshu Aggarwal --- src/ibc_api/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 656006d..9e02b83 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -460,7 +460,7 @@ def _download_and_update_progress(src_file, dst_file, connector): # download finally print(f"\n...Starting download of {len(src_file_names)} files...") - results = Parallel(n_jobs=num_jobs, backend="threading", verbose=10)( + results = Parallel(n_jobs=n_jobs, backend="threading", verbose=10)( delayed(_download_and_update_progress)(src_file, dst_file, connector) for src_file, dst_file in zip(src_file_names, dst_file_names) ) From 42664863fcd2c404249f33ba172ce75130294674 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Fri, 23 Aug 2024 16:29:50 +0200 Subject: [PATCH 23/24] expand n_jobs description Co-authored-by: Himanshu Aggarwal --- src/ibc_api/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ibc_api/utils.py b/src/ibc_api/utils.py index 9e02b83..915e7b2 100644 --- a/src/ibc_api/utils.py +++ b/src/ibc_api/utils.py @@ -411,7 +411,8 @@ def download_data(db, n_jobs=2, save_to=None): dataframe with information about files in the dataset, ideally a subset of the full dataset n_jobs : int, optional - number of parallel jobs to run, by default 2 + number of parallel jobs to run, by default 2. -1 would use all the CPUs. + See: https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html save_to : str, optional where to save the data, by default None, in which case the data is saved in a directory called "ibc_data" in the current working directory From 23030120984bab390e4cc2250b35761c63f617e4 Mon Sep 17 00:00:00 2001 From: Fernanda Ponce Date: Fri, 23 Aug 2024 16:42:41 +0200 Subject: [PATCH 24/24] num_jobs -> n_jobs in example --- examples/example.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/example.py b/examples/example.py index 9d5f0f2..cc064f4 100644 --- a/examples/example.py +++ b/examples/example.py @@ -10,4 +10,4 @@ # Download all statistic maps for sub-08, task-Lec1 # Also creates ibc_data/downloaded_volume_maps.csv # which contains local file paths and time of download -downloaded_db = ibc.download_data(filtered_db, num_jobs=2) +downloaded_db = ibc.download_data(filtered_db, n_jobs=2)