Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for middlewares intercepting outgoing events #4885

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 86 additions & 22 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ export class Socket<
private readonly adapter: Adapter;
private acks: Map<number, () => void> = new Map();
private fns: Array<(event: Event, next: (err?: Error) => void) => void> = [];
private outFns: Array<(event: Event, next: (err?: Error) => void) => void> =
[];
private flags: BroadcastFlags = {};
private _anyListeners?: Array<(...args: any[]) => void>;
private _anyOutgoingListeners?: Array<(...args: any[]) => void>;
Expand Down Expand Up @@ -337,32 +339,39 @@ export class Socket<
type: PacketType.EVENT,
data: data,
};
// running the middlewares for outgoing events
this.runOutgoing(data, (err) => {
// if error, abbort the event
if (err) {
this.emitReserved("error", err);
return;
}

// access last argument to see if it's an ACK callback
if (typeof data[data.length - 1] === "function") {
const id = this.nsp._ids++;
debug("emitting packet with ack id %d", id);

this.registerAckCallback(id, data.pop());
packet.id = id;
}
// access last argument to see if it's an ACK callback
if (typeof data[data.length - 1] === "function") {
const id = this.nsp._ids++;
debug("emitting packet with ack id %d", id);

const flags = Object.assign({}, this.flags);
this.flags = {};
this.registerAckCallback(id, data.pop());
packet.id = id;
}

// @ts-ignore
if (this.nsp.server.opts.connectionStateRecovery) {
// this ensures the packet is stored and can be transmitted upon reconnection
this.adapter.broadcast(packet, {
rooms: new Set([this.id]),
except: new Set(),
flags,
});
} else {
this.notifyOutgoingListeners(packet);
this.packet(packet, flags);
}
const flags = Object.assign({}, this.flags);
this.flags = {};

// @ts-ignore
if (this.nsp.server.opts.connectionStateRecovery) {
// this ensures the packet is stored and can be transmitted upon reconnection
this.adapter.broadcast(packet, {
rooms: new Set([this.id]),
except: new Set(),
flags,
});
} else {
this.notifyOutgoingListeners(packet);
this.packet(packet, flags);
}
});
return true;
}

Expand Down Expand Up @@ -954,6 +963,34 @@ export class Socket<
return this;
}

/**
* Sets up a socketmiddleware for outgoing requests.
*
* Can be used to hook right before acknowlegement callbacks get ran.
* @example
* io.on("connection", (socket) => {
* socket.useOutgoing((event, next) => {
* if (event.length && typeof event[event.length - 1] === 'function') {
* const callback = event[event.length - 1];
* event[event.length - 1] = (...args) => {
* // able to hook right before the acknowledgement callback is executed.
* callback(...args);
* }
* }
* // do not forget to call next
* next();
* });
* });
* @param {Function} fn - middleware function (event, next)
* @returns {Socket} self
*/
public useOutgoing(
fn: (event: Event, next: (err?: Error) => void) => void
): this {
this.outFns.push(fn);
return this;
}

/**
* Executes the middleware for an incoming event.
*
Expand Down Expand Up @@ -981,6 +1018,33 @@ export class Socket<
run(0);
}

/**
* Executes the middleware for an outgoing event.
*
* @param {Array} event - event that will get emitted to client
* @param {Function} fn - last fn call in the middleware
* @private
*/
private runOutgoing(event, fn: (err: Error | null) => void): void {
const outFns = this.outFns.slice(0);
if (!outFns.length) return fn(null);

function run(i: number) {
outFns[i](event, function (err) {
// upon error, short-circuit
if (err) return fn(err);

// if no middleware left, summon callback
if (!outFns[i + 1]) return fn(null);

// go on to next
run(i + 1);
});
}

run(0);
}

/**
* Whether the socket is currently disconnected
*/
Expand Down
1 change: 1 addition & 0 deletions test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ describe("socket.io", () => {
require("./messaging-many");
require("./middleware");
require("./socket-middleware");
require("./socket-outgoing-middleware");
require("./v2-compatibility");
require("./socket-timeout");
require("./uws");
Expand Down
97 changes: 97 additions & 0 deletions test/socket-outgoing-middleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { Server } from "..";
import expect from "expect.js";
import { success, createClient } from "./support/util";

describe("socket outgoing middleware", () => {
it("should call functions before sending event", (done) => {
const io = new Server(0);
let run = 0;
const clientSocket = createClient(io, "/", { multiplex: false });
io.on("connection", (socket) => {
socket.useOutgoing((event, next) => {
expect(event).to.eql(["join", "woot"]);
event.unshift("wrap");
run++;
next();
});
socket.useOutgoing((event, next) => {
expect(event).to.eql(["wrap", "join", "woot"]);
run++;
next();
});
socket.onAnyOutgoing((arg1, arg2, arg3) => {
expect(arg1).to.be("wrap");
expect(arg2).to.be("join");
expect(arg3).to.be("woot");
expect(run).to.be(2);

success(done, io, clientSocket);
});

socket.emit("join", "woot");
});
});

it("should pass errors", (done) => {
const io = new Server(0);
const clientSocket = createClient(io, "/", { multiplex: false });

io.on("connection", (socket) => {
socket.useOutgoing((event, next) => {
next(new Error("Filtering error"));
});
socket.useOutgoing((event, next) => {
done(new Error("should not happen"));
});
socket.on("join", () => {
done(new Error("should not happen"));
});
socket.on("error", (err) => {
expect(err).to.be.an(Error);
expect(err.message).to.eql("Filtering error");

success(done, io, clientSocket);
});
socket.emit("join", "woot");
});
});

it("should allow wrapping the acknowledgement callback", (done) => {
const io = new Server(0);
let run = 0;
const clientSocket = createClient(io, "/", { multiplex: false });
clientSocket.on("join", (arg1, acknowledge) => {
expect(typeof acknowledge).to.be("function");
run++;
expect(run).to.be(2);
acknowledge();
});
io.on("connection", (socket) => {
socket.useOutgoing((event, next) => {
const callback = event[event.length - 1];
expect(typeof callback).to.be("function");
event[event.length - 1] = (...args) => {
run++;
expect(run).to.be(3);
callback(...args);
};

run++;
expect(run).to.be(1);
next();
});

socket
.emitWithAck("join", "woot")
.then(() => {
run++;
expect(run).to.be(4);
success(done, io, clientSocket);
})
.catch((err) => {
if (err) done(err);
else done(new Error("acknowledgement rejected"));
});
});
});
});