Skip to content

Commit

Permalink
feat: improve scrapy web data gathering
Browse files Browse the repository at this point in the history
Move to v0.0.4.
  • Loading branch information
cofiem committed Mar 16, 2023
1 parent 2d7e303 commit e077241
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 206 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Change log

## [v0.0.4](https://github.com/anotherbyte-net/gather-vision/releases/tag/v0.0.4)

[full change log](https://github.com/anotherbyte-net/gather-vision/compare/v0.0.3...v0.0.4)

- allow providing data storage path
- improve scrapy web data gathering

## [v0.0.3](https://github.com/anotherbyte-net/gather-vision/releases/tag/v0.0.3)

[full change log](https://github.com/anotherbyte-net/gather-vision/compare/v0.0.2...v0.0.3)
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ source .venv-test/bin/activate
python -m pip install --upgrade pip setuptools wheel
python -m pip install --upgrade -r requirements.txt

GATHER_VISION_VERSION='0.0.3'
GATHER_VISION_VERSION='0.0.4'
pip install --index-url https://test.pypi.org/simple/ --no-deps gather-vision==$GATHER_VISION_VERSION
# or
pip install dist/gather_vision-$GATHER_VISION_VERSION-py3-none-any.whl
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.3
0.0.4
248 changes: 130 additions & 118 deletions src/gather_vision/app.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
"""The main application features."""

import dataclasses
import logging
import pathlib
import pickle
import tempfile
import typing

import scrapy
from importlib_metadata import EntryPoint, entry_points
from scrapy.crawler import CrawlerProcess
from scrapy.exporters import BaseItemExporter
from scrapy.http import Response, TextResponse
import importlib_metadata
from scrapy import crawler, exporters, http

from gather_vision import utils
from gather_vision.plugin import data as plugin_data, entry as plugin_entry
from gather_vision.plugin import data, entry

logger = logging.getLogger(__name__)

Expand All @@ -22,12 +19,12 @@
class PluginItem:
"""Information about a plugin."""

entry_point: EntryPoint
entry_class: typing.Type[plugin_entry.Entry]
entry_instance: plugin_entry.Entry
entry_point: importlib_metadata.EntryPoint
entry_class: typing.Type[entry.Entry]
entry_instance: entry.Entry


class AppPickleItemExporter(BaseItemExporter):
class AppPickleItemExporter(exporters.BaseItemExporter):
def __init__(self, file, protocol=4, **kwargs):
super().__init__(**kwargs)
self.file = file
Expand Down Expand Up @@ -62,7 +59,7 @@ def load(self) -> typing.List[PluginItem]:
logger.info("Loaded %s plugins.", len(self._available))
return self._available

def update(self, args: plugin_entry.UpdateArgs) -> plugin_entry.UpdateResult:
def update(self, args: entry.UpdateArgs) -> entry.UpdateResult:
"""Execute the update action for all plugins or the plugin with the given name.
Args:
Expand All @@ -71,133 +68,145 @@ def update(self, args: plugin_entry.UpdateArgs) -> plugin_entry.UpdateResult:
Returns:
The result of running the plugin's update process.
"""
loaded_plugins = list(self.load())
named_plugins = [
i
for i in self.load()
for i in loaded_plugins
if args.name is None or i.entry_point.name == args.name
]
available_plugins = ", ".join(
sorted(i.entry_point.name for i in loaded_plugins)
)

if len(named_plugins) == 0:
named_count = len(named_plugins)
available_count = len(loaded_plugins)

if named_count == 0:
raise utils.GatherVisionException(
f"Could not find plugin named '{args.name}'."
f"Could not find plugin named '{args.name}'. "
f"Available plugins ({available_count}): {available_plugins}."
)

if args.name and len(named_plugins) > 1:
if args.name and named_count > 1:
raise utils.GatherVisionException(
f"Found multiple plugins named '{args.name}'."
f"Available plugins ({available_count}): {available_plugins}."
)

# load data from local sources first
local_data: typing.List[plugin_data.LocalData] = []
# get the data sources
local_data: typing.List[data.LocalData] = []
web_data: typing.List[data.WebData] = []
for named_plugin in named_plugins:
plugin_update_result = named_plugin.entry_instance.update(args)

# load data from local sources
for local_data_item in plugin_update_result.local_data:
local_data_item.data = list(local_data_item.load_resources())
local_data.extend(plugin_update_result.local_data)

# get the web data sources
web_data.extend(plugin_update_result.web_data)

logger.info("Loaded %s local data sources.", len(local_data))

# allow running multiple plugins at once
# gather WebData subclasses and run the spider
web_data: typing.List[plugin_data.WebData] = []
for named_plugin in named_plugins:
plugin_update_result = named_plugin.entry_instance.update(args)
web_data.extend(plugin_update_result.web_data)

# run the spider
logger.info("Starting %s web data sources.", len(web_data))
web_data_map = dict([(self._data_item_id(i), i) for i in web_data])

# run the web data to get items, using scrapy
# save the feed to a temp file, then read the items back in
feed_items = []
with tempfile.TemporaryDirectory() as temp_dir:
feed_path = pathlib.Path(temp_dir, "feed_%(name)s_%(time)s.pickle")
feed_path_setting = str(feed_path).replace("\\", "/")

files_path = pathlib.Path(temp_dir, "feed_%(name)s_%(time)s.files")
files_path_setting = str(files_path).replace("\\", "/")

process = CrawlerProcess(
settings={
"USER_AGENT": "gather-vision (+https://github.com/anotherbyte-net/gather-vision)",
# http cache
"HTTPCACHE_ENABLED": True,
"HTTPCACHE_DIR": ".httpcache",
"HTTPCACHE_POLICY": "scrapy.extensions.httpcache.DummyPolicy",
"HTTPCACHE_STORAGE": "scrapy.extensions.httpcache.FilesystemCacheStorage",
"EXTENSIONS": {
"scrapy.extensions.telnet.TelnetConsole": None,
},
# feed
"FEED_EXPORTERS": {
"pickle_raw": "gather_vision.app.AppPickleItemExporter",
},
"FEEDS": {
f"file:///{feed_path_setting}": {"format": "pickle_raw"},
},
"WEB_DATA_ITEMS": web_data,
# logs
"LOG_ENABLED": True,
"LOG_FILE": None,
"LOG_STDOUT": False,
"LOG_LEVEL": "ERROR",
# throttling requests
"DOWNLOAD_DELAY": 3,
"AUTOTHROTTLE_ENABLED": True,
"AUTOTHROTTLE_START_DELAY": 3,
"AUTOTHROTTLE_MAX_DELAY": 60,
"AUTOTHROTTLE_TARGET_CONCURRENCY": 1.0,
# pipelines
"ITEM_PIPELINES": {
"scrapy.pipelines.files.FilesPipeline": 1,
},
"FILES_STORE": files_path_setting,
"MEDIA_ALLOW_REDIRECTS": True,
# Set settings whose default value is deprecated to a future-proof value
"REQUEST_FINGERPRINTER_IMPLEMENTATION": "2.7",
"TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor",
"FEED_EXPORT_ENCODING": "utf-8",
},
install_root_handler=True,
)

process.crawl(WebDataFetch)
# build the output paths

logging.getLogger("scrapy").setLevel("ERROR")
logging.getLogger("py.warnings").setLevel("CRITICAL")
if not args.data_path:
raise ValueError(f"Invalid data path '{args.data_path}'.")

# the script will block here until the crawling is finished
process.start()
feed_path = args.data_path / "feeds" / "feed_%(name)s_%(time)s.pickle"
feed_path_setting = str(feed_path).replace("\\", "/")

# f = io.BytesIO()
# pickle.dump(items, f)
#
# f.seek(0)
# result = pickle.load(f)
files_dir = args.data_path / "files"
files_dir_setting = str(files_dir).replace("\\", "/")

# load the feed items
for item in feed_path.parent.iterdir():
if not item.is_file():
continue
if item.suffix != ".pickle":
continue
http_cache_dir = args.data_path / "http_cache"
http_cache_dir_setting = str(http_cache_dir).replace("\\", "/")

with item.open("rb") as f:
while True:
try:
feed_items.append(pickle.load(f))
except EOFError:
break
# run the web data to get items, using scrapy
# save the feed to a temp file, then read the items back in
process = crawler.CrawlerProcess(
settings={
"USER_AGENT": "gather-vision (+https://github.com/anotherbyte-net/gather-vision)",
# http cache
"HTTPCACHE_ENABLED": True,
"HTTPCACHE_DIR": http_cache_dir_setting,
"HTTPCACHE_POLICY": "scrapy.extensions.httpcache.DummyPolicy",
"HTTPCACHE_STORAGE": "scrapy.extensions.httpcache.FilesystemCacheStorage",
"EXTENSIONS": {
"scrapy.extensions.telnet.TelnetConsole": None,
},
# feed
"FEED_EXPORTERS": {
"pickle_raw": "gather_vision.app.AppPickleItemExporter",
},
"FEEDS": {
f"file:///{feed_path_setting}": {"format": "pickle_raw"},
},
"WEB_DATA_ITEMS": web_data,
# logs
"LOG_ENABLED": True,
"LOG_FILE": None,
"LOG_STDOUT": False,
"LOG_LEVEL": "ERROR",
# throttling requests
"DOWNLOAD_DELAY": 3,
"AUTOTHROTTLE_ENABLED": True,
"AUTOTHROTTLE_START_DELAY": 3,
"AUTOTHROTTLE_MAX_DELAY": 60,
"AUTOTHROTTLE_TARGET_CONCURRENCY": 1.0,
# pipelines
"ITEM_PIPELINES": {
"scrapy.pipelines.files.FilesPipeline": 1,
},
"FILES_STORE": files_dir_setting,
"MEDIA_ALLOW_REDIRECTS": True,
# Set settings whose default value is deprecated to a future-proof value
"REQUEST_FINGERPRINTER_IMPLEMENTATION": "2.7",
"TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor",
"FEED_EXPORT_ENCODING": "utf-8",
},
install_root_handler=True,
)

logger.info("Loaded %s data items from web data sources.", len(feed_items))
process.crawl(WebDataFetch)

logging.getLogger("scrapy").setLevel("ERROR")
logging.getLogger("py.warnings").setLevel("CRITICAL")

# the script will block here until the crawling is finished
process.start()

# load the feed items
feed_item_count = 0
for item in feed_path.parent.iterdir():
if not item.is_file():
continue
if item.suffix != ".pickle":
continue

with item.open("rb") as f:
while True:
try:
# store PluginDataItems in the related PluginWebData instance
web_data_item = pickle.load(f)
map_id = self._data_item_id(web_data_item)
web_data_map[map_id].data = [web_data_item]
feed_item_count += 1
except EOFError:
break

logger.info("Loaded %s data items from web data sources.", feed_item_count)
logger.info("Finished update.")

# TODO: still need to do something with the feed_items?

# TODO: save results?
return entry.UpdateResult(web_data=web_data, local_data=local_data)

return plugin_entry.UpdateResult(web_data=web_data, local_data=local_data)

def list(self, args: plugin_entry.ListArgs) -> plugin_entry.ListResult:
def list(self, args: entry.ListArgs) -> entry.ListResult:
"""List all available plugins.
Args:
Expand All @@ -210,12 +219,14 @@ def list(self, args: plugin_entry.ListArgs) -> plugin_entry.ListResult:
for plugin_item in self.load():
result = plugin_item.entry_instance.list(args)
items.update(result.items)
return plugin_entry.ListResult(items)
return entry.ListResult(items)

def _get_entry_points(self, group: str):
return entry_points(group=group)
return importlib_metadata.entry_points(group=group)

def _build_plugin_item(self, entry_point: EntryPoint) -> PluginItem:
def _build_plugin_item(
self, entry_point: importlib_metadata.EntryPoint
) -> PluginItem:
entry_class = entry_point.load()
item = PluginItem(
entry_point=entry_point,
Expand All @@ -224,14 +235,15 @@ def _build_plugin_item(self, entry_point: EntryPoint) -> PluginItem:
)
return item

def _data_item_id(self, item) -> str:
return "-".join([item.plugin_name, item.plugin_data_source])


class WebDataFetch(scrapy.Spider):
name = "web-data"

def start_requests(self):
web_data_items: typing.List[plugin_data.WebData] = self.settings.get(
"WEB_DATA_ITEMS"
)
web_data_items: typing.List[data.WebData] = self.settings.get("WEB_DATA_ITEMS")
for web_data_item in web_data_items:
for initial_url in web_data_item.initial_urls():
yield scrapy.Request(
Expand All @@ -240,19 +252,19 @@ def start_requests(self):
cb_kwargs={"web_data_item": web_data_item},
)

def parse(self, response: Response, **kwargs):
web_data_item: plugin_data.WebData = response.cb_kwargs.get("web_data_item")
def parse(self, response: http.Response, **kwargs):
web_data_item: data.WebData = response.cb_kwargs.get("web_data_item")

is_json = "json" in response.headers["Content-Type"].decode("utf-8").lower()

if isinstance(response, TextResponse):
if isinstance(response, http.TextResponse):
body_data = response.json() if is_json else None
selector = response.selector
else:
body_data = None
selector = None

data = plugin_data.WebDataAvailable(
web_data = data.WebDataAvailable(
request_url=response.request.url,
request_method=response.request.method,
response_url=response.url,
Expand All @@ -263,7 +275,7 @@ def parse(self, response: Response, **kwargs):
headers=response.headers,
meta=response.cb_kwargs,
)
for i in web_data_item.parse_response(data):
for i in web_data_item.parse_response(web_data):
if isinstance(i, str):
yield scrapy.Request(
url=i,
Expand Down
Loading

0 comments on commit e077241

Please sign in to comment.