Skip to content

Commit

Permalink
feat: bucket notifications (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjholm committed May 9, 2023
2 parents 42a806f + 6c7c759 commit 35eee72
Show file tree
Hide file tree
Showing 17 changed files with 8,357 additions and 776 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@nitric/sdk",
"description": "Nitric NodeJS client sdk",
"nitric": "v0.22.0",
"nitric": "v0.27.0-rc.4",
"author": "Nitric <https://github.com/nitrictech>",
"repository": "https://github.com/nitrictech/node-sdk",
"main": "lib/index.js",
Expand Down
69 changes: 69 additions & 0 deletions src/api/storage/v0/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import {
File,
} from '@nitric/api/proto/storage/v1/storage_pb';
import { UnimplementedError } from '../../errors';
import { BucketNotificationWorkerOptions, bucket } from '@nitric/sdk/resources';
import { faas } from '@nitric/sdk';

describe('Storage Client Tests', () => {
describe('Given nitric.api.storage.StorageClient.Write throws an error', () => {
Expand Down Expand Up @@ -422,3 +424,70 @@ describe('Storage Client Tests', () => {
});
});
});

jest.mock('../../../faas/index');

describe('bucket notification', () => {
const startSpy = jest
.spyOn(faas.Faas.prototype, 'start')
.mockReturnValue(Promise.resolve());
const mockFn = jest.fn();

afterAll(() => {
jest.clearAllMocks();
});

describe('When registering a bucket notification for creating', () => {
afterAll(() => {
jest.resetAllMocks();
});

beforeAll(async () => {
await bucket('test-bucket').on('created:test.png', mockFn);
});

it('should create a new FaasClient', () => {
expect(faas.Faas).toBeCalledTimes(1);
});

it('should provide Faas with BucketNotificationWorkerOptions', () => {
const expectedOpts = new BucketNotificationWorkerOptions(
'test-bucket',
'created',
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
});

it('should call FaasClient::start()', () => {
expect(startSpy).toBeCalledTimes(1);
});
});

describe('When registering a bucket notification for deleting', () => {
afterAll(() => {
jest.resetAllMocks();
});

beforeAll(async () => {
await bucket('test-bucket').on('deleted:test.png', mockFn);
});

it('should create a new FaasClient', () => {
expect(faas.Faas).toBeCalledTimes(1);
});

it('should provide Faas with BucketNotificationWorkerOptions', () => {
const expectedOpts = new BucketNotificationWorkerOptions(
'test-bucket',
'deleted',
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
});

it('should call FaasClient::start()', () => {
expect(startSpy).toBeCalledTimes(1);
});
});
});
2 changes: 1 addition & 1 deletion src/api/storage/v0/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class Storage {
* A reference to a storage bucket.
*/
export class Bucket {
storage: Storage;
private storage: Storage;
name: string;

constructor(storage: Storage, name: string) {
Expand Down
2 changes: 1 addition & 1 deletion src/faas/v0/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
QueryValue,
TriggerResponse,
} from '@nitric/api/proto/faas/v1/faas_pb';
import { TriggerContext, HttpContext, EventContext } from './context';
import { TriggerContext, HttpContext, EventContext } from '.';

describe('NitricTrigger.fromGrpcTriggerRequest', () => {
describe('From a HttpTriggerRequest', () => {
Expand Down
123 changes: 113 additions & 10 deletions src/faas/v0/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
import {
TriggerRequest,
TriggerResponse,
TopicResponseContext,
HttpResponseContext,
HeaderValue,
TraceContext,
HeaderValue,
HttpResponseContext,
NotificationResponseContext,
TopicResponseContext,
BucketNotificationType as ProtoBucketNotificationType,
} from '@nitric/api/proto/faas/v1/faas_pb';
import * as api from '@opentelemetry/api';
import * as jspb from 'google-protobuf';
import { jsonResponse } from './json';

export abstract class TriggerContext<
Expand Down Expand Up @@ -47,6 +50,15 @@ export abstract class TriggerContext<
return undefined;
}

/**
* Noop base context bucketNotification method
*
* @returns undefined
*/
public get bucketNotification(): BucketNotificationContext | undefined {
return undefined;
}

/**
* Return the request object from this context.
*
Expand Down Expand Up @@ -74,6 +86,8 @@ export abstract class TriggerContext<
return HttpContext.fromGrpcTriggerRequest(trigger);
} else if (trigger.hasTopic()) {
return EventContext.fromGrpcTriggerRequest(trigger);
} else if (trigger.hasNotification()) {
return BucketNotificationContext.fromGrpcTriggerRequest(trigger);
}
throw new Error('Unsupported trigger request type');
}
Expand All @@ -83,6 +97,8 @@ export abstract class TriggerContext<
return HttpContext.toGrpcTriggerResponse(ctx);
} else if (ctx.event) {
return EventContext.toGrpcTriggerResponse(ctx);
} else if (ctx.bucketNotification) {
return BucketNotificationContext.toGrpcTriggerResponse(ctx);
}

throw new Error('Unsupported trigger context type');
Expand Down Expand Up @@ -129,7 +145,7 @@ export abstract class AbstractRequest<
}
}

interface EventResponse {
export interface EventResponse {
success: boolean;
}

Expand Down Expand Up @@ -214,17 +230,24 @@ export class EventRequest<T> extends AbstractRequest<T> {
}

// Propagate the context to the root context
const getTraceContext = (traceContext: TraceContext): api.Context => {
const traceContextObject: Record<string, string> = traceContext
? traceContext
.getValuesMap()
.toObject()
.reduce((prev, [k, v]) => (prev[k] = v), {})
export const getTraceContext = (traceContext: TraceContext): api.Context => {
const traceContextObject = traceContext
? objectFromMap(traceContext.getValuesMap())
: {};

return api.propagation.extract(api.context.active(), traceContextObject);
};

const objectFromMap = (map: jspb.Map<string, string>) => {
return map
? map.toObject().reduce((prev, [k, v]) => {
prev[k] = v;
return prev;
}, {})
: {};
};

// HTTP CONTEXT
export class HttpContext extends TriggerContext<HttpRequest, HttpResponse> {
public get http(): HttpContext {
return this;
Expand Down Expand Up @@ -407,3 +430,83 @@ export class EventContext<T> extends TriggerContext<
return triggerResponse;
}
}

// BUCKET NOTIFICATION CONTEXT
export class BucketNotificationContext extends TriggerContext<
BucketNotificationRequest,
BucketNotificationResponse
> {
public get notification(): BucketNotificationContext {
return this;
}

static fromGrpcTriggerRequest(
trigger: TriggerRequest
): BucketNotificationContext {
const ctx = new BucketNotificationContext();
const bucketConfig = trigger.getNotification().getBucket();

ctx.request = new BucketNotificationRequest(
trigger.getData_asU8(),
getTraceContext(trigger.getTraceContext()),
bucketConfig.getKey(),
bucketConfig.getType()
);

ctx.response = {
success: true,
};

return ctx;
}

static toGrpcTriggerResponse(
ctx: TriggerContext<AbstractRequest, any>
): TriggerResponse {
const notifyCtx = ctx.bucketNotification;
const triggerResponse = new TriggerResponse();
const notificationResponse = new NotificationResponseContext();
notificationResponse.setSuccess(notifyCtx.res.success);
triggerResponse.setNotification(notificationResponse);
return triggerResponse;
}
}

export enum BucketNotificationType {
Created,
Deleted,
}

export class BucketNotificationRequest extends AbstractRequest {
key: string;
type: BucketNotificationType;

constructor(
data: string | Uint8Array,
traceContext: api.Context,
key: string,
type: number
) {
super(data, traceContext);

this.key = key;
this.type = this.eventTypeToNotificationType(type);
}

private eventTypeToNotificationType = (
eventType: number
): BucketNotificationType => {
switch (eventType) {
case ProtoBucketNotificationType.CREATED:
return BucketNotificationType.Created;
case ProtoBucketNotificationType.DELETED:
return BucketNotificationType.Deleted;
default:
throw new Error(`event type unsupported: ${eventType}`);
}
};
}

export interface BucketNotificationResponse {
success: boolean;
}
9 changes: 8 additions & 1 deletion src/faas/v0/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import { NitricEvent } from '../../types';
import { TriggerContext, HttpContext, EventContext } from '.';
import {
TriggerContext,
HttpContext,
EventContext,
BucketNotificationContext,
} from '.';

export type GenericHandler<Ctx> = (ctx: Ctx) => Promise<Ctx> | Ctx;

Expand All @@ -33,6 +38,8 @@ export type EventMiddleware<
T extends Record<string, any> = Record<string, any>
> = GenericMiddleware<EventContext<NitricEvent<T>>>;
export type ScheduleMiddleware = GenericMiddleware<EventContext<undefined>>;
export type BucketNotificationMiddleware =
GenericMiddleware<BucketNotificationContext>;

/**
* createHandler
Expand Down
Loading

0 comments on commit 35eee72

Please sign in to comment.