From fc949d281c85a940294979630b113b3c14f35b6b Mon Sep 17 00:00:00 2001 From: crocodilestick <105450872+crocodilestick@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:29:53 +0000 Subject: [PATCH] Made major changes to the CWA Cover & Metadata Enforcement service. Like the ingest service, it now also supports multiple formats (currently limited to EPUB & AZW3 due to limitations of the calibre ebook-polish function) and can also now be disabled in the CWA Settings panel. The majority of the necessary work has been done to achieve these goals but these changes are currently untested --- ...ile_old => Dockerfile_calibre_not_included | 0 root/app/calibre-web/cps/cwa_functions.py | 3 +- .../cps/templates/cwa_settings.html | 8 + .../s6-rc.d/metadata-change-detector/run | 2 +- scripts/cover-enforcer.py | 231 -------------- scripts/cover_enforcer.py | 297 ++++++++++++++++++ scripts/cwa_db.py | 83 ++++- scripts/cwa_schema.sql | 5 +- scripts/setup-cwa.sh | 2 +- 9 files changed, 378 insertions(+), 253 deletions(-) rename Dockerfile_old => Dockerfile_calibre_not_included (100%) delete mode 100644 scripts/cover-enforcer.py create mode 100644 scripts/cover_enforcer.py diff --git a/Dockerfile_old b/Dockerfile_calibre_not_included similarity index 100% rename from Dockerfile_old rename to Dockerfile_calibre_not_included diff --git a/root/app/calibre-web/cps/cwa_functions.py b/root/app/calibre-web/cps/cwa_functions.py index 0355ae8..239323d 100644 --- a/root/app/calibre-web/cps/cwa_functions.py +++ b/root/app/calibre-web/cps/cwa_functions.py @@ -87,7 +87,8 @@ def set_cwa_settings(): "auto_backup_conversions", "auto_zip_backups", "cwa_update_notifications", - "auto_convert"] + "auto_convert", + "auto_metadata_enforcement"] string_settings = ["auto_convert_target_format"] for format in ignorable_formats: string_settings.append(f"ignore_ingest_{format}") diff --git a/root/app/calibre-web/cps/templates/cwa_settings.html b/root/app/calibre-web/cps/templates/cwa_settings.html index 81247b6..a9b6cad 100644 --- a/root/app/calibre-web/cps/templates/cwa_settings.html +++ b/root/app/calibre-web/cps/templates/cwa_settings.html @@ -50,6 +50,14 @@
On by default, when active all ingested books will automatically be converted to the target format specified below (epub by default)
+ {% if cwa_settings['auto_metadata_enforcement'] %} + + {% else %} + + {% endif %} +On by default, when active, whenever the Metadata and/or Cover Image is edited in the Web UI, the CWA Metadata Enforcement service will then apply those changes the ebook files themselves. Normally in Stock CW or when this setting is disabled, the changes made are only applied to what you see in the Web UI, not the ebook files themselves. This feature currently only supports files in EPUB or AZW3 format.
+When the Auto-Convert feature is active, all ingested books will be automatically converted to the format chosen here (except those formats selected in the ignore list below)
diff --git a/root/etc/s6-overlay/s6-rc.d/metadata-change-detector/run b/root/etc/s6-overlay/s6-rc.d/metadata-change-detector/run index 5515c36..8b1b613 100644 --- a/root/etc/s6-overlay/s6-rc.d/metadata-change-detector/run +++ b/root/etc/s6-overlay/s6-rc.d/metadata-change-detector/run @@ -10,5 +10,5 @@ echo "[metadata-change-detector]: Watching folder: $WATCH_FOLDER" s6-setuidgid abc inotifywait -m -e close_write -e moved_to --exclude '^.*\.(swp)$' "$WATCH_FOLDER" | while read -r directory events filename; do echo "[metadata-change-detector]: New file detected: $filename" - python3 /app/calibre-web-automated/scripts/cover-enforcer.py "--log" "$filename" + python3 /app/calibre-web-automated/scripts/cover_enforcer.py "--log" "$filename" done \ No newline at end of file diff --git a/scripts/cover-enforcer.py b/scripts/cover-enforcer.py deleted file mode 100644 index 85690a2..0000000 --- a/scripts/cover-enforcer.py +++ /dev/null @@ -1,231 +0,0 @@ -import argparse -import json -import os -import re -import sys -import time -from datetime import datetime - -from cwa_db import CWA_DB - - -class Enforcer: - def __init__(self, args): - self.args = args - self.dirs_json = "/app/calibre-web-automated/dirs.json" - self.change_logs_dir = "/app/calibre-web-automated/metadata_change_logs" - self.metadata_temp_dir = "/app/calibre-web-automated/metadata_temp" - self.calibre_library = self.get_calibre_library() - self.db = CWA_DB() - - self.illegal_characters = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"] - - def get_calibre_library(self) -> str: - """Gets Calibre-Library location from dirs_json path""" - with open(self.dirs_json, 'r') as f: - dirs = json.load(f) - return dirs['calibre_library_dir'] # Returns without / on the end - - def read_log(self, auto=True, log_path: str = "None") -> dict: - """Reads pertinent information from the given log file, adds the book_id from the log name and returns the info as a dict""" - if auto: - book_id = (self.args.log.split('-')[1]).split('.')[0] - timestamp_raw = self.args.log.split('-')[0] - timestamp = datetime.strptime(timestamp_raw, '%Y%m%d%H%M%S') - - log_info = {} - with open(f'{self.change_logs_dir}/{self.args.log}', 'r') as f: - log_info = json.load(f) - log_info['book_id'] = book_id - log_info['timestamp'] = timestamp.strftime('%Y-%m-%d %H:%M:%S') - else: - log_name = os.path.basename(log_path) - book_id = (log_name.split('-')[1]).split('.')[0] - timestamp_raw = log_name.split('-')[0] - timestamp = datetime.strptime(timestamp_raw, '%Y%m%d%H%M%S') - - log_info = {} - with open(log_path, 'r') as f: - log_info = json.load(f) - log_info['book_id'] = book_id - log_info['timestamp'] = timestamp.strftime('%Y-%m-%d %H:%M:%S') - - return log_info - - def get_book_dir_from_log(self, log_info: dict) -> str: - book_title = log_info['book_title'].replace(':', '_') - author_name = (log_info['author_name'].split(', ')[0]).split(' & ')[0] - book_id = log_info['book_id'] - - for char in book_title: - if char in self.illegal_characters: - book_title = book_title.replace(char, '_') - for char in author_name: - if char in self.illegal_characters: - author_name = author_name.replace(char, '_') - - book_dir = f"{self.calibre_library}/{author_name}/{book_title} ({book_id})/" - log_info['epub_path'] = book_dir - - return book_dir - - def enforce_cover(self, book_dir: str) -> dict: - """Will force the Cover & Metadata to update for the book in the given directory""" - library_files = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(book_dir) for f in filenames] - try: - epub = [f for f in library_files if f.endswith('.epub')][0] - except Exception as e: - print(f"[cover-enforcer]: No epub file found in {book_dir}") - print(f"[cover-enforcer]: {e}") - return {} - - title_author = epub.split('/')[-1].split('.epub')[0] - cover = book_dir + '/cover.jpg' - old_metadata = book_dir + '/metadata.opf' - - book_id: str = (list(re.findall(r'\(\d*\)', book_dir))[-1])[1:-1] - new_metadata = self.get_new_metadata(book_id) - self.replace_old_metadata(old_metadata, new_metadata) - - os.system(f'ebook-polish -c "{cover}" -o "{new_metadata}" -U "{epub}" "{epub}"') - self.empty_metadata_temp() - print(f"[cover-enforcer]: DONE: '{title_author}': Cover & metadata updated") - - timestamp = self.get_time() - book_title = title_author.split(f" - {title_author.split(' - ')[-1]}")[0] - author_name = title_author.split(' - ')[-1] - - book_info = {'timestamp':timestamp, 'book_id':book_id, 'book_title':book_title, 'author_name':author_name, 'epub_path':epub} - return book_info - - def enforce_all_covers(self) -> tuple[int, float]: - """Will force the covers and metadata to be re-generated for all books in the library""" - t_start = time.time() - library_files = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(self.calibre_library) for f in filenames] - epubs_in_library = [f for f in library_files if f.endswith('.epub')] - book_dirs = [] - for epub in epubs_in_library: - book_dirs.append(os.path.dirname(epub)) - - print(f"[cover-enforcer]: {len(book_dirs)} books detected in Library") - print(f"[cover-enforcer]: Enforcing covers for {len(epubs_in_library)} epub file(s) in {self.calibre_library} ...") - - for book_dir in book_dirs: - try: - book_info = self.enforce_cover(book_dir) - self.db.enforce_add_entry_from_all(book_info) - except Exception as e: - print(f"[cover-enforcer]: ERROR: {book_dir}") - print(f"[cover-enforcer]: Skipping book due to following error: {e}") - continue - - t_end = time.time() - - return len(epubs_in_library), (t_end - t_start) - - def get_new_metadata(self, book_id) -> str: - """Uses the export function of the calibredb utility to export any new metadata for the given book to metadata_temp, and returns the path to the new metadata.opf""" - os.system(f"calibredb export --with-library '{self.calibre_library}' --to-dir '{self.metadata_temp_dir}' {book_id}") - temp_files = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(self.metadata_temp_dir) for f in filenames] - return [f for f in temp_files if f.endswith('.opf')][0] - - def replace_old_metadata(self, old_metadata: str, new_metadata: str) -> None: - """Switches the metadata in metadata_temp with the metadata in the Calibre-Library""" - os.system(f'cp "{new_metadata}" "{old_metadata}"') - - def print_library_list(self) -> None: - """Uses the calibredb command line utility to list the books in the library""" - os.system(f'calibredb list --with-library "{self.calibre_library}"') - - def delete_log(self, auto=True, log_path="None"): - """Deletes the log file""" - if auto: - log = os.path.join(self.change_logs_dir, self.args.log) - os.remove(log) - else: - os.remove(log_path) - - def empty_metadata_temp(self): - """Empties the metadata_temp folder""" - os.system(f"rm -r {self.metadata_temp_dir}/*") - - def get_time(self) -> str: - now = datetime.now() - return now.strftime('%Y-%m-%d %H:%M:%S') - - def check_for_other_logs(self): - log_files = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(self.change_logs_dir) for f in filenames] - if len(log_files) > 0: - for log in log_files: - if log.endswith('.json'): - log_info = self.read_log(auto=False, log_path=log) - book_dir = self.get_book_dir_from_log(log_info) - book_info = self.enforce_cover(book_dir) - log_info['epub_path'] = book_info['epub_path'] - self.db.enforce_add_entry_from_log(log_info) - self.delete_log(auto=False, log_path=log) - - -def main(): - parser = argparse.ArgumentParser( - prog='cover-enforcer', - description='Upon receiving a log, valid directory or an "-all" flag, this \ - script will enforce the covers and metadata of the corresponding books, making \ - sure that each are correctly stored in both the epubs themselves and the \ - user\'s Calibre Library. Additionally, if an epub happens to be in EPUB 2 \ - format, it will also be automatically upgraded to EPUB 3.' - ) - - parser.add_argument('--log', action='store', dest='log', required=False, help='Will enforce the covers and metadata of the books in the given log file.', default=None) - parser.add_argument('--dir', action='store', dest='dir', required=False, help='Will enforce the covers and metadata of the books in the given directory.', default=None) - parser.add_argument('-all', action='store_true', dest='all', help='Will enforce covers & metadata for ALL books currently in your calibre-library-dir', default=False) - parser.add_argument('-list', '-l', action='store_true', dest='list', help='List all books in your calibre-library-dir', default=False) - parser.add_argument('-history', action='store_true', dest='history', help='Display a history of all enforcements ever carried out on your machine (not yet implemented)', default=False) - parser.add_argument('-paths', '-p', action='store_true', dest='paths', help="Use with '-history' flag to display stored paths of all epubs in enforcement database", default=False) - parser.add_argument('-v', '--verbose', action='store_true', dest='verbose', help="Use with history to display entire enforcement history instead of only the most recent 10 entries", default=False) - args = parser.parse_args() - - enforcer = Enforcer(args) - - if len(sys.argv) == 1: - parser.print_help() - elif args.log is not None and args.dir is not None: - # log and dir provided together - parser.print_usage() - elif args.all and args.log is None and args.dir is None and args.list is False and args.history is False: - # only all flag passed - print('[cover-enforcer]: Enforcing metadata and covers for all books in library...') - n_enforced, completion_time = enforcer.enforce_all_covers() - print(f"\n[cover-enforcer]: SUCCESS: All covers & metadata successfully updated for all {n_enforced} books in the library in {completion_time:.2f} seconds!") - elif args.log is not None and args.dir is None and args.all is False and args.list is False and args.history is False: - # log passed: (args.log), no dir - log_info = enforcer.read_log() - book_dir = enforcer.get_book_dir_from_log(log_info) - book_info = enforcer.enforce_cover(book_dir) - if book_info == {}: - print(f"[cover-enforcer] Metadata for '{log_info['book_title']}' could not be successfully enforced") - sys.exit(1) - log_info['epub_path'] = book_info['epub_path'] - enforcer.db.enforce_add_entry_from_log(log_info) - enforcer.delete_log() - enforcer.check_for_other_logs() - elif args.log is None and args.dir is not None and args.all is False and args.list is False and args.history is False: - if args.dir[-1] == '/': - args.dir = args.dir[:-1] - if os.path.isdir(args.dir): - book_info = enforcer.enforce_cover(args.dir) - enforcer.db.enforce_add_entry_from_dir(book_info) - else: - print(f"[cover-enforcer]: ERROR: '{args.dir}' is not a valid directory") - elif args.list and args.log is None and args.dir is None and args.all is False and args.history is False: - # only list flag passed - enforcer.print_library_list() - elif args.history and args.log is None and args.dir is None and args.all is False and args.list is False: - enforcer.db.enforce_show(args.paths, args.verbose) - else: - parser.print_usage() - - sys.exit(1) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/scripts/cover_enforcer.py b/scripts/cover_enforcer.py new file mode 100644 index 0000000..1985950 --- /dev/null +++ b/scripts/cover_enforcer.py @@ -0,0 +1,297 @@ +import argparse +import json +import os +import re +import sys +import time +from datetime import datetime +from pathlib import Path + +from cwa_db import CWA_DB + +# Global Variables +dirs_json = "/app/calibre-web-automated/dirs.json" +change_logs_dir = "/app/calibre-web-automated/metadata_change_logs" +metadata_temp_dir = "/app/calibre-web-automated/metadata_temp" + + +class Book: + def __init__(self, book_dir: str, file_path: str): + self.book_dir: str = book_dir + self.file_path: str = file_path + self.calibre_library = self.get_calibre_library() + + self.file_format: str = Path(file_path).suffix.replace('.', '') + self.timestamp: str = self.get_time() + self.book_id: str = (list(re.findall(r'\(\d*\)', book_dir))[-1])[1:-1] + self.book_title, self.author_name = self.get_title_and_author() + + self.cover_path = book_dir + '/cover.jpg' + self.old_metadata_path = book_dir + '/metadata.opf' + self.new_metadata_path = self.get_new_metadata_path() + + self.log_info = None + + + def get_calibre_library(self) -> str: + """Gets Calibre-Library location from dirs_json path""" + with open(dirs_json, 'r') as f: + dirs = json.load(f) + return dirs['calibre_library_dir'] # Returns without / on the end + + + def get_time(self) -> str: + now = datetime.now() + return now.strftime('%Y-%m-%d %H:%M:%S') + + + def get_title_and_author(self) -> tuple[str, str]: + title_author = self.file_path.split('/')[-1].split(f'.{self.file_format}')[0] + book_title = title_author.split(f" - {title_author.split(' - ')[-1]}")[0] + author_name = title_author.split(' - ')[-1] + + return book_title, author_name + + + def get_new_metadata_path(self) -> str: + """Uses the export function of the calibredb utility to export any new metadata for the given book to metadata_temp, and returns the path to the new metadata.opf""" + os.system(f"calibredb export --with-library '{self.calibre_library}' --to-dir '{metadata_temp_dir}' {self.book_id}") + temp_files = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(metadata_temp_dir) for f in filenames] + return [f for f in temp_files if f.endswith('.opf')][0] + + +class Enforcer: + def __init__(self, args): + self.db = CWA_DB() + self.cwa_settings = self.db.cwa_settings + self.enforcer_on = self.cwa_settings["auto_metadata_enforcement"] + self.supported_formats = ["epub", "azw3"] + + self.args = args + self.calibre_library = self.get_calibre_library() + + self.illegal_characters = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"] + + def get_calibre_library(self) -> str: + """Gets Calibre-Library location from dirs_json path""" + with open(dirs_json, 'r') as f: + dirs = json.load(f) + return dirs['calibre_library_dir'] # Returns without / on the end + + def read_log(self, auto=True, log_path: str = "None") -> dict: + """Reads pertinent information from the given log file, adds the book_id from the log name and returns the info as a dict""" + if auto: + book_id = (self.args.log.split('-')[1]).split('.')[0] + timestamp_raw = self.args.log.split('-')[0] + timestamp = datetime.strptime(timestamp_raw, '%Y%m%d%H%M%S') + + log_info = {} + with open(f'{change_logs_dir}/{self.args.log}', 'r') as f: + log_info = json.load(f) + log_info['book_id'] = book_id + log_info['timestamp'] = timestamp.strftime('%Y-%m-%d %H:%M:%S') + else: + log_name = os.path.basename(log_path) + book_id = (log_name.split('-')[1]).split('.')[0] + timestamp_raw = log_name.split('-')[0] + timestamp = datetime.strptime(timestamp_raw, '%Y%m%d%H%M%S') + + log_info = {} + with open(log_path, 'r') as f: + log_info = json.load(f) + log_info['book_id'] = book_id + log_info['timestamp'] = timestamp.strftime('%Y-%m-%d %H:%M:%S') + + return log_info + + def get_book_dir_from_log(self, log_info: dict) -> str: + book_title = log_info['book_title'].replace(':', '_') + author_name = (log_info['author_name'].split(', ')[0]).split(' & ')[0] + book_id = log_info['book_id'] + + for char in book_title: + if char in self.illegal_characters: + book_title = book_title.replace(char, '_') + for char in author_name: + if char in self.illegal_characters: + author_name = author_name.replace(char, '_') + + book_dir = f"{self.calibre_library}/{author_name}/{book_title} ({book_id})/" + log_info['file_path'] = book_dir + + return book_dir + + def get_supported_files_from_dir(self, dir: str) -> list[str]: + """ Returns a list if the book dir given contains files of one or more of the supported formats""" + library_files = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(dir) for f in filenames] + + supported_files = [] + for format in self.supported_formats: + supported_files = supported_files + [f for f in library_files if f.endswith(f'.{format}')] + + return supported_files + + def enforce_cover(self, book_dir: str) -> bool | list[Book]: + """Will force the Cover & Metadata to update for the supported book files in the given directory""" + supported_files = self.get_supported_files_from_dir(book_dir) + if supported_files: + book_objects = [] + for file in supported_files: + book = Book(book_dir, file) + self.replace_old_metadata(book.old_metadata_path, book.new_metadata_path) + os.system(f'ebook-polish -c "{book.cover_path}" -o "{book.new_metadata_path}" -U "{file}" "{file}"') + self.empty_metadata_temp() + print(f"[cover-metadata-enforcer]: DONE: '{title_author}': Cover & metadata updated") + book_objects.append(book) + + return book_objects + else: + print(f"[cover-metadata-enforcer]: No supported file formats found in {book_dir}. Only EPUB & AZW3 formats are currently supported.") + return False + + def enforce_all_covers(self) -> tuple[int, float, int] | tuple[bool, bool, bool]: + """Will force the covers and metadata to be re-generated for all books in the library""" + t_start = time.time() + + supported_files = self.get_supported_files_from_book_dir(self.calibre_library) + if supported_files: + book_dirs = [] + for file in supported_files: + book_dirs.append(os.path.dirname(file)) + + print(f"[cover-metadata-enforcer]: {len(book_dirs)} books detected in Library") + print(f"[cover-metadata-enforcer]: Enforcing covers for {len(supported_files)} supported file(s) in {self.calibre_library} ...") + + successful_enforcements = len(supported_files) + + for book_dir in book_dirs: + try: + book_objects = self.enforce_cover(book_dir) + self.db.enforce_add_entry_from_all(book_objects) + except Exception as e: + print(f"[cover-metadata-enforcer]: ERROR: {book_dir}") + print(f"[cover-metadata-enforcer]: Skipping book due to following error: {e}") + successful_enforcements = successful_enforcements - 1 + continue + + t_end = time.time() + + return successful_enforcements, (t_end - t_start), len(supported_files) + else: # No supported files found + return False, False, False + + def replace_old_metadata(self, old_metadata: str, new_metadata: str) -> None: + """Switches the metadata in metadata_temp with the metadata in the Calibre-Library""" + os.system(f'cp "{new_metadata}" "{old_metadata}"') + + def print_library_list(self) -> None: + """Uses the calibredb command line utility to list the books in the library""" + os.system(f'calibredb list --with-library "{self.calibre_library}"') + + def delete_log(self, auto=True, log_path="None"): + """Deletes the log file""" + if auto: + log = os.path.join(change_logs_dir, self.args.log) + os.remove(log) + else: + os.remove(log_path) + + def empty_metadata_temp(self): + """Empties the metadata_temp folder""" + os.system(f"rm -r {metadata_temp_dir}/*") + + def check_for_other_logs(self): + log_files = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(change_logs_dir) for f in filenames] + if len(log_files) > 0: + for log in log_files: + if log.endswith('.json'): + log_info = self.read_log(auto=False, log_path=log) + book_dir = self.get_book_dir_from_log(log_info) + book_objects = self.enforce_cover(book_dir) + for book in book_objects: + book.log_info = log_info + book.log_info['file_path'] = book.file_path + self.db.enforce_add_entry_from_log(book.log_info) + self.delete_log(auto=False, log_path=log) + + +def main(): + parser = argparse.ArgumentParser( + prog='cover-enforcer', + description='Upon receiving a log, valid directory or an "-all" flag, this \ + script will enforce the covers and metadata of the corresponding books, making \ + sure that each are correctly stored in both the ebook files themselves as well as in the \ + user\'s Calibre Library. Additionally, if an epub file happens to be in EPUB 2 \ + format, it will also be automatically upgraded to EPUB 3.' + ) + + parser.add_argument('--log', action='store', dest='log', required=False, help='Will enforce the covers and metadata of the books in the given log file.', default=None) + parser.add_argument('--dir', action='store', dest='dir', required=False, help='Will enforce the covers and metadata of the books in the given directory.', default=None) + parser.add_argument('-all', action='store_true', dest='all', help='Will enforce covers & metadata for ALL books currently in your calibre-library-dir', default=False) + parser.add_argument('-list', '-l', action='store_true', dest='list', help='List all books in your calibre-library-dir', default=False) + parser.add_argument('-history', action='store_true', dest='history', help='Display a history of all enforcements ever carried out on your machine (not yet implemented)', default=False) + parser.add_argument('-paths', '-p', action='store_true', dest='paths', help="Use with '-history' flag to display stored paths of all files in enforcement database", default=False) + parser.add_argument('-v', '--verbose', action='store_true', dest='verbose', help="Use with history to display entire enforcement history instead of only the most recent 10 entries", default=False) + args = parser.parse_args() + + enforcer = Enforcer(args) + + if len(sys.argv) == 1: + parser.print_help() + ######################### QUERY ARGS ########################### + elif args.log is not None and args.dir is not None: + ### log and dir provided together + parser.print_usage() + elif args.list and args.log is None and args.dir is None and args.all is False and args.history is False: + ### only list flag passed + enforcer.print_library_list() + elif args.history and args.log is None and args.dir is None and args.all is False and args.list is False: + ### only history flag passed + enforcer.db.enforce_show(args.paths, args.verbose) + ######################### ENFORCEMENT ARGS ########################### + elif args.all and args.log is None and args.dir is None and args.list is False and args.history is False: + ### only all flag passed + print('[cover-enforcer]: Enforcing metadata and covers for all books in library...') + n_enforced, completion_time, n_supported_files = enforcer.enforce_all_covers() + if n_enforced == False: + print(f"\n[cover-enforcer]: No supported ebook files found in library (only EPUB & AZW3 formats are currently supported)") + elif n_enforced == n_supported_files: + print(f"\n[cover-enforcer]: SUCCESS: All covers & metadata successfully updated for all {n_enforced} supported ebooks in the library in {completion_time:.2f} seconds!") + elif n_enforced == 0: + print("\n[cover-enforcer]: FAILURE: Supported files found but none we're successfully enforced. See the log above for details.") + elif n_enforced < n_supported_files: + print(f"\n[cover-enforcer]: PARTIAL SUCCESS: Out of {n_supported_files} supported files detected, {n_enforced} were successfully enforced. See log above for details") + elif args.log is None and args.dir is not None and args.all is False and args.list is False and args.history is False: + ### dir passed, no log, not all, no flags + if args.dir[-1] == '/': + args.dir = args.dir[:-1] + if os.path.isdir(args.dir): + book_objects = enforcer.enforce_cover(args.dir) + enforcer.db.enforce_add_entry_from_dir(book_objects) + else: + print(f"[cover-metadata-enforcer]: ERROR: '{args.dir}' is not a valid directory") + elif args.log is not None and args.dir is None and args.all is False and args.list is False and args.history is False: + ### log passed: (args.log), no dir + log_info = enforcer.read_log() + book_dir = enforcer.get_book_dir_from_log(log_info) + if enforcer.enforcer_on: + book_objects = enforcer.enforce_cover(book_dir) + if book_objects == False: + print(f"[cover-enforcer] Metadata for '{log_info['book_title']}' not successfully enforced") + sys.exit(1) + for book in book_objects: + book.log_info = log_info + book.log_info['file_path'] = book.file_path + enforcer.db.enforce_add_entry_from_log(book.log_info) + enforcer.delete_log() + enforcer.check_for_other_logs() + else: # Enforcer has been disabled in the CWA Settings + print(f"[cover-enforcer] The CWA Automatic Metadata enforcement service is currently disabled in the settings. Therefore the metadata changes for {log_info['book_title'].replace(':', '_')} won't be enforced.") + enforcer.delete_log() + else: + parser.print_usage() + + sys.exit(0) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/cwa_db.py b/scripts/cwa_db.py index ce257f9..bebde20 100644 --- a/scripts/cwa_db.py +++ b/scripts/cwa_db.py @@ -7,6 +7,7 @@ from tabulate import tabulate +from cover_enforcer import Book class CWA_DB: def __init__(self, verbose=False): @@ -16,8 +17,11 @@ def __init__(self, verbose=False): self.db_path = "/config/" self.con, self.cur = self.connect_to_db() + self.schema_path = "/app/calibre-web-automated/scripts/cwa_schema.sql" + + self.stats_tables = ["cwa_enforcement", "cwa_import", "cwa_conversions"] self.stats_tables_headers = {"no_path":["Timestamp", "Book ID", "Book Title", "Book Author", "Trigger Type"], - "with_path":["Timestamp","Book ID", "EPUB Path"]} + "with_path":["Timestamp","Book ID", "File Path"]} self.cwa_default_settings = {"default_settings":1, "auto_backup_imports": 1, @@ -27,10 +31,12 @@ def __init__(self, verbose=False): "auto_convert": 1, "auto_convert_target_format": "epub", "auto_convert_ignored_formats":"", - "auto_ingest_ignored_formats":""} + "auto_ingest_ignored_formats":"", + "auto_metadata_enforcement":1} self.tables, self.schema = self.make_tables() - self.ensure_schema_match() + self.ensure_settings_schema_match() + self.match_stat_table_columns_with_schema() self.set_default_settings() self.temp_disable_split_library() @@ -66,10 +72,10 @@ def connect_to_db(self) -> tuple[sqlite3.Connection, sqlite3.Cursor] | None: return con, cur - def make_tables(self) -> None: + def make_tables(self) -> tuple[list[str], list[str]]: """Creates the tables for the CWA DB if they don't already exist""" schema = [] - with open("/app/calibre-web-automated/scripts/cwa_schema.sql", 'r') as f: + with open(self.schema_path, 'r') as f: for line in f: if line != "\n": schema.append(line) @@ -85,7 +91,7 @@ def make_tables(self) -> None: return tables, schema - def ensure_schema_match(self) -> None: + def ensure_settings_schema_match(self) -> None: self.cur.execute("SELECT * FROM cwa_settings") cwa_setting_names = [header[0] for header in self.cur.description] @@ -112,6 +118,47 @@ def ensure_schema_match(self) -> None: print(f"[cwa_db] Deprecated setting found from previous version of CWA, deleting setting '{setting}' from cwa.db...") + def match_stat_table_columns_with_schema(self) -> None: + """ Used to rename columns whose names have been changed in later versions and add columns added in later versions """ + # Produces a dict with all of the column names for each table, from the existing DB + current_column_names = {} + for table in self.stats_tables: + self.cur.execute(f"SELECT * FROM {table}") + setting_names = [header[0] for header in self.cur.description] + current_column_names |= {table:setting_names} + + # Produces a dict with all of the column names for each table, from the schema + column_names_in_schema = {} + for table in self.tables: + column_names = [] + table = table.split('\n') + for line in table: + if line[:27] == "CREATE TABLE IF NOT EXISTS ": + table_name = line[27:].replace('(', '') + elif line[:4] == " ": + column_names.append(line.strip().split(' ')[0]) + column_names_in_schema |= {table_name:column_names} + + for table in self.stats_tables: + if len(current_column_names[table]) < len(column_names_in_schema[table]): # Adds new columns not yet in existing db + num_new_columns = len(column_names_in_schema[table]) - len(current_column_names[table]) + for x in range(1, num_new_columns + 1): + if column_names_in_schema[table][-x] not in current_column_names[table]: + for line in self.schema: + matches = re.findall(column_names_in_schema[table][-x], line) + if matches: + new_column = line.strip().replace(',', '') + self.cur.execute(f"ALTER TABLE {table} ADD {new_column};") + self.con.commit() + print(f'[cwa-db] Missing Column detected in cwa.db. Added new column "{column_names_in_schema[table][-x]}" to table "{table}" in cwa.db') + else: # Number of columns in table matches the schema, now checks whether the names are the same + for x in range(len(column_names_in_schema[table])): + if current_column_names[table][x] != column_names_in_schema[table][x]: + self.cur.execute(f"ALTER TABLE {table} RENAME COLUMN {current_column_names[table][x]} TO {column_names_in_schema[table][x]}") + self.con.commit() + print(f'[cwa-db] Fixed column mismatch between versions. Column "{current_column_names[table][x]}" in table "{table}" renamed to "{column_names_in_schema[table][x]}"', flush=True) + + def set_default_settings(self, force=False) -> None: """Sets default settings for new tables and keeps track if the user is using the default settings or not.\n\n If the argument 'force' is set to True, the function instead sets all settings to their default values""" @@ -189,25 +236,27 @@ def update_cwa_settings(self, result) -> None: def enforce_add_entry_from_log(self, log_info: dict): """Adds an entry to the db from a change log file""" - self.cur.execute("INSERT INTO cwa_enforcement(timestamp, book_id, book_title, author, epub_path, trigger_type) VALUES (?, ?, ?, ?, ?, ?);", (log_info['timestamp'], log_info['book_id'], log_info['book_title'], log_info['author_name'], log_info['epub_path'], 'auto -log')) + self.cur.execute("INSERT INTO cwa_enforcement(timestamp, book_id, book_title, author, file_path, trigger_type) VALUES (?, ?, ?, ?, ?, ?);", (log_info['timestamp'], log_info['book_id'], log_info['book_title'], log_info['author_name'], log_info['file_path'], 'auto -log')) self.con.commit() - def enforce_add_entry_from_dir(self, book_info: dict): - """Adds an entry to the db when cover-enforcer is ran with a directory""" - self.cur.execute("INSERT INTO cwa_enforcement(timestamp, book_id, book_title, author, epub_path, trigger_type) VALUES (?, ?, ?, ?, ?, ?);", (book_info['timestamp'], book_info['book_id'], book_info['book_title'], book_info['author_name'], book_info['epub_path'], 'manual -dir')) - self.con.commit() + def enforce_add_entry_from_dir(self, book_objects: list[Book]): + """Adds an entry to the db when cover_enforcer is ran with a directory""" + for book in book_objects: + self.cur.execute("INSERT INTO cwa_enforcement(timestamp, book_id, book_title, author, file_path, trigger_type) VALUES (?, ?, ?, ?, ?, ?);", (book.timestamp, book.book_id, book.book_title, book.author_name, book.file_path, 'manual -dir')) + self.con.commit() - def enforce_add_entry_from_all(self, book_info: dict): - """Adds an entry to the db when cover-enforcer is ran with the -all flag""" - self.cur.execute("INSERT INTO cwa_enforcement(timestamp, book_id, book_title, author, epub_path, trigger_type) VALUES (?, ?, ?, ?, ?, ?);", (book_info['timestamp'], book_info['book_id'], book_info['book_title'], book_info['author_name'], book_info['epub_path'], 'manual -all')) - self.con.commit() + def enforce_add_entry_from_all(self, book_objects: list[Book]): + """Adds an entry to the db when cover_enforcer is ran with the -all flag""" + for book in book_objects: + self.cur.execute("INSERT INTO cwa_enforcement(timestamp, book_id, book_title, author, file_path, trigger_type) VALUES (?, ?, ?, ?, ?, ?);", (book.timestamp, book.book_id, book.book_title, book.author_name, book.file_path, 'manual -all')) + self.con.commit() def enforce_show(self, paths: bool, verbose: bool, web_ui=False): results_no_path = self.cur.execute("SELECT timestamp, book_id, book_title, author, trigger_type FROM cwa_enforcement ORDER BY timestamp DESC;").fetchall() - results_with_path = self.cur.execute("SELECT timestamp, book_id, epub_path FROM cwa_enforcement ORDER BY timestamp DESC;").fetchall() + results_with_path = self.cur.execute("SELECT timestamp, book_id, file_path FROM cwa_enforcement ORDER BY timestamp DESC;").fetchall() if paths: if verbose: results_with_path.reverse() @@ -294,7 +343,7 @@ def conversion_add_entry(self, filename, original_format, original_backed_up): # def main(): - cwa_db = CWA_DB() + db = CWA_DB() if __name__ == "__main__": diff --git a/scripts/cwa_schema.sql b/scripts/cwa_schema.sql index 2d3c985..e2d69b7 100644 --- a/scripts/cwa_schema.sql +++ b/scripts/cwa_schema.sql @@ -4,7 +4,7 @@ CREATE TABLE IF NOT EXISTS cwa_enforcement( book_id INTEGER NOT NULL, book_title TEXT NOT NULL, author TEXT NOT NULL, - epub_path TEXT NOT NULL, + file_path TEXT NOT NULL, trigger_type TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS cwa_import( @@ -29,5 +29,6 @@ CREATE TABLE IF NOT EXISTS cwa_settings( auto_convert SMALLINT DEFAULT 1 NOT NULL, auto_convert_target_format TEXT DEFAULT "epub" NOT NULL, auto_convert_ignored_formats TEXT DEFAULT "" NOT NULL, - auto_ingest_ignored_formats TEXT DEFAULT "" NOT NULL + auto_ingest_ignored_formats TEXT DEFAULT "" NOT NULL, + auto_metadata_enforcement SMALLINT DEFAULT 1 NOT NULL ); \ No newline at end of file diff --git a/scripts/setup-cwa.sh b/scripts/setup-cwa.sh index 2930768..35215af 100644 --- a/scripts/setup-cwa.sh +++ b/scripts/setup-cwa.sh @@ -35,7 +35,7 @@ add_aliases () { echo "alias cwa-change-dirs='nano /app/calibre-web-automated/dirs.json'" | cat >> ~/.bashrc echo "cover-enforcer () {" | cat >> ~/.bashrc - echo ' python3 /app/calibre-web-automated/scripts/cover-enforcer.py "$@"' | cat >> ~/.bashrc + echo ' python3 /app/calibre-web-automated/scripts/cover_enforcer.py "$@"' | cat >> ~/.bashrc echo "}" | cat >> ~/.bashrc echo "convert-library () {" | cat >> ~/.bashrc