Skip to content

Commit

Permalink
feat: Websocket support (#196)
Browse files Browse the repository at this point in the history
Adds websocket support to the node SDK.
  • Loading branch information
tjholm committed Jul 12, 2023
2 parents bee37fb + 28cb578 commit 4481889
Show file tree
Hide file tree
Showing 12 changed files with 596 additions and 3 deletions.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"license:header:check": "license-check-and-add check -f ./licenseconfig.json",
"license:check": "licensee --production",
"download:contracts": "curl -L https://github.com/nitrictech/nitric/releases/download/${npm_package_nitric}/contracts.tgz -o contracts.tgz && tar xvzf contracts.tgz && rm contracts.tgz",
"gen:proto": "yarn run download:contracts && mkdir -p ./src/gen && grpc_tools_node_protoc --ts_out=service=grpc-node,mode=grpc-js:./src/gen --js_out=import_style=commonjs,binary:./src/gen --grpc_out=grpc_js:./src/gen -I ./contracts ./contracts/**/*.proto ./contracts/proto/**/*/*.proto"
"gen:proto": "yarn run download:contracts && yarn run gen:sources",
"gen:sources": "mkdir -p ./src/gen && grpc_tools_node_protoc --ts_out=service=grpc-node,mode=grpc-js:./src/gen --js_out=import_style=commonjs,binary:./src/gen --grpc_out=grpc_js:./src/gen -I ./contracts ./contracts/**/*.proto ./contracts/proto/**/*/*.proto"
},
"contributors": [
"Jye Cusch <[email protected]>",
Expand Down Expand Up @@ -72,6 +73,7 @@
"ts-jest": "^26.4.3",
"ts-node": "^10.9.1",
"ts-protoc-gen": "^0.15.0",
"tsconfig-paths": "^4.2.0",
"tsup": "^6.5.0",
"typescript": "^4.4"
},
Expand Down
13 changes: 13 additions & 0 deletions src/api/websocket/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2021, Nitric Technologies Pty Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
14 changes: 14 additions & 0 deletions src/api/websocket/v0/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2021, Nitric Technologies Pty Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
export * from './websocket';
101 changes: 101 additions & 0 deletions src/api/websocket/v0/websocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2021, Nitric Technologies Pty Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { SERVICE_BIND } from '../../../constants';
import { WebsocketServiceClient } from '@nitric/api/proto/websocket/v1/websocket_grpc_pb';
import {
WebsocketSendRequest,
WebsocketCloseRequest,
} from '@nitric/api/proto/websocket/v1/websocket_pb';
import * as grpc from '@grpc/grpc-js';
import { fromGrpcError } from '../../errors';

/**
* Nitric websocket client, facilitates sending messages to connections on this websocket.
*/
export class Websocket {
client: WebsocketServiceClient;

constructor() {
this.client = new WebsocketServiceClient(
SERVICE_BIND,
grpc.ChannelCredentials.createInsecure()
);
}

async send(
socket: string,
connectionId: string,
message: string | Uint8Array | Record<string, any>
): Promise<void> {
let payload: Uint8Array;

// handle all message types
if (typeof message === 'string') {
payload = new TextEncoder().encode(message);
} else if (message instanceof Uint8Array) {
payload = message;
} else {
payload = new TextEncoder().encode(JSON.stringify(message));
}

const sendRequest = new WebsocketSendRequest();

sendRequest.setSocket(socket);
sendRequest.setConnectionId(connectionId);
sendRequest.setData(payload);

return new Promise((res, rej) => {
this.client.send(sendRequest, (error, data) => {
if (error) {
rej(fromGrpcError(error));
}

res();
});
});
}

async close(socket: string, connectionId: string): Promise<void> {
const closeRequest = new WebsocketCloseRequest();

closeRequest.setSocket(socket);
closeRequest.setConnectionId(connectionId);

return new Promise((res, rej) => {
this.client.close(closeRequest, (error) => {
if (error) {
rej(fromGrpcError(error));
}

res();
});
});
}
}

// Websocket client singleton
let WEBSOCKET = undefined;

