Skip to content

Commit

Permalink
Implement more robust connection mechanism for proxy and non-proxy co…
Browse files Browse the repository at this point in the history
…nnections (#16)
  • Loading branch information
bboerst authored Sep 17, 2024
1 parent 894114a commit f8a36e2
Showing 1 changed file with 90 additions and 66 deletions.
156 changes: 90 additions & 66 deletions collector/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ def __init__(self, url, userpass, pool_name, rabbitmq_host, rabbitmq_port, rabbi
self.use_proxy = use_proxy
self.proxy_host = proxy_host
self.proxy_port = proxy_port
self.init_socket()
self.sock = None
self.connection = None
self.channel = None
self.max_retries = 5
self.retry_delay = 5

def parse_url(self, url):
purl = urlparse(url)
Expand Down Expand Up @@ -68,14 +70,16 @@ def init_socket(self):
LOG.info("Socket initialized with 600 seconds timeout")

def close(self):
LOG.info(f"Closing connection to {self.purl.geturl()}")
try:
self.sock.shutdown(socket.SHUT_RDWR)
LOG.info("Socket shutdown successful")
except OSError as e:
LOG.warning(f"Error during socket shutdown: {e}")
self.sock.close()
LOG.info(f"Socket closed. Disconnected from {self.purl.geturl()}")
if self.sock:
LOG.info(f"Closing connection to {self.purl.geturl()}")
try:
self.sock.shutdown(socket.SHUT_RDWR)
LOG.info("Socket shutdown successful")
except OSError as e:
LOG.warning(f"Error during socket shutdown: {e}")
self.sock.close()
LOG.info(f"Socket closed. Disconnected from {self.purl.geturl()}")
self.sock = None

def get_msg(self):
while True:
Expand All @@ -87,10 +91,9 @@ def get_msg(self):
new_buf = self.sock.recv(4096)
except Exception as e:
LOG.debug(f"Error receiving data: {e}")
self.close()
raise EOFError
if len(new_buf) == 0:
self.close()
raise EOFError
self.buf += new_buf
continue
try:
Expand All @@ -107,10 +110,9 @@ def get_msg(self):
new_buf = self.sock.recv(4096)
except Exception as e:
LOG.debug(f"Error receiving data: {e}")
self.close()
raise EOFError
if len(new_buf) == 0:
self.close()
raise EOFError
self.buf += new_buf

def send_jsonrpc(self, method, params):
Expand All @@ -133,72 +135,94 @@ def send_jsonrpc(self, method, params):
LOG.debug(f"Received: {resp}")

def connect_to_rabbitmq(self):
credentials = pika.PlainCredentials(self.rabbitmq_username, self.rabbitmq_password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(self.rabbitmq_host, self.rabbitmq_port, '/', credentials))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.rabbitmq_exchange, exchange_type='fanout', durable=True)
for attempt in range(self.max_retries):
try:
credentials = pika.PlainCredentials(self.rabbitmq_username, self.rabbitmq_password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(self.rabbitmq_host, self.rabbitmq_port, '/', credentials))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.rabbitmq_exchange, exchange_type='fanout', durable=True)
LOG.info("Successfully connected to RabbitMQ")
return
except Exception as e:
LOG.error(f"Failed to connect to RabbitMQ (attempt {attempt + 1}/{self.max_retries}): {e}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
else:
raise

def publish_to_rabbitmq(self, message):
LOG.info(f"Publishing message to RabbitMQ: {json.dumps(message)}")
self.channel.basic_publish(exchange=self.rabbitmq_exchange, routing_key='', body=json.dumps(message))

def get_stratum_work(self, keep_alive=False):
LOG.info("Starting get_stratum_work")
self.sock.setblocking(True)
LOG.info("Socket set to blocking mode")
if self.use_proxy:
LOG.info(f"Connecting through proxy: {self.proxy_host}:{self.proxy_port}")
LOG.info(f"Attempting to connect to {self.purl.hostname}:{self.purl.port}")
try:
self.sock.connect((self.purl.hostname, self.purl.port))
LOG.info(f"Successfully connected to server {self.purl.geturl()}")
except Exception as e:
LOG.error(f"Failed to connect to server: {e}")
raise
for attempt in range(self.max_retries):
try:
LOG.info(f"Publishing message to RabbitMQ: {json.dumps(message)}")
self.channel.basic_publish(exchange=self.rabbitmq_exchange, routing_key='', body=json.dumps(message))
return
except Exception as e:
LOG.error(f"Failed to publish to RabbitMQ (attempt {attempt + 1}/{self.max_retries}): {e}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
self.connect_to_rabbitmq() # Try to reconnect
else:
raise

LOG.info("Sending mining.subscribe request")
self.send_jsonrpc("mining.subscribe", [])
LOG.info("Successfully subscribed to pool notifications")
def connect_to_stratum(self):
for attempt in range(self.max_retries):
try:
self.init_socket()
LOG.info(f"Attempting to connect to {self.purl.hostname}:{self.purl.port}")
self.sock.connect((self.purl.hostname, self.purl.port))
LOG.info(f"Successfully connected to server {self.purl.geturl()}")

LOG.info("Sending mining.subscribe request")
self.send_jsonrpc("mining.subscribe", [])
LOG.info("Successfully subscribed to pool notifications")

LOG.info("Sending mining.authorize request")
self.send_jsonrpc("mining.authorize", self.userpass.split(":"))
LOG.info("Successfully authorized with the pool")
LOG.info("Sending mining.authorize request")
self.send_jsonrpc("mining.authorize", self.userpass.split(":"))
LOG.info("Successfully authorized with the pool")

return
except Exception as e:
LOG.error(f"Failed to connect to stratum server (attempt {attempt + 1}/{self.max_retries}): {e}")
self.close()
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
else:
raise

def get_stratum_work(self, keep_alive=False):
LOG.info("Starting get_stratum_work")
last_subscribe_time = time.time()

while True:
try:
if not self.sock:
self.connect_to_stratum()

n = self.get_msg()
LOG.debug(f"Received notification: {n}")
except Exception as e:
LOG.error(f"Error receiving message: {e}")
self.close()
return

if "method" in n and n["method"] == "mining.notify":
LOG.info("Received mining.notify message")
document = create_notification_document(n, self.pool_name, self.extranonce1, self.extranonce2_length)
insert_notification(document, self.db_url, self.db_name, self.db_username, self.db_password)
self.publish_to_rabbitmq(document)
if "method" in n and n["method"] == "mining.notify":
LOG.info("Received mining.notify message")
document = create_notification_document(n, self.pool_name, self.extranonce1, self.extranonce2_length)
insert_notification(document, self.db_url, self.db_name, self.db_username, self.db_password)
self.publish_to_rabbitmq(document)

if keep_alive and time.time() - last_subscribe_time > 480:
LOG.info("Keep-alive interval reached")
self.send_jsonrpc("mining.subscribe", [])
last_subscribe_time = time.time()
LOG.info("Keep-alive cycle completed")

if keep_alive and time.time() - last_subscribe_time > 480:
LOG.info("Keep-alive interval reached")
LOG.info(f"Disconnecting from server for keep alive {self.purl.geturl()}")
except (EOFError, ConnectionResetError, socket.timeout) as e:
LOG.error(f"Connection error: {e}")
self.close()
time.sleep(1)
LOG.info("Reinitializing socket")
self.init_socket()
LOG.info(f"Reconnecting to server {self.purl.geturl()}")
self.sock.connect((self.purl.hostname, self.purl.port))
LOG.info(f"Successfully reconnected to server {self.purl.geturl()}")
LOG.info("Resubscribing to pool notifications")
self.send_jsonrpc("mining.subscribe", [])
LOG.info("Reauthorizing with the pool")
self.send_jsonrpc("mining.authorize", self.userpass.split(":"))
LOG.info("Sending subscribe request to keep connection alive")
self.send_jsonrpc("mining.subscribe", [])
last_subscribe_time = time.time()
LOG.info("Keep-alive cycle completed")
time.sleep(self.retry_delay)
self.connect_to_stratum()
except Exception as e:
LOG.error(f"Unexpected error: {e}")
self.close()
time.sleep(self.retry_delay)
self.connect_to_stratum()

def create_notification_document(data, pool_name, extranonce1, extranonce2_length):
notification_id = str(uuid.uuid4())
Expand Down Expand Up @@ -318,4 +342,4 @@ def main():
w.connection.close()

if __name__ == "__main__":
main()
main()

0 comments on commit f8a36e2

Please sign in to comment.