Skip to content

Commit

Permalink
Merge pull request #5 from gynzy/chore/upgrade-upstream
Browse files Browse the repository at this point in the history
chore: upgrade to upstream 0.3.1
  • Loading branch information
joelluijmes committed Jan 23, 2024
2 parents 19a4d35 + 40536d1 commit bb93687
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 134 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
# History

- [0.3.1](#031-2024-01-10) (Jan 2024)
- [0.3.0](#030-2023-02-23) (Feb 2023)
- [0.2.1](#021-2022-05-03) (May 2022)
- [0.2.0](#020-2022-04-27) (Apr 2022)
- [0.1.0](#010-2021-06-01) (Jun 2021)

# Release notes

## [0.3.1](https://github.com/socketio/socket.io-mongo-adapter/compare/0.3.0...0.3.1) (2024-01-10)


### Bug Fixes

* add support for mongodb@6 ([1a04885](https://github.com/socketio/socket.io-mongo-adapter/commit/1a0488562f1e8b4171af20378aacfa43072980dd))
* properly handle promise rejections ([075216f](https://github.com/socketio/socket.io-mongo-adapter/commit/075216f7decac3e8660c39dc1009a27d786ca1ad))



## [0.3.0](https://github.com/socketio/socket.io-mongo-adapter/compare/0.2.1...0.3.0) (2023-02-23)


Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ To release the specified version as a production build, simply merge the PR into

The `@socket.io/mongo-adapter` package allows broadcasting packets between multiple Socket.IO servers.

![Adapter diagram](./assets/adapter.png)
<picture>
<source media="(prefers-color-scheme: dark)" srcset="./assets/adapter_dark.png">
<img alt="Diagram of Socket.IO packets forwarded through MongoDB" src="./assets/adapter.png">
</picture>

Unlike the existing [`socket.io-adapter-mongo`](https://github.com/lklepner/socket.io-adapter-mongo) package which uses [tailable cursors](https://docs.mongodb.com/manual/core/tailable-cursors/), this package relies on [change streams](https://docs.mongodb.com/manual/changeStreams/) and thus requires a replica set or a sharded cluster.

Expand Down
Binary file modified assets/adapter.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added assets/adapter_dark.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 5 additions & 2 deletions docker-compose.yml → compose.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
version: "3"

services:
mongo:
image: mongo:4.4
entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
# you'll need to run "rs.initiate()" on the node (https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/)
# commands:
# $ docker compose exec mongo /bin/bash
# $ mongosh
# $ rs.initiate()
# $ rs.status()
ports:
- "27017:27017"
44 changes: 26 additions & 18 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
Session,
} from "socket.io-adapter";
import { randomBytes } from "crypto";
import { ObjectId, MongoServerError } from "mongodb";
import { ObjectId, MongoServerError, WithId, Document } from "mongodb";
import type { Collection, ChangeStream, ResumeToken } from "mongodb";

const randomId = () => randomBytes(8).toString("hex");
Expand Down Expand Up @@ -140,6 +140,10 @@ const replaceBinaryObjectsByBuffers = (obj: any) => {
return obj;
};

function onPublishError(err: Error) {
debug("something went wrong when inserting the MongoDB document: %s", err);
}

/**
* Returns a function that will create a MongoAdapter instance.
*
Expand Down Expand Up @@ -217,12 +221,12 @@ export function createAdapter(

const defaultClose = adapter.close;

adapter.close = () => {
adapter.close = async () => {
adapters.delete(nsp.name);

if (adapters.size === 0) {
changeStream.removeAllListeners("close");
changeStream.close();
await changeStream.close();
// @ts-ignore
changeStream = null;
isClosed = true;
Expand Down Expand Up @@ -273,7 +277,7 @@ export class MongoAdapter extends Adapter {

this.publish({
type: EventType.INITIAL_HEARTBEAT,
});
}).catch(onPublishError);
}

close(): Promise<void> | void {
Expand Down Expand Up @@ -301,7 +305,7 @@ export class MongoAdapter extends Adapter {
case EventType.INITIAL_HEARTBEAT: {
this.publish({
type: EventType.HEARTBEAT,
});
}).catch(onPublishError);
break;
}
case EventType.BROADCAST: {
Expand Down Expand Up @@ -400,7 +404,7 @@ export class MongoAdapter extends Adapter {
data: socket.data,
})),
},
});
}).catch(onPublishError);
break;
}
case EventType.FETCH_SOCKETS_RESPONSE: {
Expand Down Expand Up @@ -477,7 +481,7 @@ export class MongoAdapter extends Adapter {
debug("sending heartbeat");
this.publish({
type: EventType.HEARTBEAT,
});
}).catch(onPublishError);
this.scheduleHeartbeat();
}, this.heartbeatInterval);
}
Expand Down Expand Up @@ -591,7 +595,7 @@ export class MongoAdapter extends Adapter {
requestId,
opts: MongoAdapter.serializeOptions(opts),
},
});
}).catch(onPublishError);

this.ackRequests.set(requestId, {
type: EventType.BROADCAST,
Expand Down Expand Up @@ -630,7 +634,7 @@ export class MongoAdapter extends Adapter {
opts: MongoAdapter.serializeOptions(opts),
rooms,
},
});
}).catch(onPublishError);
}

delSockets(opts: BroadcastOptions, rooms: Room[]) {
Expand All @@ -647,7 +651,7 @@ export class MongoAdapter extends Adapter {
opts: MongoAdapter.serializeOptions(opts),
rooms,
},
});
}).catch(onPublishError);
}

disconnectSockets(opts: BroadcastOptions, close: boolean) {
Expand All @@ -664,7 +668,7 @@ export class MongoAdapter extends Adapter {
opts: MongoAdapter.serializeOptions(opts),
close,
},
});
}).catch(onPublishError);
}

private getExpectedResponseCount() {
Expand Down Expand Up @@ -736,7 +740,7 @@ export class MongoAdapter extends Adapter {
data: {
packet,
},
});
}).catch(onPublishError);
}

private async serverSideEmitWithAck(packet: any[]) {
Expand Down Expand Up @@ -783,15 +787,15 @@ export class MongoAdapter extends Adapter {
requestId, // the presence of this attribute defines whether an acknowledgement is needed
packet,
},
});
}).catch(onPublishError);
}

override persistSession(session: any) {
debug("persisting session: %j", session);
this.publish({
type: EventType.SESSION,
data: session,
});
}).catch(onPublishError);
}

override async restoreSession(
Expand Down Expand Up @@ -821,11 +825,15 @@ export class MongoAdapter extends Adapter {
return Promise.reject("error while fetching session");
}

if (!results[0].value || !results[1]) {
const result = (results[0]?.ok
? results[0].value // mongodb@5
: results[0]) as unknown as WithId<Document>; // mongodb@6

if (!result || !results[1]) {
return Promise.reject("session or offset not found");
}

const session = results[0].value.data;
const session = result.data;

// could use a sparse index on [_id, nsp, data.opts.rooms, data.opts.except] (only index the documents whose type is EventType.BROADCAST)
/* addition daniel genis 20231026
Expand Down Expand Up @@ -862,12 +870,12 @@ export class MongoAdapter extends Adapter {
session.missedPackets = [];

try {
await cursor.forEach((document: any) => {
for await (const document of cursor) {
const packetData = document?.data?.packet?.data;
if (packetData) {
session.missedPackets.push(packetData);
}
});
}
} catch (e) {
return Promise.reject("error while fetching missed packets");
}
Expand Down
98 changes: 49 additions & 49 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,50 +1,50 @@
{
"name": "@gynzy/mongo-adapter",
"version": "0.3.0-gynzy2",
"description": "The Socket.IO MongoDB adapter, allowing to broadcast events between several Socket.IO servers",
"license": "MIT",
"repository": {
"type": "git",
"url": "[email protected]:socketio/socket.io-mongo-adapter.git"
},
"files": [
"dist/"
],
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"scripts": {
"test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/index.ts",
"format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'",
"format:fix": "prettier --parser typescript --write 'lib/**/*.ts' 'test/**/*.ts'",
"build": "tsc"
},
"dependencies": {
"debug": "~4.3.1",
"mongodb": "*"
},
"peerDependencies": {
"socket.io-adapter": "^2.5.2"
},
"devDependencies": {
"@types/expect.js": "^0.3.29",
"@types/mocha": "^8.2.1",
"@types/node": "^14.14.7",
"expect.js": "0.3.1",
"mocha": "^10.2.0",
"nyc": "^15.1.0",
"prettier": "^2.1.2",
"socket.io": "^4.6.1",
"socket.io-client": "^4.6.1",
"ts-node": "^10.9.1",
"typescript": "^4.9.4"
},
"engines": {
"node": ">=10.0.0"
},
"keywords": [
"socket.io",
"mongodb",
"mongo",
"adapter"
]
}
"name": "@gynzy/mongo-adapter",
"version": "0.3.1-gynzy-1",
"description": "The Socket.IO MongoDB adapter, allowing to broadcast events between several Socket.IO servers",
"license": "MIT",
"repository": {
"type": "git",
"url": "[email protected]:socketio/socket.io-mongo-adapter.git"
},
"files": [
"dist/"
],
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"scripts": {
"test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/index.ts",
"format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'",
"format:fix": "prettier --parser typescript --write 'lib/**/*.ts' 'test/**/*.ts'",
"build": "tsc"
},
"dependencies": {
"debug": "~4.3.1",
"mongodb": "*"
},
"peerDependencies": {
"socket.io-adapter": "^2.5.2"
},
"devDependencies": {
"@types/expect.js": "^0.3.29",
"@types/mocha": "^8.2.1",
"@types/node": "^14.14.7",
"expect.js": "0.3.1",
"mocha": "^10.2.0",
"nyc": "^15.1.0",
"prettier": "^2.1.2",
"socket.io": "^4.6.1",
"socket.io-client": "^4.6.1",
"ts-node": "^10.9.1",
"typescript": "^4.9.4"
},
"engines": {
"node": ">=10.0.0"
},
"keywords": [
"socket.io",
"mongodb",
"mongo",
"adapter"
]
}
4 changes: 3 additions & 1 deletion test/connection-state-recovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ describe("connection state recovery", () => {
servers = [];
ports = [];

mongoClient = new MongoClient("mongodb://localhost:27017/?replicaSet=rs0");
mongoClient = new MongoClient(
"mongodb://localhost:27017/?replicaSet=rs0&directConnection=true"
);
await mongoClient.connect();

const collection = mongoClient.db("test").collection("events");
Expand Down
27 changes: 26 additions & 1 deletion test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ describe("@socket.io/mongodb-adapter", () => {
serverSockets = [];
clientSockets = [];

mongoClient = new MongoClient("mongodb://localhost:27017/?replicaSet=rs0");
mongoClient = new MongoClient(
"mongodb://localhost:27017/?replicaSet=rs0&directConnection=true"
);
await mongoClient.connect();

const collection = mongoClient.db("test").collection("events");
Expand Down Expand Up @@ -242,6 +244,29 @@ describe("@socket.io/mongodb-adapter", () => {
done();
});
});

it("broadcasts with a single acknowledgement (local)", async () => {
clientSockets[0].on("test", () => expect().fail());
clientSockets[1].on("test", (cb) => cb(2));
clientSockets[2].on("test", () => expect().fail());

const response = await serverSockets[1].emitWithAck("test");
expect(response).to.eql(2);
});

// This test seems to be flaky. We don't get it to pass on our CI, and upstream it also seems to be flaky.
// We are not hitting this codepath in our usecase, so we are confident in disabling this test.
it.skip("broadcasts with a single acknowledgement (remote)", async () => {
clientSockets[0].on("test", () => expect().fail());
clientSockets[1].on("test", (cb) => cb(2));
clientSockets[2].on("test", () => expect().fail());

const sockets = await servers[0].in(serverSockets[1].id).fetchSockets();
expect(sockets.length).to.eql(1);

const response = await sockets[0].timeout(500).emitWithAck("test");
expect(response).to.eql(2);
});
});

describe("socketsJoin", () => {
Expand Down
Loading

0 comments on commit bb93687

Please sign in to comment.