diff --git a/aioscrapy/VERSION b/aioscrapy/VERSION index 21344eb..6261a05 100644 --- a/aioscrapy/VERSION +++ b/aioscrapy/VERSION @@ -1 +1 @@ -1.2.17 \ No newline at end of file +1.3.1 \ No newline at end of file diff --git a/aioscrapy/core/downloader/handlers/httpx.py b/aioscrapy/core/downloader/handlers/httpx.py index b9ecde1..6febb40 100644 --- a/aioscrapy/core/downloader/handlers/httpx.py +++ b/aioscrapy/core/downloader/handlers/httpx.py @@ -38,6 +38,7 @@ async def download_request(self, request: Request, _) -> HtmlResponse: kwargs['headers'] = headers session_args = self.httpx_client_session_args.copy() + session_args.setdefault('http2', True) session_args.update({ 'verify': request.meta.get('verify_ssl', self.verify_ssl), 'follow_redirects': self.settings.getbool('REDIRECT_ENABLED', True) if request.meta.get( diff --git a/aioscrapy/core/downloader/handlers/pyhttpx.py b/aioscrapy/core/downloader/handlers/pyhttpx.py index b1fa102..a555ebc 100644 --- a/aioscrapy/core/downloader/handlers/pyhttpx.py +++ b/aioscrapy/core/downloader/handlers/pyhttpx.py @@ -1,61 +1,62 @@ -import asyncio -import logging - -import pyhttpx - -from aioscrapy import Request -from aioscrapy.core.downloader.handlers import BaseDownloadHandler -from aioscrapy.http import HtmlResponse -from aioscrapy.settings import Settings - -logger = logging.getLogger(__name__) - - -class PyhttpxDownloadHandler(BaseDownloadHandler): - - def __init__(self, settings): - self.settings: Settings = settings - self.pyhttpx_client_args: dict = self.settings.get('PYHTTPX_CLIENT_ARGS', {}) - self.verify_ssl: bool = self.settings.get("VERIFY_SSL") - self.loop = asyncio.get_running_loop() - - @classmethod - def from_settings(cls, settings: Settings): - return cls(settings) - - async def download_request(self, request: Request, _) -> HtmlResponse: - kwargs = { - 'timeout': self.settings.get('DOWNLOAD_TIMEOUT'), - 'cookies': dict(request.cookies), - 'verify': self.verify_ssl, - 'allow_redirects': self.settings.getbool('REDIRECT_ENABLED', True) if request.meta.get( - 'dont_redirect') is None else request.meta.get('dont_redirect') - } - post_data = request.body or None - if isinstance(post_data, dict): - kwargs['json'] = post_data - else: - kwargs['data'] = post_data - - headers = request.headers or self.settings.get('DEFAULT_REQUEST_HEADERS') - kwargs['headers'] = headers - - proxy = request.meta.get("proxy") - if proxy: - kwargs["proxies"] = {'https': proxy} - logger.debug(f"use proxy {proxy}: {request.url}") - - session_args = self.pyhttpx_client_args.copy() - with pyhttpx.HttpSession(**session_args) as session: - response = await asyncio.to_thread(session.request, request.method, request.url, **kwargs) - return HtmlResponse( - request.url, - status=response.status_code, - headers=response.headers, - body=response.content, - cookies=dict(response.cookies), - encoding=response.encoding - ) - - async def close(self): - pass +import asyncio +import logging + +import pyhttpx + +from aioscrapy import Request +from aioscrapy.core.downloader.handlers import BaseDownloadHandler +from aioscrapy.http import HtmlResponse +from aioscrapy.settings import Settings + +logger = logging.getLogger(__name__) + + +class PyhttpxDownloadHandler(BaseDownloadHandler): + + def __init__(self, settings): + self.settings: Settings = settings + self.pyhttpx_client_args: dict = self.settings.get('PYHTTPX_CLIENT_ARGS', {}) + self.verify_ssl = self.settings.get("VERIFY_SSL", True) + self.loop = asyncio.get_running_loop() + + @classmethod + def from_settings(cls, settings: Settings): + return cls(settings) + + async def download_request(self, request: Request, _) -> HtmlResponse: + kwargs = { + 'timeout': self.settings.get('DOWNLOAD_TIMEOUT'), + 'cookies': dict(request.cookies), + 'verify': self.verify_ssl, + 'allow_redirects': self.settings.getbool('REDIRECT_ENABLED', True) if request.meta.get( + 'dont_redirect') is None else request.meta.get('dont_redirect') + } + post_data = request.body or None + if isinstance(post_data, dict): + kwargs['json'] = post_data + else: + kwargs['data'] = post_data + + headers = request.headers or self.settings.get('DEFAULT_REQUEST_HEADERS') + kwargs['headers'] = headers + + proxy = request.meta.get("proxy") + if proxy: + kwargs["proxies"] = {'https': proxy} + logger.debug(f"use proxy {proxy}: {request.url}") + + session_args = self.pyhttpx_client_args.copy() + session_args.setdefault('http2', True) + with pyhttpx.HttpSession(**session_args) as session: + response = await asyncio.to_thread(session.request, request.method, request.url, **kwargs) + return HtmlResponse( + request.url, + status=response.status_code, + headers=response.headers, + body=response.content, + cookies=dict(response.cookies), + encoding=response.encoding + ) + + async def close(self): + pass diff --git a/aioscrapy/db/__init__.py b/aioscrapy/db/__init__.py index db51f81..0f537fe 100644 --- a/aioscrapy/db/__init__.py +++ b/aioscrapy/db/__init__.py @@ -1,73 +1,74 @@ -import logging -from importlib import import_module -from typing import Any - -import aioscrapy -from aioscrapy.db.absmanager import AbsDBPoolManager -from aioscrapy.db.aioredis import redis_manager -from aioscrapy.utils.misc import load_object - -logger = logging.getLogger(__name__) - -__all__ = ['db_manager', 'get_pool', 'get_manager'] - -DB_MODULE_MAP = { - 'redis': ('redis', 'aioscrapy.db.aioredis.redis_manager'), - 'aiomysql': ('mysql', 'aioscrapy.db.aiomysql.mysql_manager'), - 'aio_pika': ('rabbitmq', 'aioscrapy.db.aiorabbitmq.rabbitmq_manager'), - 'motor': ('mongo', 'aioscrapy.db.aiomongo.mongo_manager'), -} - -db_manager_map = {} - -for module_name, (manager_key, class_path) in DB_MODULE_MAP.items(): - try: - import_module(module_name) - except ImportError: - pass - else: - db_manager_map[manager_key] = load_object(class_path) - - -class DBManager: - - @staticmethod - def get_manager(db_type: str) -> AbsDBPoolManager: - manager = db_manager_map.get(db_type) - assert manager is not None, f"Not support db type:{db_type}" - return manager - - def get_pool(self, db_type: str, alias='default') -> Any: - manager = self.get_manager(db_type) - return manager.get_pool(alias) - - @staticmethod - async def close_all() -> None: - for manager in db_manager_map.values(): - await manager.close_all() - - @staticmethod - async def from_dict(db_args: dict) -> None: - for db_type, args in db_args.items(): - manager = db_manager_map.get(db_type) - if manager is None: - logger.warning(f'Not support db type: {db_type}; Only {", ".join(db_manager_map.keys())} supported') - await manager.from_dict(args) - - @staticmethod - async def from_settings(settings: aioscrapy.Settings) -> None: - for manager in db_manager_map.values(): - await manager.from_settings(settings) - - async def from_crawler(self, crawler: "aioscrapy.Crawler") -> None: - return await self.from_settings(crawler.settings) - - def __getattr__(self, db_type: str) -> Any: - if db_type not in db_manager_map: - raise AttributeError(f'Not support db type: {db_type}') - return db_manager_map[db_type] - - -db_manager = DBManager() -get_manager = db_manager.get_manager -get_pool = db_manager.get_pool +import logging +from importlib import import_module +from typing import Any + +import aioscrapy +from aioscrapy.db.absmanager import AbsDBPoolManager +from aioscrapy.db.aioredis import redis_manager +from aioscrapy.utils.misc import load_object + +logger = logging.getLogger(__name__) + +__all__ = ['db_manager', 'get_pool', 'get_manager'] + +DB_MODULE_MAP = { + 'redis': ('redis', 'aioscrapy.db.aioredis.redis_manager'), + 'aiomysql': ('mysql', 'aioscrapy.db.aiomysql.mysql_manager'), + 'aio_pika': ('rabbitmq', 'aioscrapy.db.aiorabbitmq.rabbitmq_manager'), + 'motor': ('mongo', 'aioscrapy.db.aiomongo.mongo_manager'), + 'asyncpg': ('pg', 'aioscrapy.db.aiopg.pg_manager'), +} + +db_manager_map = {} + +for module_name, (manager_key, class_path) in DB_MODULE_MAP.items(): + try: + import_module(module_name) + except ImportError: + pass + else: + db_manager_map[manager_key] = load_object(class_path) + + +class DBManager: + + @staticmethod + def get_manager(db_type: str) -> AbsDBPoolManager: + manager = db_manager_map.get(db_type) + assert manager is not None, f"Not support db type:{db_type}" + return manager + + def get_pool(self, db_type: str, alias='default') -> Any: + manager = self.get_manager(db_type) + return manager.get_pool(alias) + + @staticmethod + async def close_all() -> None: + for manager in db_manager_map.values(): + await manager.close_all() + + @staticmethod + async def from_dict(db_args: dict) -> None: + for db_type, args in db_args.items(): + manager = db_manager_map.get(db_type) + if manager is None: + logger.warning(f'Not support db type: {db_type}; Only {", ".join(db_manager_map.keys())} supported') + await manager.from_dict(args) + + @staticmethod + async def from_settings(settings: aioscrapy.Settings) -> None: + for manager in db_manager_map.values(): + await manager.from_settings(settings) + + async def from_crawler(self, crawler: "aioscrapy.Crawler") -> None: + return await self.from_settings(crawler.settings) + + def __getattr__(self, db_type: str) -> Any: + if db_type not in db_manager_map: + raise AttributeError(f'Not support db type: {db_type}') + return db_manager_map[db_type] + + +db_manager = DBManager() +get_manager = db_manager.get_manager +get_pool = db_manager.get_pool diff --git a/aioscrapy/db/aiopg.py b/aioscrapy/db/aiopg.py new file mode 100644 index 0000000..e0bb86a --- /dev/null +++ b/aioscrapy/db/aiopg.py @@ -0,0 +1,112 @@ +import logging +from contextlib import asynccontextmanager + +from asyncpg.pool import create_pool + +import aioscrapy +from aioscrapy.db.absmanager import AbsDBPoolManager + +logger = logging.getLogger(__name__) + + +class PGExecutor: + def __init__(self, alias: str, pool_manager: "AioPGPoolManager"): + self.alias = alias + self.pool_manager = pool_manager + + async def insert(self, sql, value): + async with self.pool_manager.get(self.alias) as connect: + try: + result = await connect.executemany(sql, value) + return result + except Exception as e: + await connect.rollback() + raise Exception from e + + async def fetch(self, sql: str): + async with self.pool_manager.get(self.alias) as connect: + return await connect.fetch(sql) + + async def query(self, sql: str): + return await self.fetch(sql) + + +class AioPGPoolManager(AbsDBPoolManager): + _clients = {} + + async def create(self, alias: str, params: dict): + if alias in self._clients: + return self._clients[alias] + + params = params.copy() + params.setdefault('timeout', 30) + pg_pool = await create_pool(**params) + return self._clients.setdefault(alias, pg_pool) + + def get_pool(self, alias: str): + pg_pool = self._clients.get(alias) + assert pg_pool is not None, f"Dont create the PG pool named {alias}" + return pg_pool + + @asynccontextmanager + async def get(self, alias: str): + """ Get connection of pg """ + pg_pool = self.get_pool(alias) + conn = await pg_pool.acquire() + try: + yield conn + finally: + await pg_pool.release(conn) + + def executor(self, alias: str) -> PGExecutor: + return PGExecutor(alias, self) + + async def close(self, alias: str): + pg_pool = self._clients.pop(alias, None) + if pg_pool: + await pg_pool.close() + + async def close_all(self): + for alias in list(self._clients.keys()): + await self.close(alias) + + async def from_dict(self, db_args: dict): + for alias, pg_args in db_args.items(): + await self.create(alias, pg_args) + + async def from_settings(self, settings: aioscrapy.Settings): + for alias, pg_args in settings.getdict('PG_ARGS').items(): + await self.create(alias, pg_args) + + +pg_manager = AioPGPoolManager() + +if __name__ == '__main__': + import asyncio + + + async def test(): + pg_pool = await pg_manager.create( + 'default', + dict( + user='username', + password='pwd', + database='dbname', + host='127.0.0.1' + ) + ) + + # 方式一: + conn = await pg_pool.acquire() + try: + result = await conn.fetch('select 1 ') + print(tuple(result[0])) + finally: + await pg_pool.release(conn) + + # 方式二: + async with pg_manager.get('default') as conn: + result = await conn.fetch('select 1 ') + print(tuple(result[0])) + + asyncio.run(test()) diff --git a/aioscrapy/libs/pipelines/__init__.py b/aioscrapy/libs/pipelines/__init__.py index e69de29..90dcf44 100644 --- a/aioscrapy/libs/pipelines/__init__.py +++ b/aioscrapy/libs/pipelines/__init__.py @@ -0,0 +1,142 @@ +import asyncio +import logging +from typing import Optional + + +logger = logging.getLogger(__name__) + + +class SqlFormat: + + @staticmethod + def pg_insert(table: str, fields: list, *args) -> str: + fields = ','.join(fields) + placeholder = ','.join([f'${i + 1}' for i in range(len(fields))]) + return f'''INSERT INTO {table} ({fields}) VALUES ({placeholder})''' + + @staticmethod + def pg_ignore_insert(table: str, fields: list, *args) -> str: + placeholder = ','.join([f'${i + 1}' for i in range(len(fields))]) + fields = ','.join(fields) + return f'INSERT INTO {table} ({fields}) VALUES ({placeholder}) ON CONFLICT DO NOTHING' + + @staticmethod + def pg_update_insert(table: str, fields: list, update_fields: list, on_conflict: str, *args) -> str: + assert on_conflict is not None, "on_conflict must be str, eg: 'id'" + placeholder = ','.join([f'${i + 1}' for i in range(len(fields))]) + if not update_fields: + update_fields = fields + update_fields = ','.join([f"{key} = excluded.{key}" for key in update_fields]) + fields = ','.join(fields) + return f'INSERT INTO {table} ({fields}) VALUES ({placeholder}) ON CONFLICT({on_conflict}) DO UPDATE SET {update_fields}' + + @staticmethod + def mysql_insert(table: str, fields: list, *args) -> str: + placeholder = ','.join(['%s'] * len(fields)) + fields = ','.join(fields) + return f'''INSERT INTO {table} ({fields}) VALUES ({placeholder})''' + + @staticmethod + def mysql_ignore_insert(table: str, fields: list, *args) -> str: + placeholder = ','.join(['%s'] * len(fields)) + fields = ','.join(fields) + return f'''INSERT IGNORE INTO {table} ({fields}) VALUES ({placeholder})''' + + @staticmethod + def mysql_update_insert(table: str, fields: list, update_fields: list, *args) -> str: + placeholder = ','.join(['%s'] * len(fields)) + if not update_fields: + update_fields = fields + update_fields = ','.join([f"{key} = VALUES({key})" for key in update_fields]) + fields = ','.join(fields) + return f'INSERT INTO {table} ({fields}) VALUES ({placeholder}) ON DUPLICATE KEY UPDATE {update_fields}' + + def __call__(self, *args, db_type='mysql', insert_type='insert'): + if getattr(self, f'{db_type}_{insert_type}'): + return getattr(self, f'{db_type}_{insert_type}')(*args) + raise Exception(f"This write type is not supported: {db_type}_{insert_type}") + + +get_sql = SqlFormat() + + +class ItemCacheMixin: + def __init__(self, db_type: str): + self.db_type = db_type + self.item_cache = {} + self.fields_cache = {} + self.table_cache = {} + self.insert_sql_cache = {} + self.db_alias_cache = {} + + def parse_item_to_cache(self, item: dict, save_info): + table_name = save_info.get('table_name') + assert table_name is not None, 'Missing table_name' + insert_type = save_info.get('insert_type', 'insert') + update_fields = save_info.get('update_fields', []) + db_alias = save_info.get('db_alias', ['default']) + on_conflict = save_info.get('on_conflict') + if isinstance(db_alias, str): + db_alias = [db_alias] + + fields = list(item.keys()) + cache_key = ''.join(fields + update_fields + db_alias) + insert_type + table_name + (on_conflict or '') + + if self.fields_cache.get(cache_key) is None: + self.db_alias_cache[cache_key] = db_alias + self.table_cache[cache_key] = table_name + self.fields_cache[cache_key] = fields + self.item_cache[cache_key] = [] + self.insert_sql_cache[cache_key] = get_sql( + table_name, fields, update_fields, on_conflict, + db_type=self.db_type, + insert_type=insert_type, + ) + + self.item_cache[cache_key].append([item[field] for field in self.fields_cache[cache_key]]) + return cache_key, len(self.item_cache[cache_key]) + + +class DBPipelineBase(ItemCacheMixin): + def __init__(self, settings, db_type: str): + super().__init__(db_type) + self.cache_num = settings.getint('SAVE_CACHE_NUM', 500) + self.save_cache_interval = settings.getint('SAVE_CACHE_INTERVAL', 10) + self.lock = asyncio.Lock() + self.running: bool = True + self.item_save_key: str = f'__{db_type}__' + + async def open_spider(self, spider): + asyncio.create_task(self.save_heartbeat()) + + async def save_heartbeat(self): + while self.running: + await asyncio.sleep(self.save_cache_interval) + asyncio.create_task(self.save_all()) + + async def process_item(self, item, spider): + save_info = item.pop(self.item_save_key, None) + if save_info is None: + logger.warning(f"item Missing key {self.item_save_key}, not stored") + return item + + await self.save_item(item, save_info) + return item + + async def close_spider(self, spider): + self.running = False + await self.save_all() + + async def save_all(self): + async with self.lock: + for cache_key, items in self.item_cache.items(): + items and await self._save(cache_key) + + async def save_item(self, item: dict, save_info: dict): + async with self.lock: + cache_key, cache_count = self.parse_item_to_cache(item, save_info) + if cache_count >= self.cache_num: + await self._save(cache_key) + + async def _save(self, cache_key): + raise NotImplementedError diff --git a/aioscrapy/libs/pipelines/sink/__init__.py b/aioscrapy/libs/pipelines/sink/__init__.py index 7d32c9f..c824fc4 100644 --- a/aioscrapy/libs/pipelines/sink/__init__.py +++ b/aioscrapy/libs/pipelines/sink/__init__.py @@ -1,3 +1,6 @@ # -*- coding: utf-8 -*- from .mysql import MysqlPipeline from .mongo import MongoPipeline +from .pg import PGPipeline +from .execl import ExeclPipeline +from ._csv import CsvPipeline diff --git a/aioscrapy/libs/pipelines/sink/_csv.py b/aioscrapy/libs/pipelines/sink/_csv.py new file mode 100644 index 0000000..478cebc --- /dev/null +++ b/aioscrapy/libs/pipelines/sink/_csv.py @@ -0,0 +1,87 @@ +import asyncio +import csv +import logging +from typing import Optional + +logger = logging.getLogger(__name__) + + +class CsvSinkMixin: + csv_writer = {} + + async def save_item( + self, + item: dict, + *, + filename: Optional[str] = None, + ): + assert filename is not None, "请传入filename参数" + if '.csv' not in filename: + filename = filename + '.csv' + try: + writer = self._get_writer(filename, item) + writer.writerow(item.values()) + except Exception as e: + logger.exception(f'Save csv Error, filename:{filename}, item:{item}, errMsg: {e}') + + def _get_writer(self, filename, item): + writer, *_ = self.csv_writer.get(filename, (None, None)) + if writer is None: + file = open(filename, 'w', encoding="UTF8", newline='') + writer = csv.writer(file) + writer.writerow(item.keys()) + self.csv_writer[filename] = (writer, file) + return writer + + def close_csv(self, filename=None): + *_, file = self.csv_writer.pop(filename, (None, None)) + if file is not None: + logger.info(f'Closing csv: {filename}') + file.close() + + def close(self): + for filename in list(self.csv_writer.keys()): + self.close_csv(filename) + + +class CsvPipeline(CsvSinkMixin): + def __init__(self, settings): + self.lock = asyncio.Lock() + + @classmethod + def from_settings(cls, settings): + return cls(settings) + + async def process_item(self, item, spider): + execl_kw: Optional[dict] = item.pop('__csv__', None) + if not execl_kw: + logger.warning(f"item Missing key __csv__, not stored") + return item + + execl_kw.setdefault('filename', spider.name) + async with self.lock: + await self.save_item(item, **execl_kw) + + async def close_spider(self, spider): + self.close() + + +if __name__ == '__main__': + class TestSpider: + name = 'TestSpider' + + + async def test(): + p = CsvPipeline({}) + await p.process_item({ + 'title': '测试', + 'img': '//www.baidu.com/img/flexible/logo/pc/result.png', + '__csv__': { + 'filename': 'test', + } + }, TestSpider()) + + await p.close_spider(None) + + + asyncio.run(test()) diff --git a/aioscrapy/libs/pipelines/sink/execl.py b/aioscrapy/libs/pipelines/sink/execl.py new file mode 100644 index 0000000..6a31593 --- /dev/null +++ b/aioscrapy/libs/pipelines/sink/execl.py @@ -0,0 +1,167 @@ +import asyncio +import logging +import math +from io import BytesIO +from typing import Tuple, Optional + +import requests +import xlsxwriter +from PIL import Image, ImageFile + +logger = logging.getLogger(__name__) +try: + resample = Image.LANCZOS +except: + resample = Image.ANTIALIAS +ImageFile.LOAD_TRUNCATED_IMAGES = True + + +class ExeclSinkMixin: + ws_cache = {} + wb_cache = {} + fields_cache = {} + y_cache = {} + + @staticmethod + async def deal_img(url: str, img_size: Optional[Tuple[int, int]]) -> Optional[BytesIO]: + if url.startswith('//'): + url = 'https:' + url + try: + img_bytes = requests.get(url).content + except Exception as e: + logger.error(f"download img error: {e}") + return + im = Image.open(BytesIO(img_bytes)) + im_format = im.format + if img_size: + temp = max(im.size[0] / img_size[0], im.size[1] / img_size[1]) + img_size = (math.ceil(im.size[0] / temp), math.ceil(im.size[1] / temp)) + im = im.resize(img_size, resample).convert('P') + result = BytesIO() + im.save(result, format=im_format) + return result + + async def save_item( + self, + item: dict, + *, + filename: Optional[str] = None, + date_fields: Optional[list] = None, + date_format: str = 'yyyy-mm-dd HH:MM:SS', + img_fields: Optional[list] = None, + img_size: Optional[Tuple[int, int]] = None, + **options + ): + assert filename is not None, "请传入filename参数" + if '.xlsx' not in filename: + filename = filename + '.xlsx' + try: + wb, ws, fields, y = self._get_write_class(filename, item, **options) + bold_format_1 = wb.add_format({'align': 'left', 'border': 1, 'valign': 'vcenter'}) + bold_format_2 = wb.add_format({'align': 'left', 'border': 1, 'valign': 'vcenter', 'fg_color': '#D0D3D4'}) + for x, field in enumerate(fields): + if x % 2 == 0: + bold_format = bold_format_1 + else: + bold_format = bold_format_2 + if date_fields is not None and field in date_fields: + ws.write_datetime(y, x, item.get(field), wb.add_format({'num_format': date_format})) + + elif img_fields is not None and field in img_fields: + img_size and ws.set_column_pixels(x, x, width=math.ceil(img_size[0])) + url = item.get(field) + img_bytes = await self.deal_img(url, img_size) + if img_bytes is None or ws.insert_image(y, x, '', {'image_data': img_bytes}) == -1: + ws.write(y, x, url, bold_format) + else: + ws.write(y, x, item.get(field), bold_format) + if img_size is not None: + ws.set_column_pixels(0, len(fields), width=math.ceil(img_size[0])) + ws.set_row_pixels(y, height=math.ceil(img_size[1])) + except Exception as e: + logger.exception(f'Save Execl Error, filename:{filename}, item:{item}, errMsg: {e}') + + def _get_write_class(self, filename, item, sheet='sheet1', **options): + filename_sheet = filename + sheet + if self.ws_cache.get(filename_sheet) is None: + if self.wb_cache.get(filename) is None: + logger.info(f'Create Execl: {filename}') + wb = xlsxwriter.Workbook(filename, options=options) + self.wb_cache[filename] = wb + else: + wb = self.wb_cache[filename] + ws = wb.add_worksheet(sheet) + bold_format = wb.add_format( + {'bold': True, 'font_size': 12, 'border': 1, 'align': 'center', 'valign': 'vcenter'}) + fields = list(item.keys()) + ws.write_row('A1', fields, cell_format=bold_format) + ws.set_row(0, height=30) + self.fields_cache[filename_sheet] = fields + self.ws_cache[filename_sheet] = ws + self.y_cache[filename_sheet] = 0 + self.y_cache[filename_sheet] += 1 + return self.wb_cache[filename], \ + self.ws_cache[filename_sheet], \ + self.fields_cache[filename_sheet], \ + self.y_cache[filename_sheet] + + def close_execl(self, filename=None): + if filename in self.wb_cache.keys(): + logger.info(f'Closing Execl: {filename}') + self.wb_cache.pop(filename, None).close() + for filename_sheet in list(self.ws_cache.keys()): + if not filename_sheet.startswith(filename): + continue + self.ws_cache.pop(filename_sheet, None) + self.y_cache.pop(filename_sheet, None) + self.fields_cache.pop(filename_sheet, None) + + def close(self): + for filename in list(self.wb_cache.keys()): + self.close_execl(filename) + + +class ExeclPipeline(ExeclSinkMixin): + def __init__(self, settings): + self.lock = asyncio.Lock() + + @classmethod + def from_settings(cls, settings): + return cls(settings) + + async def process_item(self, item, spider): + execl_kw: Optional[dict] = item.pop('__execl__', None) + if not execl_kw: + logger.warning(f"item Missing key __execl__, not stored") + return item + + execl_kw.setdefault('filename', spider.name) + async with self.lock: + await self.save_item(item, **execl_kw) + + async def close_spider(self, spider): + self.close() + + +if __name__ == '__main__': + + class TestSpider: + name = 'TestSpider' + + + async def test(): + p = ExeclPipeline({}) + await p.process_item({ + 'title': 'tttt', + 'img': '//www.baidu.com/img/flexible/logo/pc/result.png', + '__execl__': { + 'sheet': 'sheet1', + # 'filename': 'test', + # 'img_fields': ['img'], + # 'img_size': (100, 500) + } + }, TestSpider()) + await p.close_spider(None) + + + asyncio.run(test()) diff --git a/aioscrapy/libs/pipelines/sink/mongo.py b/aioscrapy/libs/pipelines/sink/mongo.py index 4f02249..9f42f0a 100644 --- a/aioscrapy/libs/pipelines/sink/mongo.py +++ b/aioscrapy/libs/pipelines/sink/mongo.py @@ -1,84 +1,53 @@ -import asyncio -import logging -from aioscrapy.db import db_manager - -logger = logging.getLogger(__name__) - - -class MongoPipeline: - def __init__(self, settings): - self.cache_num = settings.getint('SAVE_CACHE_NUM', 500) - self.save_cache_interval = settings.getint('SAVE_CACHE_INTERVAL', 10) - self.lock = asyncio.Lock() - self.running: bool = True - self.db_alias_cache = {} - self.table_cache = {} - self.item_cache = {} - self.db_cache = {} - - @classmethod - def from_settings(cls, settings): - return cls(settings) - - async def open_spider(self, spider): - asyncio.create_task(self.save_heartbeat()) - - async def save_heartbeat(self): - while self.running: - await asyncio.sleep(self.save_cache_interval) - asyncio.create_task(self.save_all()) - - async def process_item(self, item, spider): - await self.save_item(item) - return item - - async def close_spider(self, spider): - self.running = False - await self.save_all() - - def parse_item_to_cache(self, item: dict): - item.pop('save_insert_type', None) - db_name = item.pop('save_db_name', None) - table_name = item.pop('save_table_name', None) - assert table_name is not None, Exception('please set save_table_name') - save_db_alias = item.pop('save_db_alias', ['default']) - if isinstance(save_db_alias, str): - save_db_alias = [save_db_alias] - - cache_key = ''.join(save_db_alias) + (db_name or '') + table_name - - if self.table_cache.get(cache_key) is None: - self.db_alias_cache[cache_key] = save_db_alias - self.table_cache[cache_key] = table_name - self.db_cache[cache_key] = db_name - self.item_cache[cache_key] = [] - - self.item_cache[cache_key].append(item) - return cache_key, len(self.item_cache[cache_key]) - - async def save_all(self): - async with self.lock: - for cache_key, items in self.item_cache.items(): - items and await self._save(cache_key) - - async def save_item(self, item: dict): - async with self.lock: - cache_key, cache_count = self.parse_item_to_cache(item) - if cache_count >= self.cache_num: - await self._save(cache_key) - - async def _save(self, cache_key): - table_name = self.table_cache[cache_key] - try: - for alias in self.db_alias_cache[cache_key]: - try: - executor = db_manager.mongo.executor(alias) - result = await executor.insert( - table_name, self.item_cache[cache_key], db_name=self.db_cache[cache_key] - ) - logger.info( - f'table:{alias}->{table_name} sum:{len(self.item_cache[cache_key])} ok:{len(result.inserted_ids)}') - except Exception as e: - logger.exception(f'save data error, table:{alias}->{table_name}, err_msg:{e}') - finally: - self.item_cache[cache_key] = [] +import logging + +from aioscrapy.db import db_manager +from aioscrapy.libs.pipelines import DBPipelineBase + +logger = logging.getLogger(__name__) + + +class MongoPipeline(DBPipelineBase): + + def __init__(self, settings, db_type: str): + super().__init__(settings, db_type) + self.db_cache = {} + + @classmethod + def from_settings(cls, settings): + return cls(settings, 'mongo') + + def parse_item_to_cache(self, item: dict, save_info: dict): + db_name = save_info.get('db_name') + table_name = save_info.get('table_name') + assert table_name is not None, 'please set table_name' + db_alias = save_info.get('db_alias', ['default']) + if isinstance(db_alias, str): + db_alias = [db_alias] + + cache_key = ''.join(db_alias) + (db_name or '') + table_name + + if self.table_cache.get(cache_key) is None: + self.db_alias_cache[cache_key] = db_alias + self.table_cache[cache_key] = table_name + self.db_cache[cache_key] = db_name + self.item_cache[cache_key] = [] + + self.item_cache[cache_key].append(item) + return cache_key, len(self.item_cache[cache_key]) + + async def _save(self, cache_key): + table_name = self.table_cache[cache_key] + try: + for alias in self.db_alias_cache[cache_key]: + try: + executor = db_manager.mongo.executor(alias) + result = await executor.insert( + table_name, self.item_cache[cache_key], db_name=self.db_cache[cache_key] + ) + logger.info( + f'table:{alias}->{table_name} sum:{len(self.item_cache[cache_key])} ok:{len(result.inserted_ids)}' + ) + except Exception as e: + logger.exception(f'save data error, table:{alias}->{table_name}, err_msg:{e}') + finally: + self.item_cache[cache_key] = [] diff --git a/aioscrapy/libs/pipelines/sink/mysql.py b/aioscrapy/libs/pipelines/sink/mysql.py index d1ba30a..37334b9 100644 --- a/aioscrapy/libs/pipelines/sink/mysql.py +++ b/aioscrapy/libs/pipelines/sink/mysql.py @@ -1,125 +1,11 @@ -import asyncio import logging + from aioscrapy.db import db_manager +from aioscrapy.libs.pipelines import DBPipelineBase logger = logging.getLogger(__name__) -class SqlFormat: - - @staticmethod - def ck_insert(table: str, fields: list, *args) -> str: - fields = ','.join(fields) - return f'''INSERT INTO {table} ({fields}) VALUES ''' - - @staticmethod - def mysql_insert(table: str, fields: list, *args) -> str: - placeholder = ','.join(['%s'] * len(fields)) - fields = ','.join(fields) - return f'''INSERT INTO {table} ({fields}) VALUES ({placeholder})''' - - @staticmethod - def mysql_ignore_insert(table: str, fields: list, *args) -> str: - placeholder = ','.join(['%s'] * len(fields)) - fields = ','.join(fields) - return f'''INSERT IGNORE INTO {table} ({fields}) VALUES ({placeholder})''' - - @staticmethod - def mysql_update_insert(table: str, fields: list, update_fields: list, *args) -> str: - placeholder = ','.join(['%s'] * len(fields)) - if not update_fields: - update_fields = fields - update_fields = ','.join([f"{key} = VALUES({key})" for key in update_fields]) - fields = ','.join(fields) - return f'INSERT INTO {table} ({fields}) VALUES ({placeholder}) ON DUPLICATE KEY UPDATE {update_fields}' - - def __call__(self, *args, db_type='mysql', insert_type='insert'): - if getattr(self, f'{db_type}_{insert_type}'): - func = getattr(self, f'{db_type}_{insert_type}') - return func(*args) - raise Exception(f"This write type is not supported: {db_type}_{insert_type}") - - -get_sql = SqlFormat() - - -class ItemCache(object): - def __init__(self, db_type): - self.db_type = db_type - self.item_cache = {} - self.fields_cache = {} - self.table_cache = {} - self.insert_sql_cache = {} - self.db_alias_cache = {} - - def parse_item_to_cache(self, item: dict): - table_name = item.pop('save_table_name') - if table_name is None: - raise Exception('please set save_table_name') - insert_type = item.pop('save_insert_type', 'insert') - update_fields = item.pop('save_update_fields', []) - save_db_alias = item.pop('save_db_alias', ['default']) - if isinstance(save_db_alias, str): - save_db_alias = [save_db_alias] - - fields = list(item.keys()) - cache_key = ''.join(fields + update_fields + save_db_alias) + insert_type + table_name - - if self.fields_cache.get(cache_key) is None: - self.db_alias_cache[cache_key] = save_db_alias - self.table_cache[cache_key] = table_name - self.fields_cache[cache_key] = fields - self.item_cache[cache_key] = [] - self.insert_sql_cache[cache_key] = get_sql( - table_name, fields, update_fields, - db_type=self.db_type, - insert_type=insert_type - ) - - self.item_cache[cache_key].append([item[field] for field in self.fields_cache[cache_key]]) - return cache_key, len(self.item_cache[cache_key]) - - -class DBPipelineBase: - def __init__(self, settings, db_type: str): - self.cache_num = settings.getint('SAVE_CACHE_NUM', 500) - self.save_cache_interval = settings.getint('SAVE_CACHE_INTERVAL', 10) - self.db_type = db_type - self.lock = asyncio.Lock() - self.running: bool = True - self.cache = ItemCache(db_type) - - async def open_spider(self, spider): - asyncio.create_task(self.save_heartbeat()) - - async def save_heartbeat(self): - while self.running: - await asyncio.sleep(self.save_cache_interval) - asyncio.create_task(self.save_all()) - - async def process_item(self, item, spider): - await self.save_item(item) - return item - - async def close_spider(self, spider): - self.running = False - await self.save_all() - - async def save_all(self): - async with self.lock: - for cache_key, items in self.cache.item_cache.items(): - items and await self._save(cache_key) - - async def save_item(self, item: dict): - async with self.lock: - cache_key, cache_count = self.cache.parse_item_to_cache(item) - if cache_count >= self.cache_num: - await self._save(cache_key) - - async def _save(self, cache_key): - raise NotImplementedError - - class MysqlPipeline(DBPipelineBase): @classmethod @@ -127,17 +13,16 @@ def from_settings(cls, settings): return cls(settings, 'mysql') async def _save(self, cache_key): - table_name = self.cache.table_cache[cache_key] + table_name = self.table_cache[cache_key] try: - for alias in self.cache.db_alias_cache[cache_key]: + for alias in self.db_alias_cache[cache_key]: async with db_manager.mysql.get(alias, ping=True) as (conn, cursor): try: num = await cursor.executemany( - self.cache.insert_sql_cache[cache_key], self.cache.item_cache[cache_key]) - await conn.commit() - logger.info(f'table:{alias}->{table_name} sum:{len(self.cache.item_cache[cache_key])} ok:{num}') + self.insert_sql_cache[cache_key], self.item_cache[cache_key] + ) + logger.info(f'table:{alias}->{table_name} sum:{len(self.item_cache[cache_key])} ok:{num}') except Exception as e: - await conn.rollback() logger.exception(f'save data error, table:{alias}->{table_name}, err_msg:{e}') finally: - self.cache.item_cache[cache_key] = [] + self.item_cache[cache_key] = [] diff --git a/aioscrapy/libs/pipelines/sink/pg.py b/aioscrapy/libs/pipelines/sink/pg.py new file mode 100644 index 0000000..5395743 --- /dev/null +++ b/aioscrapy/libs/pipelines/sink/pg.py @@ -0,0 +1,28 @@ +import logging + +from aioscrapy.db import db_manager +from aioscrapy.libs.pipelines import DBPipelineBase + +logger = logging.getLogger(__name__) + + +class PGPipeline(DBPipelineBase): + + @classmethod + def from_settings(cls, settings): + return cls(settings, 'pg') + + async def _save(self, cache_key): + table_name = self.table_cache[cache_key] + try: + for alias in self.db_alias_cache[cache_key]: + async with db_manager.pg.get(alias) as conn: + try: + num = await conn.executemany( + self.insert_sql_cache[cache_key], self.item_cache[cache_key] + ) + logger.info(f'table:{alias}->{table_name} sum:{len(self.item_cache[cache_key])} ok:{num}') + except Exception as e: + logger.exception(f'save data error, table:{alias}->{table_name}, err_msg:{e}') + finally: + self.item_cache[cache_key] = [] diff --git a/aioscrapy/middleware/itempipeline.py b/aioscrapy/middleware/itempipeline.py index 3c2fec1..5f562d0 100644 --- a/aioscrapy/middleware/itempipeline.py +++ b/aioscrapy/middleware/itempipeline.py @@ -1,24 +1,18 @@ -import logging - -from aioscrapy.utils.conf import build_component_list - -from aioscrapy.middleware.absmanager import AbsMiddlewareManager - - -logger = logging.getLogger(__name__) - - -class ItemPipelineManager(AbsMiddlewareManager): - component_name = 'item pipeline' - - @classmethod - def _get_mwlist_from_settings(cls, settings): - return build_component_list(settings.getwithbase('ITEM_PIPELINES')) - - def _add_middleware(self, pipe): - super()._add_middleware(pipe) - if hasattr(pipe, 'process_item'): - self.methods['process_item'].append(pipe.process_item) - - async def process_item(self, item, spider): - return await self._process_chain('process_item', item, spider) +from aioscrapy.middleware.absmanager import AbsMiddlewareManager +from aioscrapy.utils.conf import build_component_list + + +class ItemPipelineManager(AbsMiddlewareManager): + component_name = 'item pipeline' + + @classmethod + def _get_mwlist_from_settings(cls, settings): + return build_component_list(settings.getwithbase('ITEM_PIPELINES')) + + def _add_middleware(self, pipe): + super()._add_middleware(pipe) + if hasattr(pipe, 'process_item'): + self.methods['process_item'].append(pipe.process_item) + + async def process_item(self, item, spider): + return await self._process_chain('process_item', item, spider) diff --git a/doc/documentation.md b/doc/documentation.md index 6e192bd..7ae68d6 100644 --- a/doc/documentation.md +++ b/doc/documentation.md @@ -1,188 +1,273 @@ - -English | [中文](./documentation_zh.md) -### Scheduler Queue -`SCHEDULER_QUEUE_CLASS`: gets the queue type of the request task, The default type is `memory`. -##### memory -```python -SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.memory.SpiderPriorityQueue' -``` -##### reids -```python -SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.redis.SpiderPriorityQueue' - -# redis parameter -REDIS_ARGS = { - 'queue': { - 'url': 'redis://192.168.234.128:6379/1', - 'max_connections': 2, - 'timeout': None, - 'retry_on_timeout': True, - 'health_check_interval': 30, - } -} -``` -##### rabbitMq -```python -SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.rabbitmq.SpiderPriorityQueue' -# RabbitMq parameter -RABBITMQ_ARGS = { - 'queue': { - 'url': "amqp://guest:guest@192.168.234.128:5673/", - 'connection_max_size': 2, - 'channel_max_size': 10, - } -} -``` - -### Dupefilters -`DUPEFILTER_CLASS`: filter duplicate urls, No default configuration. - -##### disk -Save URL fingerprint information to disk. -```python -DUPEFILTER_CLASS = 'aioscrapy.dupefilters.disk.RFPDupeFilter' -``` -##### redis with hash -Save URL fingerprint information to redis, Hash the URL. -```python -DUPEFILTER_CLASS = 'aioscrapy.dupefilters.redis.RFPDupeFilter' -``` -##### redis with Bloom filter -Save URL fingerprint information to redis, use Bloom filter. - -```python -DUPEFILTER_CLASS = 'aioscrapy.dupefilters.redis.BloomDupeFilter' -``` - -### Close Sipder -`CLOSE_SPIDER_ON_IDLE`: Whether to close crawler when queue has no work, Default `False`. - -### Scrapyd -How to deploy distributed crawler of aio-scrapy with scrapyd - -Install scrapyd -```shell -pip install scrapyd -``` -Modify scrapyd configuration - -default_scrapyd.conf -```ini -[scrapyd] -eggs_dir = eggs -logs_dir = logs -items_dir = -jobs_to_keep = 5 -dbs_dir = dbs -max_proc = 0 -max_proc_per_cpu = 4 -finished_to_keep = 100 -poll_interval = 5.0 -bind_address = 127.0.0.1 -http_port = 6800 -debug = off -# runner = scrapyd.runner # The original configuration -runner = aioscrapy.scrapyd.runner # Replace runner with aio-scrapy runner -application = scrapyd.app.application -launcher = scrapyd.launcher.Launcher -webroot = scrapyd.website.Root - -[services] -schedule.json = scrapyd.webservice.Schedule -cancel.json = scrapyd.webservice.Cancel -addversion.json = scrapyd.webservice.AddVersion -listprojects.json = scrapyd.webservice.ListProjects -listversions.json = scrapyd.webservice.ListVersions -listspiders.json = scrapyd.webservice.ListSpiders -delproject.json = scrapyd.webservice.DeleteProject -delversion.json = scrapyd.webservice.DeleteVersion -listjobs.json = scrapyd.webservice.ListJobs -daemonstatus.json = scrapyd.webservice.DaemonStatus - -``` -Start scrapyd -```shell -scrapyd & -``` -Please refer to scrapyd's documentation for more details. - -### Other - -##### MysqlPipeline -Mysql Bulk Storage Middleware -```python -ITEM_PIPELINES = { - 'aioscrapy.libs.pipelines.db.MysqlPipeline': 100, -} - -# mysql parameter -MYSQL_ARGS = { - # "default" is alias of the mysql pool - # Use: - # from aioscrapy.db import db_manager - # async with db_manager.get('default') as (conn, cur): - # print(await cur.execute('select 1')) - 'default': { - 'db': 'test', - 'user': 'root', - 'password': '123456', - 'host': '192.168.234.128', - 'port': 3306, - 'charset': 'utf8mb4', - }, - - # # "dev" is alias of the mysql pool - # 'dev': { - # 'db': 'test2', - # 'user': 'root', - # 'password': 'root', - # 'host': '127.0.0.1', - # 'port': 3306, - # 'charset': 'utf8mb4', - # } -} -SAVE_CACHE_NUM = 1000 # Trigger mysql storage every 1000 item. -SAVE_CACHE_INTERVAL = 10 # Trigger mysql storage every 10 seconds. -""" -# Format requirements for item -item = { - 'save_table_name': 'baidu', # table name of mysql - 'save_insert_type': 'insert', # Save type for mysql - 'save_db_alias': ['default'], # Alias of mysql to save - - # Below are the item fields - 'title': "title", - } -""" - -``` - -##### MongoPipeline - -Mongo Bulk Storage Middleware - -```python -ITEM_PIPELINES = { - 'aioscrapy.libs.pipelines.db.MongoPipeline': 100, -} - -MONGO_ARGS = { - 'default': { - 'host': 'mongodb://root:root@192.168.234.128:27017', - 'db': 'test', - } -} -SAVE_CACHE_NUM = 1000 # Trigger mysql storage every 1000 item. -SAVE_CACHE_INTERVAL = 10 # Trigger mysql storage every 10 seconds. -""" -# Format requirements for item -item = { - 'save_table_name': 'article', # table name of mongo - 'save_db_alias': 'default', # Alias of mongo to save - # 'save_db_name': 'xxx', # db name of mongo, If not specified, the default value is "MONGO_ARGS" in "db" - - # Below are the item fields - 'title': "title", -} -""" + +English | [中文](./documentation_zh.md) +### Scheduler Queue +`SCHEDULER_QUEUE_CLASS`: gets the queue type of the request task, The default type is `memory`. +##### memory +```python +SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.memory.SpiderPriorityQueue' +``` +##### reids +```python +SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.redis.SpiderPriorityQueue' + +# redis parameter +REDIS_ARGS = { + 'queue': { + 'url': 'redis://192.168.234.128:6379/1', + 'max_connections': 2, + 'timeout': None, + 'retry_on_timeout': True, + 'health_check_interval': 30, + } +} +``` +##### rabbitMq +```python +SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.rabbitmq.SpiderPriorityQueue' +# RabbitMq parameter +RABBITMQ_ARGS = { + 'queue': { + 'url': "amqp://guest:guest@192.168.234.128:5673/", + 'connection_max_size': 2, + 'channel_max_size': 10, + } +} +``` + +### Dupefilters +`DUPEFILTER_CLASS`: filter duplicate urls, No default configuration. + +##### disk +Save URL fingerprint information to disk. +```python +DUPEFILTER_CLASS = 'aioscrapy.dupefilters.disk.RFPDupeFilter' +``` +##### redis with hash +Save URL fingerprint information to redis, Hash the URL. +```python +DUPEFILTER_CLASS = 'aioscrapy.dupefilters.redis.RFPDupeFilter' +``` +##### redis with Bloom filter +Save URL fingerprint information to redis, use Bloom filter. + +```python +DUPEFILTER_CLASS = 'aioscrapy.dupefilters.redis.BloomDupeFilter' +``` + +### Close Sipder +`CLOSE_SPIDER_ON_IDLE`: Whether to close crawler when queue has no work, Default `False`. + +### Scrapyd +How to deploy distributed crawler of aio-scrapy with scrapyd + +Install scrapyd +```shell +pip install scrapyd +``` +Modify scrapyd configuration + +default_scrapyd.conf +```ini +[scrapyd] +eggs_dir = eggs +logs_dir = logs +items_dir = +jobs_to_keep = 5 +dbs_dir = dbs +max_proc = 0 +max_proc_per_cpu = 4 +finished_to_keep = 100 +poll_interval = 5.0 +bind_address = 127.0.0.1 +http_port = 6800 +debug = off +# runner = scrapyd.runner # The original configuration +runner = aioscrapy.scrapyd.runner # Replace runner with aio-scrapy runner +application = scrapyd.app.application +launcher = scrapyd.launcher.Launcher +webroot = scrapyd.website.Root + +[services] +schedule.json = scrapyd.webservice.Schedule +cancel.json = scrapyd.webservice.Cancel +addversion.json = scrapyd.webservice.AddVersion +listprojects.json = scrapyd.webservice.ListProjects +listversions.json = scrapyd.webservice.ListVersions +listspiders.json = scrapyd.webservice.ListSpiders +delproject.json = scrapyd.webservice.DeleteProject +delversion.json = scrapyd.webservice.DeleteVersion +listjobs.json = scrapyd.webservice.ListJobs +daemonstatus.json = scrapyd.webservice.DaemonStatus + +``` +Start scrapyd +```shell +scrapyd & +``` +Please refer to scrapyd's documentation for more details. + +### Other + +##### CsvPipeline +Csv Bulk Storage Middleware + +```python +ITEM_PIPELINES = { + 'aioscrapy.libs.pipelines.sink.CsvPipeline': 100, +} +""" +# Format requirements for item +item = { + '__csv__': { + 'filename': 'article', # 文件名 或 存储的路径及文件名 如:D:\article.xlsx + }, + + # Below are the item fields + 'title': "title", +} +""" +``` + +##### ExeclPipeline +Execl Bulk Storage Middleware + +```python +ITEM_PIPELINES = { + 'aioscrapy.libs.pipelines.sink.ExeclPipeline': 100, +} + +""" +# Format requirements for item +item = { + '__execl__': { + 'filename': 'article', # File name to store, eg:D:\article.xlsx + 'sheet': 'sheet1', # sheet name, default: sheet1 + + # 'img_fields': ['img'], # Specify the image fields when you want to download + # 'img_size': (100, 100) # the size of image + }, + + # Below are the item fields + 'title': "title", + 'img': "https://domain/test.png", +} +""" +``` + +##### MysqlPipeline +Mysql Bulk Storage Middleware +```python +ITEM_PIPELINES = { + 'aioscrapy.libs.pipelines.sink.MysqlPipeline': 100, +} + +# mysql parameter +MYSQL_ARGS = { + # "default" is alias of the mysql pool + # Use: + # from aioscrapy.db import db_manager + # async with db_manager.get('default') as (conn, cur): + # print(await cur.execute('select 1')) + 'default': { + 'db': 'test', + 'user': 'root', + 'password': '123456', + 'host': '192.168.234.128', + 'port': 3306, + 'charset': 'utf8mb4', + }, + + # # "dev" is alias of the mysql pool + # 'dev': { + # 'db': 'test2', + # 'user': 'root', + # 'password': 'root', + # 'host': '127.0.0.1', + # 'port': 3306, + # 'charset': 'utf8mb4', + # } +} +SAVE_CACHE_NUM = 1000 # Trigger mysql storage every 1000 item. +SAVE_CACHE_INTERVAL = 10 # Trigger mysql storage every 10 seconds. +""" +# Format requirements for item +item = { + '__mysql__': { + 'table_name': 'baidu', # table name of mysql + 'insert_type': 'insert', # Save type for mysql + 'db_alias': ['default'], # Alias of mysql to save + }, + + # Below are the item fields + 'title': "title", +} +""" + +``` + +##### MongoPipeline + +Mongo Bulk Storage Middleware + +```python +ITEM_PIPELINES = { + 'aioscrapy.libs.pipelines.sink.MongoPipeline': 100, +} + +MONGO_ARGS = { + 'default': { + 'host': 'mongodb://root:root@192.168.234.128:27017', + 'db': 'test', + } +} +SAVE_CACHE_NUM = 1000 # Trigger mysql storage every 1000 item. +SAVE_CACHE_INTERVAL = 10 # Trigger mysql storage every 10 seconds. +""" +# Format requirements for item +item = { + '__mongo__': { + 'db_alias': 'default', # Alias of mongo to save + 'table_name': 'article', # table name of mongo + # 'db_name': 'xxx', # db name of mongo, If not specified, the default value is "MONGO_ARGS" in "db" + + }, + # Below are the item fields + 'title': "title", +} +""" +``` + + +##### PGPipeline +PostpreSQL批量存储中间件 + +```python +ITEM_PIPELINES = { + 'aioscrapy.libs.pipelines.sink.PGPipeline': 100, +} +PG_ARGS = { + 'default': { + 'user': 'user', + 'password': 'password', + 'database': 'spider_db', + 'host': '127.0.0.1' + } +} +SAVE_CACHE_NUM = 1000 # 每1000个item触发一次存储 +SAVE_CACHE_INTERVAL = 10 # 每10s触发一次存储 +""" +# Format requirements for item +item = { + '__pg__': { + 'db_alias': 'default', # # Alias of PostgreSQL to save + 'table_name': 'spider_db.article', # schema and table_name, Separate with "." + + 'insert_type': 'insert', # Save type for PostgreSQL + # 'on_conflict': 'id', + } + + # Below are the item fields + 'title': "title", +} +""" ``` \ No newline at end of file diff --git a/doc/documentation_zh.md b/doc/documentation_zh.md index d2e4882..71130d3 100644 --- a/doc/documentation_zh.md +++ b/doc/documentation_zh.md @@ -1,210 +1,303 @@ -[英文](./documentation.md)| 中文 - -### 调度的队列 - -`SCHEDULER_QUEUE_CLASS`:获取请求任务的队列类型,默认为`memory` - -##### memory - -```python -SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.memory.SpiderPriorityQueue' -``` - -##### reids - -```python -SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.redis.SpiderPriorityQueue' - -# redis parameter -REDIS_ARGS = { - 'queue': { - 'url': 'redis://192.168.234.128:6379/1', - 'max_connections': 2, - 'timeout': None, - 'retry_on_timeout': True, - 'health_check_interval': 30, - } -} -``` - -##### rabbitMq - -```python -SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.rabbitmq.SpiderPriorityQueue' -# RabbitMq parameter -RABBITMQ_ARGS = { - 'queue': { - 'url': "amqp://guest:guest@192.168.234.128:5673/", - 'connection_max_size': 2, - 'channel_max_size': 10, - } -} -``` - -### 过滤重复请求 - -`DUPEFILTER_CLASS`:配置url的去重类, 默认不配 - -##### disk - -将url指纹信息存放在磁盘 - -```python -DUPEFILTER_CLASS = 'aioscrapy.dupefilters.disk.RFPDupeFilter' -``` - -##### redis with hash - -将url指纹信息放到redis, 对url进行hash - -```python -DUPEFILTER_CLASS = 'aioscrapy.dupefilters.redis.RFPDupeFilter' -``` - -##### redis with Bloom filter - -将url指纹信息放到redis,使用布隆过滤 - -```python -DUPEFILTER_CLASS = 'aioscrapy.dupefilters.redis.BloomDupeFilter' -``` - -### 关闭爬虫 - -`CLOSE_SPIDER_ON_IDLE`: 当没有队列任务的时候是否关闭爬虫, 默认 `False`. - -### Scrapyd - -如可使用scrapyd部署aio-scrapy的分布式爬虫 - -安装scrapyd - -```shell -pip install scrapyd -``` - -修改scrapyd配置如下 default_scrapyd.conf - -```ini -[scrapyd] -eggs_dir = eggs -logs_dir = logs -items_dir = -jobs_to_keep = 5 -dbs_dir = dbs -max_proc = 0 -max_proc_per_cpu = 4 -finished_to_keep = 100 -poll_interval = 5.0 -bind_address = 127.0.0.1 -http_port = 6800 -debug = off -# runner = scrapyd.runner # 原配置 -runner = aioscrapy.scrapyd.runner # 将runner替换为aio-scrapy提供的runner -application = scrapyd.app.application -launcher = scrapyd.launcher.Launcher -webroot = scrapyd.website.Root - -[services] -schedule.json = scrapyd.webservice.Schedule -cancel.json = scrapyd.webservice.Cancel -addversion.json = scrapyd.webservice.AddVersion -listprojects.json = scrapyd.webservice.ListProjects -listversions.json = scrapyd.webservice.ListVersions -listspiders.json = scrapyd.webservice.ListSpiders -delproject.json = scrapyd.webservice.DeleteProject -delversion.json = scrapyd.webservice.DeleteVersion -listjobs.json = scrapyd.webservice.ListJobs -daemonstatus.json = scrapyd.webservice.DaemonStatus - -``` - -启动scrapyd - -```shell -scrapyd & -``` - -更多具体操作请参考scrapyd的文档 - -### 其它 - -##### MysqlPipeline - -Mysql批量存储中间件 - -```python -ITEM_PIPELINES = { - 'aioscrapy.libs.pipelines.db.MysqlPipeline': 100, -} - -# mysql parameter -MYSQL_ARGS = { - # "default" is alias of the mysql pool - # Use: - # from aioscrapy.db import db_manager - # async with db_manager.get('default') as (conn, cur): - # print(await cur.execute('select 1')) - 'default': { - 'db': 'test', - 'user': 'root', - 'password': '123456', - 'host': '192.168.234.128', - 'port': 3306, - 'charset': 'utf8mb4', - }, - - # # "dev" is alias of the mysql pool - # 'dev': { - # 'db': 'test2', - # 'user': 'root', - # 'password': 'root', - # 'host': '127.0.0.1', - # 'port': 3306, - # 'charset': 'utf8mb4', - # } -} -SAVE_CACHE_NUM = 1000 # 每1000个item触发一次存储 -SAVE_CACHE_INTERVAL = 10 # 每10s触发一次存储 -""" -# item的格式要求如下 -item = { - 'save_table_name': 'baidu', # 要存储的表名字 - 'save_insert_type': 'insert', # 存储的方式 - 'save_db_alias': ['default'], # 要存储的mysql的库, mysql的alias - - # 下面为存储的字段 - 'title': "title", - } -""" -``` - -##### MongoPipeline - -Mongo批量存储中间件 - -```python -ITEM_PIPELINES = { - 'aioscrapy.libs.pipelines.db.MongoPipeline': 100, -} - -MONGO_ARGS = { - 'default': { - 'host': 'mongodb://root:root@192.168.234.128:27017', - 'db': 'test', - } -} -SAVE_CACHE_NUM = 1000 # 每1000个item触发一次存储 -SAVE_CACHE_INTERVAL = 10 # 每10s触发一次存储 -""" -# item的格式要求如下 -item = { - 'save_table_name': 'article', # 要存储的表名字 - 'save_db_alias': 'default', # 要存储的mongo, 参数“MONGO_ARGS”的key - # 'save_db_name': 'xxx', # 要存储的mongo的库名, 不指定则默认为“MONGO_ARGS”中的“db”值 - - # 下面为存储的字段 - 'title': "title", -} -""" -``` +[英文](./documentation.md)| 中文 + +### 调度的队列 + +`SCHEDULER_QUEUE_CLASS`:获取请求任务的队列类型,默认为`memory` + +##### memory + +```python +SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.memory.SpiderPriorityQueue' +``` + +##### reids + +```python +SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.redis.SpiderPriorityQueue' + +# redis parameter +REDIS_ARGS = { + 'queue': { + 'url': 'redis://192.168.234.128:6379/1', + 'max_connections': 2, + 'timeout': None, + 'retry_on_timeout': True, + 'health_check_interval': 30, + } +} +``` + +##### rabbitMq + +```python +SCHEDULER_QUEUE_CLASS = 'aioscrapy.queue.rabbitmq.SpiderPriorityQueue' +# RabbitMq parameter +RABBITMQ_ARGS = { + 'queue': { + 'url': "amqp://guest:guest@192.168.234.128:5673/", + 'connection_max_size': 2, + 'channel_max_size': 10, + } +} +``` + +### 过滤重复请求 + +`DUPEFILTER_CLASS`:配置url的去重类, 默认不配 + +##### disk + +将url指纹信息存放在磁盘 + +```python +DUPEFILTER_CLASS = 'aioscrapy.dupefilters.disk.RFPDupeFilter' +``` + +##### redis with hash + +将url指纹信息放到redis, 对url进行hash + +```python +DUPEFILTER_CLASS = 'aioscrapy.dupefilters.redis.RFPDupeFilter' +``` + +##### redis with Bloom filter + +将url指纹信息放到redis,使用布隆过滤 + +```python +DUPEFILTER_CLASS = 'aioscrapy.dupefilters.redis.BloomDupeFilter' +``` + +### 关闭爬虫 + +`CLOSE_SPIDER_ON_IDLE`: 当没有队列任务的时候是否关闭爬虫, 默认 `False`. + +### Scrapyd + +如可使用scrapyd部署aio-scrapy的分布式爬虫 + +安装scrapyd + +```shell +pip install scrapyd +``` + +修改scrapyd配置如下 default_scrapyd.conf + +```ini +[scrapyd] +eggs_dir = eggs +logs_dir = logs +items_dir = +jobs_to_keep = 5 +dbs_dir = dbs +max_proc = 0 +max_proc_per_cpu = 4 +finished_to_keep = 100 +poll_interval = 5.0 +bind_address = 127.0.0.1 +http_port = 6800 +debug = off +# runner = scrapyd.runner # 原配置 +runner = aioscrapy.scrapyd.runner # 将runner替换为aio-scrapy提供的runner +application = scrapyd.app.application +launcher = scrapyd.launcher.Launcher +webroot = scrapyd.website.Root + +[services] +schedule.json = scrapyd.webservice.Schedule +cancel.json = scrapyd.webservice.Cancel +addversion.json = scrapyd.webservice.AddVersion +listprojects.json = scrapyd.webservice.ListProjects +listversions.json = scrapyd.webservice.ListVersions +listspiders.json = scrapyd.webservice.ListSpiders +delproject.json = scrapyd.webservice.DeleteProject +delversion.json = scrapyd.webservice.DeleteVersion +listjobs.json = scrapyd.webservice.ListJobs +daemonstatus.json = scrapyd.webservice.DaemonStatus + +``` + +启动scrapyd + +```shell +scrapyd & +``` + +更多具体操作请参考scrapyd的文档 + +### 其它 + +##### CsvPipeline +csv存储中间件 + +```python +ITEM_PIPELINES = { + 'aioscrapy.libs.pipelines.sink.CsvPipeline': 100, +} +""" +# item的格式要求如下 +item = { + '__csv__': { + 'filename': 'article', # 文件名 或 存储的路径及文件名 如:D:\article.xlsx + }, + + # 下面为存储的字段 + 'title': "title", +} +""" +``` + +##### ExeclPipeline +execl存储中间件 + +```python +ITEM_PIPELINES = { + 'aioscrapy.libs.pipelines.sink.ExeclPipeline': 100, +} + +""" +# item的格式要求如下 +item = { + '__execl__': { + 'filename': 'article', # 文件名 或 存储的路径及文件名 如:D:\article.xlsx + 'sheet': 'sheet1', # 表格的sheet名字 不指定默认为sheet1 + + # 'img_fields': ['img'], # 图片字段 当指定图片字段时 自行下载图片 并保存到表格里 + # 'img_size': (100, 100) # 指定图片大小时 自动将图片转换为指定大小 + }, + + # 下面为存储的字段 + 'title': "title", + 'img': "https://domain/test.png", +} +""" +``` + +##### MysqlPipeline + +Mysql批量存储中间件 + +```python +ITEM_PIPELINES = { + 'aioscrapy.libs.pipelines.sink.MysqlPipeline': 100, +} + +# mysql parameter +MYSQL_ARGS = { + # "default" is alias of the mysql pool + # Use: + # from aioscrapy.db import db_manager + # async with db_manager.get('default') as (conn, cur): + # print(await cur.execute('select 1')) + 'default': { + 'db': 'test', + 'user': 'root', + 'password': '123456', + 'host': '192.168.234.128', + 'port': 3306, + 'charset': 'utf8mb4', + }, + + # # "dev" is alias of the mysql pool + # 'dev': { + # 'db': 'test2', + # 'user': 'root', + # 'password': 'root', + # 'host': '127.0.0.1', + # 'port': 3306, + # 'charset': 'utf8mb4', + # } +} +SAVE_CACHE_NUM = 1000 # 每1000个item触发一次存储 +SAVE_CACHE_INTERVAL = 10 # 每10s触发一次存储 +""" +# item的格式要求如下 +item = { + '__mysql__': { + 'db_alias': 'default', # 要存储的mysql, 参数“MYSQL_ARGS”的key + 'table_name': 'article', # 要存储的表名字 + + # 写入数据库的方式: 默认insert方式 + # insert:普通写入 出现主键或唯一键冲突时抛出异常 + # update_insert:更新插入 出现主键或唯一键冲突时,更新写入 + # ignore_insert:忽略写入 写入时出现冲突 丢掉该条数据 不抛出异常 + 'insert_type': 'update_insert', + } + + # 下面为存储的字段 + 'title': "title", +} +""" +``` + +##### MongoPipeline + +Mongo批量存储中间件 + +```python +ITEM_PIPELINES = { + 'aioscrapy.libs.pipelines.sink.MongoPipeline': 100, +} + +MONGO_ARGS = { + 'default': { + 'host': 'mongodb://root:root@192.168.234.128:27017', + 'db': 'test', + } +} +SAVE_CACHE_NUM = 1000 # 每1000个item触发一次存储 +SAVE_CACHE_INTERVAL = 10 # 每10s触发一次存储 +""" +# item的格式要求如下 +item = { + '__mongo__': { + 'db_alias': 'default', # 要存储的mongo, 参数“MONGO_ARGS”的key + 'table_name': 'article', # 要存储的表名字 + # 'db_name': 'xxx', # 要存储的mongo的库名, 不指定则默认为“MONGO_ARGS”中的“db”值 + } + + # 下面为存储的字段 + 'title': "title", +} +""" +``` + +##### PGPipeline +PostpreSQL批量存储中间件 + +```python +ITEM_PIPELINES = { + 'aioscrapy.libs.pipelines.sink.PGPipeline': 100, +} +PG_ARGS = { + 'default': { + 'user': 'user', + 'password': 'password', + 'database': 'spider_db', + 'host': '127.0.0.1' + } +} +SAVE_CACHE_NUM = 1000 # 每1000个item触发一次存储 +SAVE_CACHE_INTERVAL = 10 # 每10s触发一次存储 +""" +# item的格式要求如下 +item = { + '__pg__': { + 'db_alias': 'default', # 要存储的PostgreSQL, 参数“PG_ARGS”的key + 'table_name': 'spider_db.article', # 要存储的schema和表名字,用.隔开 + + # 写入数据库的方式: + # insert:普通写入 出现主键或唯一键冲突时抛出异常 + # update_insert:更新插入 出现on_conflict指定的冲突时,更新写入 + # ignore_insert:忽略写入 写入时出现冲突 丢掉该条数据 不抛出异常 + 'insert_type': 'update_insert', + 'on_conflict': 'id', # update_insert方式下的约束 + } + + # 下面为存储的字段 + 'title': "title", +} +""" +``` diff --git a/example/singlespider/demo_duplicate.py b/example/singlespider/demo_duplicate.py index 3a74909..22e9eef 100644 --- a/example/singlespider/demo_duplicate.py +++ b/example/singlespider/demo_duplicate.py @@ -16,9 +16,9 @@ class DemoDuplicateSpider(Spider): "CLOSE_SPIDER_ON_IDLE": True, # 'LOG_FILE': 'test.log', - # 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.disk.RFPDupeFilter', - # 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.redis.RFPDupeFilter', - 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.redis.BloomDupeFilter', + # 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.disk.RFPDupeFilter', # 本地文件存储指纹去重 + # 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.redis.RFPDupeFilter', # redis set去重 + 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.redis.BloomDupeFilter', # 布隆过滤器去重 'SCHEDULER_QUEUE_CLASS': 'aioscrapy.queue.redis.SpiderPriorityQueue', 'SCHEDULER_SERIALIZER': 'aioscrapy.serializer.JsonSerializer', diff --git a/example/singlespider/demo_memory.py b/example/singlespider/demo_queue_memory.py similarity index 100% rename from example/singlespider/demo_memory.py rename to example/singlespider/demo_queue_memory.py diff --git a/example/singlespider/demo_rabbitmq.py b/example/singlespider/demo_queue_rabbitmq.py similarity index 100% rename from example/singlespider/demo_rabbitmq.py rename to example/singlespider/demo_queue_rabbitmq.py diff --git a/example/singlespider/demo_redis.py b/example/singlespider/demo_queue_redis.py similarity index 92% rename from example/singlespider/demo_redis.py rename to example/singlespider/demo_queue_redis.py index c804e9c..572045c 100644 --- a/example/singlespider/demo_redis.py +++ b/example/singlespider/demo_queue_redis.py @@ -33,6 +33,9 @@ class DemoRedisSpider(Spider): 'queue': { 'url': 'redis://192.168.18.129:6379/0', 'max_connections': 2, + 'timeout': None, + 'retry_on_timeout': True, + 'health_check_interval': 30 } } } diff --git a/example/singlespider/demo_request_aiohttp.py b/example/singlespider/demo_request_aiohttp.py new file mode 100644 index 0000000..2362da5 --- /dev/null +++ b/example/singlespider/demo_request_aiohttp.py @@ -0,0 +1,60 @@ +import logging + +from aioscrapy import Request +from aioscrapy.spiders import Spider + +logger = logging.getLogger(__name__) + + +class DemoAiohttpSpider(Spider): + name = 'DemoAiohttpSpider' + custom_settings = dict( + USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", + # DOWNLOAD_DELAY=3, + # RANDOMIZE_DOWNLOAD_DELAY=True, + # CONCURRENT_REQUESTS=1, + LOG_LEVEL='INFO', + CLOSE_SPIDER_ON_IDLE=True, + HTTPERROR_ALLOW_ALL=True, + # DOWNLOAD_HANDLERS={ + # 'http': 'aioscrapy.core.downloader.handlers.aiohttp.AioHttpDownloadHandler', + # 'https': 'aioscrapy.core.downloader.handlers.aiohttp.AioHttpDownloadHandler', + # }, + DOWNLOAD_HANDLERS_TYPE="aiohttp", + ) + + start_urls = ['https://quotes.toscrape.com'] + + @staticmethod + async def process_request(request, spider): + """ request middleware """ + pass + + @staticmethod + async def process_response(request, response, spider): + """ response middleware """ + return response + + @staticmethod + async def process_exception(request, exception, spider): + """ exception middleware """ + pass + + async def parse(self, response): + for quote in response.css('div.quote'): + yield { + 'author': quote.xpath('span/small/text()').get(), + 'text': quote.css('span.text::text').get(), + } + + next_page = response.css('li.next a::attr("href")').get() + if next_page is not None: + # yield response.follow(next_page, self.parse) + yield Request(f"https://quotes.toscrape.com/{next_page}", callback=self.parse) + + async def process_item(self, item): + print(item) + + +if __name__ == '__main__': + DemoAiohttpSpider.start() diff --git a/example/singlespider/demo_httpx.py b/example/singlespider/demo_request_httpx.py similarity index 78% rename from example/singlespider/demo_httpx.py rename to example/singlespider/demo_request_httpx.py index a7a1139..e72008d 100644 --- a/example/singlespider/demo_httpx.py +++ b/example/singlespider/demo_request_httpx.py @@ -9,19 +9,19 @@ class DemoHttpxSpider(Spider): name = 'DemoHttpxSpider' custom_settings = dict( - USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", + USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", # DOWNLOAD_DELAY=3, # RANDOMIZE_DOWNLOAD_DELAY=True, # CONCURRENT_REQUESTS=1, LOG_LEVEL='INFO', CLOSE_SPIDER_ON_IDLE=True, - DOWNLOAD_HANDLERS={ - 'http': 'aioscrapy.core.downloader.handlers.httpx.HttpxDownloadHandler', - 'https': 'aioscrapy.core.downloader.handlers.httpx.HttpxDownloadHandler', - }, - HTTPX_CLIENT_SESSION_ARGS={ - 'http2': True - } + # DOWNLOAD_HANDLERS={ + # 'http': 'aioscrapy.core.downloader.handlers.httpx.HttpxDownloadHandler', + # 'https': 'aioscrapy.core.downloader.handlers.httpx.HttpxDownloadHandler', + # }, + DOWNLOAD_HANDLERS_TYPE="httpx", + HTTPX_CLIENT_SESSION_ARGS={'http2': True}, + HTTPERROR_ALLOW_ALL=True ) start_urls = ['https://quotes.toscrape.com'] diff --git a/example/singlespider/demo_playwright.py b/example/singlespider/demo_request_playwright.py similarity index 91% rename from example/singlespider/demo_playwright.py rename to example/singlespider/demo_request_playwright.py index 5ea5f32..66224b9 100644 --- a/example/singlespider/demo_playwright.py +++ b/example/singlespider/demo_request_playwright.py @@ -1,94 +1,95 @@ -import logging - -from aioscrapy import Request -from aioscrapy.spiders import Spider -from aioscrapy.http import PlaywrightResponse -from aioscrapy.core.downloader.handlers.playwright import PlaywrightDriver - -logger = logging.getLogger(__name__) - - -class DemoPlaywrightSpider(Spider): - name = 'DemoPlaywrightSpider' - - custom_settings = dict( - USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", - # DOWNLOAD_DELAY=3, - # RANDOMIZE_DOWNLOAD_DELAY=True, - CONCURRENT_REQUESTS=1, - LOG_LEVEL='INFO', - CLOSE_SPIDER_ON_IDLE=True, - DOWNLOAD_HANDLERS={ - 'http': 'aioscrapy.core.downloader.handlers.playwright.PlaywrightHandler', - 'https': 'aioscrapy.core.downloader.handlers.playwright.PlaywrightHandler', - }, - PLAYWRIGHT_CLIENT_ARGS=dict( - driver_type="chromium", # chromium、firefox、webkit - wait_until="networkidle", # 等待页面加载完成的事件,可选值:"commit", "domcontentloaded", "load", "networkidle" - window_size=(1024, 800), - # url_regexes=["xxxx"], - # proxy='http://user:pwd@127.0.0.1:7890', - browser_args=dict( - executable_path=None, channel=None, args=None, ignore_default_args=None, handle_sigint=None, - handle_sigterm=None, handle_sighup=None, timeout=None, env=None, headless=False, devtools=None, - downloads_path=None, slow_mo=None, traces_dir=None, chromium_sandbox=None, - firefox_user_prefs=None, - ), - context_args=dict( - no_viewport=None, ignore_https_errors=None, java_script_enabled=None, - bypass_csp=None, user_agent=None, locale=None, timezone_id=None, geolocation=None, permissions=None, - extra_http_headers=None, offline=None, http_credentials=None, device_scale_factor=None, - is_mobile=None, has_touch=None, color_scheme=None, reduced_motion=None, forced_colors=None, - accept_downloads=None, default_browser_type=None, record_har_path=None, - record_har_omit_content=None, record_video_dir=None, record_video_size=None, storage_state=None, - base_url=None, strict_selectors=None, service_workers=None, record_har_url_filter=None, - record_har_mode=None, record_har_content=None, - ), - ) - - ) - - start_urls = ['https://hanyu.baidu.com/zici/s?wd=黄&query=黄'] - - @staticmethod - async def process_request(request, spider): - """ request middleware """ - pass - - @staticmethod - async def process_response(request, response, spider): - """ response middleware """ - return response - - @staticmethod - async def process_exception(request, exception, spider): - """ exception middleware """ - pass - - async def parse(self, response: PlaywrightResponse): - # res = response.get_response("xxxx") - # print(res.text[:100]) - - img_bytes = response.get_response('action_result') - yield { - 'pingyin': response.xpath('//div[@id="pinyin"]/span/b/text()').get(), - 'fan': response.xpath('//*[@id="traditional"]/span/text()').get(), - 'img': img_bytes, - } - - new_character = response.xpath('//a[@class="img-link"]/@href').getall() - for character in new_character: - new_url = 'https://hanyu.baidu.com/zici' + character - yield Request(new_url, callback=self.parse, dont_filter=True) - - async def process_action(self, driver: PlaywrightDriver): - """ Do some operations after function page.goto """ - img_bytes = await driver.page.screenshot(type="jpeg", quality=50) - return img_bytes - - # async def process_item(self, item): - # print(item) - - -if __name__ == '__main__': - DemoPlaywrightSpider.start() +import logging + +from aioscrapy import Request +from aioscrapy.spiders import Spider +from aioscrapy.http import PlaywrightResponse +from aioscrapy.core.downloader.handlers.playwright import PlaywrightDriver + +logger = logging.getLogger(__name__) + + +class DemoPlaywrightSpider(Spider): + name = 'DemoPlaywrightSpider' + + custom_settings = dict( + USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", + # DOWNLOAD_DELAY=3, + # RANDOMIZE_DOWNLOAD_DELAY=True, + CONCURRENT_REQUESTS=1, + LOG_LEVEL='INFO', + CLOSE_SPIDER_ON_IDLE=True, + # DOWNLOAD_HANDLERS={ + # 'http': 'aioscrapy.core.downloader.handlers.playwright.PlaywrightHandler', + # 'https': 'aioscrapy.core.downloader.handlers.playwright.PlaywrightHandler', + # }, + DOWNLOAD_HANDLERS_TYPE="playwright", + PLAYWRIGHT_CLIENT_ARGS=dict( + driver_type="chromium", # chromium、firefox、webkit + wait_until="networkidle", # 等待页面加载完成的事件,可选值:"commit", "domcontentloaded", "load", "networkidle" + window_size=(1024, 800), + # url_regexes=["xxxx"], + # proxy='http://user:pwd@127.0.0.1:7890', + browser_args=dict( + executable_path=None, channel=None, args=None, ignore_default_args=None, handle_sigint=None, + handle_sigterm=None, handle_sighup=None, timeout=None, env=None, headless=False, devtools=None, + downloads_path=None, slow_mo=None, traces_dir=None, chromium_sandbox=None, + firefox_user_prefs=None, + ), + context_args=dict( + no_viewport=None, ignore_https_errors=None, java_script_enabled=None, + bypass_csp=None, user_agent=None, locale=None, timezone_id=None, geolocation=None, permissions=None, + extra_http_headers=None, offline=None, http_credentials=None, device_scale_factor=None, + is_mobile=None, has_touch=None, color_scheme=None, reduced_motion=None, forced_colors=None, + accept_downloads=None, default_browser_type=None, record_har_path=None, + record_har_omit_content=None, record_video_dir=None, record_video_size=None, storage_state=None, + base_url=None, strict_selectors=None, service_workers=None, record_har_url_filter=None, + record_har_mode=None, record_har_content=None, + ), + ) + + ) + + start_urls = ['https://hanyu.baidu.com/zici/s?wd=黄&query=黄'] + + @staticmethod + async def process_request(request, spider): + """ request middleware """ + pass + + @staticmethod + async def process_response(request, response, spider): + """ response middleware """ + return response + + @staticmethod + async def process_exception(request, exception, spider): + """ exception middleware """ + pass + + async def parse(self, response: PlaywrightResponse): + # res = response.get_response("xxxx") + # print(res.text[:100]) + + img_bytes = response.get_response('action_result') + yield { + 'pingyin': response.xpath('//div[@id="pinyin"]/span/b/text()').get(), + 'fan': response.xpath('//*[@id="traditional"]/span/text()').get(), + 'img': img_bytes, + } + + new_character = response.xpath('//a[@class="img-link"]/@href').getall() + for character in new_character: + new_url = 'https://hanyu.baidu.com/zici' + character + yield Request(new_url, callback=self.parse, dont_filter=True) + + async def process_action(self, driver: PlaywrightDriver): + """ Do some operations after function page.goto """ + img_bytes = await driver.page.screenshot(type="jpeg", quality=50) + return img_bytes + + # async def process_item(self, item): + # print(item) + + +if __name__ == '__main__': + DemoPlaywrightSpider.start() diff --git a/example/singlespider/demo_pyhttpx.py b/example/singlespider/demo_request_pyhttpx.py similarity index 79% rename from example/singlespider/demo_pyhttpx.py rename to example/singlespider/demo_request_pyhttpx.py index ffbf62a..13fed1b 100644 --- a/example/singlespider/demo_pyhttpx.py +++ b/example/singlespider/demo_request_pyhttpx.py @@ -1,55 +1,56 @@ -import logging - -from aioscrapy import Request -from aioscrapy.spiders import Spider -from aioscrapy.http import Response - -logger = logging.getLogger(__name__) - - -class DemoPyhttpxSpider(Spider): - name = 'DemoPyhttpxSpider' - - custom_settings = dict( - USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", - # DOWNLOAD_DELAY=3, - # RANDOMIZE_DOWNLOAD_DELAY=True, - CONCURRENT_REQUESTS=1, - LOG_LEVEL='INFO', - CLOSE_SPIDER_ON_IDLE=True, - DOWNLOAD_HANDLERS={ - 'http': 'aioscrapy.core.downloader.handlers.pyhttpx.PyhttpxDownloadHandler', - 'https': 'aioscrapy.core.downloader.handlers.pyhttpx.PyhttpxDownloadHandler', - }, - PYHTTPX_CLIENT_ARGS=dict( - browser_type='chrome', - http2=True - ) - ) - - start_urls = ['https://tls.peet.ws/api/all'] - - @staticmethod - async def process_request(request, spider): - """ request middleware """ - pass - - @staticmethod - async def process_response(request, response, spider): - """ response middleware """ - return response - - @staticmethod - async def process_exception(request, exception, spider): - """ exception middleware """ - pass - - async def parse(self, response: Response): - print(response.text) - - async def process_item(self, item): - print(item) - - -if __name__ == '__main__': - DemoPyhttpxSpider.start() +import logging + +from aioscrapy import Request +from aioscrapy.spiders import Spider +from aioscrapy.http import Response + +logger = logging.getLogger(__name__) + + +class DemoPyhttpxSpider(Spider): + name = 'DemoPyhttpxSpider' + + custom_settings = dict( + USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", + # DOWNLOAD_DELAY=3, + # RANDOMIZE_DOWNLOAD_DELAY=True, + CONCURRENT_REQUESTS=1, + LOG_LEVEL='INFO', + CLOSE_SPIDER_ON_IDLE=True, + # DOWNLOAD_HANDLERS={ + # 'http': 'aioscrapy.core.downloader.handlers.pyhttpx.PyhttpxDownloadHandler', + # 'https': 'aioscrapy.core.downloader.handlers.pyhttpx.PyhttpxDownloadHandler', + # }, + DOWNLOAD_HANDLERS_TYPE="pyhttpx", + PYHTTPX_CLIENT_ARGS=dict( + browser_type='chrome', + http2=True + ) + ) + + start_urls = ['https://tls.peet.ws/api/all'] + + @staticmethod + async def process_request(request, spider): + """ request middleware """ + pass + + @staticmethod + async def process_response(request, response, spider): + """ response middleware """ + return response + + @staticmethod + async def process_exception(request, exception, spider): + """ exception middleware """ + pass + + async def parse(self, response: Response): + print(response.text) + + async def process_item(self, item): + print(item) + + +if __name__ == '__main__': + DemoPyhttpxSpider.start() diff --git a/example/singlespider/demo_requests.py b/example/singlespider/demo_request_requests.py similarity index 81% rename from example/singlespider/demo_requests.py rename to example/singlespider/demo_request_requests.py index e6d8103..d870347 100644 --- a/example/singlespider/demo_requests.py +++ b/example/singlespider/demo_request_requests.py @@ -17,10 +17,11 @@ class DemoRequestsSpider(Spider): CONCURRENT_REQUESTS=1, LOG_LEVEL='INFO', CLOSE_SPIDER_ON_IDLE=True, - DOWNLOAD_HANDLERS={ - 'http': 'aioscrapy.core.downloader.handlers.requests.RequestsDownloadHandler', - 'https': 'aioscrapy.core.downloader.handlers.requests.RequestsDownloadHandler', - }, + # DOWNLOAD_HANDLERS={ + # 'http': 'aioscrapy.core.downloader.handlers.requests.RequestsDownloadHandler', + # 'https': 'aioscrapy.core.downloader.handlers.requests.RequestsDownloadHandler', + # }, + DOWNLOAD_HANDLERS_TYPE="requests", ) start_urls = ['https://quotes.toscrape.com'] diff --git a/example/singlespider/demo_sink_csv.py b/example/singlespider/demo_sink_csv.py new file mode 100644 index 0000000..22e8362 --- /dev/null +++ b/example/singlespider/demo_sink_csv.py @@ -0,0 +1,61 @@ +import datetime +import logging +import time + +from aioscrapy import Request +from aioscrapy.spiders import Spider + +logger = logging.getLogger(__name__) + + +class DemoCsvSpider(Spider): + name = 'DemoCsvSpider' + custom_settings = { + "USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", + # 'DOWNLOAD_DELAY': 3, + # 'RANDOMIZE_DOWNLOAD_DELAY': True, + # 'CONCURRENT_REQUESTS': 1, + # 'LOG_LEVEL': 'INFO' + # 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.disk.RFPDupeFilter', + "CLOSE_SPIDER_ON_IDLE": True, + + "ITEM_PIPELINES": { + 'aioscrapy.libs.pipelines.sink.CsvPipeline': 100, + }, + "SAVE_CACHE_NUM": 1000, # 每次存储1000条 + "SAVE_CACHE_INTERVAL": 10, # 每次10秒存储一次 + } + + start_urls = ['https://quotes.toscrape.com'] + + @staticmethod + async def process_request(request, spider): + """ request middleware """ + pass + + @staticmethod + async def process_response(request, response, spider): + """ response middleware """ + return response + + @staticmethod + async def process_exception(request, exception, spider): + """ exception middleware """ + pass + + async def parse(self, response): + for quote in response.css('div.quote'): + yield { + 'author': quote.xpath('span/small/text()').get(), + 'text': quote.css('span.text::text').get(), + '__csv__': { + 'filename': 'article', # 文件名 或 存储的路径及文件名 如:D:\article.xlsx + } + } + + async def process_item(self, item): + print(item) + + +if __name__ == '__main__': + DemoCsvSpider.start() diff --git a/example/singlespider/demo_sink_execl.py b/example/singlespider/demo_sink_execl.py new file mode 100644 index 0000000..570ea8c --- /dev/null +++ b/example/singlespider/demo_sink_execl.py @@ -0,0 +1,65 @@ +import datetime +import logging +import time + +from aioscrapy import Request +from aioscrapy.spiders import Spider + +logger = logging.getLogger(__name__) + + +class DemoExeclSpider(Spider): + name = 'DemoExeclSpider' + custom_settings = { + "USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", + # 'DOWNLOAD_DELAY': 3, + # 'RANDOMIZE_DOWNLOAD_DELAY': True, + # 'CONCURRENT_REQUESTS': 1, + # 'LOG_LEVEL': 'INFO' + # 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.disk.RFPDupeFilter', + "CLOSE_SPIDER_ON_IDLE": True, + + "ITEM_PIPELINES": { + 'aioscrapy.libs.pipelines.sink.ExeclPipeline': 100, + }, + "SAVE_CACHE_NUM": 1000, # 每次存储1000条 + "SAVE_CACHE_INTERVAL": 10, # 每次10秒存储一次 + } + + start_urls = ['https://quotes.toscrape.com'] + + @staticmethod + async def process_request(request, spider): + """ request middleware """ + pass + + @staticmethod + async def process_response(request, response, spider): + """ response middleware """ + return response + + @staticmethod + async def process_exception(request, exception, spider): + """ exception middleware """ + pass + + async def parse(self, response): + for quote in response.css('div.quote'): + yield { + 'author': quote.xpath('span/small/text()').get(), + 'text': quote.css('span.text::text').get(), + '__execl__': { + 'filename': 'article', # 文件名 或 存储的路径及文件名 如:D:\article.xlsx + 'sheet': 'sheet1', # 表格的sheet名字 不指定默认为sheet1 + + # 'img_fields': ['img'], # 图片字段 当指定图片字段时 自行下载图片 并保存到表格里 + # 'img_size': (100, 100) # 指定图片大小时 自动将图片转换为指定大小 + } + } + + async def process_item(self, item): + print(item) + + +if __name__ == '__main__': + DemoExeclSpider.start() diff --git a/example/singlespider/demo_sink_mongo.py b/example/singlespider/demo_sink_mongo.py index 52ec795..d8e9993 100644 --- a/example/singlespider/demo_sink_mongo.py +++ b/example/singlespider/demo_sink_mongo.py @@ -1,71 +1,72 @@ -import logging - -from aioscrapy import Request -from aioscrapy.spiders import Spider - -logger = logging.getLogger(__name__) - - -class DemoMongoSpider(Spider): - name = 'DemoMongoSpider' - custom_settings = { - "USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", - # 'DOWNLOAD_DELAY': 3, - # 'RANDOMIZE_DOWNLOAD_DELAY': True, - # 'CONCURRENT_REQUESTS': 1, - # 'LOG_LEVEL': 'INFO' - # 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.disk.RFPDupeFilter', - "CLOSE_SPIDER_ON_IDLE": True, - # mongo parameter - "MONGO_ARGS": { - 'default': { - 'host': 'mongodb://root:root@192.168.234.128:27017', - 'db': 'test', - } - }, - "ITEM_PIPELINES": { - 'aioscrapy.libs.pipelines.sink.MongoPipeline': 100, - }, - "SAVE_CACHE_NUM": 1000, # 每次存储1000条 - "SAVE_CACHE_INTERVAL": 10, # 每次10秒存储一次 - } - - start_urls = ['https://quotes.toscrape.com'] - - @staticmethod - async def process_request(request, spider): - """ request middleware """ - pass - - @staticmethod - async def process_response(request, response, spider): - """ response middleware """ - return response - - @staticmethod - async def process_exception(request, exception, spider): - """ exception middleware """ - pass - - async def parse(self, response): - for quote in response.css('div.quote'): - yield { - 'save_table_name': 'article', # 要存储的表名字 - 'save_db_alias': 'default', # 要存储的mongo, 参数“MONGO_ARGS”的key - # 'save_db_name': 'xxx', # 要存储的mongo的库名, 不指定则默认为“MONGO_ARGS”中的“db”值 - - 'author': quote.xpath('span/small/text()').get(), - 'text': quote.css('span.text::text').get(), - } - - next_page = response.css('li.next a::attr("href")').get() - if next_page is not None: - # yield response.follow(next_page, self.parse) - yield Request(f"https://quotes.toscrape.com{next_page}", callback=self.parse) - - async def process_item(self, item): - print(item) - - -if __name__ == '__main__': - DemoMongoSpider.start() +import logging + +from aioscrapy import Request +from aioscrapy.spiders import Spider + +logger = logging.getLogger(__name__) + + +class DemoMongoSpider(Spider): + name = 'DemoMongoSpider' + custom_settings = { + "USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", + # 'DOWNLOAD_DELAY': 3, + # 'RANDOMIZE_DOWNLOAD_DELAY': True, + # 'CONCURRENT_REQUESTS': 1, + # 'LOG_LEVEL': 'INFO' + # 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.disk.RFPDupeFilter', + "CLOSE_SPIDER_ON_IDLE": True, + # mongo parameter + "MONGO_ARGS": { + 'default': { + 'host': 'mongodb://root:root@192.168.234.128:27017', + 'db': 'test', + } + }, + "ITEM_PIPELINES": { + 'aioscrapy.libs.pipelines.sink.MongoPipeline': 100, + }, + "SAVE_CACHE_NUM": 1000, # 每次存储1000条 + "SAVE_CACHE_INTERVAL": 10, # 每次10秒存储一次 + } + + start_urls = ['https://quotes.toscrape.com'] + + @staticmethod + async def process_request(request, spider): + """ request middleware """ + pass + + @staticmethod + async def process_response(request, response, spider): + """ response middleware """ + return response + + @staticmethod + async def process_exception(request, exception, spider): + """ exception middleware """ + pass + + async def parse(self, response): + for quote in response.css('div.quote'): + yield { + 'author': quote.xpath('span/small/text()').get(), + 'text': quote.css('span.text::text').get(), + '__mongo__': { + 'db_alias': 'default', # 要存储的mongo, 参数“MONGO_ARGS”的key + 'table_name': 'article', # 要存储的表名字 + # 'db_name': 'xxx', # 要存储的mongo的库名, 不指定则默认为“MONGO_ARGS”中的“db”值 + } + } + + next_page = response.css('li.next a::attr("href")').get() + if next_page is not None: + # yield response.follow(next_page, self.parse) + yield Request(f"https://quotes.toscrape.com{next_page}", callback=self.parse) + + async def process_item(self, item): + print(item) + + +if __name__ == '__main__': + DemoMongoSpider.start() diff --git a/example/singlespider/demo_sink_mysql.py b/example/singlespider/demo_sink_mysql.py index cfb9951..1277cdf 100644 --- a/example/singlespider/demo_sink_mysql.py +++ b/example/singlespider/demo_sink_mysql.py @@ -1,75 +1,81 @@ -import logging - -from aioscrapy import Request -from aioscrapy.spiders import Spider - -logger = logging.getLogger(__name__) - - -class DemoMysqlSpider(Spider): - name = 'DemoMysqlSpider' - custom_settings = dict( - USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", - # OWNLOAD_DELAY=3, - # ANDOMIZE_DOWNLOAD_DELAY=True, - # ONCURRENT_REQUESTS=1, - # OG_LEVEL='INFO', - # UPEFILTER_CLASS='aioscrapy.dupefilters.disk.RFPDupeFilter', - CLOSE_SPIDER_ON_IDLE=True, - # mysql parameter - MYSQL_ARGS={ - 'default': { - 'host': '127.0.0.1', - 'user': 'root', - 'password': 'root', - 'port': 3306, - 'charset': 'utf8mb4', - 'db': 'test', - }, - }, - ITEM_PIPELINES={ - 'aioscrapy.libs.pipelines.sink.MysqlPipeline': 100, - }, - SAVE_CACHE_NUM=1000, # 每次存储1000条 - SAVE_CACHE_INTERVAL=10, # 每次10秒存储一次 - ) - - start_urls = ['https://quotes.toscrape.com'] - - @staticmethod - async def process_request(request, spider): - """ request middleware """ - pass - - @staticmethod - async def process_response(request, response, spider): - """ response middleware """ - return response - - @staticmethod - async def process_exception(request, exception, spider): - """ exception middleware """ - pass - - async def parse(self, response): - for quote in response.css('div.quote'): - yield { - 'save_table_name': 'article', # 要存储的表名字 - 'save_db_alias': 'default', # 要存储的mongo, 参数“MYSQL_ARGS”的key - # 'save_db_name': 'xxx', # 要存储的mongo的库名, 不指定则默认为“MYSQL_ARGS”中的“db”值 - - 'author': quote.xpath('span/small/text()').get(), - 'text': quote.css('span.text::text').get(), - } - - next_page = response.css('li.next a::attr("href")').get() - if next_page is not None: - # yield response.follow(next_page, self.parse) - yield Request(f"https://quotes.toscrape.com{next_page}", callback=self.parse) - - async def process_item(self, item): - print(item) - - -if __name__ == '__main__': - DemoMysqlSpider.start() +import logging + +from aioscrapy import Request +from aioscrapy.spiders import Spider + +logger = logging.getLogger(__name__) + + +class DemoMysqlSpider(Spider): + name = 'DemoMysqlSpider' + custom_settings = dict( + USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", + # OWNLOAD_DELAY=3, + # ANDOMIZE_DOWNLOAD_DELAY=True, + # ONCURRENT_REQUESTS=1, + # OG_LEVEL='INFO', + # UPEFILTER_CLASS='aioscrapy.dupefilters.disk.RFPDupeFilter', + CLOSE_SPIDER_ON_IDLE=True, + # mysql parameter + MYSQL_ARGS={ + 'default': { + 'host': '127.0.0.1', + 'user': 'root', + 'password': 'root', + 'port': 3306, + 'charset': 'utf8mb4', + 'db': 'test', + }, + }, + ITEM_PIPELINES={ + 'aioscrapy.libs.pipelines.sink.MysqlPipeline': 100, + }, + SAVE_CACHE_NUM=1000, # 每次存储1000条 + SAVE_CACHE_INTERVAL=10, # 每次10秒存储一次 + ) + + start_urls = ['https://quotes.toscrape.com'] + + @staticmethod + async def process_request(request, spider): + """ request middleware """ + pass + + @staticmethod + async def process_response(request, response, spider): + """ response middleware """ + return response + + @staticmethod + async def process_exception(request, exception, spider): + """ exception middleware """ + pass + + async def parse(self, response): + for quote in response.css('div.quote'): + yield { + 'author': quote.xpath('span/small/text()').get(), + 'text': quote.css('span.text::text').get(), + '__mysql__': { + 'db_alias': 'default', # 要存储的mysql, 参数“MYSQL_ARGS”的key + 'table_name': 'article', # 要存储的表名字 + + # 写入数据库的方式: 默认insert方式 + # insert:普通写入 出现主键或唯一键冲突时抛出异常 + # update_insert:更新插入 出现主键或唯一键冲突时,更新写入 + # ignore_insert:忽略写入 写入时出现冲突 丢掉该条数据 不抛出异常 + 'insert_type': 'update_insert', + } + } + + next_page = response.css('li.next a::attr("href")').get() + if next_page is not None: + # yield response.follow(next_page, self.parse) + yield Request(f"https://quotes.toscrape.com{next_page}", callback=self.parse) + + async def process_item(self, item): + print(item) + + +if __name__ == '__main__': + DemoMysqlSpider.start() diff --git a/example/singlespider/demo_sink_pg.py b/example/singlespider/demo_sink_pg.py new file mode 100644 index 0000000..79eee69 --- /dev/null +++ b/example/singlespider/demo_sink_pg.py @@ -0,0 +1,79 @@ +import logging + +from aioscrapy import Request +from aioscrapy.spiders import Spider + +logger = logging.getLogger(__name__) + + +class DemoPGSpider(Spider): + name = 'DemoPGSpider' + custom_settings = { + "USER_AGENT": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", + # 'DOWNLOAD_DELAY': 3, + # 'RANDOMIZE_DOWNLOAD_DELAY': True, + # 'CONCURRENT_REQUESTS': 1, + # 'LOG_LEVEL': 'INFO' + # 'DUPEFILTER_CLASS': 'aioscrapy.dupefilters.disk.RFPDupeFilter', + "CLOSE_SPIDER_ON_IDLE": True, + # mongo parameter + "PG_ARGS": { + 'default': { + 'user': 'user', + 'password': 'password', + 'database': 'spider_db', + 'host': '127.0.0.1' + } + }, + "ITEM_PIPELINES": { + 'aioscrapy.libs.pipelines.sink.PGPipeline': 100, + }, + "SAVE_CACHE_NUM": 1000, # 每次存储1000条 + "SAVE_CACHE_INTERVAL": 10, # 每次10秒存储一次 + } + + start_urls = ['https://quotes.toscrape.com'] + + @staticmethod + async def process_request(request, spider): + """ request middleware """ + pass + + @staticmethod + async def process_response(request, response, spider): + """ response middleware """ + return response + + @staticmethod + async def process_exception(request, exception, spider): + """ exception middleware """ + pass + + async def parse(self, response): + for quote in response.css('div.quote'): + yield { + 'author': quote.xpath('span/small/text()').get(), + 'text': quote.css('span.text::text').get(), + '__pg__': { + 'db_alias': 'default', # 要存储的PostgreSQL, 参数“PG_ARGS”的key + 'table_name': 'spider_db.article', # 要存储的schema和表名字,用.隔开 + + # 写入数据库的方式: + # insert:普通写入 出现主键或唯一键冲突时抛出异常 + # update_insert:更新插入 出现on_conflict指定的冲突时,更新写入 + # ignore_insert:忽略写入 写入时出现冲突 丢掉该条数据 不抛出异常 + 'insert_type': 'update_insert', + 'on_conflict': 'id', # update_insert方式下的约束 + } + } + next_page = response.css('li.next a::attr("href")').get() + if next_page is not None: + # yield response.follow(next_page, self.parse) + yield Request(f"https://quotes.toscrape.com{next_page}", callback=self.parse) + + async def process_item(self, item): + print(item) + + +if __name__ == '__main__': + DemoPGSpider.start() diff --git a/setup.py b/setup.py index f09b7bf..a607ece 100644 --- a/setup.py +++ b/setup.py @@ -17,14 +17,18 @@ extras_require = { "all": [ "aiomysql>=0.1.1", "httpx[http2]>=0.23.0", "aio-pika>=8.1.1", - "cryptography", "motor>=3.1.1", "playwright>=1.31.1", "pyhttpx>=2.10.1" + "cryptography", "motor>=3.1.1", "playwright>=1.31.1", "pyhttpx>=2.10.1", + "asyncpg>=0.27.0", "XlsxWriter>=3.1.2", "pillow>=9.4.0", "requests>=2.28.2" ], "aiomysql": ["aiomysql>=0.1.1", "cryptography"], "httpx": ["httpx[http2]>=0.23.0"], "aio-pika": ["aio-pika>=8.1.1"], "mongo": ["motor>=3.1.1"], "playwright": ["playwright>=1.31.1"], - "pyhttpx": ["pyhttpx>=2.10.4"] + "pyhttpx": ["pyhttpx>=2.10.4"], + "requests": ["requests>=2.28.2"], + "pg": ["asyncpg>=0.27.0"], + "execl": ["XlsxWriter>=3.1.2", "pillow>=9.4.0"], } setup(