Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

format revisions based on errors from nox lint #44

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@

python_versions = ["3.11", "3.10"]


@nox.session(python=python_versions)
def test(session):
session.install(".")
session.install("pytest")
session.run("pytest")


@nox.session(python=python_versions)
def lint(session):
session.install("flake8")
session.run("flake8", "src", "tests", "noxfile.py")


@nox.session(python=python_versions[0])
def format(session):
session.install("black", "isort")
session.run("black", "src", "tests")
session.run("isort", "src", "tests")


@nox.session(python=python_versions[0])
def types(session):
session.install(".")
Expand Down
82 changes: 48 additions & 34 deletions src/digarch_scripts/lint/lint_ft.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

LOGGER = logging.getLogger(__name__)


def _configure_logging(log_folder: Path):
log_fn = datetime.now().strftime("lint_%Y_%m_%d_%H_%M.log")
log_fpath = log_folder / log_fn
Expand All @@ -21,15 +22,14 @@ def _configure_logging(log_folder: Path):
encoding="utf-8",
)


def parse_args() -> argparse.Namespace:
"""Validate and return command-line args"""

def extant_dir(p):
path = Path(p)
if not path.is_dir():
raise argparse.ArgumentTypeError(
f'{path} does not exist'
)
raise argparse.ArgumentTypeError(f"{path} does not exist")
return path

def list_of_paths(p):
Expand All @@ -43,28 +43,21 @@ def list_of_paths(p):
parser = argparse.ArgumentParser()

parser.add_argument(
'--package',
type=extant_dir,
nargs='+',
dest='packages',
action='extend'
"--package", type=extant_dir, nargs="+", dest="packages", action="extend"
)
parser.add_argument(
'--directory',
type=list_of_paths,
dest='packages',
action='extend'
"--directory", type=list_of_paths, dest="packages", action="extend"
)
parser.add_argument(
'--log_folder',
help='''Optional. Designate where to save the log file,
or it will be saved in current directory''',
default='.'
"--log_folder",
help="""Optional. Designate where to save the log file,
or it will be saved in current directory""",
default=".",
)


return parser.parse_args()


def package_has_valid_name(package: Path) -> bool:
"""Top level folder name has to conform to ACQ_####_######"""
folder_name = package.name
Expand All @@ -76,15 +69,17 @@ def package_has_valid_name(package: Path) -> bool:
LOGGER.error(f"{folder_name} does not conform to ACQ_####_######")
return False


def package_has_two_subfolders(package: Path) -> bool:
"""There must be two subfolders in the package"""
pkg_folders = [ x for x in package.iterdir() if x.is_dir() ]
pkg_folders = [x for x in package.iterdir() if x.is_dir()]
if len(pkg_folders) == 2:
return True
else:
LOGGER.error(f"{package} does not have exactly two subfolders")
return False


def package_has_valid_subfolder_names(package: Path) -> bool:
"""Second level folders must be objects and metadata folder"""
expected = set(["objects", "metadata"])
Expand All @@ -98,6 +93,7 @@ def package_has_valid_subfolder_names(package: Path) -> bool:
)
return False


