Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Demozoo importer #257

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 16 additions & 19 deletions scrapers/py_importers/demozoo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,8 @@
# URL is structured in this way:
# https://demozoo.org/productions/?platform={internal_no_platform}&production_type={internal_prodtype_number}

import sys
import re
import os
import json
import shutil
import zipfile
import fnmatch
import urllib3

import requests
import unicodedata
from urllib.request import urlopen
from bs4 import BeautifulSoup

from py_common.Logger import Logger
Expand Down Expand Up @@ -71,13 +62,15 @@ def scrape(platform):
page = requests.get(baseurl + "/productions/?platform=" + str(PLATFORMS[platform][0]) + "&page=1", timeout=None)
soup = BeautifulSoup(page.content, 'html.parser')

# get total number of pages
span_pages = soup.find("span", {"class":"current"})
numberofpages = int(str.strip(span_pages.text).split(" ")[-1].split(".")[0])
logger.write("[INFO]", "Total number of pages: " + str(numberofpages) )

# parsing every page
for i in range(0, numberofpages):
enough_page = True
i = 0
while enough_page:
if soup.find('a', {"title": "Next_page"}):
enough_page = True
else:
enough_page = False

logger.write("[INFO]", "Parsing page: " + str(i+1) )
#TODO: dont call twice this page, as it is called before

Expand Down Expand Up @@ -107,7 +100,7 @@ def scrape(platform):

# check if it could be added to database or not
# building files
ret = utils.build(prod, entrypath, ["GB", "GBC"]) # TODO: GBA, add GBA to this list
ret = utils.build(prod, entrypath, ["gb", "gbc"]) # TODO: GBA, add GBA to this list

# make required JSON file
if ret != 1:
Expand Down Expand Up @@ -165,7 +158,7 @@ def scrape_page(slug, url, platform):

# fetching screenshot
screen_obj = soup.find('a', {"class": "screenshot"})
if screen_obj != None:
if screen_obj is not None:
screenshot = screen_obj.get("href")
else:
screenshot = "None"
Expand All @@ -178,7 +171,7 @@ def scrape_page(slug, url, platform):

# fetching url (if present)
url = soup.find('ul', {"class": "download_links"})
if url != None:
if url is not None:
url = url.findChildren("a")
else:
# it doesn't make any sense to have a prod without DL link
Expand All @@ -196,11 +189,15 @@ def scrape_page(slug, url, platform):
elif len(url) >= 2:
# because almost always the prod will have the secondary mirror as scene.org or smth like that
url = url[1].get("href")
if "scene.org" in url and "view" in url:
url = url.replace("view", "get")

# fetching video
video = soup.find(lambda tag: tag.name == "a" and "youtube" in tag.text.lower())
video = video.get("href") if video else ""

files = [f"{slug}.{platform.lower()}"]

return Production(title, slug, developer, platform, typetag, screenshots, files, video, repository=source, url=url)

def main():
Expand Down
227 changes: 102 additions & 125 deletions scrapers/py_importers/py_common/utils.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,20 @@
import sys
import py_common.utils
import re
import json
import shutil
import zipfile
import fnmatch
import urllib3
import requests
import unicodedata
import contextlib
import urllib
from urllib.request import urlopen
import imghdr
from PIL import Image

import os
from os import listdir
from os.path import isfile, join

from bs4 import BeautifulSoup
from unidecode import unidecode

from py_common.Logger import Logger
from py_common.Production import Production
import py7zr

###########################
### GLOBAL VAR AND CONS ###
Expand Down Expand Up @@ -115,139 +107,124 @@ def fetch_prod_name(prod, suffix, filepath):
return path


def build(prod: Production, entrypath: str, desired_extentions: list):

def build(prod, entrypath: str, desired_extensions: list):
'''
given a prod "Production" object containing
all production's data, create a proper named folder, fetches all files (screenshot + rom)
and properly organize everything
Given a prod "Production" object containing
all production's data, create a properly named folder, fetch all files (screenshot + ROM),
and organize everything.
'''
if not os.path.exists(entrypath + prod.slug):
#############
# PROD FILE #
#############
# make its own folder
os.mkdir(entrypath + prod.slug, 0o777)

