Skip to content

Commit

Permalink
feat: postpresql pipeline & excel pipeline & csv pipline
Browse files Browse the repository at this point in the history
  • Loading branch information
ConlinH committed Sep 22, 2023
1 parent 000f69f commit 4325f34
Show file tree
Hide file tree
Showing 30 changed files with 1,925 additions and 1,075 deletions.
2 changes: 1 addition & 1 deletion aioscrapy/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.2.17
1.3.1
1 change: 1 addition & 0 deletions aioscrapy/core/downloader/handlers/httpx.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async def download_request(self, request: Request, _) -> HtmlResponse:
kwargs['headers'] = headers

session_args = self.httpx_client_session_args.copy()
session_args.setdefault('http2', True)
session_args.update({
'verify': request.meta.get('verify_ssl', self.verify_ssl),
'follow_redirects': self.settings.getbool('REDIRECT_ENABLED', True) if request.meta.get(
Expand Down
123 changes: 62 additions & 61 deletions aioscrapy/core/downloader/handlers/pyhttpx.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,62 @@
import asyncio
import logging

import pyhttpx

from aioscrapy import Request
from aioscrapy.core.downloader.handlers import BaseDownloadHandler
from aioscrapy.http import HtmlResponse
from aioscrapy.settings import Settings

logger = logging.getLogger(__name__)


class PyhttpxDownloadHandler(BaseDownloadHandler):

def __init__(self, settings):
self.settings: Settings = settings
self.pyhttpx_client_args: dict = self.settings.get('PYHTTPX_CLIENT_ARGS', {})
self.verify_ssl: bool = self.settings.get("VERIFY_SSL")
self.loop = asyncio.get_running_loop()

@classmethod
def from_settings(cls, settings: Settings):
return cls(settings)

async def download_request(self, request: Request, _) -> HtmlResponse:
kwargs = {
'timeout': self.settings.get('DOWNLOAD_TIMEOUT'),
'cookies': dict(request.cookies),
'verify': self.verify_ssl,
'allow_redirects': self.settings.getbool('REDIRECT_ENABLED', True) if request.meta.get(
'dont_redirect') is None else request.meta.get('dont_redirect')
}
post_data = request.body or None
if isinstance(post_data, dict):
kwargs['json'] = post_data
else:
kwargs['data'] = post_data

headers = request.headers or self.settings.get('DEFAULT_REQUEST_HEADERS')
kwargs['headers'] = headers

proxy = request.meta.get("proxy")
if proxy:
kwargs["proxies"] = {'https': proxy}
logger.debug(f"use proxy {proxy}: {request.url}")

session_args = self.pyhttpx_client_args.copy()
with pyhttpx.HttpSession(**session_args) as session:
response = await asyncio.to_thread(session.request, request.method, request.url, **kwargs)
return HtmlResponse(
request.url,
status=response.status_code,
headers=response.headers,
body=response.content,
cookies=dict(response.cookies),
encoding=response.encoding
)

async def close(self):
pass
import asyncio
import logging

import pyhttpx

from aioscrapy import Request
from aioscrapy.core.downloader.handlers import BaseDownloadHandler
from aioscrapy.http import HtmlResponse
from aioscrapy.settings import Settings

logger = logging.getLogger(__name__)


class PyhttpxDownloadHandler(BaseDownloadHandler):

def __init__(self, settings):
self.settings: Settings = settings
self.pyhttpx_client_args: dict = self.settings.get('PYHTTPX_CLIENT_ARGS', {})
self.verify_ssl = self.settings.get("VERIFY_SSL", True)
self.loop = asyncio.get_running_loop()

@classmethod
def from_settings(cls, settings: Settings):
return cls(settings)

async def download_request(self, request: Request, _) -> HtmlResponse:
kwargs = {
'timeout': self.settings.get('DOWNLOAD_TIMEOUT'),
'cookies': dict(request.cookies),
'verify': self.verify_ssl,
'allow_redirects': self.settings.getbool('REDIRECT_ENABLED', True) if request.meta.get(
'dont_redirect') is None else request.meta.get('dont_redirect')
}
post_data = request.body or None
if isinstance(post_data, dict):
kwargs['json'] = post_data
else:
kwargs['data'] = post_data

headers = request.headers or self.settings.get('DEFAULT_REQUEST_HEADERS')
kwargs['headers'] = headers

proxy = request.meta.get("proxy")
if proxy:
kwargs["proxies"] = {'https': proxy}
logger.debug(f"use proxy {proxy}: {request.url}")

session_args = self.pyhttpx_client_args.copy()
session_args.setdefault('http2', True)
with pyhttpx.HttpSession(**session_args) as session:
response = await asyncio.to_thread(session.request, request.method, request.url, **kwargs)
return HtmlResponse(
request.url,
status=response.status_code,
headers=response.headers,
body=response.content,
cookies=dict(response.cookies),
encoding=response.encoding
)

async def close(self):
pass
147 changes: 74 additions & 73 deletions aioscrapy/db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,74 @@
import logging
from importlib import import_module
from typing import Any

import aioscrapy
from aioscrapy.db.absmanager import AbsDBPoolManager
from aioscrapy.db.aioredis import redis_manager
from aioscrapy.utils.misc import load_object

logger = logging.getLogger(__name__)

__all__ = ['db_manager', 'get_pool', 'get_manager']

DB_MODULE_MAP = {
'redis': ('redis', 'aioscrapy.db.aioredis.redis_manager'),
'aiomysql': ('mysql', 'aioscrapy.db.aiomysql.mysql_manager'),
'aio_pika': ('rabbitmq', 'aioscrapy.db.aiorabbitmq.rabbitmq_manager'),
'motor': ('mongo', 'aioscrapy.db.aiomongo.mongo_manager'),
}

db_manager_map = {}

for module_name, (manager_key, class_path) in DB_MODULE_MAP.items():
try:
import_module(module_name)
except ImportError:
pass
else:
db_manager_map[manager_key] = load_object(class_path)


class DBManager:

@staticmethod
def get_manager(db_type: str) -> AbsDBPoolManager:
manager = db_manager_map.get(db_type)
assert manager is not None, f"Not support db type:{db_type}"
return manager

def get_pool(self, db_type: str, alias='default') -> Any:
manager = self.get_manager(db_type)
return manager.get_pool(alias)

@staticmethod
async def close_all() -> None:
for manager in db_manager_map.values():
await manager.close_all()

@staticmethod
async def from_dict(db_args: dict) -> None:
for db_type, args in db_args.items():
manager = db_manager_map.get(db_type)
if manager is None:
logger.warning(f'Not support db type: {db_type}; Only {", ".join(db_manager_map.keys())} supported')
await manager.from_dict(args)

@staticmethod
async def from_settings(settings: aioscrapy.Settings) -> None:
for manager in db_manager_map.values():
await manager.from_settings(settings)

async def from_crawler(self, crawler: "aioscrapy.Crawler") -> None:
return await self.from_settings(crawler.settings)

def __getattr__(self, db_type: str) -> Any:
if db_type not in db_manager_map:
raise AttributeError(f'Not support db type: {db_type}')
return db_manager_map[db_type]


db_manager = DBManager()
get_manager = db_manager.get_manager
get_pool = db_manager.get_pool
import logging
from importlib import import_module
from typing import Any

import aioscrapy
from aioscrapy.db.absmanager import AbsDBPoolManager
from aioscrapy.db.aioredis import redis_manager
from aioscrapy.utils.misc import load_object

logger = logging.getLogger(__name__)

__all__ = ['db_manager', 'get_pool', 'get_manager']

DB_MODULE_MAP = {
'redis': ('redis', 'aioscrapy.db.aioredis.redis_manager'),
'aiomysql': ('mysql', 'aioscrapy.db.aiomysql.mysql_manager'),
'aio_pika': ('rabbitmq', 'aioscrapy.db.aiorabbitmq.rabbitmq_manager'),
'motor': ('mongo', 'aioscrapy.db.aiomongo.mongo_manager'),
'asyncpg': ('pg', 'aioscrapy.db.aiopg.pg_manager'),
}

db_manager_map = {}

for module_name, (manager_key, class_path) in DB_MODULE_MAP.items():
try:
import_module(module_name)
except ImportError:
pass
else:
db_manager_map[manager_key] = load_object(class_path)


class DBManager:

@staticmethod
def get_manager(db_type: str) -> AbsDBPoolManager:
manager = db_manager_map.get(db_type)
assert manager is not None, f"Not support db type:{db_type}"
return manager

def get_pool(self, db_type: str, alias='default') -> Any:
manager = self.get_manager(db_type)
return manager.get_pool(alias)

@staticmethod
async def close_all() -> None:
for manager in db_manager_map.values():
await manager.close_all()

@staticmethod
async def from_dict(db_args: dict) -> None:
for db_type, args in db_args.items():
manager = db_manager_map.get(db_type)
if manager is None:
logger.warning(f'Not support db type: {db_type}; Only {", ".join(db_manager_map.keys())} supported')
await manager.from_dict(args)

@staticmethod
async def from_settings(settings: aioscrapy.Settings) -> None:
for manager in db_manager_map.values():
await manager.from_settings(settings)

async def from_crawler(self, crawler: "aioscrapy.Crawler") -> None:
return await self.from_settings(crawler.settings)

def __getattr__(self, db_type: str) -> Any:
if db_type not in db_manager_map:
raise AttributeError(f'Not support db type: {db_type}')
return db_manager_map[db_type]


db_manager = DBManager()
get_manager = db_manager.get_manager
get_pool = db_manager.get_pool
112 changes: 112 additions & 0 deletions aioscrapy/db/aiopg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import logging
from contextlib import asynccontextmanager

from asyncpg.pool import create_pool

import aioscrapy
from aioscrapy.db.absmanager import AbsDBPoolManager

logger = logging.getLogger(__name__)


class PGExecutor:
def __init__(self, alias: str, pool_manager: "AioPGPoolManager"):
self.alias = alias
self.pool_manager = pool_manager

async def insert(self, sql, value):
async with self.pool_manager.get(self.alias) as connect:
try:
result = await connect.executemany(sql, value)
return result
except Exception as e:
await connect.rollback()
raise Exception from e

async def fetch(self, sql: str):
async with self.pool_manager.get(self.alias) as connect:
return await connect.fetch(sql)

async def query(self, sql: str):
return await self.fetch(sql)


class AioPGPoolManager(AbsDBPoolManager):
_clients = {}

async def create(self, alias: str, params: dict):
if alias in self._clients:
return self._clients[alias]

params = params.copy()
params.setdefault('timeout', 30)
pg_pool = await create_pool(**params)
return self._clients.setdefault(alias, pg_pool)

def get_pool(self, alias: str):
pg_pool = self._clients.get(alias)
assert pg_pool is not None, f"Dont create the PG pool named {alias}"
return pg_pool

@asynccontextmanager
async def get(self, alias: str):
""" Get connection of pg """
pg_pool = self.get_pool(alias)
conn = await pg_pool.acquire()
try:
yield conn
finally:
await pg_pool.release(conn)

def executor(self, alias: str) -> PGExecutor:
return PGExecutor(alias, self)

async def close(self, alias: str):
pg_pool = self._clients.pop(alias, None)
if pg_pool:
await pg_pool.close()

async def close_all(self):
for alias in list(self._clients.keys()):
await self.close(alias)

async def from_dict(self, db_args: dict):
for alias, pg_args in db_args.items():
await self.create(alias, pg_args)

async def from_settings(self, settings: aioscrapy.Settings):
for alias, pg_args in settings.getdict('PG_ARGS').items():
await self.create(alias, pg_args)


pg_manager = AioPGPoolManager()

if __name__ == '__main__':
import asyncio


async def test():
pg_pool = await pg_manager.create(
'default',
dict(
user='username',
password='pwd',
database='dbname',
host='127.0.0.1'
)
)

# 方式一:
conn = await pg_pool.acquire()
try:
result = await conn.fetch('select 1 ')
print(tuple(result[0]))
finally:
await pg_pool.release(conn)

# 方式二:
async with pg_manager.get('default') as conn:
result = await conn.fetch('select 1 ')
print(tuple(result[0]))

asyncio.run(test())
Loading

0 comments on commit 4325f34

Please sign in to comment.