diff --git a/lib/socket.ts b/lib/socket.ts index d37e099017..6e7dbfd6e6 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -236,6 +236,8 @@ export class Socket< private readonly adapter: Adapter; private acks: Map void> = new Map(); private fns: Array<(event: Event, next: (err?: Error) => void) => void> = []; + private outFns: Array<(event: Event, next: (err?: Error) => void) => void> = + []; private flags: BroadcastFlags = {}; private _anyListeners?: Array<(...args: any[]) => void>; private _anyOutgoingListeners?: Array<(...args: any[]) => void>; @@ -337,32 +339,39 @@ export class Socket< type: PacketType.EVENT, data: data, }; + // running the middlewares for outgoing events + this.runOutgoing(data, (err) => { + // if error, abbort the event + if (err) { + this.emitReserved("error", err); + return; + } - // access last argument to see if it's an ACK callback - if (typeof data[data.length - 1] === "function") { - const id = this.nsp._ids++; - debug("emitting packet with ack id %d", id); - - this.registerAckCallback(id, data.pop()); - packet.id = id; - } + // access last argument to see if it's an ACK callback + if (typeof data[data.length - 1] === "function") { + const id = this.nsp._ids++; + debug("emitting packet with ack id %d", id); - const flags = Object.assign({}, this.flags); - this.flags = {}; + this.registerAckCallback(id, data.pop()); + packet.id = id; + } - // @ts-ignore - if (this.nsp.server.opts.connectionStateRecovery) { - // this ensures the packet is stored and can be transmitted upon reconnection - this.adapter.broadcast(packet, { - rooms: new Set([this.id]), - except: new Set(), - flags, - }); - } else { - this.notifyOutgoingListeners(packet); - this.packet(packet, flags); - } + const flags = Object.assign({}, this.flags); + this.flags = {}; + // @ts-ignore + if (this.nsp.server.opts.connectionStateRecovery) { + // this ensures the packet is stored and can be transmitted upon reconnection + this.adapter.broadcast(packet, { + rooms: new Set([this.id]), + except: new Set(), + flags, + }); + } else { + this.notifyOutgoingListeners(packet); + this.packet(packet, flags); + } + }); return true; } @@ -954,6 +963,34 @@ export class Socket< return this; } + /** + * Sets up a socketmiddleware for outgoing requests. + * + * Can be used to hook right before acknowlegement callbacks get ran. + * @example + * io.on("connection", (socket) => { + * socket.useOutgoing((event, next) => { + * if (event.length && typeof event[event.length - 1] === 'function') { + * const callback = event[event.length - 1]; + * event[event.length - 1] = (...args) => { + * // able to hook right before the acknowledgement callback is executed. + * callback(...args); + * } + * } + * // do not forget to call next + * next(); + * }); + * }); + * @param {Function} fn - middleware function (event, next) + * @returns {Socket} self + */ + public useOutgoing( + fn: (event: Event, next: (err?: Error) => void) => void + ): this { + this.outFns.push(fn); + return this; + } + /** * Executes the middleware for an incoming event. * @@ -981,6 +1018,33 @@ export class Socket< run(0); } + /** + * Executes the middleware for an outgoing event. + * + * @param {Array} event - event that will get emitted to client + * @param {Function} fn - last fn call in the middleware + * @private + */ + private runOutgoing(event, fn: (err: Error | null) => void): void { + const outFns = this.outFns.slice(0); + if (!outFns.length) return fn(null); + + function run(i: number) { + outFns[i](event, function (err) { + // upon error, short-circuit + if (err) return fn(err); + + // if no middleware left, summon callback + if (!outFns[i + 1]) return fn(null); + + // go on to next + run(i + 1); + }); + } + + run(0); + } + /** * Whether the socket is currently disconnected */ diff --git a/test/index.ts b/test/index.ts index 7e21589f62..de32dd8ebc 100644 --- a/test/index.ts +++ b/test/index.ts @@ -16,6 +16,7 @@ describe("socket.io", () => { require("./messaging-many"); require("./middleware"); require("./socket-middleware"); + require("./socket-outgoing-middleware"); require("./v2-compatibility"); require("./socket-timeout"); require("./uws"); diff --git a/test/socket-outgoing-middleware.ts b/test/socket-outgoing-middleware.ts new file mode 100644 index 0000000000..fe5ecdb8a7 --- /dev/null +++ b/test/socket-outgoing-middleware.ts @@ -0,0 +1,97 @@ +import { Server } from ".."; +import expect from "expect.js"; +import { success, createClient } from "./support/util"; + +describe("socket outgoing middleware", () => { + it("should call functions before sending event", (done) => { + const io = new Server(0); + let run = 0; + const clientSocket = createClient(io, "/", { multiplex: false }); + io.on("connection", (socket) => { + socket.useOutgoing((event, next) => { + expect(event).to.eql(["join", "woot"]); + event.unshift("wrap"); + run++; + next(); + }); + socket.useOutgoing((event, next) => { + expect(event).to.eql(["wrap", "join", "woot"]); + run++; + next(); + }); + socket.onAnyOutgoing((arg1, arg2, arg3) => { + expect(arg1).to.be("wrap"); + expect(arg2).to.be("join"); + expect(arg3).to.be("woot"); + expect(run).to.be(2); + + success(done, io, clientSocket); + }); + + socket.emit("join", "woot"); + }); + }); + + it("should pass errors", (done) => { + const io = new Server(0); + const clientSocket = createClient(io, "/", { multiplex: false }); + + io.on("connection", (socket) => { + socket.useOutgoing((event, next) => { + next(new Error("Filtering error")); + }); + socket.useOutgoing((event, next) => { + done(new Error("should not happen")); + }); + socket.on("join", () => { + done(new Error("should not happen")); + }); + socket.on("error", (err) => { + expect(err).to.be.an(Error); + expect(err.message).to.eql("Filtering error"); + + success(done, io, clientSocket); + }); + socket.emit("join", "woot"); + }); + }); + + it("should allow wrapping the acknowledgement callback", (done) => { + const io = new Server(0); + let run = 0; + const clientSocket = createClient(io, "/", { multiplex: false }); + clientSocket.on("join", (arg1, acknowledge) => { + expect(typeof acknowledge).to.be("function"); + run++; + expect(run).to.be(2); + acknowledge(); + }); + io.on("connection", (socket) => { + socket.useOutgoing((event, next) => { + const callback = event[event.length - 1]; + expect(typeof callback).to.be("function"); + event[event.length - 1] = (...args) => { + run++; + expect(run).to.be(3); + callback(...args); + }; + + run++; + expect(run).to.be(1); + next(); + }); + + socket + .emitWithAck("join", "woot") + .then(() => { + run++; + expect(run).to.be(4); + success(done, io, clientSocket); + }) + .catch((err) => { + if (err) done(err); + else done(new Error("acknowledgement rejected")); + }); + }); + }); +});