def package_has_no_hidden_file(package: Path) -> bool:
"""The package should not have any hidden file"""
hidden_ls = [
Expand All @@ -111,17 +107,19 @@ def package_has_no_hidden_file(package: Path) -> bool:
else:
return True


def package_has_no_zero_bytes_file(package: Path) -> bool:
"""The package should not have any zero bytes file"""
all_file = [ f for f in package.rglob("*") if f.is_file() ]
zero_bytes_ls = [ f for f in all_file if f.stat().st_size == 0 ]
all_file = [f for f in package.rglob("*") if f.is_file()]
zero_bytes_ls = [f for f in all_file if f.stat().st_size == 0]

if zero_bytes_ls:
LOGGER.warning(f"{package.name} has zero bytes file {zero_bytes_ls}")
return False
else:
return True


def metadata_folder_is_flat(package: Path) -> bool:
"""The metadata folder should not have folder structure"""
metadata_path = package / "metadata"
Expand All @@ -132,40 +130,49 @@ def metadata_folder_is_flat(package: Path) -> bool:
else:
return True


def metadata_folder_has_files(package: Path) -> bool:
"""The metadata folder should have one or more file"""
metadata_path = package / "metadata"
md_files_ls = [ x for x in metadata_path.rglob("*") if x.is_file() ]
md_files_ls = [x for x in metadata_path.rglob("*") if x.is_file()]
if md_files_ls:
return True
else:
LOGGER.warning(f"{package.name} metadata folder does not have any files")
return False


def metadata_has_correct_naming_convention(package: Path) -> bool:
"""The metadata file name should be in the accepted list"""
metadata_path = package / "metadata"
accepted_fn = ["rclone.log"]

md_files_ls = [ x for x in metadata_path.rglob("*") if x.is_file() ]
md_files_ls = [x for x in metadata_path.rglob("*") if x.is_file()]
nonconforming = []
for file in md_files_ls:
if not file.name in accepted_fn:
if file.name not in accepted_fn:
nonconforming.append(file)

if nonconforming:
LOGGER.error(f"""{package.name} has nonconforming metadata file(s):
{nonconforming}""")
LOGGER.error(
f"""{package.name} has nonconforming metadata file(s):
{nonconforming}"""
)
return False
else:
return True


def objects_folder_correct_structure(package: Path) -> bool:
"""objects folder should have a data folder, which includes four files:
bag-info.txt, bagit.txt, manifest-md5.txt and tagmanifest-md5.txt"""
expected_paths = []
expected_files = ["bag-info.txt", "bagit.txt",
"manifest-md5.txt", "tagmanifest-md5.txt"]
expected_files = [
"bag-info.txt",
"bagit.txt",
"manifest-md5.txt",
"tagmanifest-md5.txt",
]
missing = []

data_folder = package / "objects" / "data"
Expand All @@ -180,16 +187,19 @@ def objects_folder_correct_structure(package: Path) -> bool:
missing.append(fp.name)

if missing:
LOGGER.error(f"""{package.name} has incorrect structure.
missing {missing}""")
LOGGER.error(
f"""{package.name} has incorrect structure.
missing {missing}"""
)
return False
else:
return True


def objects_folder_has_no_empty_folder(package: Path) -> bool:
"""The objects folder should not have any empty folders"""
objects_path = package / "objects"
folder_in_obj = [ x for x in objects_path.rglob("*") if x.is_dir() ]
folder_in_obj = [x for x in objects_path.rglob("*") if x.is_dir()]
empty = []

for folder in folder_in_obj:
Expand All @@ -202,14 +212,15 @@ def objects_folder_has_no_empty_folder(package: Path) -> bool:
else:
return True


def lint_package(package: Path) -> Literal["valide", "invalide", "needs review"]:
"""Run all linting tests against a package"""
result = "valid"

less_strict_tests = [
package_has_no_hidden_file,
package_has_no_zero_bytes_file,
metadata_folder_has_files
metadata_folder_has_files,
]

for test in less_strict_tests:
Expand All @@ -223,7 +234,7 @@ def lint_package(package: Path) -> Literal["valide", "invalide", "needs review"]
metadata_folder_is_flat,
metadata_has_correct_naming_convention,
objects_folder_correct_structure,
objects_folder_has_no_empty_folder
objects_folder_has_no_empty_folder,
]

for test in strict_tests:
Expand All @@ -232,6 +243,7 @@ def lint_package(package: Path) -> Literal["valide", "invalide", "needs review"]

return result