/**
* Websocket API client.
*
* @returns a Websocket API client.
*/
export const websocket = (): Websocket => {
if (!WEBSOCKET) {
WEBSOCKET = new Websocket();
}

return WEBSOCKET;
};
87 changes: 87 additions & 0 deletions src/faas/v0/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
NotificationResponseContext,
TopicResponseContext,
BucketNotificationType as ProtoBucketNotificationType,
WebsocketResponseContext,
} from '@nitric/api/proto/faas/v1/faas_pb';
import * as api from '@opentelemetry/api';
import * as jspb from 'google-protobuf';
Expand Down Expand Up @@ -68,6 +69,15 @@ export abstract class TriggerContext<
return undefined;
}

/**
* Noop base context websocket method
*
* @returns undefined
*/
public get websocket(): WebsocketNotificationContext<unknown> | undefined {
return undefined;
}

/**
* Return the request object from this context.
*
Expand Down Expand Up @@ -107,6 +117,8 @@ export abstract class TriggerContext<
trigger,
options as BucketNotificationWorkerOptions
);
} else if (trigger.hasWebsocket()) {
return WebsocketNotificationContext.fromGrpcTriggerRequest(trigger);
}
throw new Error('Unsupported trigger request type');
}
Expand All @@ -118,6 +130,8 @@ export abstract class TriggerContext<
return EventContext.toGrpcTriggerResponse(ctx);
} else if (ctx.bucketNotification) {
return BucketNotificationContext.toGrpcTriggerResponse(ctx);
} else if (ctx.websocket) {
return WebsocketNotificationContext.toGrpcTriggerResponse(ctx);
}

throw new Error('Unsupported trigger context type');
Expand Down Expand Up @@ -595,3 +609,76 @@ export class FileNotificationRequest extends BucketNotificationRequest {
export interface BucketNotificationResponse {
success: boolean;
}

// WEBSOCKET NOTIFICATION CONTEXT

export class WebsocketNotificationContext<T> extends TriggerContext<
WebsocketNotificationRequest<T>,
WebsocketNotificationResponse
> {
public get websocket(): WebsocketNotificationContext<T> {
return this;
}

static fromGrpcTriggerRequest(
trigger: TriggerRequest
): WebsocketNotificationContext<any> {
const ctx = new WebsocketNotificationContext();

ctx.request = new WebsocketNotificationRequest(
trigger.getData_asU8(),
getTraceContext(trigger.getTraceContext()),
trigger.getWebsocket().getSocket(),
trigger.getWebsocket().getEvent(),
trigger.getWebsocket().getConnectionid()
);

ctx.response = {
success: true,
};

return ctx;
}

static toGrpcTriggerResponse(
ctx: TriggerContext<AbstractRequest, any>
): TriggerResponse {
const notifyCtx = ctx.websocket;
const triggerResponse = new TriggerResponse();
const notificationResponse = new WebsocketResponseContext();
notificationResponse.setSuccess(notifyCtx.res.success);
triggerResponse.setWebsocket(notificationResponse);
return triggerResponse;
}
}

export enum WebsocketNotificationType {
Connected,
Disconnected,
Message,
}

export class WebsocketNotificationRequest<T> extends AbstractRequest<T> {
public readonly socket: string;
public readonly notificationType: WebsocketNotificationType;
public readonly connectionId: string;

constructor(
data: string | Uint8Array,
traceContext: api.Context,
socket: string,
notificationType: WebsocketNotificationType,
connectionId: string
) {
super(data, traceContext);

// Get reference to the bucket
this.socket = socket;
this.notificationType = notificationType;
this.connectionId = connectionId;
}
}

export interface WebsocketNotificationResponse {
success: boolean;
}
4 changes: 4 additions & 0 deletions src/faas/v0/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import {
EventContext,
BucketNotificationContext,
FileNotificationContext,
WebsocketNotificationContext,
JSONTypes,
} from '.';

export type GenericHandler<Ctx> = (ctx: Ctx) => Promise<Ctx> | Ctx;
Expand All @@ -35,6 +37,8 @@ export type GenericMiddleware<Ctx> = (

export type TriggerMiddleware = GenericMiddleware<TriggerContext>;
export type HttpMiddleware = GenericMiddleware<HttpContext>;
export type WebsocketMiddleware<T extends JSONTypes = Record<string, any>> =
GenericMiddleware<WebsocketNotificationContext<T>>;
export type EventMiddleware<
T extends Record<string, any> = Record<string, any>
> = GenericMiddleware<EventContext<NitricEvent<T>>>;
Expand Down
43 changes: 43 additions & 0 deletions src/faas/v0/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import {
BucketNotificationWorker,
BucketNotificationConfig,
HttpWorker,
WebsocketResponseContext,
WebsocketWorker,
WebsocketEvent,
} from '@nitric/api/proto/faas/v1/faas_pb';

import {
Expand All @@ -45,6 +48,7 @@ import {
TriggerContext,
TriggerMiddleware,
FileNotificationMiddleware,
WebsocketMiddleware,
} from '.';

import newTracerProvider from './traceProvider';
Expand All @@ -59,6 +63,7 @@ import {

import * as grpc from '@grpc/grpc-js';
import { HttpWorkerOptions } from '@nitric/sdk/resources/http';
import { WebsocketWorkerOptions } from '@nitric/sdk/resources/websocket';

export class FaasWorkerOptions {}

Expand All @@ -75,6 +80,7 @@ type FaasClientOptions =
*/
export class Faas {
private httpHandler?: HttpMiddleware;
private websocketHandler?: WebsocketMiddleware;
private eventHandler?: EventMiddleware | ScheduleMiddleware;
private bucketNotificationHandler?:
| BucketNotificationMiddleware
Expand Down Expand Up @@ -108,6 +114,17 @@ export class Faas {
return this;
}

/**
* Add a websocket handler to this Faas server
*
* @param handlers the functions to call to respond to http requests
* @returns self
*/
websocket(...handlers: WebsocketMiddleware[]): Faas {
this.websocketHandler = createHandler(...handlers);
return this;
}

/**
* Add a notification handler to this Faas server
*
Expand Down Expand Up @@ -152,6 +169,18 @@ export class Faas {
return this.bucketNotificationHandler || this.anyHandler;
}

/**
* Get websocket handler for this server
*
* @returns the registered websocket handler
*/
private getWebsocketHandler():
| WebsocketMiddleware
| TriggerMiddleware
| undefined {
return this.websocketHandler || this.anyHandler;
}

/**
* Start the Faas server
*
Expand All @@ -167,6 +196,7 @@ export class Faas {
!this.httpHandler &&
!this.eventHandler &&
!this.bucketNotificationHandler &&
!this.websocketHandler &&
!this.anyHandler
) {
throw new Error('A handler function must be provided.');
Expand Down Expand Up @@ -222,6 +252,10 @@ export class Faas {
triggerType = 'Notification';
handler =
this.getBucketNotificationHandler() as GenericMiddleware<TriggerContext>;
} else if (ctx.websocket) {
triggerType = 'Websocket';
handler =
this.getWebsocketHandler() as GenericMiddleware<TriggerContext>;
} else {
console.error(
`received an unexpected trigger type, are you using an outdated version of the SDK?`
Expand Down Expand Up @@ -266,6 +300,10 @@ export class Faas {
const notificationResponse = new NotificationResponseContext();
notificationResponse.setSuccess(false);
triggerResponse.setNotification(notificationResponse);
} else if (triggerRequest.hasWebsocket()) {
const notificationResponse = new WebsocketResponseContext();
notificationResponse.setSuccess(false);
triggerResponse.setWebsocket(notificationResponse);
}
}
// Send the response back to the membrane
Expand Down Expand Up @@ -330,6 +368,11 @@ export class Faas {
const httpWorker = new HttpWorker();
httpWorker.setPort(this.options.port);
initRequest.setHttpWorker(httpWorker);
} else if (this.options instanceof WebsocketWorkerOptions) {
const websocketWorker = new WebsocketWorker();
websocketWorker.setSocket(this.options.socket);
websocketWorker.setEvent(this.options.eventType);
initRequest.setWebsocket(websocketWorker);
}
// Original faas workers should return a blank InitRequest for compatibility.

Expand Down
Loading

0 comments on commit 4481889

Please sign in to comment.