Skip to content

Commit

Permalink
fixed a bug related to the processing of several events again
Browse files Browse the repository at this point in the history
  • Loading branch information
GLEF1X committed Jul 14, 2021
1 parent 5ce4c41 commit ae99ae6
Showing 1 changed file with 77 additions and 96 deletions.
173 changes: 77 additions & 96 deletions glQiwiApi/utils/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Coroutine,
Any,
cast,
NoReturn,
)

from aiohttp import ClientTimeout, web
Expand All @@ -36,21 +37,26 @@
if TYPE_CHECKING:
from glQiwiApi.qiwi.client import QiwiWrapper


class BadCallback(Exception):
...


T = TypeVar("T")


def start_webhook(
client: QiwiWrapper,
*,
host: str = "localhost",
port: int = 8080,
path: Optional[Path] = None,
on_startup: Optional[Callable[[QiwiWrapper], Awaitable[None]]] = None,
on_shutdown: Optional[Callable[[QiwiWrapper], Awaitable[None]]] = None,
tg_app: Optional[TelegramWebhookProxy] = None,
app: Optional["web.Application"] = None,
ssl_context: Optional[SSLContext] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
client: QiwiWrapper,
*,
host: str = "localhost",
port: int = 8080,
path: Optional[Path] = None,
on_startup: Optional[Callable[[QiwiWrapper], Awaitable[None]]] = None,
on_shutdown: Optional[Callable[[QiwiWrapper], Awaitable[None]]] = None,
tg_app: Optional[TelegramWebhookProxy] = None,
app: Optional["web.Application"] = None,
ssl_context: Optional[SSLContext] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
"""
Blocking function that listens for webhooks
Expand All @@ -71,19 +77,17 @@ def start_webhook(
_setup_callbacks(executor, on_startup, on_shutdown)
if isinstance(tg_app, TelegramWebhookProxy) and ssl_context is None:
ssl_context = tg_app.ssl_context
executor.start_webhook(
host=host, port=port, path=path, app=app, ssl_context=ssl_context
)
executor.start_webhook(host=host, port=port, path=path, app=app, ssl_context=ssl_context)


def start_polling(
client: QiwiWrapper,
*,
get_updates_from: Optional[datetime] = None,
timeout: Union[float, int, ClientTimeout] = 5,
on_startup: Optional[Callable[[QiwiWrapper], Any]] = None,
on_shutdown: Optional[Callable[[QiwiWrapper], Any]] = None,
tg_app: Optional[BaseProxy] = None,
client: QiwiWrapper,
*,
get_updates_from: Optional[datetime] = None,
timeout: Union[float, int, ClientTimeout] = 5,
on_startup: Optional[Callable[[QiwiWrapper], Any]] = None,
on_shutdown: Optional[Callable[[QiwiWrapper], Any]] = None,
tg_app: Optional[BaseProxy] = None,
) -> None:
"""
Setup for long-polling mode
Expand All @@ -105,19 +109,24 @@ def start_polling(
executor.start_polling(get_updates_from=get_updates_from, timeout=timeout)


async def _inspect_and_execute_callback(
client: "QiwiWrapper", callback: Callable[[QiwiWrapper], Any]
) -> None:
async def _inspect_and_execute_callback(client: "QiwiWrapper", callback: Callable[[QiwiWrapper], Any]) -> None:
if inspect.iscoroutinefunction(callback):
await callback(client)
else:
callback(client)


def _check_callback(callback: Callable[[QiwiWrapper], Any]) -> NoReturn:
if not isinstance(callback, types.FunctionType):
raise BadCallback("Колбек, переданный в on_startup/on_shutdown не является функцией") # NOQA
if len(inspect.getfullargspec(callback).args) < 1:
raise BadCallback("Функция on_startup или on_shutdown должна принимать аргумент - экземпляр класса QiwiWrapper")


def _setup_callbacks(
executor: Executor,
on_startup: Optional[Callable[..., Any]] = None,
on_shutdown: Optional[Callable[..., Any]] = None,
executor: Executor,
on_startup: Optional[Callable[[QiwiWrapper], Any]] = None,
on_shutdown: Optional[Callable[[QiwiWrapper], Any]] = None,
) -> None:
"""
Function, which setup callbacks and set it to dispatcher object
Expand All @@ -127,9 +136,9 @@ def _setup_callbacks(
:param on_shutdown:
"""
if on_startup is not None:
executor["on_startup"] = on_startup
executor.add_startup_callback(on_startup)
if on_shutdown is not None:
executor["on_shutdown"] = on_shutdown
executor.add_shutdown_callback(on_shutdown)


def parse_timeout(timeout: Union[float, int, ClientTimeout]) -> float:
Expand Down Expand Up @@ -158,10 +167,10 @@ class Executor:
"""

def __init__(
self,
client: QiwiWrapper,
tg_app: Optional[BaseProxy],
loop: Optional[asyncio.AbstractEventLoop] = None,
self,
client: QiwiWrapper,
tg_app: Optional[BaseProxy],
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
"""
Expand All @@ -177,7 +186,7 @@ def __init__(
"level": logging.DEBUG,
}
self._tg_app = tg_app
self.offset: Optional[int] = None
self.last_handled_txn_id: Optional[int] = None
self.get_updates_until: Optional[datetime] = None
self.get_updates_from: Optional[datetime] = None
self.client: QiwiWrapper = client
Expand All @@ -191,23 +200,15 @@ def telegram_proxy_application(self) -> Optional[BaseProxy]:

@property
def loop(self) -> asyncio.AbstractEventLoop:
return cast(
asyncio.AbstractEventLoop, getattr(self, "_loop", asyncio.get_event_loop())
)

def __setitem__(self, key: str, callback: Callable) -> None:
if key not in ["on_shutdown", "on_startup"]:
raise TypeError("to __setitem__ you can only pass callbacks")
return cast(asyncio.AbstractEventLoop, getattr(self, "_loop", asyncio.get_event_loop()))

if not isinstance(callback, types.FunctionType):
raise TypeError(
"Invalid type of callback, expected function, got %s" % type(callback)
)
def add_shutdown_callback(self, callback: Callable[[QiwiWrapper], Any]) -> None:
_check_callback(callback)
self._on_shutdown_calls.append(callback)

if key == "on_shutdown":
return self._on_shutdown_calls.append(callback)
else:
return self._on_startup_calls.append(callback)
def add_startup_callback(self, callback: Callable[[QiwiWrapper], Any]) -> None:
_check_callback(callback)
self._on_startup_calls.append(callback)

async def _pre_process(self, get_updates_from: Optional[datetime]) -> None:
"""
Expand All @@ -227,9 +228,7 @@ async def _get_history(self) -> List[Transaction]:
class Transaction - raise exception
"""
history = await self.client.transactions(
end_date=self.get_updates_until, start_date=self.get_updates_from
)
history = await self.client.transactions(end_date=self.get_updates_until, start_date=self.get_updates_from)
if not history or not all(isinstance(txn, Transaction) for txn in history):
raise NoUpdatesToExecute()
return history
Expand All @@ -242,20 +241,18 @@ async def _pool_process(self, get_updates_from: Optional[datetime]) -> None:
"""
await self._pre_process(get_updates_from)
try:
history: List[Transaction] = await self._get_history()
# Here we get transactions from old to new like [3, 2, 1](a list of mock id's of events)
history_from_last_to_first: List[Transaction] = await self._get_history()
except NoUpdatesToExecute:
return
# Convert it to list of transactions with id's like [1, 2, 3] and work with it
history_from_first_to_last: List[Transaction] = history_from_last_to_first[::-1]

last_payment = history[0]
last_txn_id = last_payment.transaction_id
if self.last_handled_txn_id is None:
first_payment: Transaction = history_from_first_to_last[0]
self.last_handled_txn_id = first_payment.transaction_id - 1

if self.offset is None:
first_payment: Transaction = history[-1]
self.offset = first_payment.transaction_id - 1

await self._parse_history_and_process_events(
history=history, last_payment_id=last_txn_id
)
await self._parse_history_and_process_events(history=history_from_first_to_last)

async def _start_polling(self, **kwargs: Any) -> None:
"""
Expand All @@ -276,7 +273,7 @@ async def _start_polling(self, **kwargs: Any) -> None:

def _on_shutdown(self, loop: asyncio.AbstractEventLoop) -> None:
"""
On shutdown, we gracefully cancel all tasks, close event loop
On shutdown, executor gracefully cancel all tasks, close event loop
and call `close` method to clear resources
"""
coroutines: List[Coroutine] = [self.goodbye(), self.client.close()]
Expand All @@ -285,47 +282,33 @@ def _on_shutdown(self, loop: asyncio.AbstractEventLoop) -> None:
loop.run_until_complete(asyncio.gather(*coroutines, loop=loop))

async def _shutdown_tg_app(self) -> None:
"""
Gracefully shutdown tg application
"""
self.tg_app.dispatcher.stop_polling() # type: ignore
await self.tg_app.dispatcher.storage.close() # type: ignore
await self.tg_app.dispatcher.storage.wait_closed() # type: ignore
await self.tg_app.dispatcher.bot.session.close() # type: ignore

async def _parse_history_and_process_events(
self, history: List[Transaction], last_payment_id: int
) -> None:
async def _parse_history_and_process_events(self, history: List[Transaction]) -> None:
"""
Processing events and send callbacks to handlers
:param history: [list] list of :class:`Transaction`
:param last_payment_id: id of last payment in history
"""
history_iterator = iter(history[::-1])

while cast(int, self.offset) < last_payment_id:
try:
payment = next(history_iterator)
await self.dispatcher.process_event(payment)
self.offset = payment.transaction_id
for event in history:
if cast(int, self.last_handled_txn_id) < event.transaction_id:
await self.dispatcher.process_event(event)
self.last_handled_txn_id = event.transaction_id
self.get_updates_until = self.get_updates_from
except StopIteration: # handle exhausted iterator
break

def start_polling(
self,
*,
get_updates_from: Optional[datetime] = None,
timeout: Union[float, int, ClientTimeout] = DEFAULT_TIMEOUT,
self,
*,
get_updates_from: Optional[datetime] = None,
timeout: Union[float, int, ClientTimeout] = DEFAULT_TIMEOUT,
) -> None:
loop: asyncio.AbstractEventLoop = self.loop
try:
loop.run_until_complete(self.welcome())
loop.create_task(
self._start_polling(get_updates_from=get_updates_from, timeout=timeout)
)
loop.create_task(self._start_polling(get_updates_from=get_updates_from, timeout=timeout))
if isinstance(self.telegram_proxy_application, BaseProxy):
self.telegram_proxy_application.setup(loop=loop)
loop.run_forever()
Expand All @@ -336,13 +319,13 @@ def start_polling(
self._on_shutdown(loop=loop)

def start_webhook(
self,
*,
host: str = "localhost",
port: int = 8080,
path: Optional[Path] = None,
app: Optional[web.Application] = None,
ssl_context: Optional[SSLContext] = None,
self,
*,
host: str = "localhost",
port: int = 8080,
path: Optional[Path] = None,
app: Optional[web.Application] = None,
ssl_context: Optional[SSLContext] = None,
):
loop: asyncio.AbstractEventLoop = self.loop
application = app or web.Application()
Expand All @@ -365,13 +348,11 @@ def start_webhook(
self._on_shutdown(loop=loop)

async def welcome(self) -> None:
""" Execute on_startup callback"""
self.dispatcher.logger.debug("Start polling!")
for callback in self._on_startup_calls:
await _inspect_and_execute_callback(callback=callback, client=self.client) # pragma: no cover

async def goodbye(self) -> None:
""" Execute on_shutdown callback """
self.dispatcher.logger.debug("Goodbye!")
for callback in self._on_shutdown_calls:
await _inspect_and_execute_callback(callback=callback, client=self.client) # pragma: no cover

0 comments on commit ae99ae6

Please sign in to comment.