diff --git a/package.json b/package.json index 3a3d98c0..be5de870 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,8 @@ "license:header:check": "license-check-and-add check -f ./licenseconfig.json", "license:check": "licensee --production", "download:contracts": "curl -L https://github.com/nitrictech/nitric/releases/download/${npm_package_nitric}/contracts.tgz -o contracts.tgz && tar xvzf contracts.tgz && rm contracts.tgz", - "gen:proto": "yarn run download:contracts && mkdir -p ./src/gen && grpc_tools_node_protoc --ts_out=service=grpc-node,mode=grpc-js:./src/gen --js_out=import_style=commonjs,binary:./src/gen --grpc_out=grpc_js:./src/gen -I ./contracts ./contracts/**/*.proto ./contracts/proto/**/*/*.proto" + "gen:proto": "yarn run download:contracts && yarn run gen:sources", + "gen:sources": "mkdir -p ./src/gen && grpc_tools_node_protoc --ts_out=service=grpc-node,mode=grpc-js:./src/gen --js_out=import_style=commonjs,binary:./src/gen --grpc_out=grpc_js:./src/gen -I ./contracts ./contracts/**/*.proto ./contracts/proto/**/*/*.proto" }, "contributors": [ "Jye Cusch ", @@ -72,6 +73,7 @@ "ts-jest": "^26.4.3", "ts-node": "^10.9.1", "ts-protoc-gen": "^0.15.0", + "tsconfig-paths": "^4.2.0", "tsup": "^6.5.0", "typescript": "^4.4" }, diff --git a/src/api/websocket/index.ts b/src/api/websocket/index.ts new file mode 100644 index 00000000..1909a328 --- /dev/null +++ b/src/api/websocket/index.ts @@ -0,0 +1,13 @@ +// Copyright 2021, Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. diff --git a/src/api/websocket/v0/index.ts b/src/api/websocket/v0/index.ts new file mode 100644 index 00000000..dc0a9182 --- /dev/null +++ b/src/api/websocket/v0/index.ts @@ -0,0 +1,14 @@ +// Copyright 2021, Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +export * from './websocket'; diff --git a/src/api/websocket/v0/websocket.ts b/src/api/websocket/v0/websocket.ts new file mode 100644 index 00000000..2e68f25e --- /dev/null +++ b/src/api/websocket/v0/websocket.ts @@ -0,0 +1,101 @@ +// Copyright 2021, Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +import { SERVICE_BIND } from '../../../constants'; +import { WebsocketServiceClient } from '@nitric/api/proto/websocket/v1/websocket_grpc_pb'; +import { + WebsocketSendRequest, + WebsocketCloseRequest, +} from '@nitric/api/proto/websocket/v1/websocket_pb'; +import * as grpc from '@grpc/grpc-js'; +import { fromGrpcError } from '../../errors'; + +/** + * Nitric websocket client, facilitates sending messages to connections on this websocket. + */ +export class Websocket { + client: WebsocketServiceClient; + + constructor() { + this.client = new WebsocketServiceClient( + SERVICE_BIND, + grpc.ChannelCredentials.createInsecure() + ); + } + + async send( + socket: string, + connectionId: string, + message: string | Uint8Array | Record + ): Promise { + let payload: Uint8Array; + + // handle all message types + if (typeof message === 'string') { + payload = new TextEncoder().encode(message); + } else if (message instanceof Uint8Array) { + payload = message; + } else { + payload = new TextEncoder().encode(JSON.stringify(message)); + } + + const sendRequest = new WebsocketSendRequest(); + + sendRequest.setSocket(socket); + sendRequest.setConnectionId(connectionId); + sendRequest.setData(payload); + + return new Promise((res, rej) => { + this.client.send(sendRequest, (error, data) => { + if (error) { + rej(fromGrpcError(error)); + } + + res(); + }); + }); + } + + async close(socket: string, connectionId: string): Promise { + const closeRequest = new WebsocketCloseRequest(); + + closeRequest.setSocket(socket); + closeRequest.setConnectionId(connectionId); + + return new Promise((res, rej) => { + this.client.close(closeRequest, (error) => { + if (error) { + rej(fromGrpcError(error)); + } + + res(); + }); + }); + } +} + +// Websocket client singleton +let WEBSOCKET = undefined; + +/** + * Websocket API client. + * + * @returns a Websocket API client. + */ +export const websocket = (): Websocket => { + if (!WEBSOCKET) { + WEBSOCKET = new Websocket(); + } + + return WEBSOCKET; +}; diff --git a/src/faas/v0/context.ts b/src/faas/v0/context.ts index f59835b3..7ef161ff 100644 --- a/src/faas/v0/context.ts +++ b/src/faas/v0/context.ts @@ -20,6 +20,7 @@ import { NotificationResponseContext, TopicResponseContext, BucketNotificationType as ProtoBucketNotificationType, + WebsocketResponseContext, } from '@nitric/api/proto/faas/v1/faas_pb'; import * as api from '@opentelemetry/api'; import * as jspb from 'google-protobuf'; @@ -68,6 +69,15 @@ export abstract class TriggerContext< return undefined; } + /** + * Noop base context websocket method + * + * @returns undefined + */ + public get websocket(): WebsocketNotificationContext | undefined { + return undefined; + } + /** * Return the request object from this context. * @@ -107,6 +117,8 @@ export abstract class TriggerContext< trigger, options as BucketNotificationWorkerOptions ); + } else if (trigger.hasWebsocket()) { + return WebsocketNotificationContext.fromGrpcTriggerRequest(trigger); } throw new Error('Unsupported trigger request type'); } @@ -118,6 +130,8 @@ export abstract class TriggerContext< return EventContext.toGrpcTriggerResponse(ctx); } else if (ctx.bucketNotification) { return BucketNotificationContext.toGrpcTriggerResponse(ctx); + } else if (ctx.websocket) { + return WebsocketNotificationContext.toGrpcTriggerResponse(ctx); } throw new Error('Unsupported trigger context type'); @@ -595,3 +609,76 @@ export class FileNotificationRequest extends BucketNotificationRequest { export interface BucketNotificationResponse { success: boolean; } + +// WEBSOCKET NOTIFICATION CONTEXT + +export class WebsocketNotificationContext extends TriggerContext< + WebsocketNotificationRequest, + WebsocketNotificationResponse +> { + public get websocket(): WebsocketNotificationContext { + return this; + } + + static fromGrpcTriggerRequest( + trigger: TriggerRequest + ): WebsocketNotificationContext { + const ctx = new WebsocketNotificationContext(); + + ctx.request = new WebsocketNotificationRequest( + trigger.getData_asU8(), + getTraceContext(trigger.getTraceContext()), + trigger.getWebsocket().getSocket(), + trigger.getWebsocket().getEvent(), + trigger.getWebsocket().getConnectionid() + ); + + ctx.response = { + success: true, + }; + + return ctx; + } + + static toGrpcTriggerResponse( + ctx: TriggerContext + ): TriggerResponse { + const notifyCtx = ctx.websocket; + const triggerResponse = new TriggerResponse(); + const notificationResponse = new WebsocketResponseContext(); + notificationResponse.setSuccess(notifyCtx.res.success); + triggerResponse.setWebsocket(notificationResponse); + return triggerResponse; + } +} + +export enum WebsocketNotificationType { + Connected, + Disconnected, + Message, +} + +export class WebsocketNotificationRequest extends AbstractRequest { + public readonly socket: string; + public readonly notificationType: WebsocketNotificationType; + public readonly connectionId: string; + + constructor( + data: string | Uint8Array, + traceContext: api.Context, + socket: string, + notificationType: WebsocketNotificationType, + connectionId: string + ) { + super(data, traceContext); + + // Get reference to the bucket + this.socket = socket; + this.notificationType = notificationType; + this.connectionId = connectionId; + } +} + +export interface WebsocketNotificationResponse { + success: boolean; +} diff --git a/src/faas/v0/handler.ts b/src/faas/v0/handler.ts index 816dd3c1..a3b59cb2 100644 --- a/src/faas/v0/handler.ts +++ b/src/faas/v0/handler.ts @@ -18,6 +18,8 @@ import { EventContext, BucketNotificationContext, FileNotificationContext, + WebsocketNotificationContext, + JSONTypes, } from '.'; export type GenericHandler = (ctx: Ctx) => Promise | Ctx; @@ -35,6 +37,8 @@ export type GenericMiddleware = ( export type TriggerMiddleware = GenericMiddleware; export type HttpMiddleware = GenericMiddleware; +export type WebsocketMiddleware> = + GenericMiddleware>; export type EventMiddleware< T extends Record = Record > = GenericMiddleware>>; diff --git a/src/faas/v0/start.ts b/src/faas/v0/start.ts index cc51d423..28a3f75b 100644 --- a/src/faas/v0/start.ts +++ b/src/faas/v0/start.ts @@ -33,6 +33,9 @@ import { BucketNotificationWorker, BucketNotificationConfig, HttpWorker, + WebsocketResponseContext, + WebsocketWorker, + WebsocketEvent, } from '@nitric/api/proto/faas/v1/faas_pb'; import { @@ -45,6 +48,7 @@ import { TriggerContext, TriggerMiddleware, FileNotificationMiddleware, + WebsocketMiddleware, } from '.'; import newTracerProvider from './traceProvider'; @@ -59,6 +63,7 @@ import { import * as grpc from '@grpc/grpc-js'; import { HttpWorkerOptions } from '@nitric/sdk/resources/http'; +import { WebsocketWorkerOptions } from '@nitric/sdk/resources/websocket'; export class FaasWorkerOptions {} @@ -75,6 +80,7 @@ type FaasClientOptions = */ export class Faas { private httpHandler?: HttpMiddleware; + private websocketHandler?: WebsocketMiddleware; private eventHandler?: EventMiddleware | ScheduleMiddleware; private bucketNotificationHandler?: | BucketNotificationMiddleware @@ -108,6 +114,17 @@ export class Faas { return this; } + /** + * Add a websocket handler to this Faas server + * + * @param handlers the functions to call to respond to http requests + * @returns self + */ + websocket(...handlers: WebsocketMiddleware[]): Faas { + this.websocketHandler = createHandler(...handlers); + return this; + } + /** * Add a notification handler to this Faas server * @@ -152,6 +169,18 @@ export class Faas { return this.bucketNotificationHandler || this.anyHandler; } + /** + * Get websocket handler for this server + * + * @returns the registered websocket handler + */ + private getWebsocketHandler(): + | WebsocketMiddleware + | TriggerMiddleware + | undefined { + return this.websocketHandler || this.anyHandler; + } + /** * Start the Faas server * @@ -167,6 +196,7 @@ export class Faas { !this.httpHandler && !this.eventHandler && !this.bucketNotificationHandler && + !this.websocketHandler && !this.anyHandler ) { throw new Error('A handler function must be provided.'); @@ -222,6 +252,10 @@ export class Faas { triggerType = 'Notification'; handler = this.getBucketNotificationHandler() as GenericMiddleware; + } else if (ctx.websocket) { + triggerType = 'Websocket'; + handler = + this.getWebsocketHandler() as GenericMiddleware; } else { console.error( `received an unexpected trigger type, are you using an outdated version of the SDK?` @@ -266,6 +300,10 @@ export class Faas { const notificationResponse = new NotificationResponseContext(); notificationResponse.setSuccess(false); triggerResponse.setNotification(notificationResponse); + } else if (triggerRequest.hasWebsocket()) { + const notificationResponse = new WebsocketResponseContext(); + notificationResponse.setSuccess(false); + triggerResponse.setWebsocket(notificationResponse); } } // Send the response back to the membrane @@ -330,6 +368,11 @@ export class Faas { const httpWorker = new HttpWorker(); httpWorker.setPort(this.options.port); initRequest.setHttpWorker(httpWorker); + } else if (this.options instanceof WebsocketWorkerOptions) { + const websocketWorker = new WebsocketWorker(); + websocketWorker.setSocket(this.options.socket); + websocketWorker.setEvent(this.options.eventType); + initRequest.setWebsocket(websocketWorker); } // Original faas workers should return a blank InitRequest for compatibility. diff --git a/src/resources/http.ts b/src/resources/http.ts index 3bd90e9c..775f3a5a 100644 --- a/src/resources/http.ts +++ b/src/resources/http.ts @@ -23,15 +23,22 @@ interface NodeApplication { listen: ListenerFunction; } +// eslint-disable-next-line +const NO_OP = () => {}; + export class HttpWorkerOptions { public readonly app: NodeApplication; public readonly port: number; public readonly callback: () => void; - constructor(app: NodeApplication, port: number, callback?: () => void) { + constructor( + app: NodeApplication, + port: number, + callback: () => void = NO_OP + ) { this.app = app; this.port = port; - this.callback = callback || (() => {}); + this.callback = callback; } } diff --git a/src/resources/index.ts b/src/resources/index.ts index e46e8d0f..ab9df236 100644 --- a/src/resources/index.ts +++ b/src/resources/index.ts @@ -19,3 +19,4 @@ export * from './bucket'; export * from './schedule'; export * from './secret'; export * from './http'; +export * from './websocket'; diff --git a/src/resources/websocket.test.ts b/src/resources/websocket.test.ts new file mode 100644 index 00000000..933c2628 --- /dev/null +++ b/src/resources/websocket.test.ts @@ -0,0 +1,130 @@ +// Copyright 2021, Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { ResourceServiceClient } from '@nitric/api/proto/resource/v1/resource_grpc_pb'; +import { UnimplementedError } from '../api/errors'; +import { websocket } from '.'; +import { ResourceDeclareResponse } from '@nitric/api/proto/resource/v1/resource_pb'; +import * as faas from '../faas/index'; + +jest.mock('../faas/index'); + +describe('Registering websocket resources', () => { + describe('Given declare returns an error from the resource server', () => { + const MOCK_ERROR = { + code: 2, + message: 'UNIMPLEMENTED', + }; + + const validName = 'my-websocket'; + let declareSpy; + + beforeAll(() => { + declareSpy = jest + .spyOn(ResourceServiceClient.prototype, 'declare') + .mockImplementationOnce((request, callback: any) => { + callback(MOCK_ERROR, null); + + return null as any; + }); + }); + + afterAll(() => { + declareSpy.mockClear(); + }); + + it('Should throw the error', async () => { + await expect(websocket(validName)['registerPromise']).rejects.toEqual( + new UnimplementedError('UNIMPLEMENTED') + ); + }); + + it('Should call the resource server', () => { + expect(declareSpy).toBeCalledTimes(1); + }); + }); + + describe('Given declare succeeds on the resource server', () => { + describe('When the service succeeds', () => { + const validName = 'my-websocket2'; + let otherSpy; + + beforeAll(() => { + otherSpy = jest + .spyOn(ResourceServiceClient.prototype, 'declare') + .mockImplementation((request, callback: any) => { + const response = new ResourceDeclareResponse(); + callback(null, response); + return null as any; + }); + }); + + afterAll(() => { + jest.resetAllMocks(); + }); + + it('Should succeed', async () => { + await expect( + websocket(validName)['registerPromise'] + ).resolves.not.toBeNull(); + }); + + it('Should call the resource server twice', () => { + expect(otherSpy).toBeCalledTimes(2); + }); + }); + }); + + describe('Given a topic is already registered', () => { + const websocketName = 'already-exists'; + let websocketResource; + let existsSpy; + + beforeEach(() => { + // ensure a success is returned and calls can be counted. + existsSpy = jest + .spyOn(ResourceServiceClient.prototype, 'declare') + .mockImplementation((request, callback: any) => { + const response = new ResourceDeclareResponse(); + callback(null, response); + return null as any; + }); + + // register the resource for the first time + websocketResource = websocket(websocketName); + }); + + afterEach(() => { + jest.resetAllMocks(); + }); + + describe('When registering a topic with the same name', () => { + let secondWebsocket; + + beforeEach(() => { + // make sure the initial registration isn't counted for these tests. + existsSpy.mockClear(); + secondWebsocket = websocket(websocketName); + }); + + it('Should not call the server again', () => { + expect(existsSpy).not.toBeCalled(); + }); + + it('Should return the same resource object', () => { + expect(websocketResource === secondWebsocket).toEqual(true); + }); + }); + }); +}); diff --git a/src/resources/websocket.ts b/src/resources/websocket.ts new file mode 100644 index 00000000..76113f56 --- /dev/null +++ b/src/resources/websocket.ts @@ -0,0 +1,182 @@ +// Copyright 2021, Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +import { fromGrpcError } from '../api/errors'; +import { Faas, JSONTypes, WebsocketMiddleware } from '../faas'; +import { + Websocket as WsClient, + websocket as wsClient, +} from '../api/websocket/v0'; +import { WebsocketEvent } from '../gen/proto/faas/v1/faas_pb'; +import { + Action, + PolicyResource, + Resource, + ResourceDeclareRequest, + ResourceDetailsResponse, + ResourceType, +} from '../gen/proto/resource/v1/resource_pb'; +import resourceClient from './client'; +import { make, Resource as Base } from './common'; + +const WebsocketEventTypeMap = { + connect: WebsocketEvent.CONNECT, + disconnect: WebsocketEvent.DISCONNECT, + message: WebsocketEvent.MESSAGE, +}; + +type WebsocketEventType = keyof typeof WebsocketEventTypeMap; + +export class WebsocketWorkerOptions { + public readonly socket: string; + public readonly eventType: (typeof WebsocketEventTypeMap)[WebsocketEventType]; + + constructor(socket: string, eventType: WebsocketEventType) { + this.socket = socket; + this.eventType = WebsocketEventTypeMap[eventType]; + } +} + +export class Websocket { + private readonly faas: Faas; + + constructor( + socket: string, + eventType: WebsocketEventType, + ...middleware: WebsocketMiddleware[] + ) { + this.faas = new Faas(new WebsocketWorkerOptions(socket, eventType)); + this.faas.websocket(...middleware); + } + + private async start(): Promise { + return this.faas.start(); + } +} + +/** + * Websocket resource for bi-di HTTP communication. + */ +export class WebsocketResource extends Base { + private readonly wsClient: WsClient; + + constructor(name: string) { + super(name); + this.wsClient = wsClient(); + } + + /** + * Register this websocket as a required resource for the calling function/container. + * + * @returns a promise that resolves when the registration is complete + */ + protected async register(): Promise { + const req = new ResourceDeclareRequest(); + const resource = new Resource(); + resource.setName(this.name); + resource.setType(ResourceType.WEBSOCKET); + + req.setResource(resource); + + const res = await new Promise((resolve, reject) => { + resourceClient.declare(req, (error, _: ResourceDeclareRequest) => { + if (error) { + reject(fromGrpcError(error)); + } else { + resolve(resource); + } + }); + }); + + const defaultPrincipal = new Resource(); + defaultPrincipal.setType(ResourceType.FUNCTION); + + const policyResource = new Resource(); + policyResource.setType(ResourceType.POLICY); + const policyReq = new ResourceDeclareRequest(); + const policy = new PolicyResource(); + policy.setActionsList([Action.WEBSOCKETMANAGE]); + policy.setPrincipalsList([defaultPrincipal]); + policy.setResourcesList([resource]); + policyReq.setPolicy(policy); + policyReq.setResource(policyResource); + + await new Promise((resolve, reject) => { + resourceClient.declare(policyReq, (error, _: ResourceDeclareRequest) => { + if (error) { + reject(fromGrpcError(error)); + } else { + resolve(resource); + } + }); + }); + + return res; + } + + async send( + connectionId: string, + // TODO: add less raw data types + data: string | Uint8Array | Record + ): Promise { + await this.wsClient.send(this.name, connectionId, data); + } + + async close(connectionId: string): Promise { + await this.wsClient.close(this.name, connectionId); + } + + /** + * Retrieves the Invocation URL of this Websocket at runtime. + * + * @returns Promise that returns the URL of this Websocket + */ + async url(): Promise { + const { + details: { url }, + } = await this.details(); + + return url; + } + + /** + * Register and start a websocket event handler that will be called for all matching events on this websocket + * + * @param eventType the notification type that should trigger the middleware, either 'connect', 'disconnect' or 'message' + * @param middleware handler middleware which will be run for every incoming event + * @returns Promise which resolves when the handler server terminates + */ + on>( + eventType: WebsocketEventType, + ...middleware: WebsocketMiddleware[] + ): Promise { + const notification = new Websocket(this.name, eventType, ...middleware); + return notification['start'](); + } + + protected resourceType() { + return ResourceType.WEBSOCKET; + } + + protected unwrapDetails(resp: ResourceDetailsResponse) { + if (resp.hasWebsocket()) { + return { + url: resp.getWebsocket().getUrl(), + }; + } + + throw new Error('Unexpected details in response. Expected API details'); + } +} + +export const websocket = make(WebsocketResource); diff --git a/yarn.lock b/yarn.lock index 3358cf90..d47c6b64 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6348,6 +6348,15 @@ ts-protoc-gen@^0.15.0: dependencies: google-protobuf "^3.15.5" +tsconfig-paths@^4.2.0: + version "4.2.0" + resolved "https://registry.yarnpkg.com/tsconfig-paths/-/tsconfig-paths-4.2.0.tgz#ef78e19039133446d244beac0fd6a1632e2d107c" + integrity sha512-NoZ4roiN7LnbKn9QqE1amc9DJfzvZXxF4xDavcOWt1BPkdx+m+0gJuPM+S0vCe7zTJMYUP0R8pO2XMr+Y8oLIg== + dependencies: + json5 "^2.2.2" + minimist "^1.2.6" + strip-bom "^3.0.0" + tslib@^1.8.1: version "1.14.1" resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00"