Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,9 @@ TWILIO_PHONE_NUMBER=+1234567890
# Owner Configuration (for auto-reclaim)
# Owner Account (for executing reclaim transactions)
OWNER_MNEMONIC=

# Real-Time Rent Payment Tracker (Issue #16)
# Cron expression for how often to poll Horizon for new payments (default: every minute)
PAYMENT_TRACKER_CRON=* * * * *
# Override the Horizon base URL used by the payment tracker (defaults to testnet)
LEASEFLOW_HORIZON_URL=https://horizon-testnet.stellar.org
109 changes: 109 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const { NotificationService } = require('./src/services/notificationService');
const { SorobanLeaseService } = require('./src/services/sorobanLeaseService');
const { LeaseRenewalService } = require('./src/services/leaseRenewalService');
const { LeaseRenewalJob, startLeaseRenewalScheduler } = require('./src/jobs/leaseRenewalJob');
const { RentPaymentTrackerService } = require('./services/rentPaymentTrackerService');
const { startPaymentTrackerJob } = require('./src/jobs/paymentTrackerJob');
const { createPaymentRoutes } = require('./src/routes/paymentRoutes');
const { getUSDCToFiatRates, getXLMToUSDCPath } = require('./services/priceFeedService');
const AvailabilityService = require('./services/availabilityService');
const AssetMetadataService = require('./services/assetMetadataService');
Expand Down Expand Up @@ -80,6 +83,65 @@ function createApp(dependencies = {}) {

// Middleware
app.use(cors());
app.use(express.json());
const express = require('express');
const cors = require('cors');
const { randomUUID } = require('crypto');

const multer = require('multer');
const path = require('path');
const fs = require('fs');
const sharp = require('sharp');
const app = express();
const port = 3000;
const creditScoreAggregator = new TenantCreditScoreAggregator();
const listings = [];
const HORIZON_URL = process.env.HORIZON_URL || 'https://horizon.stellar.org';

const uploadDir = path.join(__dirname, 'uploads');
if (!fs.existsSync(uploadDir)) {
fs.mkdirSync(uploadDir, { recursive: true });
}

const storage = multer.diskStorage({
destination: (req, file, cb) => cb(null, uploadDir),
filename: (req, file, cb) => cb(null, `${Date.now()}_${file.originalname}`)
});
const upload = multer({ storage });

// Middleware
app.use(cors());
app.use(express.json({ limit: '50mb' }));
app.use(express.urlencoded({ extended: true, limit: '50mb' }));

// Routes
app.use('/api/leases', leaseRoutes);
app.use('/api/owners', ownerRoutes);
app.use('/api', createPaymentRoutes(database));

app.get('/', (req, res) => {
res.json({
project: 'LeaseFlow Protocol Backend',
description: 'Secure Lease Indexer and Storage Facilitator',
status: 'Operational',
version: '1.0.0',
contract_id: process.env.CONTRACT_ID || 'CAEGD57WVTVQSYWYB23AISBW334QO7WNA5XQ56S45GH6BP3D2AVHKUG4',
endpoints: {
upload_lease: 'POST /api/leases/upload',
view_lease_handshake: 'GET /api/leases/:leaseCID/handshake',
top_owners: 'GET /api/owners/top'
}
app.use(express.json());
app.use('/uploads', express.static(path.join(__dirname, 'uploads')));
const {
createConditionProofService,
ConditionProofError,
} = require('./services/conditionProofService');
const {
createFileConditionProofStore,
} = require('./services/conditionProofStore');

const port = process.env.PORT || 3000;
app.use(express.json({ limit: '50mb' }));
app.use(express.urlencoded({ extended: true, limit: '50mb' }));

Expand Down Expand Up @@ -253,6 +315,53 @@ if (require.main === module) {
}).catch(err => {
console.warn('AutoReclaimWorker failed to initialize:', err.message);
});
},
);

return app;
}

const app = createApp();

if (require.main === module) {
let scheduler;

if (config.jobs.renewalJobEnabled) {
const database = new AppDatabase(config.database.filename);
const notificationService = new NotificationService(database);
const sorobanLeaseService = new SorobanLeaseService(config);
const leaseRenewalService = new LeaseRenewalService(
database,
notificationService,
sorobanLeaseService,
config,
);
scheduler = startLeaseRenewalScheduler(new LeaseRenewalJob(leaseRenewalService), config);
}

const paymentTrackerDb = new AppDatabase(config.database.filename);
const paymentTrackerService = new RentPaymentTrackerService(paymentTrackerDb, {
contractAccountId: config.contracts.defaultContractId,
});
startPaymentTrackerJob(paymentTrackerService, {
cronExpression: process.env.PAYMENT_TRACKER_CRON || '* * * * *',
});

app.listen(port, () => {
console.log(`LeaseFlow Backend running at http://localhost:${port}`);
console.log(`Lease Encryption Service: Active`);
console.log(`IPFS Storage Service: Initialized (Host: ${process.env.IPFS_HOST || 'ipfs.infura.io'})`);
console.log(`LeaseFlow Backend listening at http://localhost:${port}`);
if (scheduler) {
console.log(`Lease renewal scheduler running every ${config.jobs.intervalMs}ms`);
}
const autoReclaimWorker = new AutoReclaimWorker();

autoReclaimWorker.initialize().then(() => {
autoReclaimWorker.start();
app.listen(port, () => {
console.log(`LeaseFlow Backend listening at http://localhost:${port}`);
console.log('Auto-Reclaim Worker started');
});
});
}
Expand Down
30 changes: 30 additions & 0 deletions migrations/002_add_payment_history.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- Migration: Add payment_history table and extend leases with payment tracking columns
-- Issue #16: Real-Time Rent Payment Tracker

-- payment_history stores every detected Horizon payment event
CREATE TABLE IF NOT EXISTS payment_history (
id TEXT PRIMARY KEY,
horizon_op_id TEXT NOT NULL UNIQUE,
lease_id TEXT,
tenant_account_id TEXT NOT NULL,
amount TEXT NOT NULL,
asset_code TEXT NOT NULL DEFAULT 'XLM',
asset_issuer TEXT,
transaction_hash TEXT NOT NULL,
paid_at TEXT NOT NULL,
recorded_at TEXT NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_payment_history_lease_id
ON payment_history (lease_id);

CREATE INDEX IF NOT EXISTS idx_payment_history_tenant_account
ON payment_history (tenant_account_id);

CREATE INDEX IF NOT EXISTS idx_payment_history_paid_at
ON payment_history (paid_at DESC);

-- Extend leases table with payment-tracking columns
ALTER TABLE leases ADD COLUMN IF NOT EXISTS tenant_account_id TEXT;
ALTER TABLE leases ADD COLUMN IF NOT EXISTS payment_status TEXT NOT NULL DEFAULT 'pending';
ALTER TABLE leases ADD COLUMN IF NOT EXISTS last_payment_at TEXT;
7 changes: 6 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@
},
"dependencies": {
"@stellar/stellar-sdk": "^14.6.1",
"algosdk": "^2.0.0",
"axios": "^1.13.6",
"busboy": "^1.6.0",
"algosdk": "^2.0.0",
"cors": "^2.8.6",
"crypto-js": "^4.2.0",
"dotenv": "^17.3.1",
"express": "^5.2.1",
"lodash": "^4.17.23",
"eciesjs": "^0.4.18",
"ipfs-http-client": "^60.0.1",
"lodash": "^4.17.23",
"multer": "^2.1.1",
"node-cron": "^3.0.3",
"pg": "^8.11.3",
"sharp": "^0.34.5"
"multer": "^2.1.1",
"sharp": "^0.34.5",
"pg": "^8.11.3",
Expand Down
154 changes: 154 additions & 0 deletions services/rentPaymentTrackerService.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/**
* RentPaymentTrackerService
*
* Monitors the Stellar Horizon API for `payment` operations directed at the
* LeaseFlow contract account. When a new payment is detected it is recorded
* in the `payment_history` table and the parent lease `payment_status` is
* updated accordingly — giving landlords a "Stripe-like" real-time view of
* tenant rent payments.
*/

const axios = require('axios');

const HORIZON_BASE_URL =
process.env.HORIZON_URL || 'https://horizon-testnet.stellar.org';

/** How many operations to fetch per Horizon page (max 200). */
const PAGE_LIMIT = 200;

class RentPaymentTrackerService {
/**
* @param {import('../src/db/appDatabase').AppDatabase} database
* @param {{contractAccountId?: string}} [options]
*/
constructor(database, options = {}) {
this.database = database;
/** The Stellar account (contract or landlord escrow) to watch. */
this.contractAccountId =
options.contractAccountId ||
process.env.SOROBAN_CONTRACT_ID ||
process.env.CONTRACT_ID ||
'CAEGD57WVTVQSYWYB23AISBW334QO7WNA5XQ56S45GH6BP3D2AVHKUG4';

/**
* Horizon paging_token of the last successfully processed operation.
* Stored in-memory so each poll only fetches genuinely new operations
* instead of re-fetching (and skipping) the same page every minute.
* @type {string|null}
*/
this._lastPagingToken = null;
}

/**
* Poll Horizon for new payments made TO the contract account.
* Cursor-based — only fetches operations newer than the last poll.
* Idempotent — repeated calls do not create duplicate records.
*
* @returns {Promise<{processed: number, skipped: number, errors: Array<{id: string, message: string}>}>}
*/
async poll() {
const result = { processed: 0, skipped: 0, errors: [] };

// Use ascending order + cursor so we only get operations we haven't seen yet.
let url =
`${HORIZON_BASE_URL}/accounts/${encodeURIComponent(this.contractAccountId)}` +
`/payments?limit=${PAGE_LIMIT}&order=asc&include_failed=false`;

if (this._lastPagingToken) {
url += `&cursor=${encodeURIComponent(this._lastPagingToken)}`;
}

const response = await this._fetchHorizon(url);
const records = response?._embedded?.records ?? [];

let lastToken = null;
for (const op of records) {
try {
const outcome = await this._processPaymentOperation(op);
if (outcome === 'recorded') {
result.processed += 1;
} else {
result.skipped += 1;
}
} catch (err) {
result.errors.push({ id: op.id, message: err.message });
}
// Always advance the cursor, even for skipped ops, so we never
// re-scan the same page on the next poll.
if (op.paging_token) {
lastToken = op.paging_token;
}
}

if (lastToken) {
this._lastPagingToken = lastToken;
}

return result;
}

/**
* Process a single Horizon payment operation record.
*
* @param {object} op Horizon payment operation object.
* @returns {Promise<'recorded'|'skipped'>}
*/
async _processPaymentOperation(op) {
// We only care about incoming credit operations (payment / path_payment).
if (!['payment', 'path_payment_strict_send', 'path_payment_strict_receive'].includes(op.type)) {
return 'skipped';
}

// The payment must be directed *to* the contract account.
if (op.to !== this.contractAccountId) {
return 'skipped';
}

// Deduplicate — skip if this Horizon operation id is already recorded.
if (this.database.getPaymentByHorizonOpId(op.id)) {
return 'skipped';
}

// Try to resolve the lease by matching the sender (tenant Stellar account).
const tenantAccountId = op.from;
const lease = this.database.getActiveLeaseByTenantAccount(tenantAccountId);

const leaseId = lease?.id ?? null;

const payment = {
horizonOperationId: op.id,
leaseId,
tenantAccountId,
amount: op.amount,
assetCode: op.asset_code || 'XLM',
assetIssuer: op.asset_issuer || null,
transactionHash: op.transaction_hash,
paidAt: op.created_at,
};

this.database.insertPayment(payment);

// Update the lease payment status if we matched a lease.
if (leaseId) {
this.database.updateLeasePaymentStatus(leaseId, 'paid', op.created_at);
}

return 'recorded';
}

/**
* Fetch a URL from Horizon and return parsed JSON.
*
* @param {string} url
* @returns {Promise<object>}
*/
async _fetchHorizon(url) {
const response = await axios.get(url, {
headers: { Accept: 'application/json' },
timeout: 10_000,
});
return response.data;
}
}

module.exports = { RentPaymentTrackerService };
Loading
Loading