# figuring out the suffix
suffix = str.lower(prod.url.split(".")[-1])
if suffix not in desired_extentions:
suffix = "gb"

# building the filepath
filepath = entrypath + prod.slug + "/"

# download the file
# in case of http
if prod.url.startswith("http"):
try:
r = requests.get(prod.url, allow_redirects=True,
timeout=None, verify=False, headers=headers)
if r.status_code != 200:
logger.write("[ERR]:", str(r.status_code) +
": " + prod.slug + " - " + prod.url)

# cleaning in case of error
shutil.rmtree(entrypath + prod.slug)
return 1
except ConnectionError as e:
logger.write("[ERR]:", str(r.status_code) +
": " + prod.slug + " - " + prod.url)
logger.write("[ERR]:", "REASON: " + e)

# cleaning in case of error
shutil.rmtree(entrypath + prod.slug)
return 1
open(filepath + prod.slug + "." + suffix, 'wb').write(r.content)
else:
with contextlib.closing(urllib.request.urlopen(prod.url)) as r:
with open(filepath + prod.slug + "." + suffix, 'wb') as f:
shutil.copyfileobj(r, f)

# unzip in case of zip
if prod.url.endswith(".zip") or prod.url.endswith(".ZIP"):
# download and unzip
try:
with zipfile.ZipFile(filepath + prod.slug + "." + suffix, "r") as zip_ref:
zip_ref.extractall(filepath + "unzippedfolder")
# Create folder if not already present
target_folder = os.path.join(entrypath, prod.slug)
if not os.path.exists(target_folder):
os.mkdir(target_folder, 0o777)

# manage all extensions, and it doesn't matter if they have uppercase or lowercase
path = [] # eventually the file
# Extract file extension
suffix = prod.url.split(".")[-1].lower()

if suffix not in desired_extensions and suffix not in ["zip", "7z", "mp4"]:
print(f"ERROR: {prod.slug} extension is not in {desired_extensions}")
suffix = "gb" # Fallback extension

extentions = fix_extentions(desired_extentions)
for extension in extentions:
path = fetch_prod_name(prod, extension, filepath)
if path != []:
break
# Build the file path
filepath = os.path.join(target_folder, f"{prod.slug}.{suffix}")

# proper renaming and moving the file
if path != []:
os.rename(path[0], filepath + prod.slug +
"." + extension.lower())
# Download the file
try:
if prod.url.startswith("http"):
r = requests.get(prod.url, allow_redirects=True, timeout=None, verify=False)
if r.status_code != 200:
raise Exception(f"HTTP Error {r.status_code}")
with open(filepath, 'wb') as f:
f.write(r.content)
else:
with contextlib.closing(urllib.request.urlopen(prod.url)) as r:
with open(filepath, 'wb') as f:
shutil.copyfileobj(r, f)
except Exception as e:
logger.write("[ERR]:", f"Error downloading {prod.slug}: {e}")
shutil.rmtree(target_folder)
return 1

# Unzip and handle files
if suffix in ["zip", "7z"]:
unzipped_path = os.path.join(target_folder, "unzippedfolder")
os.makedirs(unzipped_path, exist_ok=True)

# update production object file
prod.files.append(prod.slug + "." + extension.lower())
else:
logger.write(
"[WARN]", prod.title + " extension is not a " + prod.platform + " file.")
shutil.rmtree(entrypath + prod.slug)
return 1

# cleaning up unneeded files
shutil.rmtree(filepath + "unzippedfolder")
if CLEANZIP:
os.remove(filepath + prod.slug + "." + "zip")
except zipfile.BadZipFile as e:
logger.write("[ERR] ", str(e) + " bad zip file")
shutil.rmtree(entrypath + prod.slug)
try:
if suffix == "zip":
with zipfile.ZipFile(filepath, "r") as zip_ref:
zip_ref.extractall(unzipped_path)
elif suffix == "7z":
with py7zr.SevenZipFile(filepath, mode='r') as z:
z.extractall(unzipped_path)
except Exception as e:
logger.write("[ERR]:", f"Failed to extract {suffix} file: {e}")
shutil.rmtree(target_folder)
return 1
else:
# it is a proper gb file -> just write the filename in its own structure field
pass

