Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/bucket notification #178

Merged
merged 13 commits into from
May 9, 2023
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@nitric/sdk",
"description": "Nitric NodeJS client sdk",
"nitric": "v0.22.0",
"nitric": "v0.27.0-rc.4",
"author": "Nitric <https://github.com/nitrictech>",
"repository": "https://github.com/nitrictech/node-sdk",
"main": "lib/index.js",
Expand Down
69 changes: 69 additions & 0 deletions src/api/storage/v0/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import {
File,
} from '@nitric/api/proto/storage/v1/storage_pb';
import { UnimplementedError } from '../../errors';
import { BucketNotificationWorkerOptions, bucket } from '@nitric/sdk/resources';
import { faas } from '@nitric/sdk';

describe('Storage Client Tests', () => {
describe('Given nitric.api.storage.StorageClient.Write throws an error', () => {
Expand Down Expand Up @@ -422,3 +424,70 @@ describe('Storage Client Tests', () => {
});
});
});

jest.mock('../../../faas/index');

describe('bucket notification', () => {
const startSpy = jest
.spyOn(faas.Faas.prototype, 'start')
.mockReturnValue(Promise.resolve());
const mockFn = jest.fn();

afterAll(() => {
jest.clearAllMocks();
});

describe('When registering a bucket notification for creating', () => {
afterAll(() => {
jest.resetAllMocks();
});

beforeAll(async () => {
await bucket('test-bucket').on('created:test.png', mockFn);
});

it('should create a new FaasClient', () => {
expect(faas.Faas).toBeCalledTimes(1);
});

it('should provide Faas with BucketNotificationWorkerOptions', () => {
const expectedOpts = new BucketNotificationWorkerOptions(
'test-bucket',
'created',
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
});

it('should call FaasClient::start()', () => {
expect(startSpy).toBeCalledTimes(1);
});
});

describe('When registering a bucket notification for deleting', () => {
afterAll(() => {
jest.resetAllMocks();
});

beforeAll(async () => {
await bucket('test-bucket').on('deleted:test.png', mockFn);
});

it('should create a new FaasClient', () => {
expect(faas.Faas).toBeCalledTimes(1);
});

it('should provide Faas with BucketNotificationWorkerOptions', () => {
const expectedOpts = new BucketNotificationWorkerOptions(
'test-bucket',
'deleted',
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
});

it('should call FaasClient::start()', () => {
expect(startSpy).toBeCalledTimes(1);
});
});
});
2 changes: 1 addition & 1 deletion src/api/storage/v0/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class Storage {
* A reference to a storage bucket.
*/
export class Bucket {
storage: Storage;
private storage: Storage;
name: string;

constructor(storage: Storage, name: string) {
Expand Down
2 changes: 1 addition & 1 deletion src/faas/v0/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
QueryValue,
TriggerResponse,
} from '@nitric/api/proto/faas/v1/faas_pb';
import { TriggerContext, HttpContext, EventContext } from './context';
import { TriggerContext, HttpContext, EventContext } from '.';

describe('NitricTrigger.fromGrpcTriggerRequest', () => {
describe('From a HttpTriggerRequest', () => {
Expand Down
123 changes: 113 additions & 10 deletions src/faas/v0/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
import {
TriggerRequest,
TriggerResponse,
TopicResponseContext,
HttpResponseContext,
HeaderValue,
TraceContext,
HeaderValue,
HttpResponseContext,
NotificationResponseContext,
TopicResponseContext,
BucketNotificationType as ProtoBucketNotificationType,
} from '@nitric/api/proto/faas/v1/faas_pb';
import * as api from '@opentelemetry/api';
import * as jspb from 'google-protobuf';
import { jsonResponse } from './json';

export abstract class TriggerContext<
Expand Down Expand Up @@ -47,6 +50,15 @@ export abstract class TriggerContext<
return undefined;
}

/**
* Noop base context bucketNotification method
*
* @returns undefined
*/
public get bucketNotification(): BucketNotificationContext | undefined {
return undefined;
}

/**
* Return the request object from this context.
*
Expand Down Expand Up @@ -74,6 +86,8 @@ export abstract class TriggerContext<
return HttpContext.fromGrpcTriggerRequest(trigger);
} else if (trigger.hasTopic()) {
return EventContext.fromGrpcTriggerRequest(trigger);
} else if (trigger.hasNotification()) {
return BucketNotificationContext.fromGrpcTriggerRequest(trigger);
}
throw new Error('Unsupported trigger request type');
}
Expand All @@ -83,6 +97,8 @@ export abstract class TriggerContext<
return HttpContext.toGrpcTriggerResponse(ctx);
} else if (ctx.event) {
return EventContext.toGrpcTriggerResponse(ctx);
} else if (ctx.bucketNotification) {
return BucketNotificationContext.toGrpcTriggerResponse(ctx);
}

throw new Error('Unsupported trigger context type');
Expand Down Expand Up @@ -129,7 +145,7 @@ export abstract class AbstractRequest<
}
}

interface EventResponse {
export interface EventResponse {
success: boolean;
}

Expand Down Expand Up @@ -214,17 +230,24 @@ export class EventRequest<T> extends AbstractRequest<T> {
}

// Propagate the context to the root context
const getTraceContext = (traceContext: TraceContext): api.Context => {
const traceContextObject: Record<string, string> = traceContext
? traceContext
.getValuesMap()
.toObject()
.reduce((prev, [k, v]) => (prev[k] = v), {})
export const getTraceContext = (traceContext: TraceContext): api.Context => {
const traceContextObject = traceContext
? objectFromMap(traceContext.getValuesMap())
: {};

return api.propagation.extract(api.context.active(), traceContextObject);
};

const objectFromMap = (map: jspb.Map<string, string>) => {
return map
? map.toObject().reduce((prev, [k, v]) => {
prev[k] = v;
return prev;
}, {})
: {};
};

// HTTP CONTEXT
export class HttpContext extends TriggerContext<HttpRequest, HttpResponse> {
public get http(): HttpContext {
return this;
Expand Down Expand Up @@ -407,3 +430,83 @@ export class EventContext<T> extends TriggerContext<
return triggerResponse;
}
}

// BUCKET NOTIFICATION CONTEXT
export class BucketNotificationContext extends TriggerContext<
BucketNotificationRequest,
BucketNotificationResponse
> {
public get notification(): BucketNotificationContext {
return this;
}

static fromGrpcTriggerRequest(
trigger: TriggerRequest
): BucketNotificationContext {
const ctx = new BucketNotificationContext();
const bucketConfig = trigger.getNotification().getBucket();

ctx.request = new BucketNotificationRequest(
trigger.getData_asU8(),
getTraceContext(trigger.getTraceContext()),
bucketConfig.getKey(),
bucketConfig.getType()
);

ctx.response = {
success: true,
};

return ctx;
}

static toGrpcTriggerResponse(
ctx: TriggerContext<AbstractRequest, any>
): TriggerResponse {
const notifyCtx = ctx.bucketNotification;
const triggerResponse = new TriggerResponse();
const notificationResponse = new NotificationResponseContext();
notificationResponse.setSuccess(notifyCtx.res.success);
triggerResponse.setNotification(notificationResponse);
return triggerResponse;
}
}

export enum BucketNotificationType {
Created,
Deleted,
}

export class BucketNotificationRequest extends AbstractRequest {
key: string;
type: BucketNotificationType;

constructor(
data: string | Uint8Array,
traceContext: api.Context,
key: string,
type: number
) {
super(data, traceContext);

this.key = key;
this.type = this.eventTypeToNotificationType(type);
}

private eventTypeToNotificationType = (
eventType: number
): BucketNotificationType => {
switch (eventType) {
case ProtoBucketNotificationType.CREATED:
return BucketNotificationType.Created;
case ProtoBucketNotificationType.DELETED:
return BucketNotificationType.Deleted;
default:
throw new Error(`event type unsupported: ${eventType}`);
}
};
}

export interface BucketNotificationResponse {
success: boolean;
}
9 changes: 8 additions & 1 deletion src/faas/v0/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import { NitricEvent } from '../../types';
import { TriggerContext, HttpContext, EventContext } from '.';
import {
TriggerContext,
HttpContext,
EventContext,
BucketNotificationContext,
} from '.';

export type GenericHandler<Ctx> = (ctx: Ctx) => Promise<Ctx> | Ctx;

Expand All @@ -33,6 +38,8 @@ export type EventMiddleware<
T extends Record<string, any> = Record<string, any>
> = GenericMiddleware<EventContext<NitricEvent<T>>>;
export type ScheduleMiddleware = GenericMiddleware<EventContext<undefined>>;
export type BucketNotificationMiddleware =
GenericMiddleware<BucketNotificationContext>;

/**
* createHandler
Expand Down
Loading