Skip to content

Commit a4a8d71

Browse files
authored
feat: Reconnect (#39)
1 parent b5a5156 commit a4a8d71

File tree

10 files changed

+278
-3
lines changed

10 files changed

+278
-3
lines changed

apps/integration-tests/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"dependencies": {
1212
"@types/ws": "^8.18.1",
1313
"centrifuge": "^5.3.5",
14+
"toxiproxy-node-client": "^4.0.0",
1415
"vitest": "^3.2.4",
1516
"ws": "^8.18.3"
1617
}

apps/integration-tests/src/end-to-end.integration.test.ts

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
/** biome-ignore-all lint/suspicious/noExplicitAny: test code */
2+
/** biome-ignore-all lint/suspicious/noShadowRestrictedNames: test code */
23
import { type ConnectionMode, type IKeyManager, type IKVStore, type KeyPair, type SessionRequest, SessionStore, WebSocketTransport } from "@metamask/mobile-wallet-protocol-core";
34
import { DappClient, type OtpRequiredPayload } from "@metamask/mobile-wallet-protocol-dapp-client";
45
import { WalletClient } from "@metamask/mobile-wallet-protocol-wallet-client";
56
import { decrypt, encrypt, PrivateKey } from "eciesjs";
7+
import { type Proxy, Toxiproxy } from "toxiproxy-node-client";
68
import * as t from "vitest";
79
import WebSocket from "ws";
810

911
const RELAY_URL = "ws://localhost:8000/connection/websocket";
12+
const PROXY_RELAY_URL = "ws://localhost:8001/connection/websocket";
13+
const TOXIPROXY_URL = "http://localhost:8474";
1014

1115
class InMemoryKVStore implements IKVStore {
1216
private store = new Map<string, string>();
@@ -69,6 +73,12 @@ async function connectClients(dappClient: DappClient, walletClient: WalletClient
6973
await Promise.all([dappConnectPromise, walletConnectPromise]);
7074
}
7175

76+
// Helper to assert that a promise does NOT resolve within a given time
77+
async function assertPromiseNotResolve(promise: Promise<unknown>, timeout: number, message: string) {
78+
const timeoutPromise = new Promise((_, reject) => setTimeout(() => reject(new Error(message)), timeout));
79+
await t.expect(Promise.race([promise, timeoutPromise])).rejects.toThrow(message);
80+
}
81+
7282
t.describe("E2E Integration Test", () => {
7383
let dappClient: DappClient;
7484
let walletClient: WalletClient;
@@ -199,4 +209,114 @@ t.describe("E2E Integration Test", () => {
199209
await resumedDappClient.sendRequest(testPayload);
200210
await t.expect(messagePromise).resolves.toEqual(testPayload);
201211
});
212+
213+
});
214+
215+
t.describe("E2E Integration Test via Proxy", () => {
216+
let dappClient: DappClient;
217+
let walletClient: WalletClient;
218+
let dappKvStore: InMemoryKVStore;
219+
let walletKvStore: InMemoryKVStore;
220+
let dappSessionStore: SessionStore;
221+
let walletSessionStore: SessionStore;
222+
223+
// Toxiproxy setup
224+
let toxiproxy: Toxiproxy;
225+
let proxy: Proxy;
226+
const proxyConfig = {
227+
listen: "0.0.0.0:8001",
228+
upstream: "centrifugo:8000",
229+
};
230+
231+
t.beforeAll(async () => {
232+
toxiproxy = new Toxiproxy(TOXIPROXY_URL);
233+
try {
234+
proxy = await toxiproxy.get("centrifugo_proxy");
235+
await proxy.remove();
236+
} catch {
237+
// Proxy doesn't exist, which is fine
238+
}
239+
proxy = await toxiproxy.createProxy({
240+
name: "centrifugo_proxy",
241+
...proxyConfig,
242+
});
243+
});
244+
245+
t.beforeEach(async () => {
246+
// Ensure the proxy is enabled before each test
247+
await proxy.update({ ...proxyConfig, enabled: true });
248+
249+
dappKvStore = new InMemoryKVStore();
250+
walletKvStore = new InMemoryKVStore();
251+
dappSessionStore = new SessionStore(dappKvStore);
252+
walletSessionStore = new SessionStore(walletKvStore);
253+
254+
// DApp connects directly, Wallet connects through proxy
255+
const dappTransport = await WebSocketTransport.create({ url: RELAY_URL, kvstore: dappKvStore, websocket: WebSocket });
256+
const walletTransport = await WebSocketTransport.create({ url: PROXY_RELAY_URL, kvstore: walletKvStore, websocket: WebSocket });
257+
const keyManager = new KeyManager();
258+
259+
dappClient = new DappClient({ transport: dappTransport, sessionstore: dappSessionStore, keymanager: keyManager });
260+
walletClient = new WalletClient({ transport: walletTransport, sessionstore: walletSessionStore, keymanager: keyManager });
261+
});
262+
263+
t.afterEach(async () => {
264+
// Reset proxy state after each test
265+
if (proxy) {
266+
await proxy.update({ ...proxyConfig, enabled: true });
267+
}
268+
try {
269+
await dappClient?.disconnect();
270+
} catch (e) {
271+
console.error("Error disconnecting dappClient:", e);
272+
}
273+
try {
274+
await walletClient?.disconnect();
275+
} catch (e) {
276+
console.error("Error disconnecting walletClient:", e);
277+
}
278+
});
279+
280+
t.test(
281+
"should recover from a one-sided stale connection and receive queued messages after reconnect",
282+
async () => {
283+
// 1. Establish a normal connection and exchange a message to confirm it works
284+
await connectClients(dappClient, walletClient, "trusted");
285+
const initialMessage = { step: "initial_message" };
286+
const initialMessagePromise = new Promise((resolve) => walletClient.once("message", resolve));
287+
await dappClient.sendRequest(initialMessage);
288+
await t.expect(initialMessagePromise).resolves.toEqual(initialMessage);
289+
t.expect((walletClient as any).state).toBe("CONNECTED");
290+
291+
// 2. Create a ONE-SIDED network partition using toxiproxy.
292+
// This cuts off the Wallet's connection but leaves the DApp's connection intact.
293+
await proxy.update({ ...proxyConfig, enabled: false });
294+
295+
// 3. Dapp sends a message. Its transport is still live, so it successfully publishes to the relay.
296+
// The Wallet, however, is now on a stale connection and should NOT receive it.
297+
const missedMessage = { step: "missed_message" };
298+
const missedMessagePromise = new Promise((resolve) => walletClient.once("message", resolve));
299+
await dappClient.sendRequest(missedMessage);
300+
301+
// Assert that the message is NOT received by the wallet within 2 seconds.
302+
await assertPromiseNotResolve(missedMessagePromise, 2000, "Wallet incorrectly received message during network partition.");
303+
304+
// 4. Restore the network path and trigger the wallet's reconnect logic.
305+
await proxy.update({ ...proxyConfig, enabled: true });
306+
await walletClient.reconnect();
307+
308+
// 5. The wallet should now re-establish its connection. The transport's recovery logic
309+
// should fetch the missed message from the channel's history.
310+
const receivedMissedMessage = await missedMessagePromise;
311+
t.expect(receivedMissedMessage).toEqual(missedMessage);
312+
t.expect((walletClient as any).state).toBe("CONNECTED");
313+
314+
// 6. Send a final message to confirm the live connection is fully restored and working.
315+
const finalMessage = { step: "final_message" };
316+
const finalMessagePromise = new Promise((resolve) => walletClient.once("message", resolve));
317+
await dappClient.sendRequest(finalMessage);
318+
await t.expect(finalMessagePromise).resolves.toEqual(finalMessage);
319+
},
320+
15000,
321+
);
202322
});

apps/rn-demo/context/WalletContext.tsx

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import "react-native-get-random-values";
33
import { type Session, SessionStore } from "@metamask/mobile-wallet-protocol-core";
44
import Constants from "expo-constants";
55
import { createContext, type ReactNode, useCallback, useContext, useEffect, useState } from "react";
6-
import { Platform } from "react-native";
6+
import { AppState, type AppStateStatus, Platform } from "react-native";
77
import { AsyncStorageKVStore } from "@/lib/AsyncStorageKVStore";
88
import { type GlobalActivityLogEntry, SessionManager } from "@/lib/SessionManager";
99

@@ -127,6 +127,22 @@ export function WalletProvider({ children }: { children: ReactNode }) {
127127
};
128128
}, [addLog]);
129129

