From 8e427cf2c4f7022fa6a195827af5759e290cc005 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=B4mulo=20Vitoi?= Date: Fri, 17 Feb 2023 07:48:33 -0300 Subject: [PATCH] fix: close service subscriptions (#61) --- lib/gateway.js | 12 +++ lib/gateway/build-gateway.js | 1 + lib/gateway/make-resolver.js | 1 + lib/gateway/service-map.js | 2 + test/subscription.js | 189 +++++++++++++++++++++++++++++++++++ 5 files changed, 205 insertions(+) diff --git a/lib/gateway.js b/lib/gateway.js index d8107b6..2e171c3 100644 --- a/lib/gateway.js +++ b/lib/gateway.js @@ -232,6 +232,18 @@ async function createGateway (gatewayOpts, app) { context.gateway = assignApplicationLifecycleHooksToContext(assignLifeCycleHooksToContext(gateway, gateway[kGatewayHooks]), gateway[kGatewayHooks]) }) + fastifyGraphQl.addHook('onSubscriptionEnd', async (context, id) => { + if (!id || !gateway.subscriptionMap.has(id)) { + return + } + + const { serviceName, subscriptionId } = gateway.subscriptionMap.get(id) + const service = gateway.serviceMap[serviceName] + service.unsubscribe(subscriptionId) + + gateway.subscriptionMap.delete(id) + }) + return gateway } catch (e) { for (const service of Object.values(serviceMap)) { diff --git a/lib/gateway/build-gateway.js b/lib/gateway/build-gateway.js index 27da395..8e14b9a 100644 --- a/lib/gateway/build-gateway.js +++ b/lib/gateway/build-gateway.js @@ -510,6 +510,7 @@ async function buildGateway (serviceMap, gatewayOpts, app, lruGatewayResolvers) return { schema, serviceMap, + subscriptionMap: new Map(), entityResolversFactory: factory, pollingInterval: gatewayOpts.pollingInterval, serviceFn: typeof gatewayOpts.services === 'function' ? gatewayOpts.services : undefined, diff --git a/lib/gateway/make-resolver.js b/lib/gateway/make-resolver.js index 4cea017..9e394cf 100644 --- a/lib/gateway/make-resolver.js +++ b/lib/gateway/make-resolver.js @@ -605,6 +605,7 @@ function makeResolver ({ pubsub.publish.bind(pubsub), context ) + context.gateway.subscriptionMap.set(context.id, { serviceName: service.name, subscriptionId }) return pubsub.subscribe(`${service.name}_${subscriptionId}`) } diff --git a/lib/gateway/service-map.js b/lib/gateway/service-map.js index f37100d..f7708fa 100644 --- a/lib/gateway/service-map.js +++ b/lib/gateway/service-map.js @@ -169,6 +169,7 @@ async function buildServiceMap (serviceMap, services, errorHandler, log) { serviceConfig.client = client serviceConfig.createSubscription = client.createSubscription.bind(client) + serviceConfig.unsubscribe = client.unsubscribe.bind(client) } }, async init () { @@ -213,6 +214,7 @@ async function buildServiceMap (serviceMap, services, errorHandler, log) { client.connect() serviceConfig.client = client serviceConfig.createSubscription = client.createSubscription.bind(client) + serviceConfig.unsubscribe = client.unsubscribe.bind(client) } serviceMap[service.name] = serviceConfig diff --git a/test/subscription.js b/test/subscription.js index 7468364..26764ad 100644 --- a/test/subscription.js +++ b/test/subscription.js @@ -308,6 +308,195 @@ test('gateway subscription handling works correctly', t => { .then(() => runSubscription()) }) +test('gateway subscription properly closes service subscriptions', async t => { + t.plan(2) + let testService + let gateway + let client + + async function createTestService () { + testService = Fastify() + await testService.register(GQL, { + schema: buildFederationSchema(` + type Notification { + id: String! + } + type Query { + notifications: [Notification] + } + type Mutation { + addNotification(id: String!): Notification + } + type Subscription { + notificationAdded(id: String!): Notification + } + `), + resolvers: { + Query: { + notifications: () => [] + }, + Mutation: { + addNotification: async (_, args, { pubsub }) => { + const notification = { + id: args.id + } + await pubsub.publish({ + topic: 'NOTIFICATION_ADDED', + payload: { notificationAdded: notification } + }) + return notification + } + }, + Subscription: { + notificationAdded: { + subscribe: GQL.withFilter( + (_, __, { pubsub }) => { + return pubsub.subscribe('NOTIFICATION_ADDED') + }, + (payload, args) => { + t.equal(args.id, 'n2') + return args.id === payload.notificationAdded.id + } + ) + } + } + }, + subscription: true + }) + await testService.listen({ port: 0 }) + } + + async function createGatewayApp () { + const testServicePort = testService.server.address().port + + gateway = Fastify() + + await gateway.register(plugin, { + gateway: { + services: [ + { + name: 'user', + url: `http://localhost:${testServicePort}/graphql`, + wsUrl: `ws://localhost:${testServicePort}/graphql` + } + ] + }, + subscription: true, + jit: 1 + }) + + await gateway.listen({ port: 0 }) + } + + function runSubscription () { + return new Promise(resolve => { + const ws = new WebSocket( + `ws://localhost:${gateway.server.address().port}/graphql`, + 'graphql-ws' + ) + client = WebSocket.createWebSocketStream(ws, { + encoding: 'utf8', + objectMode: true + }) + + client.setEncoding('utf8') + + client.write( + JSON.stringify({ + type: 'connection_init' + }) + ) + + client.write( + JSON.stringify({ + id: 1, + type: 'start', + payload: { + query: ` + subscription { + notificationAdded(id: "n1") { + id + } + } + ` + } + }) + ) + + client.write( + JSON.stringify({ + id: 2, + type: 'start', + payload: { + query: ` + subscription { + notificationAdded(id: "n2") { + id + } + } + ` + } + }) + ) + + client.on('data', async chunk => { + const data = JSON.parse(chunk) + + if (data.type === 'connection_ack') { + client.write( + JSON.stringify({ + id: 1, + type: 'stop' + }) + ) + } else if (data.id === 1 && data.type === 'complete') { + gateway.inject({ + method: 'POST', + url: '/graphql', + body: { + query: ` + mutation { + addNotification(id: "n2") { + id + } + } + ` + } + }) + } else if (data.type === 'data') { + t.equal( + chunk, + JSON.stringify({ + type: 'data', + id: 2, + payload: { + data: { + notificationAdded: { + id: 'n2' + } + } + } + }) + ) + + await client.end() + resolve() + } + }) + }) + } + + await createTestService() + await createGatewayApp() + await runSubscription() + + t.teardown(async () => { + await client.destroy() + await gateway.close() + await testService.close() + }) +}) + test('gateway wsConnectionParams object is passed to SubscriptionClient', t => { t.plan(1)