Skip to content

Commit 3451cb4

Browse files
authored
Merge pull request #2 from nanopay/migrate-payments-notifier
Migrate payments notifier and refactor router
2 parents 05c913b + 27b6612 commit 3451cb4

19 files changed

+234
-340
lines changed

README.md

-6
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ npm install
3636
```bash
3737
wrangler queues create payment-listener-queue
3838
wrangler queues create payment-write-queue
39-
wrangler queues create payment-pusher-queue
4039
wrangler queues create payment-receiver-queue
4140
wrangler queues create payment-sender-queue
4241
wrangler queues create webhook-delivery-queue
@@ -73,11 +72,6 @@ wrangler secret put RPC_URL
7372

7473
# A Nano node worker RPC url with PoW generation support (can be the same as RPC_URL)
7574
wrangler secret put WORKER_URLS
76-
77-
# Your Pusher crendentials
78-
wrangler secret put PUSHER_APP_ID
79-
wrangler secret put PUSHER_KEY
80-
wrangler secret put PUSHER_SECRET
8175
```
8276

8377
## Running locally

example.dev.vars

-4
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@ SUPABASE_SECRET_KEY="xxx.xxx.xx-xxx"
55

66
NANO_WEBSOCKET_URL="wss://xx.xxxx.xx"
77

8-
PUSHER_APP_ID="xxxx"
9-
PUSHER_KEY="xxxx"
10-
PUSHER_SECRET="xxxx"
11-
128
HOT_WALLET_SEED="xxxxxxxx"
139
REPRESENTATIVE="nano_3xxx"
1410
RPC_URLS=https://rpc.xxxx.xx

package-lock.json

+13-17
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1-3
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,18 @@
1111
"dependencies": {
1212
"@supabase/supabase-js": "^2.8.0",
1313
"bignumber.js": "^9.1.1",
14-
"crypto-js": "^4.1.1",
1514
"nanocurrency": "^2.5.0",
1615
"path-to-regexp": "^7.1.0",
1716
"uuid": "^9.0.1",
1817
"zod": "^3.22.4"
1918
},
2019
"devDependencies": {
2120
"@cloudflare/workers-types": "^4.20240718.0",
22-
"@types/crypto-js": "^4.1.1",
2321
"@types/uuid": "^9.0.8",
2422
"typescript": "^5.5.2",
2523
"wrangler": "3.65.1"
2624
},
2725
"resolutions": {
2826
"@types/node": "18.15.3"
2927
}
30-
}
28+
}

src/durable/payment-listener.ts

+34-13
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import { Invoice, MessageBody, Payment, Service, Webhook } from '../types';
44
import { rawToNano } from '../utils';
55
import { logger } from '../logger';
66
import { INVOICE_MIN_AMOUNT } from '../constants';
7+
import { PaymentNotifier } from './payment-notifier';
78

