Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Tracker Architecture for Multi-Peer Connections #77

Merged
merged 18 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
435 changes: 219 additions & 216 deletions lib/fast-tracker.ts

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@

export type { UWebSocketsTracker } from "./uws-tracker.js";
export type { FastTracker } from "./fast-tracker.js";
export type { Tracker, PeerContext, TrackerError } from "./tracker.js";
export type {
Tracker,
SocketContext as PeerContext,
TrackerError,
} from "./tracker.js";
2 changes: 1 addition & 1 deletion lib/run-uws-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ function buildServer({
peersCount += swarm.peers.length;

const infoHashHex = Buffer.from(infoHash, "binary").toString("hex");
peersCountPerInfoHash[infoHashHex] = peersCount;
peersCountPerInfoHash[infoHashHex] = swarm.peers.length;
}

const serversStats = new Array<{
Expand Down
21 changes: 17 additions & 4 deletions lib/tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,29 @@
* limitations under the License.
*/

export interface SocketContext {
sendMessage: (json: object, peer: SocketContext) => void;
}

export type Swarm = {
infoHash: string;
completedPeers?: Set<string>;
peers: PeerContext[];
};

export interface PeerContext {
id?: string;
sendMessage: (json: object, peer: PeerContext) => void;
peerId: string;
sendMessage: (json: object, peer: SocketContext) => void;
socket: SocketContext;
lastAccessed: number;
swarm: Swarm;
}

export interface Tracker {
readonly swarms: ReadonlyMap<string, { peers: readonly PeerContext[] }>;
readonly settings: object;
processMessage: (json: object, peer: PeerContext) => void;
disconnectPeer: (peer: PeerContext) => void;
processMessage: (json: object, peer: SocketContext) => void;
disconnectPeersFromSocket: (peer: SocketContext) => void;
}

export class TrackerError extends Error {}
34 changes: 11 additions & 23 deletions lib/uws-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import {
HttpResponse,
} from "uWebSockets.js";
import Debug from "debug";
import { Tracker, TrackerError, PeerContext } from "./tracker.js";
import { Tracker, TrackerError, SocketContext } from "./tracker.js";
import {
ServerSettings,
WebSocketsSettings,
WebSocketsAccessSettings,
} from "./run-uws-tracker.js";

declare module "./tracker.js" {
interface PeerContext {
ws: WebSocket<PeerContext>;
interface SocketContext {
ws: WebSocket<SocketContext>;
}
}

Expand Down Expand Up @@ -186,7 +186,7 @@ export class UWebSocketsTracker {
idleTimeout: this.settings.websockets.idleTimeout,
open: this.onOpen,
upgrade: this.onUpgrade,
drain: (ws: WebSocket<PeerContext>) => {
drain: (ws: WebSocket<SocketContext>) => {
if (debugWebSocketsEnabled) {
debugWebSockets("drain", ws.getBufferedAmount());
}
Expand Down Expand Up @@ -276,7 +276,7 @@ export class UWebSocketsTracker {
);
}

response.upgrade<Pick<PeerContext, "sendMessage">>(
response.upgrade<Omit<SocketContext, "ws">>(
{
sendMessage,
},
Expand All @@ -288,7 +288,7 @@ export class UWebSocketsTracker {
};

private readonly onMessage = (
ws: WebSocket<PeerContext>,
ws: WebSocket<SocketContext>,
message: ArrayBuffer,
): void => {
debugWebSockets("message of size", message.byteLength);
Expand All @@ -308,13 +308,7 @@ export class UWebSocketsTracker {
}

if (debugMessagesEnabled) {
debugMessages(
"in",
userData.id === undefined
? "unknown peer"
: Buffer.from(userData.id).toString("hex"),
json,
);
debugMessages("in", json);
}

try {
Expand All @@ -330,28 +324,22 @@ export class UWebSocketsTracker {
};

private readonly onClose = (
ws: WebSocket<PeerContext>,
ws: WebSocket<SocketContext>,
code: number,
): void => {
this.webSocketsCount--;

if (ws.getUserData().sendMessage !== undefined) {
this.tracker.disconnectPeer(ws as unknown as PeerContext);
this.tracker.disconnectPeersFromSocket(ws as unknown as SocketContext);
}

debugWebSockets("closed with code", code);
};
}

function sendMessage(json: object, peerContext: PeerContext): void {
function sendMessage(json: object, peerContext: SocketContext): void {
peerContext.ws.send(JSON.stringify(json), false, false);
if (debugMessagesEnabled) {
debugMessages(
"out",
peerContext.id === undefined
? "unknown peer"
: Buffer.from(peerContext.id).toString("hex"),
json,
);
debugMessages("out", json);
}
}
Loading
Loading