Skip to content

Commit

Permalink
[#8] adding option to generate metadata json file when extracting a s…
Browse files Browse the repository at this point in the history
…pecific file type
  • Loading branch information
pkdash committed Jun 20, 2024
1 parent 811c273 commit 8598244
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 19 deletions.
47 changes: 28 additions & 19 deletions hsextract/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,63 @@

from asyncio import run as aiorun

from hsextract.utils import list_and_extract, extract_metadata
from hsextract.utils import list_and_extract, extract_metadata, is_file_path, is_dir_path, save_metadata


app = typer.Typer()


def _extract_metadata(path: str, file_type: str, generate_metadata_file: bool):
if not is_file_path(path):
return
metadata_dict = extract_metadata(file_type, path)
if metadata_dict is None:
return
if generate_metadata_file:
save_metadata(path=path, metadata_dict=metadata_dict)
else:
print(json.dumps(metadata_dict, indent=2))


@app.command()
def feature(path: str):
metadata_dict = extract_metadata("feature", path)
print(json.dumps(metadata_dict, indent=2))
def feature(path: str, generate_metadata_file: bool = False):
_extract_metadata(path, "feature", generate_metadata_file)


@app.command()
def raster(path: str):
metadata_dict = extract_metadata("raster", path)
print(json.dumps(metadata_dict, indent=2))
def raster(path: str, generate_metadata_file: bool = False):
_extract_metadata(path, "raster", generate_metadata_file)


@app.command()
def reftimeseries(path: str):
metadata_dict = extract_metadata("reftimeseries", path)
print(json.dumps(metadata_dict, indent=2))
def reftimeseries(path: str, generate_metadata_file: bool = False):
_extract_metadata(path, "reftimeseries", generate_metadata_file)


@app.command()
def timeseries(path: str):
metadata_dict = extract_metadata("timeseries", path)
print(json.dumps(metadata_dict, indent=2))
def timeseries(path: str, generate_metadata_file: bool = False):
_extract_metadata(path, "timeseries", generate_metadata_file)


@app.command()
def timeseriescsv(path: str):
metadata_dict = extract_metadata("timeseries", path)
print(json.dumps(metadata_dict, indent=2))
def timeseriescsv(path: str, generate_metadata_file: bool = False):
_extract_metadata(path, "timeseries", generate_metadata_file)


@app.command()
def netcdf(path: str):
metadata_dict = extract_metadata("netcdf", path)
print(json.dumps(metadata_dict, indent=2))
def netcdf(path: str, generate_metadata_file: bool = False):
_extract_metadata(path, "netcdf", generate_metadata_file)


async def _extract(path: str, user_metadata_filename: str, base_url: str):
# generates metadata json files for all files types in the given path
await list_and_extract(path, user_metadata_filename, base_url)


@app.command()
def extract(path: str, base_url: str, user_metadata_filename: str = "hs_user_meta.json"):
if not is_dir_path(path):
return
aiorun(_extract(path, user_metadata_filename, base_url))


Expand Down
30 changes: 30 additions & 0 deletions hsextract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,33 @@
from hsextract.file_utils import file_metadata


def is_file_path(filepath: str):
if not os.path.isfile(filepath):
logging.error(f"{filepath} is not a file or doesn't exist.")
return False
return True


def is_dir_path(dirpath: str):
if not os.path.isdir(dirpath):
logging.error(f"{dirpath} is not a directory or doesn't exist.")
return False
return True


def save_metadata(path: str, metadata_dict: dict):
file_name = Path(path).name
metadata_file_name = Path(file_name + ".json")
# save the metadata file in '.hs' folder relative to the directory of the input file
metadata_file_path = Path(path).parent / ".hs" / metadata_file_name
# create the '.hs' directory
metadata_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(metadata_file_path, "w") as f:
json.dump(metadata_dict, f, indent=2)

print(f"Metadata was saved to file path: {metadata_file_path}")


def _to_metadata_path(filepath: str, user_metadata_filename: str):
if not filepath.endswith(user_metadata_filename):
return os.path.join(".hs", filepath + ".json")
Expand All @@ -43,6 +70,9 @@ def extract_metadata(type: str, filepath, use_adapter=True):
# use_adapter is a flag to determine if the metadata should be converted to a catalog record
# it is set to False in tests when testing for the raw extracted metadata

print(f">> Extracting {type} metadata from {filepath}", flush=True)
logging.info(f"Extracting {type} metadata from {filepath}")

extension = os.path.splitext(filepath)[1]
try:
extracted_metadata = _extract_metadata(type, filepath)
Expand Down

0 comments on commit 8598244

Please sign in to comment.