130+
useEffect(() => {
131+
const handleAppStateChange = (nextAppState: AppStateStatus) => {
132+
console.log(`App state changed to: ${nextAppState}`);
133+
if (nextAppState === "active") {
134+
console.log("App is in the foreground, reconnecting all clients...");
135+
sessionManager?.reconnectAllClients();
136+
}
137+
};
138+
139+
const subscription = AppState.addEventListener("change", handleAppStateChange);
140+
141+
return () => {
142+
subscription.remove();
143+
};
144+
}, [sessionManager]);
145+
130146
const value = { sessionManager, sessions, globalActivityLog, isInitializing, error, addLog, otpToDisplay, clearOtpDisplay };
131147

132148
return <WalletContext.Provider value={value}>{children}</WalletContext.Provider>;

apps/rn-demo/lib/SessionManager.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ export class SessionManager extends EventEmitter {
103103
await Promise.all(allSessions.map((id) => this.deleteClient(id)));
104104
}
105105

106+
public async reconnectAllClients(): Promise<void> {
107+
console.log("SessionManager: Reconnecting all clients...");
108+
const reconnectPromises = Array.from(this.clients.values()).map((client) => client.reconnect());
109+
await Promise.all(reconnectPromises);
110+
}
111+
106112
public getClient(sessionId: string): WalletClient | undefined {
107113
return this.clients.get(sessionId);
108114
}