89
export class PaymentListener extends DurableObject<Env> {
910
private nanoWebsocket: NanoWebsocket;
1011
private pendingInvoices: { id: string; expiresAt: string; payAddress: string; payments: Payment[] }[] = [];
12+
private notifierNamespace: DurableObjectNamespace<PaymentNotifier>;
1113

1214
/**
1315
* The constructor is invoked once upon creation of the Durable Object, i.e. the first call to
@@ -19,6 +21,7 @@ export class PaymentListener extends DurableObject<Env> {
1921
constructor(ctx: DurableObjectState, env: Env) {
2022
super(ctx, env);
2123
this.nanoWebsocket = new NanoWebsocket(env.NANO_WEBSOCKET_URL);
24+
this.notifierNamespace = env.PAYMENT_NOTIFIER;
2225
}
2326

2427
async listen(message: MessageBody) {
@@ -58,6 +61,8 @@ export class PaymentListener extends DurableObject<Env> {
5861
payments: [],
5962
});
6063

64+
await this.startPaymentNotifier(invoice.id);
65+
6166
await this.alarm();
6267
}
6368

@@ -101,23 +106,23 @@ export class PaymentListener extends DurableObject<Env> {
101106

102107
payments.push(newPayment);
103108

104-
this.paymentNotify(payments, invoice);
109+
this.paymentNotify(newPayment, invoice.id);
105110
this.paymentWrite(newPayment, invoice, service, webhooks);
106111

107112
const paid_total = payments.reduce((acc, payment) => {
108113
return acc + payment.amount;
109114
}, 0);
110115

111116
if (paid_total >= invoice.price) {
112-
this.nanoWebsocket.unsubscribe(invoice.pay_address);
113-
this.removePendingInvoice(invoice.id);
114-
115-
this.paymentReceiver(payments, invoice);
117+
await this.removePendingInvoice(invoice.id, invoice.pay_address);
118+
await this.paymentReceiver(payments, invoice);
116119
}
117120
}
118121

119-
private removePendingInvoice(invoiceId: string) {
122+
private async removePendingInvoice(invoiceId: string, payAddress: string) {
123+
this.nanoWebsocket.unsubscribe(payAddress);
120124
this.pendingInvoices = this.pendingInvoices.filter((activeInvoice) => activeInvoice.id !== invoiceId);
125+
await this.stopPaymentNotifier(invoiceId);
121126
}
122127

123128
private async paymentWrite(payment: Payment, invoice: Invoice, service: Service, webhooks: Webhook[]) {
@@ -130,11 +135,28 @@ export class PaymentListener extends DurableObject<Env> {
130135
});
131136
}
132137

133-
private async paymentNotify(payments: Payment[], invoice: Invoice) {
134-
// Send the payment to the worker to push to the channel
135-
await this.env.PAYMENT_PUSHER_QUEUE.send({
136-
invoice,
137-
payments,
138+
private async startPaymentNotifier(invoiceId: string) {
139+
const notifierId = this.notifierNamespace.idFromName(invoiceId);
140+
const paymentNotifier = this.notifierNamespace.get(notifierId);
141+
await paymentNotifier.start();
142+
}
143+
144+
private async stopPaymentNotifier(invoiceId: string) {
145+
const notifierId = this.notifierNamespace.idFromName(invoiceId);
146+
const paymentNotifier = this.notifierNamespace.get(notifierId);
147+
await paymentNotifier.stop();
148+
}
149+
150+
private async paymentNotify(payment: Payment, invoiceId: string) {
151+
// Send the payment to the PaymentNotifier
152+
const notifierId = this.notifierNamespace.idFromName(invoiceId);
153+
const paymentNotifier = this.notifierNamespace.get(notifierId);
154+
await paymentNotifier.notify({
155+
from: payment.from,
156+
to: payment.to,
157+
amount: payment.amount,
158+
hash: payment.hash,
159+
timestamp: payment.timestamp,
138160
});
139161
}
140162

@@ -156,8 +178,7 @@ export class PaymentListener extends DurableObject<Env> {
156178
logger.info(`Invoice expired: ${activeInvoice.id}`, {
157179
activeInvoice,
158180
});
159-
this.nanoWebsocket.unsubscribe(activeInvoice.payAddress);
160-
this.removePendingInvoice(activeInvoice.id);
181+
await this.removePendingInvoice(activeInvoice.id, activeInvoice.payAddress);
161182
}
162183
}
163184
if (this.pendingInvoices.length > 0) {

src/durable/payment-notifier.ts

+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import { DurableObject } from 'cloudflare:workers';
2+
import { logger } from '../logger';
3+
4+
export type PaymentNotification = {
5+
from: string;
6+
to: string;
7+
amount: number;
8+
hash: string;
9+
timestamp: number;
10+
};
11+
12+
/*
13+
* PaymentNotifier implements a Durable Object that coordinates notifications for an individual invoice.
14+
* Participants connect to the notifier using WebSockets, and the notifier broadcasts the payments for them.
15+
* The notifier only start to respond websockets connections after started.
16+
* It auto hibernates when no messages are being sent and resume when notify is called.
17+
*/
18+
export class PaymentNotifier extends DurableObject<Env> {
19+
sessions = new Set<WebSocket>();
20+
state: DurableObjectState;
21+
storage: DurableObjectStorage;
22+
23+
constructor(ctx: DurableObjectState, env: Env) {
24+
super(ctx, env);
25+
26+
this.state = ctx;
27+
this.storage = ctx.storage;
28+
29+
this.state.getWebSockets().forEach((webSocket) => {
30+
this.sessions.add(webSocket);
31+
});
32+
}
33+
34+
async fetch(request: Request) {
35+
// We only accept WebSocket requests.
36+
if (request.headers.get('Upgrade') != 'websocket') {
37+
return new Response('expected websocket', { status: 400 });
38+
}
39+
40+
const started = await this.storage.get('started');
41+
if (started !== 'true') {
42+
return new Response('not started', { status: 503 });
43+
}
44+
45+
// To accept the WebSocket request, we create a WebSocketPair
46+
const pair = new WebSocketPair();
47+
48+
// We're going to take pair[1] as our end, and return pair[0] to the client.
49+
await this.handleSession(pair[1]);
50+
51+
// Now we return the other end of the pair to the client.
52+
return new Response(null, { status: 101, webSocket: pair[0] });
53+
}
54+
55+
async start() {
56+
await this.storage.put('started', 'true');
57+
logger.debug('Started payment notifier');
58+
}
59+
60+
async stop() {
61+
this.sessions.forEach((session) => {
62+
session.close();
63+
});
64+
await this.clear();
65+
logger.debug('Stopped payment notifier');
66+
}
67+
68+
private async clear() {
69+
this.sessions.clear();
70+
await this.storage.deleteAll();
71+
}
72+
73+
private async handleSession(webSocket: WebSocket) {
74+
// Accept our end of the WebSocket. This tells the runtime that we'll be terminating the
75+
// WebSocket in JavaScript, not sending it elsewhere.
76+
this.state.acceptWebSocket(webSocket);
77+
78+
this.sessions.add(webSocket);
79+
80+
// Load the last 10 payments from the history stored on disk, and send them to the
81+
// client.
82+
const payments = await this.storage.list<Record<any, any>>({ reverse: true, limit: 10, prefix: 'payment_' });
83+
[...payments.values()].forEach((value) => {
84+
webSocket.send(JSON.stringify(value));
85+
});
86+
}
87+
88+
// On "close" and "error" events, remove the WebSocket from the sessions list and broadcast
89+
// a quit message.
90+
private async closeOrErrorHandler(webSocket: WebSocket) {
91+
this.sessions.delete(webSocket);
92+
}
93+
94+
// Implement DurableObject's webSocketClose event.
95+
async webSocketClose(webSocket: WebSocket, code: number, reason: string, wasClean: boolean) {
96+
this.closeOrErrorHandler(webSocket);
97+
}
98+
99+
// Implement DurableObject's webSocketError event.
100+
async webSocketError(webSocket: WebSocket, error: unknown) {
101+
this.closeOrErrorHandler(webSocket);
102+
}
103+
104+
// Broadcasts a payment to all clients.
105+
private async broadcast(payment: PaymentNotification) {
106+
// Iterate over all the sessions sending them messages.
107+
this.sessions.forEach((session) => {
108+
try {
109+
session.send(JSON.stringify(payment));
110+
} catch (err) {
111+
// Whoops, this connection is dead. Remove it from the map
112+
this.sessions.delete(session);
113+
}
114+
});
115+
}
116+
117+
public async notify(payment: PaymentNotification) {
118+
// Store the payment in the history.
119+
await this.storage.put(`payment_${payment.hash}`, payment);
120+
121+
// Broadcast the payment to all clients.
122+
this.broadcast(payment);
123+
}
124+
}

src/index.ts

+1-9
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import { invoiceRouter } from './invoice/invoice-router';
22
import { logger } from './logger';
33
import { queue } from './queues';
4-
import { UnauthorizedException } from './responses';
54
import { isFalsyLike } from './utils';
65
import { Router } from './utils/router';
76
/*
87
* Export our Durable Object classes here.
98
*/
109
export { PaymentListener } from './durable/payment-listener';
10+
export { PaymentNotifier } from './durable/payment-notifier';
1111

1212
/*
1313
* Our API router.
@@ -23,14 +23,6 @@ router.route('/invoices', invoiceRouter);
2323
*/
2424
export default {
2525
async fetch(request: Request<unknown, IncomingRequestCfProperties<unknown>>, env: Env, ctx: ExecutionContext): Promise<Response> {
26-
// Check authorization
27-
const authorizationHeader = request.headers.get('Authorization');
28-
const bearerToken = authorizationHeader?.split(' ')[1];
29-
const authorized = bearerToken === env.AUTH_TOKEN;
30-
if (!authorized) {
31-
return UnauthorizedException();
32-
}
33-
3426
const localMode = !isFalsyLike(env.IS_LOCAL_MODE);
3527
logger.setLocalDev(localMode);
3628

0 commit comments

Comments
 (0)