Skip to content

Commit

Permalink
fix: now we correctly manage the extension
Browse files Browse the repository at this point in the history
  • Loading branch information
dag7dev committed Dec 17, 2024
1 parent 898b000 commit 96fae00
Showing 1 changed file with 98 additions and 130 deletions.
228 changes: 98 additions & 130 deletions scrapers/py_importers/py_common/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import re
import json
import shutil
Expand All @@ -15,7 +14,6 @@
from unidecode import unidecode

from py_common.Logger import Logger
from py_common.Production import Production
import py7zr

###########################
Expand Down Expand Up @@ -109,154 +107,124 @@ def fetch_prod_name(prod, suffix, filepath):
return path


def build(prod: Production, entrypath: str, desired_extentions: list):

def build(prod, entrypath: str, desired_extensions: list):
'''
given a prod "Production" object containing
all production's data, create a proper named folder, fetches all files (screenshot + rom)
and properly organize everything
Given a prod "Production" object containing
all production's data, create a properly named folder, fetch all files (screenshot + ROM),
and organize everything.
'''
if not os.path.exists(entrypath + prod.slug):
#############
# PROD FILE #
#############
# make its own folder
os.mkdir(entrypath + prod.slug, 0o777)

# figuring out the suffix
suffix = str.lower(prod.url.split(".")[-1])
# Create folder if not already present
target_folder = os.path.join(entrypath, prod.slug)
if not os.path.exists(target_folder):
os.mkdir(target_folder, 0o777)

# Extract file extension
suffix = prod.url.split(".")[-1].lower()

if suffix not in desired_extentions:
print("ERROR: " + prod.slug + " extension is not a " +
str(desired_extentions) + " file!")
suffix = "gb"
if suffix not in desired_extensions and suffix not in ["zip", "7z", "mp4"]:
print(f"ERROR: {prod.slug} extension is not in {desired_extensions}")
suffix = "gb" # Fallback extension

# building the filepath
filepath = entrypath + prod.slug + "/"
# Build the file path
filepath = os.path.join(target_folder, f"{prod.slug}.{suffix}")

# download the file
# in case of http
if prod.url.startswith("http"):
try:
r = requests.get(prod.url, allow_redirects=True,
timeout=None, verify=False, headers=headers)
# Download the file
try:
if prod.url.startswith("http"):
r = requests.get(prod.url, allow_redirects=True, timeout=None, verify=False)
if r.status_code != 200:
logger.write("[ERR]:", str(r.status_code) +
": " + prod.slug + " - " + prod.url)

# cleaning in case of error
shutil.rmtree(entrypath + prod.slug)
return 1
except ConnectionError as e:
logger.write("[ERR]:", str(r.status_code) +
": " + prod.slug + " - " + prod.url)
logger.write("[ERR]:", "REASON: " + e)

# cleaning in case of error
shutil.rmtree(entrypath + prod.slug)
return 1
open(filepath + prod.slug + "." + suffix, 'wb').write(r.content)
else:
with contextlib.closing(urllib.request.urlopen(prod.url)) as r:
with open(filepath + prod.slug + "." + suffix, 'wb') as f:
shutil.copyfileobj(r, f)

# unzip in case of zip
raise Exception(f"HTTP Error {r.status_code}")
with open(filepath, 'wb') as f:
f.write(r.content)
else:
with contextlib.closing(urllib.request.urlopen(prod.url)) as r:
with open(filepath, 'wb') as f:
shutil.copyfileobj(r, f)
except Exception as e:
logger.write("[ERR]:", f"Error downloading {prod.slug}: {e}")
shutil.rmtree(target_folder)
return 1

# Unzip and handle files
if suffix in ["zip", "7z"]:
# manage zip file
if prod.url == "zip":
try:
with zipfile.ZipFile(filepath + prod.slug + "." + suffix, "r") as zip_ref:
zip_ref.extractall(filepath + "unzippedfolder")
except zipfile.BadZipFile as e:
logger.write("[ERR] ", str(e) + " bad zip file")
shutil.rmtree(entrypath + prod.slug)
return 1
# manage 7z file
elif suffix == "7z":
try:
with py7zr.SevenZipFile(filepath + prod.slug + "." + suffix, mode='r') as z:
z.extractall(filepath + "unzippedfolder")
except py7zr.exceptions.Bad7zFile as e:
logger.write("[ERR] ", str(e) + " bad 7z file")
shutil.rmtree(entrypath + prod.slug)
return 1

# manage all extensions, and it doesn't matter if they have uppercase or lowercase
path = [] # eventually the file
unzipped_path = os.path.join(target_folder, "unzippedfolder")
os.makedirs(unzipped_path, exist_ok=True)