# download the screenshot
if prod.screenshots != None and prod.screenshots != [] and prod.screenshots[0] != "None":
r = requests.get(
prod.screenshots[0], allow_redirects=True, timeout=None)

# figuring out what kind of screenshots I am dealing with
screen_file_path = filepath + prod.slug + "."

# screenshot fileext
screen_ext = prod.screenshots[0].split(".")[-1]
logger.write("[INFO]", " The screenshot is in " +
screen_ext + " format")

if screen_ext.lower() == "png":
screen_file_path += "png"
else:
screen_file_path += screen_ext

open(screen_file_path, 'wb').write(r.content)
# Search for desired extensions in the extracted folder
valid_file_found = False

# Recursively search all files under the unzipped path
for root, _, files in os.walk(unzipped_path):
for file in files:
ext = file.split(".")[-1].lower()
if ext in desired_extensions:
extracted_file = os.path.join(root, file)
final_file = os.path.join(target_folder, f"{prod.slug}.{ext}")

# Move the valid file to the target folder
shutil.move(extracted_file, final_file)
prod.files.append(f"{prod.slug}.{ext}")

valid_file_found = True
break

if valid_file_found:
break

if screen_ext != "png":
im = Image.open(screen_file_path).convert("RGB")
im.save(filepath + prod.slug + ".png", "png")
if not valid_file_found:
logger.write("[WARN]:", f"No valid files with extensions {desired_extensions} found.")
shutil.rmtree(target_folder)
return 1

logger.write(
"[INFO]", " Screenshot has been converted into a PNG file.")
logger.write("[INFO]", " Removing screenshot " +
screen_ext + " file...")
# Clean up unzipped files and original archive
shutil.rmtree(unzipped_path)
if CLEANZIP:
os.remove(filepath)
else:
prod.files.append(f"{prod.slug}.{suffix}")

os.remove(screen_file_path)
# Handle screenshots
if prod.screenshots and prod.screenshots[0] != "None":
try:
r = requests.get(prod.screenshots[0], allow_redirects=True, timeout=None)
screen_ext = prod.screenshots[0].split(".")[-1].lower()
screen_file = os.path.join(target_folder, f"{prod.slug}.{screen_ext}")
with open(screen_file, 'wb') as f:
f.write(r.content)

# Convert to PNG if necessary
if screen_ext != "png":
img = Image.open(screen_file).convert("RGB")
png_file = os.path.join(target_folder, f"{prod.slug}.png")
img.save(png_file, "PNG")
os.remove(screen_file)
prod.screenshots[0] = f"{prod.slug}.png"
else:
prod.screenshots[0] = f"{prod.slug}.png"
except Exception as e:
logger.write("[ERR]:", f"Failed to download screenshot for {prod.slug}: {e}")
prod.screenshots = []

open(filepath + prod.slug + "." + "png", 'wb').write(r.content)
prod.screenshots[0] = prod.slug + "." + "png"
else:
prod.screenshots = []
logger.write(
"[INFO]", "Screenshot not present for this production")
else:
logger.write(
"[WARN]", "directory already present. Skipping " + prod.slug + "...")
logger.write("[WARN]:", f"Directory already exists for {prod.slug}. Skipping...")
return 1
return 0



def fix_extentions(desired_extentions):
'''
given a theorical list of extensions, it returns a list containing additional correct extensions (like CGB, AGB)
Expand Down
1 change: 1 addition & 0 deletions scrapers/py_importers/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ webencodings==0.5.1
wget==3.2
webptools==0.0.5
pillow==8.3.2
py7zr==0.22.0
Loading