From 6a33fedb7cccb07d741a98433385a5aae3dad8bc Mon Sep 17 00:00:00 2001 From: boerst Date: Wed, 11 Sep 2024 20:04:03 -0500 Subject: [PATCH] Various updates, including new columns: "Template revision count for block" and "Time since last revision" (#14) --- webapp/main.py | 121 +++++++++++++++++++++++++----------- webapp/static/main.js | 85 ++++++++++++++++++++++++- webapp/templates/index.html | 11 ++++ 3 files changed, 177 insertions(+), 40 deletions(-) diff --git a/webapp/main.py b/webapp/main.py index 46104cb..7332b3e 100644 --- a/webapp/main.py +++ b/webapp/main.py @@ -47,6 +47,17 @@ def add_headers(response): rabbitmq_exchange = os.environ.get('RABBITMQ_EXCHANGE') def process_row_data(row): + if 'counters' not in globals(): + globals()['counters'] = {} + + pool_name = row['pool_name'] + if pool_name not in counters or counters[pool_name]['height'] != row['height']: + counters[pool_name] = {'height': row['height'], 'count': 1} + else: + counters[pool_name]['count'] += 1 + + template_revision = counters[pool_name]['count'] + coinbase1 = row['coinbase1'] coinbase2 = row['coinbase2'] extranonce1 = row['extranonce1'] @@ -73,10 +84,19 @@ def process_row_data(row): value = tx_out.coin_value / 1e8 coinbase_outputs.append({"address": address, "value": value}) + current_time = time.time() + if 'last_revision_time' not in counters[pool_name]: + counters[pool_name]['last_revision_time'] = current_time + time_since_last_revision = 0 + else: + time_since_last_revision = current_time - counters[pool_name]['last_revision_time'] + counters[pool_name]['last_revision_time'] = current_time + processed_row = { 'pool_name': row['pool_name'], 'timestamp': row['timestamp'], 'height': height, + 'template_revision': template_revision, 'prev_block_hash': prev_block_hash, 'block_version': block_version, 'coinbase_raw': coinbase_hex, @@ -90,7 +110,8 @@ def process_row_data(row): 'merkle_branches': merkle_branches, 'merkle_branch_colors': merkle_branch_colors, 'coinbase_output_value': output_value, - 'coinbase_outputs': coinbase_outputs + 'coinbase_outputs': coinbase_outputs, + 'time_since_last_revision': time_since_last_revision } return processed_row @@ -148,7 +169,10 @@ def extract_coinbase_script_ascii(coinbase_tx): # Get the script_sig in hex from the input of the coinbase transaction script_sig_hex = coinbase_tx.txs_in[0].script.hex() - # Convert the script_sig hex to ASCII and filter out non-printable characters + # Remove the first 8 characters (4 bytes) which represent the block height + script_sig_hex = script_sig_hex[8:] + + # Convert the remaining script_sig hex to ASCII and filter out non-printable characters return ''.join(filter(lambda x: x in string.printable, bytes.fromhex(script_sig_hex).decode('ascii', 'replace'))) def precompute_merkle_branch_colors(merkle_branches): @@ -164,52 +188,72 @@ def precompute_merkle_branch_colors(merkle_branches): def hash_code(text): return sum(ord(char) for char in text) -def consume_messages(): +def initialize_queue_connection(): global connection, channel + try: + credentials = pika.PlainCredentials(rabbitmq_username, rabbitmq_password) + connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host, rabbitmq_port, '/', credentials)) + channel = connection.channel() - credentials = pika.PlainCredentials(rabbitmq_username, rabbitmq_password) - connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host, rabbitmq_port, '/', credentials)) - channel = connection.channel() - - # Declare the exchange as a fanout exchange - channel.exchange_declare(exchange=rabbitmq_exchange, exchange_type='fanout', durable=True) + # Declare the exchange as a fanout exchange + channel.exchange_declare(exchange=rabbitmq_exchange, exchange_type='fanout', durable=True) - # Let RabbitMQ generate a unique queue name for each consumer - result = channel.queue_declare(queue='', exclusive=True) - queue_name = result.method.queue + # Let RabbitMQ generate a unique queue name for each consumer + result = channel.queue_declare(queue='', exclusive=True) + queue_name = result.method.queue - # Bind the generated queue to the fanout exchange - channel.queue_bind(exchange=rabbitmq_exchange, queue=queue_name) + # Bind the generated queue to the fanout exchange + channel.queue_bind(exchange=rabbitmq_exchange, queue=queue_name) - def callback(ch, method, properties, body): - try: - message = json.loads(body) - processed_message = process_row_data(message) - for client_id in connected_clients: - socketio.emit('mining_data', processed_message, room=client_id) - except Exception as e: - logger.exception(f"Error processing message: {e}") + logger.info('RabbitMQ connection initialized') + return queue_name + except Exception as e: + logger.exception(f"Error initializing RabbitMQ connection: {e}") + return None - channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) +def consume_messages(queue_name): + try: + def callback(ch, method, properties, body): + try: + message = json.loads(body) + processed_message = process_row_data(message) + socketio.emit('mining_data', processed_message, to=None) + except Exception as e: + logger.exception(f"Error processing message: {e}") + + channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) + logger.info('Started consuming messages from the exchange') + channel.start_consuming() + except Exception as e: + logger.exception(f"Error in consume_messages: {e}") + +# Initialize queue connection +queue_name = initialize_queue_connection() +if queue_name: + # Start consuming messages in a background thread + socketio.start_background_task(target=consume_messages, queue_name=queue_name) - logger.info('Started consuming messages from the exchange') - channel.start_consuming() - @socketio.on('connect') def handle_connect(): - logger.info('Client connected') - if len(connected_clients) == 0: - socketio.start_background_task(target=consume_messages) - connected_clients.add(request.sid) + try: + logger.info('Client connected') + connected_clients.add(request.sid) + except Exception as e: + logger.warning(f"Error during client connect: {e}") @socketio.on('disconnect') def handle_disconnect(): - logger.info('Client disconnected') - connected_clients.remove(request.sid) - if len(connected_clients) == 0: - if channel and connection: - channel.stop_consuming() - connection.close() + try: + logger.info('Client disconnected') + connected_clients.remove(request.sid) + except KeyError: + logger.info('Client disconnected (already removed)') + except Exception as e: + logger.warning(f"Error during client disconnect: {e}") + +@socketio.on('heartbeat') +def handle_heartbeat(): + pass # Simply acknowledge the heartbeat def gzip_response(response): accept_encoding = request.headers.get('Accept-Encoding', '') @@ -248,6 +292,7 @@ def handle_sigterm(*args): logger.info("Received SIGTERM signal. Shutting down gracefully.") socketio.stop() sys.exit(0) - + if __name__ == "__main__": - socketio.run(app) + signal.signal(signal.SIGTERM, handle_sigterm) + socketio.run(app, debug=True, use_reloader=False) diff --git a/webapp/static/main.js b/webapp/static/main.js index 1a37bf6..51e6f71 100644 --- a/webapp/static/main.js +++ b/webapp/static/main.js @@ -1,5 +1,57 @@ document.addEventListener('DOMContentLoaded', () => { - const socket = io(SOCKET_URL); + function connectWebSocket() { + const socket = io(SOCKET_URL, { + reconnection: true, + reconnectionAttempts: Infinity, + reconnectionDelay: 1000, + reconnectionDelayMax: 5000, + randomizationFactor: 0.5, + }); + + socket.on('connect', () => { + console.log('WebSocket connected'); + hideReconnectionMessage(); + }); + + socket.on('disconnect', (reason) => { + console.log('WebSocket disconnected:', reason); + showReconnectionMessage(); + }); + + socket.on('connect_error', (error) => { + console.error('WebSocket connection error:', error); + showReconnectionMessage(); + }); + + return socket; + } + + function showReconnectionMessage() { + const message = document.getElementById('reconnection-message'); + if (!message) { + const newMessage = document.createElement('div'); + newMessage.id = 'reconnection-message'; + newMessage.innerHTML = 'Attempting to reconnect...'; + newMessage.style.position = 'fixed'; + newMessage.style.top = '10px'; + newMessage.style.right = '10px'; + newMessage.style.backgroundColor = 'rgba(255, 0, 0, 0.7)'; + newMessage.style.color = 'white'; + newMessage.style.padding = '10px'; + newMessage.style.borderRadius = '5px'; + newMessage.style.zIndex = '1000'; + document.body.appendChild(newMessage); + } + } + + function hideReconnectionMessage() { + const message = document.getElementById('reconnection-message'); + if (message) { + message.remove(); + } + } + + const socket = connectWebSocket(); let isPaused = false; const pauseButton = document.getElementById('pause-button'); @@ -52,6 +104,27 @@ document.addEventListener('DOMContentLoaded', () => { field: 'pool_name', width: 130, }, + { + title: 'Template Revision for Current Block', + field: 'template_revision', + width: 50, + sorter: 'number', + visible: true, + }, + { + title: 'Time Since Last Template Revision', + field: 'time_since_last_revision', + width: 80, + sorter: 'number', + formatter: function(cell) { + const value = cell.getValue(); + if (value === undefined || value === null) return ''; + const seconds = Math.floor(value); + const milliseconds = Math.round((value - seconds) * 1000); + return `${seconds}.${milliseconds.toString().padStart(3, '0')}s`; + }, + visible: true, + }, { title: 'Timestamp', field: 'timestamp', @@ -81,7 +154,6 @@ document.addEventListener('DOMContentLoaded', () => { .filter(output => !output.address.includes("nulldata")) .map(output => `${output.address}:${output.value}`) .join('|'); - const color = generateColorFromOutputs(outputs); cell.getElement().style.backgroundColor = color; cell.getElement().style.whiteSpace = 'nowrap'; @@ -299,4 +371,13 @@ document.addEventListener('DOMContentLoaded', () => { return text.split('').reduce((prevHash, currVal) => (((prevHash << 5) - prevHash) + currVal.charCodeAt(0))|0, 0); } + + setInterval(() => { + if (socket.connected) { + socket.emit('heartbeat'); + } else { + console.log('WebSocket not connected, attempting to reconnect...'); + socket.connect(); + } + }, 30000); // Send heartbeat every 30 seconds }); \ No newline at end of file diff --git a/webapp/templates/index.html b/webapp/templates/index.html index 2c2929b..b0b1a9a 100644 --- a/webapp/templates/index.html +++ b/webapp/templates/index.html @@ -233,6 +233,17 @@ animation: flash-border 1s infinite; border: 2px solid red; } + + #reconnection-message { + position: fixed; + top: 10px; + right: 10px; + background-color: rgba(255, 0, 0, 0.7); + color: white; + padding: 10px; + border-radius: 5px; + z-index: 1000; + }