Skip to content

Commit

Permalink
Save documents in each valid format version directory (#158)
Browse files Browse the repository at this point in the history
* 🎨 Add function to get valid range of format versions

* βœ… add test

* 🎨 use list of file paths

* 🎨 Update related code to new list of paths

* πŸ™ˆ Add macos related files to gitignore

* βœ… quick fixes for tests

* 🎨 Use only the next format version instead of all future ones

* βœ… update unit test

* πŸ”₯ delete out dated line

* 🎨 improve variable names

* βœ… fix unit test for mirror

* 🎨 use List as type hint for python 3.8

* 🩹 fix all list type hints
  • Loading branch information
hf-krechan authored Apr 16, 2024
1 parent 2a12dc4 commit b283a9e
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 52 deletions.
29 changes: 29 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,35 @@ dmypy.json
# Pyre type checker
.pyre/

# General
.DS_Store
.AppleDouble
.LSOverride

# Icon must end with two \r
Icon


# Thumbnails
._*

# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent

# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk


.idea/

# vscode settings
Expand Down
169 changes: 136 additions & 33 deletions src/edi_energy_scraper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from email.message import Message
from pathlib import Path
from random import randint
from typing import Awaitable, Dict, List, Optional, Set, Tuple, Union
from typing import Awaitable, Dict, List, Optional, Set, Union

import aiohttp
import pytz
Expand Down Expand Up @@ -70,7 +70,7 @@ async def _get_soup(self, url: str) -> BeautifulSoup:
EdiEnergyScraper.remove_comments(soup)
return soup

async def _download_and_save_pdf(self, file_basename: str, link: str) -> Path:
async def _download_and_save_pdf(self, file_basename: str, link: str) -> List[Path]:
"""
Downloads a PDF file from a given link and stores it under the file name in a folder that has the same name
as the directory if the pdf does not exist yet or if the metadata has changed since the last download.
Expand Down Expand Up @@ -106,33 +106,34 @@ async def _download_and_save_pdf(self, file_basename: str, link: str) -> Path:
raise
await asyncio.sleep(delay=randint(5, 10)) # cool down...

file_path = self._get_file_path(file_name=file_name)
Path.mkdir(file_path.parent, parents=True, exist_ok=True)

# Save file if it does not exist yet
if not os.path.isfile(file_path):
with open(file_path, "wb+") as outfile: # pdfs are written as binaries
_logger.debug("Saving new PDF %s", file_path)
outfile.write(response_content)
return file_path

# First fix, different file types do just the same as before, only with correct file extension
if not file_name.endswith(".pdf"):
with open(file_path, "wb+") as outfile:
_logger.debug("Saving %s", file_path)
outfile.write(response_content)
return file_path

# Check if metadata has changed
metadata_has_changed = self._have_different_metadata(response_content, file_path)
if metadata_has_changed: # delete old file and replace with new one
_logger.debug("Metadata for PDF %s changed; Replacing it", file_path)
os.remove(file_path)
with open(file_path, "wb+") as outfile: # pdfs are written as binaries
outfile.write(response_content)
else:
_logger.debug("Meta data haven't changed for %s", file_path)
return file_path
file_paths = self._get_file_paths(file_name=file_name)
for file_path in file_paths:
Path.mkdir(file_path.parent, parents=True, exist_ok=True)

# Save file if it does not exist yet
if not os.path.isfile(file_path):
with open(file_path, "wb+") as outfile: # pdfs are written as binaries
_logger.debug("Saving new PDF %s", file_path)
outfile.write(response_content)
continue

# First fix, different file types do just the same as before, only with correct file extension
if not file_name.endswith(".pdf"):
with open(file_path, "wb+") as outfile:
_logger.debug("Saving %s", file_path)
outfile.write(response_content)
continue

# Check if metadata has changed
metadata_has_changed = self._have_different_metadata(response_content, file_path)
if metadata_has_changed: # delete old file and replace with new one
_logger.debug("Metadata for PDF %s changed; Replacing it", file_path)
os.remove(file_path)
with open(file_path, "wb+") as outfile: # pdfs are written as binaries
outfile.write(response_content)
else:
_logger.debug("Meta data haven't changed for %s", file_path)
return file_paths

def _get_file_path(self, file_name: str) -> Path:
if "/" in file_name:
Expand All @@ -144,6 +145,23 @@ def _get_file_path(self, file_name: str) -> Path:

return file_path

def _get_file_paths(self, file_name: str) -> List[Path]:
"""
Returns a list of file paths for all valid format versions of a given file.
"""

file_paths = []

format_version_range = get_edifact_format_version_range_from_filename(path=Path(file_name))

for format_version in format_version_range:
file_path = Path(self._root_dir).joinpath(
f"{format_version}/{file_name}" # e.g "{root_dir}/FV2310/ahbmabis_99991231_20210401.pdf"
)
file_paths.append(file_path)

return file_paths

@staticmethod
def _add_file_extension_to_file_basename(headers: dict, file_basename: str) -> str:
"""Extracts the extension of a file from a response header and add it to the file basename."""
Expand Down Expand Up @@ -294,7 +312,7 @@ def remove_no_longer_online_files(self, online_files: Set[Path]) -> Set[Path]:

async def _download(
self, file_basename: str, link: str, optional_success_msg: Optional[str] = None
) -> Optional[Path]:
) -> Optional[List[Path]]:
try:
file_path = await self._download_and_save_pdf(file_basename=file_basename, link=link)
if optional_success_msg is not None:
Expand Down Expand Up @@ -334,7 +352,7 @@ async def mirror(self):
with open(epoch_path, "w+", encoding="utf8") as outfile:
outfile.write(epoch_soup.prettify())
file_map = EdiEnergyScraper.get_epoch_file_map(epoch_soup)
download_tasks: List[Awaitable[Optional[Path]]] = []
download_tasks: List[Awaitable[Optional[List[Path]]]] = []
file_counter = itertools.count()
for file_basename, link in file_map.items():
download_tasks.append(
Expand All @@ -344,10 +362,10 @@ async def mirror(self):
f"Successfully downloaded {_epoch} file {next(file_counter)}/{len(file_map)}",
)
)
download_results: List[Optional[Path]] = await asyncio.gather(*download_tasks)
download_results: List[Optional[List[Path]]] = await asyncio.gather(*download_tasks)
for download_result in download_results:
if download_result is not None:
new_file_paths.add(download_result)
new_file_paths.update(download_result)
self.remove_no_longer_online_files(new_file_paths)
_logger.info("Finished mirroring")

Expand All @@ -368,3 +386,88 @@ def get_edifact_version_from_filename(path: Path) -> EdifactFormatVersion:
format_version = get_edifact_format_version(berlin_local_time)

return format_version


def get_publication_date_from_filename(path: Path) -> datetime.datetime:
"""
Determines the publication date of a given file.
"""
filename = path.stem
publication_date_string = filename.split("_")[-1] # Assuming date is the "publication date"
date_format = "%Y%m%d"
berlin = pytz.timezone("Europe/Berlin")
publication_date = datetime.datetime.strptime(publication_date_string, date_format).astimezone(berlin)

return publication_date


def get_valid_to_date_from_filename(path: Path) -> datetime.datetime:
"""
Determines the valid to date of a given file.
"""
filename = path.stem
valid_to_date_string = filename.split("_")[-2] # Assuming date is the "valid to" date

if valid_to_date_string == "99991231":
return datetime.datetime(9999, 12, 31, tzinfo=datetime.timezone.utc)

date_format = "%Y%m%d"
berlin = pytz.timezone("Europe/Berlin")
valid_to_date = datetime.datetime.strptime(valid_to_date_string, date_format).astimezone(berlin)

return valid_to_date


def get_current_format_version() -> EdifactFormatVersion:
"""
Determines the current format version.
"""
return get_edifact_format_version(datetime.datetime.now(tz=datetime.timezone.utc))


def get_next_format_version() -> EdifactFormatVersion:
"""
Determines the next format version.
"""
current_format_version = get_edifact_format_version(datetime.datetime.now(tz=datetime.timezone.utc))

format_versions = [efv.value for efv in EdifactFormatVersion]
next_format_version = format_versions[format_versions.index(current_format_version.value) + 1]

return EdifactFormatVersion(next_format_version)


def get_edifact_format_version_range_from_filename(path: Path) -> List[EdifactFormatVersion]:
"""
Determines range of valid format versions of a given file.
A document can be valid for multiple format versions.
Therefore, a list of all valid format versions is returned.
example:
- 'IFTSTAMIG2.0e_20240930_20231001.pdf' -> [FV2310, FV2404]
- 'IFTSTAMIG2.0e_99991231_20231001.pdf' -> [FV2310, FV2404, ...] all future format versions
"""

publication_date: datetime.datetime = get_publication_date_from_filename(path)
valid_to_date: datetime.datetime = get_valid_to_date_from_filename(path)

next_format_version = get_next_format_version()

format_version_start = get_edifact_format_version(publication_date)

format_version_end = get_edifact_format_version(valid_to_date)

is_format_version_end_greater_than_next_format_version = format_version_end > next_format_version
if is_format_version_end_greater_than_next_format_version:
format_version_end = next_format_version

format_versions = [efv.value for efv in EdifactFormatVersion]

format_version_range = [
EdifactFormatVersion(format_version)
for format_version in format_versions[
format_versions.index(format_version_start.value) : format_versions.index(format_version_end.value) + 1
]
]

return format_version_range
Loading

0 comments on commit b283a9e

Please sign in to comment.