diff --git a/.env.example b/.env.example index 4c046a2..cb3b996 100644 --- a/.env.example +++ b/.env.example @@ -6,4 +6,10 @@ DB_HOST=localhost DB_PORT=5432 DB_USERNAME=postgres DB_PASSWORD=password -DB_DATABASE=tradeflow \ No newline at end of file +DB_DATABASE=tradeflow + +# Soroban Event Indexer & WebSocket Configuration +SOROBAN_RPC_URL="https://soroban-testnet.stellar.org" +POOL_ADDRESS="CC..." # Replace with your Pool Contract ID +INDEXER_POLL_INTERVAL=5000 # Polling interval in ms +WS_PORT=3001 # WebSocket server port \ No newline at end of file diff --git a/package.json b/package.json index af8e4d6..53e55d8 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,9 @@ "prisma:generate": "prisma generate", "prisma:push": "prisma db push", "prisma:migrate": "prisma migrate dev", - "prisma:studio": "prisma studio" + "prisma:studio": "prisma studio", + "start:indexer": "node services/eventIndexer.js", + "start:all": "node server.js" }, "dependencies": { "@nestjs/common": "^10.4.22", diff --git a/server.js b/server.js new file mode 100644 index 0000000..9b8883a --- /dev/null +++ b/server.js @@ -0,0 +1,83 @@ +/** + * server.js + * + * Main Entry Point for combined Soroban Indexer and WebSocket streaming server. + * This file orchestrates the real-time data flow from the blockchain to the frontend. + */ + +const WebSocket = require('ws'); +const wsEvents = require('./services/wsEvents'); +const { startIndexer } = require('./services/eventIndexer'); + +// Config and Port Setup +const WS_PORT = process.env.WS_PORT || 3001; +const wss = new WebSocket.Server({ port: WS_PORT }); + +console.log('--- 🌐 TradeFlow Real-Time Stream Server ---'); +console.log(`📡 WebSocket server running on ws://localhost:${WS_PORT}`); + +// Connection tracking +let activeConnections = 0; + +/** + * Listen for incoming WebSocket connections. + */ +wss.on('connection', (ws) => { + activeConnections++; + console.log(`✅ New Web3 client connected. Active: ${activeConnections}`); + + // Initial Connection ACK + ws.send(JSON.stringify({ + event: 'INDEXER_CONNECTED', + status: 'ONLINE', + timestamp: new Date().toISOString() + })); + + ws.on('close', () => { + activeConnections--; + console.log(`❌ Web3 client disconnected. Active: ${activeConnections}`); + }); + + ws.on('error', (err) => { + console.error('⚠️ WS Socket Error:', err.message); + }); +}); + +/** + * BROADCASTER: Listens to the internal 'newTrade' event emitter. + * Broadcasts every new blockchain event caught by the Indexer daemon + * to all connected browser clients. + */ +wsEvents.on('newTrade', (tradeData) => { + console.log(`📣 BROADCASTING: New trade found in pool ${tradeData.poolId.slice(0, 8)}...`); + + const payload = JSON.stringify({ + event: 'NEW_TRADE_EVENT', + data: tradeData, + receivedAt: new Date().toISOString() + }); + + // Iterative broadcast to all active subscribers + wss.clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(payload); + } + }); +}); + +/** + * 🚀 START DAEMON ORCHESTRATION + * We start the Soroban Indexer in the same Node.js process to bridge + * the blockchain data with the WebSocket emitter via internal memory. + */ +startIndexer().catch((err) => { + console.error('❌ CRITICAL ERROR: Event Indexer failed to start:', err.message); + process.exit(1); +}); + +// Process Management +process.on('SIGTERM', () => { + console.log('🛑 Closing WebSocket connections and shutting down.'); + wss.close(); + process.exit(0); +}); diff --git a/services/eventIndexer.js b/services/eventIndexer.js new file mode 100644 index 0000000..f0882c0 --- /dev/null +++ b/services/eventIndexer.js @@ -0,0 +1,150 @@ +/** + * services/eventIndexer.js + * + * Background Soroban Event Listener daemon for TradeFlow-API. + * This service polls the Soroban RPC for events emitted by the specified Pool contract. + * When a 'Swap' event is detected, it parses the data and saves it to the database with Prisma. + */ + +const { rpc } = require('@stellar/stellar-sdk'); +const { PrismaClient } = require('@prisma/client'); +const { parseScVal } = require('./scValParser'); +const wsEvents = require('./wsEvents'); + +// In case dotenv is not installed as a top-level dependency, +// we try to load it safely. Most Node.js environments for this project should have it. +try { + require('dotenv').config(); +} catch (e) { + console.warn('⚠️ dotenv not loaded. Ensure environment variables are set manually.'); +} + +const prisma = new PrismaClient(); + +// Configuration +const RPC_URL = process.env.SOROBAN_RPC_URL || 'https://soroban-testnet.stellar.org'; +const POOL_ADDRESS = process.env.POOL_ADDRESS; +const POLL_INTERVAL = parseInt(process.env.INDEXER_POLL_INTERVAL || '5000'); + +if (!POOL_ADDRESS) { + console.error('❌ POOL_ADDRESS is not defined in environment variables.'); + console.error('Please add POOL_ADDRESS="YOUR_CONTRACT_ID" to your .env file.'); + process.exit(1); +} + +const server = new rpc.Server(RPC_URL); + +/** + * Main daemon loop to poll for Soroban events. + */ +async function startIndexer() { + console.log('--- 🚀 TradeFlow Soroban Event Indexer ---'); + console.log(`📡 RPC Node: ${RPC_URL}`); + console.log(`🎯 Pool Contract: ${POOL_ADDRESS}`); + console.log('-------------------------------------------'); + + // Start from the latest ledger initially + let currentLedgerSequence; + try { + const latestLedger = await server.getLatestLedger(); + currentLedgerSequence = latestLedger.sequence; + console.log(`Initial Start Ledger: ${currentLedgerSequence}`); + } catch (err) { + console.error('❌ Failed to connect to Soroban RPC. Verify your SOROBAN_RPC_URL.'); + process.exit(1); + } + + // Periodic polling + setInterval(async () => { + try { + const response = await server.getEvents({ + startLedger: currentLedgerSequence, + filters: [ + { + type: 'contract', + contractIds: [POOL_ADDRESS], + }, + ], + limit: 10, + }); + + if (response.events && response.events.length > 0) { + console.log(`Found ${response.events.length} new event(s). Processing...`); + + for (const event of response.events) { + // Process event + await handleContractEvent(event); + } + + // Advance ledger checkpoint + const latestProcessed = Math.max(...response.events.map(e => parseInt(e.ledger))); + currentLedgerSequence = latestProcessed + 1; + } + } catch (error) { + console.error('⚠️ Indexer Polling Error:', error.message); + } + }, POLL_INTERVAL); +} + +/** + * Handles an individual contract event. + * Filters for 'Swap' events and indexes them. + * + * @param {rpc.Api.GetEventsResponse.Event} event - The Soroban event from RPC. + */ +async function handleContractEvent(event) { + try { + // Decode topics to identify the event + const topics = event.topic.map(t => parseScVal(t)); + + // Check if topics contain "Swap" (case-insensitive) + const isSwapEvent = topics.some(topic => + typeof topic === 'string' && topic.toLowerCase() === 'swap' + ); + + if (isSwapEvent) { + console.log(`✅ Detected SwapEvent in ledger ${event.ledger}`); + + const payload = parseScVal(event.value); + if (!payload) return; + + console.log('Decoded Payload:', JSON.stringify(payload, null, 2)); + + // Map Soroban event data to our Prisma Trade model + // Expected structure from SwapEvent: { user, amount_in, amount_out } + const tradeData = { + poolId: event.contractId, + userAddress: payload.user || payload.address || 'Unknown', + amountIn: (payload.amount_in || payload.amountIn || '0').toString(), + amountOut: (payload.amount_out || payload.amountOut || '0').toString(), + timestamp: new Date(), + }; + + // Save to Database via Prisma + const savedTrade = await prisma.trade.create({ + data: tradeData + }); + + console.log(`💾 Indexed Trade saved. DB ID: ${savedTrade.id}`); + + // Trigger WebSocket broadcast + wsEvents.emit('newTrade', savedTrade); + } + } catch (error) { + console.error('❌ Failed to process event:', error.message); + } +} + +// Graceful Shut-off +process.on('SIGINT', async () => { + console.log('\n--- Indexer Shutting Down ---'); + await prisma.$disconnect(); + process.exit(0); +}); + +exports.startIndexer = startIndexer; + +// In standalone mode, starting the indexer automatically +if (require.main === module) { + startIndexer(); +} diff --git a/services/scValParser.js b/services/scValParser.js new file mode 100644 index 0000000..cc9d01e --- /dev/null +++ b/services/scValParser.js @@ -0,0 +1,58 @@ +/** + * services/scValParser.js + * + * Utility to decode Soroban XDR ScVal types into native JavaScript objects/JSON. + */ + +const { scValToNative } = require('@stellar/stellar-sdk'); + +/** + * Decodes a Soroban ScVal into its native JavaScript representation. + * Handles BigInt conversions to standard strings for JSON compatibility. + * + * @param {xdr.ScVal} scVal - The Soroban value to decode. + * @returns {any} - The native JavaScript value. + */ +function parseScVal(scVal) { + try { + const native = scValToNative(scVal); + return stringifyBigInts(native); + } catch (error) { + console.error('Error decoding Soroban XDR:', error.message); + return null; + } +} + +/** + * Recursively converts BigInt values to strings in an object/array. + * This is useful for Prisma and JSON serialization. + * + * @param {any} obj - The object to process. + * @returns {any} - The object with BigInts converted to strings. + */ +function stringifyBigInts(obj) { + if (typeof obj === 'bigint') { + return obj.toString(); + } + + if (Array.isArray(obj)) { + return obj.map(stringifyBigInts); + } + + if (obj !== null && typeof obj === 'object') { + const result = {}; + for (const key in obj) { + if (Object.prototype.hasOwnProperty.call(obj, key)) { + result[key] = stringifyBigInts(obj[key]); + } + } + return result; + } + + return obj; +} + +module.exports = { + parseScVal, + stringifyBigInts +}; diff --git a/services/wsEvents.js b/services/wsEvents.js new file mode 100644 index 0000000..29b36d4 --- /dev/null +++ b/services/wsEvents.js @@ -0,0 +1,4 @@ +const EventEmitter = require('events'); +const wsEvents = new EventEmitter(); + +module.exports = wsEvents;