Skip to content

Commit

Permalink
Merge pull request #70 from nitrictech/feature/multiple-handlers
Browse files Browse the repository at this point in the history
feat: allow multiple handlers in faas functions
  • Loading branch information
tjholm committed Oct 10, 2021
2 parents efc414a + cbffb6e commit 72cfe34
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 39 deletions.
85 changes: 47 additions & 38 deletions src/faas/v0/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,21 @@ import {
HeaderValue,
TriggerResponse,
TopicResponseContext,
InitRequest
InitRequest,
} from '@nitric/api/proto/faas/v1/faas_pb';

import { EventMiddleware, GenericMiddleware, HttpMiddleware, TriggerContext, TriggerMiddleware } from '.';

import {
createHandler,
EventMiddleware,
GenericMiddleware,
HttpContext,
HttpMiddleware,
TriggerContext,
TriggerMiddleware,
} from '.';

/**
*
*
*/
class Faas {
private httpHandler?: HttpMiddleware;
Expand All @@ -39,16 +46,16 @@ class Faas {
/**
* Add an event handler to this Faas server
*/
event(handler: EventMiddleware): Faas {
this.eventHandler = handler;
event(...handlers: EventMiddleware[]): Faas {
this.eventHandler = createHandler(...handlers);
return this;
}

/**
* Add a http handler to this Faas server
*/
http(handler: HttpMiddleware): Faas {
this.httpHandler = handler;
http(...handlers: HttpMiddleware[]): Faas {
this.httpHandler = createHandler(...handlers);
return this;
}

Expand All @@ -69,21 +76,21 @@ class Faas {
/**
* Start the Faas server
*/
async start(handler?: TriggerMiddleware): Promise<void> {
this.anyHandler = handler;
if(!this.httpHandler && !this.eventHandler && !this.anyHandler) {
throw new Error("A handler function must be provided.");
async start(...handlers: TriggerMiddleware[]): Promise<void> {
this.anyHandler = handlers.length && createHandler(...handlers);
if (!this.httpHandler && !this.eventHandler && !this.anyHandler) {
throw new Error('A handler function must be provided.');
}

// Actually start the server...
const faasClient = new FaasServiceClient(
SERVICE_BIND,
grpc.ChannelCredentials.createInsecure()
);

// Begin Bi-Di streaming
const faasStream = faasClient.triggerStream();

faasStream.on('data', async (message: ServerMessage) => {
// We have an init response from the membrane
if (message.hasInitResponse()) {
Expand All @@ -94,39 +101,43 @@ class Faas {
// We want to handle a function here...
const triggerRequest = message.getTriggerRequest();
const responseMessage = new ClientMessage();

responseMessage.setId(message.getId());

try {
const ctx = TriggerContext.fromGrpcTriggerRequest(triggerRequest);

let handler: GenericMiddleware<TriggerContext> = undefined;
let triggerType = "Unknown";
let triggerType = 'Unknown';
if (ctx.http) {
triggerType = "HTTP";
triggerType = 'HTTP';
handler = this.getHttpHandler() as GenericMiddleware<TriggerContext>;
} else if (ctx.event) {
triggerType = "Event";
triggerType = 'Event';
handler = this.getEventHandler() as GenericMiddleware<TriggerContext>;
} else {
console.error(`received an unexpected trigger type, are you using an outdated version of the SDK?`);
console.error(
`received an unexpected trigger type, are you using an outdated version of the SDK?`
);
}
if(!handler) {

if (!handler) {
// No handler defined for the trigger type received.
console.error(`no handler defined for ${triggerType} triggers`);
faasStream.cancel();
}

const result = await handler(ctx, async (ctx) => ctx);
responseMessage.setTriggerResponse(TriggerContext.toGrpcTriggerResponse(result));
responseMessage.setTriggerResponse(
TriggerContext.toGrpcTriggerResponse(result)
);
} catch (e) {
console.error(e);
// generic error handling
console.error(e);
const triggerResponse = new TriggerResponse();
responseMessage.setTriggerResponse(triggerResponse);

if (triggerRequest.hasHttp()) {
const httpResponse = new HttpResponseContext();
triggerResponse.setHttp(httpResponse);
Expand All @@ -140,7 +151,7 @@ class Faas {
triggerResponse.setData('Internal Server Error');
} else if (triggerRequest.hasTopic()) {
const topicResponse = new TopicResponseContext();

topicResponse.setSuccess(false);
triggerResponse.setTopic(topicResponse);
triggerResponse.setData('Internal Server Error');
Expand All @@ -150,13 +161,13 @@ class Faas {
faasStream.write(responseMessage);
}
});

// Let the membrane know we're ready to start
const initRequest = new InitRequest();
const initMessage = new ClientMessage();
initMessage.setInitRequest(initRequest);
faasStream.write(initMessage);

// Block until the stream has closed...
await new Promise<void>((res) => {
// The server has determined this stream must close
Expand All @@ -173,25 +184,23 @@ let INSTANCE: Faas = undefined;

const getInstance = (): Faas => {
INSTANCE = INSTANCE || new Faas();
return INSTANCE;
}

return INSTANCE;
};

/**
* Register a HTTP handler
*/
export const http = (handler: HttpMiddleware): Faas =>
getInstance().http(handler);

export const http = (...handlers: HttpMiddleware[]): Faas =>
getInstance().http(...handlers);

/**
* Register an event handler
*/
export const event = (handler: EventMiddleware): Faas =>
getInstance().event(handler);
export const event = (...handlers: EventMiddleware[]): Faas =>
getInstance().event(...handlers);

/**
* Start the FaaS server with a universal handler
*/
export const start = async (handler: TriggerMiddleware): Promise<void> =>
await getInstance().start(handler);
export const start = async (...handlers: TriggerMiddleware[]): Promise<void> =>
await getInstance().start(...handlers);
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"esModuleInterop": true,
"declaration": true,
"module": "commonjs",
"target": "ES5",
"target": "es6",
"outDir": "lib",
"allowJs": true,
"types": ["node", "jest"],
Expand Down

0 comments on commit 72cfe34

Please sign in to comment.