Skip to content

Commit

Permalink
fix: close service subscriptions (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
RomuloVitoi committed Feb 17, 2023
1 parent 6295c58 commit 8e427cf
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 0 deletions.
12 changes: 12 additions & 0 deletions lib/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,18 @@ async function createGateway (gatewayOpts, app) {
context.gateway = assignApplicationLifecycleHooksToContext(assignLifeCycleHooksToContext(gateway, gateway[kGatewayHooks]), gateway[kGatewayHooks])
})

fastifyGraphQl.addHook('onSubscriptionEnd', async (context, id) => {
if (!id || !gateway.subscriptionMap.has(id)) {
return
}

const { serviceName, subscriptionId } = gateway.subscriptionMap.get(id)
const service = gateway.serviceMap[serviceName]
service.unsubscribe(subscriptionId)

gateway.subscriptionMap.delete(id)
})

return gateway
} catch (e) {
for (const service of Object.values(serviceMap)) {
Expand Down
1 change: 1 addition & 0 deletions lib/gateway/build-gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ async function buildGateway (serviceMap, gatewayOpts, app, lruGatewayResolvers)
return {
schema,
serviceMap,
subscriptionMap: new Map(),
entityResolversFactory: factory,
pollingInterval: gatewayOpts.pollingInterval,
serviceFn: typeof gatewayOpts.services === 'function' ? gatewayOpts.services : undefined,
Expand Down
1 change: 1 addition & 0 deletions lib/gateway/make-resolver.js
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ function makeResolver ({
pubsub.publish.bind(pubsub),
context
)
context.gateway.subscriptionMap.set(context.id, { serviceName: service.name, subscriptionId })
return pubsub.subscribe(`${service.name}_${subscriptionId}`)
}

Expand Down
2 changes: 2 additions & 0 deletions lib/gateway/service-map.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ async function buildServiceMap (serviceMap, services, errorHandler, log) {
serviceConfig.client = client
serviceConfig.createSubscription =
client.createSubscription.bind(client)
serviceConfig.unsubscribe = client.unsubscribe.bind(client)
}
},
async init () {
Expand Down Expand Up @@ -213,6 +214,7 @@ async function buildServiceMap (serviceMap, services, errorHandler, log) {
client.connect()
serviceConfig.client = client
serviceConfig.createSubscription = client.createSubscription.bind(client)
serviceConfig.unsubscribe = client.unsubscribe.bind(client)
}

serviceMap[service.name] = serviceConfig
Expand Down
189 changes: 189 additions & 0 deletions test/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,195 @@ test('gateway subscription handling works correctly', t => {
.then(() => runSubscription())
})

test('gateway subscription properly closes service subscriptions', async t => {
t.plan(2)
let testService
let gateway
let client

async function createTestService () {
testService = Fastify()
await testService.register(GQL, {
schema: buildFederationSchema(`
type Notification {
id: String!
}
type Query {
notifications: [Notification]
}
type Mutation {
addNotification(id: String!): Notification
}
type Subscription {
notificationAdded(id: String!): Notification
}
`),
resolvers: {
Query: {
notifications: () => []
},
Mutation: {
addNotification: async (_, args, { pubsub }) => {
const notification = {
id: args.id
}
await pubsub.publish({
topic: 'NOTIFICATION_ADDED',
payload: { notificationAdded: notification }
})
return notification
}
},
Subscription: {
notificationAdded: {
subscribe: GQL.withFilter(
(_, __, { pubsub }) => {
return pubsub.subscribe('NOTIFICATION_ADDED')
},
(payload, args) => {
t.equal(args.id, 'n2')
return args.id === payload.notificationAdded.id
}
)
}
}
},
subscription: true
})
await testService.listen({ port: 0 })
}

async function createGatewayApp () {
const testServicePort = testService.server.address().port

gateway = Fastify()

await gateway.register(plugin, {
gateway: {
services: [
{
name: 'user',
url: `http://localhost:${testServicePort}/graphql`,
wsUrl: `ws://localhost:${testServicePort}/graphql`
}
]
},
subscription: true,
jit: 1
})

await gateway.listen({ port: 0 })
}

function runSubscription () {
return new Promise(resolve => {
const ws = new WebSocket(
`ws://localhost:${gateway.server.address().port}/graphql`,
'graphql-ws'
)
client = WebSocket.createWebSocketStream(ws, {
encoding: 'utf8',
objectMode: true
})

client.setEncoding('utf8')

client.write(
JSON.stringify({
type: 'connection_init'
})
)

client.write(
JSON.stringify({
id: 1,
type: 'start',
payload: {
query: `
subscription {
notificationAdded(id: "n1") {
id
}
}
`
}
})
)

client.write(
JSON.stringify({
id: 2,
type: 'start',
payload: {
query: `
subscription {
notificationAdded(id: "n2") {
id
}
}
`
}
})
)

client.on('data', async chunk => {
const data = JSON.parse(chunk)

if (data.type === 'connection_ack') {
client.write(
JSON.stringify({
id: 1,
type: 'stop'
})
)
} else if (data.id === 1 && data.type === 'complete') {
gateway.inject({
method: 'POST',
url: '/graphql',
body: {
query: `
mutation {
addNotification(id: "n2") {
id
}
}
`
}
})
} else if (data.type === 'data') {
t.equal(
chunk,
JSON.stringify({
type: 'data',
id: 2,
payload: {
data: {
notificationAdded: {
id: 'n2'
}
}
}
})
)

await client.end()
resolve()
}
})
})
}

await createTestService()
await createGatewayApp()
await runSubscription()

t.teardown(async () => {
await client.destroy()
await gateway.close()
await testService.close()
})
})

test('gateway wsConnectionParams object is passed to SubscriptionClient', t => {
t.plan(1)

Expand Down

0 comments on commit 8e427cf

Please sign in to comment.