extentions = fix_extentions(desired_extentions)
for extension in extentions:
path = fetch_prod_name(prod, extension, filepath)
if path != []:
break
try:
if suffix == "zip":
with zipfile.ZipFile(filepath, "r") as zip_ref:
zip_ref.extractall(unzipped_path)
elif suffix == "7z":
with py7zr.SevenZipFile(filepath, mode='r') as z:
z.extractall(unzipped_path)
except Exception as e:
logger.write("[ERR]:", f"Failed to extract {suffix} file: {e}")
shutil.rmtree(target_folder)
return 1

# proper renaming and moving the file
if path != []:
os.rename(path[0], filepath + prod.slug +
"." + extension.lower())
# Search for desired extensions in the extracted folder
valid_file_found = False

# Recursively search all files under the unzipped path
for root, _, files in os.walk(unzipped_path):
for file in files:
ext = file.split(".")[-1].lower()
if ext in desired_extensions:
extracted_file = os.path.join(root, file)
final_file = os.path.join(target_folder, f"{prod.slug}.{ext}")

# Move the valid file to the target folder
shutil.move(extracted_file, final_file)
prod.files.append(f"{prod.slug}.{ext}")

valid_file_found = True
break

if valid_file_found:
break

# update production object file
prod.files.append(prod.slug + "." + extension.lower())
else:
logger.write(
"[WARN]", prod.title + " extension is not a " + prod.platform + " file.")
shutil.rmtree(entrypath + prod.slug)
if not valid_file_found:
logger.write("[WARN]:", f"No valid files with extensions {desired_extensions} found.")
shutil.rmtree(target_folder)
return 1

# cleaning up unneeded files
shutil.rmtree(filepath + "unzippedfolder")

for ext in ["zip", "7z"]:
if CLEANZIP and os.path.exists(filepath + prod.slug + "." + ext):
os.remove(filepath + prod.slug + "." + ext)
# Clean up unzipped files and original archive
shutil.rmtree(unzipped_path)
if CLEANZIP:
os.remove(filepath)
else:
# it is a proper gb file -> just write the filename in its own structure field
pass

# download the screenshot
if prod.screenshots is not None and prod.screenshots != [] and prod.screenshots[0] != "None":
r = requests.get(
prod.screenshots[0], allow_redirects=True, timeout=None)

# figuring out what kind of screenshots I am dealing with
screen_file_path = filepath + prod.slug + "."

# screenshot fileext
screen_ext = prod.screenshots[0].split(".")[-1]
logger.write("[INFO]", " The screenshot is in " +
screen_ext + " format")

if screen_ext.lower() == "png":
screen_file_path += "png"
else:
screen_file_path += screen_ext
prod.files.append(f"{prod.slug}.{suffix}")

open(screen_file_path, 'wb').write(r.content)

if screen_ext != "png":
im = Image.open(screen_file_path).convert("RGB")
im.save(filepath + prod.slug + ".png", "png")

logger.write(
"[INFO]", " Screenshot has been converted into a PNG file.")
logger.write("[INFO]", " Removing screenshot " +
screen_ext + " file...")

os.remove(screen_file_path)
# Handle screenshots
if prod.screenshots and prod.screenshots[0] != "None":
try:
r = requests.get(prod.screenshots[0], allow_redirects=True, timeout=None)
screen_ext = prod.screenshots[0].split(".")[-1].lower()
screen_file = os.path.join(target_folder, f"{prod.slug}.{screen_ext}")
with open(screen_file, 'wb') as f:
f.write(r.content)

# Convert to PNG if necessary
if screen_ext != "png":
img = Image.open(screen_file).convert("RGB")
png_file = os.path.join(target_folder, f"{prod.slug}.png")
img.save(png_file, "PNG")
os.remove(screen_file)
prod.screenshots[0] = f"{prod.slug}.png"
else:
prod.screenshots[0] = f"{prod.slug}.png"
except Exception as e:
logger.write("[ERR]:", f"Failed to download screenshot for {prod.slug}: {e}")
prod.screenshots = []

open(filepath + prod.slug + "." + "png", 'wb').write(r.content)
prod.screenshots[0] = prod.slug + "." + "png"
else:
prod.screenshots = []
logger.write(
"[INFO]", "Screenshot not present for this production")
else:
logger.write(
"[WARN]", "directory already present. Skipping " + prod.slug + "...")
logger.write("[WARN]:", f"Directory already exists for {prod.slug}. Skipping...")
return 1
return 0



def fix_extentions(desired_extentions):
'''
given a theorical list of extensions, it returns a list containing additional correct extensions (like CGB, AGB)
Expand Down

0 comments on commit 96fae00

Please sign in to comment.