Skip to content

Commit

Permalink
replace deprecated low-level socket usage w/ R/W streams
Browse files Browse the repository at this point in the history
  • Loading branch information
bleepbop committed Dec 1, 2023
1 parent 1c944c6 commit f9fcc7e
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions app/contacts/contact_tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from app.utility.base_world import BaseWorld
from plugins.manx.app.c_session import Session
from plugins.manx.app.c_connection import Connection


class Contact(BaseWorld):
Expand Down Expand Up @@ -60,7 +61,8 @@ async def refresh(self):
session = self.sessions[index]

try:
session.connection.send(str.encode(' '))
session.connection.writer.write(str.encode(' '))
await session.connection.writer.drain()
except socket.error:
self.log.debug('Error occurred when refreshing session %s. Removing from session pool.', session.id)
del self.sessions[index]
Expand All @@ -73,20 +75,20 @@ async def accept(self, reader, writer):
except Exception as e:
self.log.debug('Handshake failed: %s' % e)
return
connection = writer.get_extra_info('socket')
profile['executors'] = [e for e in profile['executors'].split(',') if e]
profile['contact'] = 'tcp'
agent, _ = await self.services.get('contact_svc').handle_heartbeat(**profile)
new_session = Session(id=self.generate_number(size=6), paw=agent.paw, connection=connection)
new_session = Session(id=self.generate_number(size=6), paw=agent.paw, connection=Connection(reader, writer))
self.sessions.append(new_session)
await self.send(new_session.id, agent.paw, timeout=5)

async def send(self, session_id: int, cmd: str, timeout: int = 60) -> Tuple[int, str, str, str]:
try:
conn = next(i.connection for i in self.sessions if i.id == int(session_id))
conn.send(str.encode(' '))
conn.writer.write(str.encode(' '))
time.sleep(0.01)
conn.send(str.encode('%s\n' % cmd))
conn.writer.write(str.encode('%s\n' % cmd))
await conn.writer.drain()
response = await self._attempt_connection(session_id, conn, timeout=timeout)
response = json.loads(response)
return response['status'], response['pwd'], response['response'], response.get('agent_reported_time', '')
Expand All @@ -106,7 +108,7 @@ async def _attempt_connection(self, session_id, connection, timeout):
time.sleep(0.1) # initial wait for fast operations.
while True:
try:
part = connection.recv(buffer)
part = await connection.reader.read(buffer)
data += part
if len(part) < buffer:
break
Expand Down

0 comments on commit f9fcc7e

Please sign in to comment.