backend/docker-compose.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,13 @@ services:
66
- 8000:8000
77
volumes:
88
- ./config.json:/centrifugo/config.json
9-
command: centrifugo -c config.json
9+
command: centrifugo -c config.json
10+
11+
toxiproxy:
12+
image: shopify/toxiproxy:latest
13+
ports:
14+
# The test client will connect through this proxy port
15+
- 8001:8001
16+
# The test runner will control the proxy via this HTTP API port
17+
- 8474:8474
18+
command: -host 0.0.0.0

packages/core/src/base-client.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,25 @@ export abstract class BaseClient extends EventEmitter {
5050
});
5151
}
5252

53+
/**
54+
* Proactively refreshes the underlying transport connection.
55+
* This is the recommended method for mobile clients to call when the application
56+
* returns to the foreground to ensure the connection is not stale.
57+
*/
58+
public async reconnect(): Promise<void> {
59+
if (this.state === ClientState.CONNECTING || !this.session || !this.transport.reconnect) return;
60+
61+
try {
62+
this.state = ClientState.CONNECTING;
63+
await this.transport.reconnect();
64+
this.state = ClientState.CONNECTED;
65+
this.emit("connected");
66+
} catch {
67+
this.state = ClientState.DISCONNECTED;
68+
throw new TransportError(ErrorCode.TRANSPORT_RECONNECT_FAILED, "Failed to reconnect");
69+
}
70+
}
71+
5372
/**
5473
* Resumes an existing session by loading it from storage and connecting to the
5574
* transport on the session's secure channel.

packages/core/src/domain/errors.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export enum ErrorCode {
1111
TRANSPORT_SUBSCRIBE_FAILED = "TRANSPORT_SUBSCRIBE_FAILED",
1212
TRANSPORT_HISTORY_FAILED = "TRANSPORT_HISTORY_FAILED",
1313
TRANSPORT_PARSE_FAILED = "TRANSPORT_PARSE_FAILED",
14+
TRANSPORT_RECONNECT_FAILED = "TRANSPORT_RECONNECT_FAILED",
1415

1516
// Crypto errors
1617
DECRYPTION_FAILED = "DECRYPTION_FAILED",

packages/core/src/domain/transport.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,14 @@ export interface ITransport {
4949
* @param channel The channel to clear.
5050
*/
5151
clear(channel: string): Promise<void>;
52+
53+
/**
54+
* Forcibly drops the current connection and attempts to re-establish it.
55+
* This is designed to recover from stale or zombie connections without losing
56+
* the client's subscription state. It should trigger the transport's built-in
57+
* recovery mechanisms upon a successful new connection.
58+
*
59+
* @returns A promise that resolves when the connection is re-established.
60+
*/
61+
reconnect?(): Promise<void>;
5262
}

packages/core/src/transport/websocket/index.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,25 @@ export class WebSocketTransport extends EventEmitter implements ITransport {
130130
});
131131
}
132132

133+
/**
134+
* Disconnects and immediately reconnects the underlying Centrifuge client.
135+
* This is a proactive way to force a fresh connection while preserving all
136+
* existing subscription objects in memory, allowing for automatic recovery.
137+
*/
138+
public reconnect(): Promise<void> {
139+
if (this.state === "connecting") {
140+
return new Promise((resolve) => this.centrifuge.once("connected", () => resolve()));
141+
}
142+
143+
this.centrifuge.disconnect();
144+
145+
return new Promise((resolve, reject) => {
146+
this.centrifuge.once("connected", () => resolve());
147+
this.centrifuge.once("error", (ctx) => reject(new TransportError(ErrorCode.TRANSPORT_RECONNECT_FAILED, ctx.error.message)));
148+
this.centrifuge.connect();
149+
});
150+
}
151+
133152
/**
134153
* Subscribes to a channel and fetches historical messages and sends any queued messages.
135154
*/

0 commit comments

Comments
 (0)