diff --git a/yowsup/layers/network/dispatcher/dispatcher_asyncio.py b/yowsup/layers/network/dispatcher/dispatcher_asyncio.py new file mode 100644 index 000000000..9e93a22b0 --- /dev/null +++ b/yowsup/layers/network/dispatcher/dispatcher_asyncio.py @@ -0,0 +1,72 @@ +from yowsup.layers.network.dispatcher.dispatcher import YowConnectionDispatcher +import asyncio +import logging +import socket +import traceback + +logger = logging.getLogger(__name__) + +class AsyncioConnectionDispatcher(YowConnectionDispatcher): + def __init__(self, connectionCallbacks): + super().__init__(connectionCallbacks) + self._connected = False + self.loop = asyncio.get_event_loop() + self.sock = None + + async def _send_data(self, data): + if self._connected: + self.sock.sendall(data) + else: + logger.warning("Attempted to send %d bytes while still not connected", len(data)) + + def sendData(self, data): + asyncio.run(self._send_data(data)) + + async def _connect(self, host): + logger.debug("connect(%s)", str(host)) + self.connectionCallbacks.onConnecting() + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setblocking(False) + try: + await self.loop.sock_connect(self.sock, host) + self._connected = True + self.connectionCallbacks.onConnected() + except Exception as e: + logger.error("Connection failed: %s", str(e)) + self.connectionCallbacks.onDisconnected() + + def connect(self, host): + self.loop.run_until_complete(self._connect(host)) + + def handle_connect(self): + logger.debug("handle_connect") + if not self._connected: + self._connected = True + self.connectionCallbacks.onConnected() + + def handle_close(self): + logger.debug("handle_close") + if self._connected and self.sock: + self.sock.close() + self._connected = False + self.connectionCallbacks.onDisconnected() + + def handle_error(self): + logger.error(traceback.format_exc()) + self.handle_close() + + async def _handle_read(self): + if self._connected: + try: + data = await self.loop.sock_recv(self.sock, 1024) + self.connectionCallbacks.onRecvData(data) + except Exception as e: + logger.error("Read failed: %s", str(e)) + self.handle_close() + + def handle_read(self): + asyncio.run(self._handle_read()) + + def disconnect(self): + logger.debug("disconnect") + self.handle_close() diff --git a/yowsup/layers/network/dispatcher/dispatcher_asyncore.py b/yowsup/layers/network/dispatcher/dispatcher_asyncore.py deleted file mode 100644 index b69929138..000000000 --- a/yowsup/layers/network/dispatcher/dispatcher_asyncore.py +++ /dev/null @@ -1,52 +0,0 @@ -from yowsup.layers.network.dispatcher.dispatcher import YowConnectionDispatcher -import asyncore -import logging -import socket -import traceback - -logger = logging.getLogger(__name__) - - -class AsyncoreConnectionDispatcher(YowConnectionDispatcher, asyncore.dispatcher_with_send): - def __init__(self, connectionCallbacks): - super(AsyncoreConnectionDispatcher, self).__init__(connectionCallbacks) - asyncore.dispatcher_with_send.__init__(self) - self._connected = False - - def sendData(self, data): - if self._connected: - self.out_buffer = self.out_buffer + data - self.initiate_send() - else: - logger.warn("Attempted to send %d bytes while still not connected" % len(data)) - - def connect(self, host): - logger.debug("connect(%s)" % str(host)) - self.connectionCallbacks.onConnecting() - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - asyncore.dispatcher_with_send.connect(self, host) - asyncore.loop(timeout=1) - - def handle_connect(self): - logger.debug("handle_connect") - if not self._connected: - self._connected = True - self.connectionCallbacks.onConnected() - - def handle_close(self): - logger.debug("handle_close") - self.close() - self._connected = False - self.connectionCallbacks.onDisconnected() - - def handle_error(self): - logger.error(traceback.format_exc()) - self.handle_close() - - def handle_read(self): - data = self.recv(1024) - self.connectionCallbacks.onRecvData(data) - - def disconnect(self): - logger.debug("disconnect") - self.handle_close() diff --git a/yowsup/layers/network/layer.py b/yowsup/layers/network/layer.py index 2dec35b36..45d2b0a8d 100644 --- a/yowsup/layers/network/layer.py +++ b/yowsup/layers/network/layer.py @@ -3,7 +3,7 @@ from yowsup.layers.network.dispatcher.dispatcher import ConnectionCallbacks from yowsup.layers.network.dispatcher.dispatcher import YowConnectionDispatcher from yowsup.layers.network.dispatcher.dispatcher_socket import SocketConnectionDispatcher -from yowsup.layers.network.dispatcher.dispatcher_asyncore import AsyncoreConnectionDispatcher +from yowsup.layers.network.dispatcher.dispatcher_asyncio import AsyncioConnectionDispatcher import logging logger = logging.getLogger(__name__) @@ -42,7 +42,7 @@ def __init__(self): def __create_dispatcher(self, dispatcher_type): if dispatcher_type == self.DISPATCHER_ASYNCORE: logger.debug("Created asyncore dispatcher") - return AsyncoreConnectionDispatcher(self) + return AsyncioConnectionDispatcher(self) else: logger.debug("Created socket dispatcher") return SocketConnectionDispatcher(self)