def main():
args = parse_args()
_configure_logging(args.log_folder)
Expand Down Expand Up @@ -266,7 +278,9 @@ def main():
print(
f"""
The following {len(needs_review)} packages need review.
They may be passed without change after review: {needs_review}""")
They may be passed without change after review: {needs_review}"""
)


if __name__ == "__main__":
main()
main()
31 changes: 18 additions & 13 deletions src/digarch_scripts/package/package_cloud.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import argparse
from datetime import date
import logging
import os
from pathlib import Path
import re
from datetime import date
from pathlib import Path

import bagit

Expand Down Expand Up @@ -35,6 +35,7 @@ def digital_carrier_label(id: str) -> Path:

return parser.parse_args()


def create_base_dir(dest: Path, id: str) -> Path:
acq_id = id.rsplit("_", 1)[0]
package_base = dest / acq_id / id
Expand All @@ -49,6 +50,7 @@ def create_base_dir(dest: Path, id: str) -> Path:
raise PermissionError(f"{dest} is not writable")
return package_base


def move_metadata_file(md_path: Path, pkg_dir: Path) -> None:
md_dir = pkg_dir / "metadata"
if not md_dir.exists():
Expand All @@ -61,6 +63,7 @@ def move_metadata_file(md_path: Path, pkg_dir: Path) -> None:
md_path.rename(new_md_path)
return None


def create_bag_in_objects(payload_path: Path, md5_path: Path, pkg_dir: Path) -> None:
bag_dir = pkg_dir / "objects"
bag_dir.mkdir()
Expand All @@ -70,44 +73,45 @@ def create_bag_in_objects(payload_path: Path, md5_path: Path, pkg_dir: Path) ->
create_bag_tag_files(bag_dir)
return None


def move_payload(payload_path: Path, bag_dir: Path) -> None:
#instantiate a var for objects dir
# instantiate a var for objects dir
payload_dir = bag_dir / "data"
#if the object folder does not exist create it
# if the object folder does not exist create it
if not payload_dir.exists():
payload_dir.mkdir(parents=True)
else:
raise FileExistsError(f"{payload_dir} already exists. Not moving files.")

for a_file in payload_path.iterdir():
new_ob_path = payload_dir / a_file.name
#if a payload file is already in the object directory do not move, raise error
# if a payload file is already in the object directory do not move, raise error
if new_ob_path.exists():
raise FileExistsError(f"{new_ob_path} already exists. Not moving.")
raise FileExistsError(f"{new_ob_path} already exists. Not moving.")

a_file.rename(new_ob_path)
return None


def convert_to_bagit_manifest(md5_path: Path, bag_dir: Path) -> None:
#check for manifest
# check for manifest
new_md5_path = bag_dir / "manifest-md5.txt"
if new_md5_path.exists():
raise FileExistsError("manifest-md5.txt already exists, review package")

with open(md5_path, "r") as f:
manifest_data = f.readlines()

updated_manifest = [
line.replace(" ", " data/") for line in manifest_data
]
#re-writes the manifest lines
updated_manifest = [line.replace(" ", " data/") for line in manifest_data]
# re-writes the manifest lines
with open(md5_path, "w") as f:
f.writelines(updated_manifest)
#move md5 file to manifest-md5.txt in bag
# move md5 file to manifest-md5.txt in bag
md5_path.rename(new_md5_path)

return None


def create_bag_tag_files(bag_dir: Path):
txt = """BagIt-Version: 0.97\nTag-File-Character-Encoding: UTF-8\n"""
with open(bag_dir / "bagit.txt", "w") as bagit_file:
Expand All @@ -125,7 +129,7 @@ def get_oxum(payload_dir: Path) -> (int, int):
total_bytes = 0
total_files = 0

for payload_file in payload_dir.rglob('*'):
for payload_file in payload_dir.rglob("*"):
if payload_file.is_file():
total_files += 1
total_bytes += os.stat(payload_file).st_size
Expand All @@ -152,5 +156,6 @@ def main():
create_bag_in_objects(args.payload, args.md5, base_dir)
validate_bag_in_payload(base_dir)


if __name__ == "__main__":
main()
Loading