Skip to content

Commit

Permalink
feat: rework bucket notification DX (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
davemooreuws committed May 15, 2023
2 parents c7deace + ff9c003 commit dc7859c
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 73 deletions.
113 changes: 107 additions & 6 deletions src/api/storage/v0/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { FileMode, Storage } from './storage';
import { Bucket, FileMode, Storage } from './storage';
import { StorageServiceClient as GrpcStorageClient } from '@nitric/api/proto/storage/v1/storage_grpc_pb';
import {
StorageWriteResponse,
Expand All @@ -23,8 +23,15 @@ import {
File,
} from '@nitric/api/proto/storage/v1/storage_pb';
import { UnimplementedError } from '../../errors';
import { BucketNotificationWorkerOptions, bucket } from '@nitric/sdk/resources';
import {
BucketNotificationType,
BucketNotificationWorkerOptions,
FileNotificationWorkerOptions,
bucket,
} from '@nitric/sdk/resources';
import { faas } from '@nitric/sdk';
import { ResourceServiceClient } from '@nitric/sdk/gen/proto/resource/v1/resource_grpc_pb';
import { ResourceDeclareResponse } from '@nitric/sdk/gen/proto/resource/v1/resource_pb';

describe('Storage Client Tests', () => {
describe('Given nitric.api.storage.StorageClient.Write throws an error', () => {
Expand Down Expand Up @@ -443,7 +450,11 @@ describe('bucket notification', () => {
});

beforeAll(async () => {
await bucket('test-bucket').on('created:test.png', mockFn);
await bucket('test-bucket').on(
BucketNotificationType.Write,
'test.png',
mockFn
);
});

it('should create a new FaasClient', () => {
Expand All @@ -453,7 +464,7 @@ describe('bucket notification', () => {
it('should provide Faas with BucketNotificationWorkerOptions', () => {
const expectedOpts = new BucketNotificationWorkerOptions(
'test-bucket',
'created',
BucketNotificationType.Write,
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
Expand All @@ -470,7 +481,11 @@ describe('bucket notification', () => {
});

beforeAll(async () => {
await bucket('test-bucket').on('deleted:test.png', mockFn);
await bucket('test-bucket').on(
BucketNotificationType.Delete,
'test.png',
mockFn
);
});

it('should create a new FaasClient', () => {
Expand All @@ -480,7 +495,93 @@ describe('bucket notification', () => {
it('should provide Faas with BucketNotificationWorkerOptions', () => {
const expectedOpts = new BucketNotificationWorkerOptions(
'test-bucket',
'deleted',
BucketNotificationType.Delete,
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
});

it('should call FaasClient::start()', () => {
expect(startSpy).toBeCalledTimes(1);
});
});
});

describe('file notification', () => {
const startSpy = jest
.spyOn(faas.Faas.prototype, 'start')
.mockReturnValue(Promise.resolve());

const existsSpy = jest
.spyOn(ResourceServiceClient.prototype, 'declare')
.mockImplementation((_, callback: any) => {
const response = new ResourceDeclareResponse();
callback(null, response);
return null as any;
});

const mockFn = jest.fn();

describe('When registering a file notification for creating', () => {
let bucketResource: Bucket;
beforeAll(async () => {
bucketResource = bucket('test-bucket-create').for('reading');
await bucketResource.on(BucketNotificationType.Write, 'test.png', mockFn);
});

afterAll(() => {
jest.resetAllMocks();
});

it('should declare the new resource', () => {
expect(existsSpy).toBeCalledTimes(1);
});

it('should create a new FaasClient', () => {
expect(faas.Faas).toBeCalledTimes(1);
});

it('should provide Faas with FileNotificationWorkerOptions', () => {
const expectedOpts = new FileNotificationWorkerOptions(
bucketResource,
BucketNotificationType.Write,
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
});

it('should call FaasClient::start()', () => {
expect(startSpy).toBeCalledTimes(1);
});
});

describe('When registering a file notification for deleting', () => {
let bucketResource: Bucket;
beforeAll(async () => {
bucketResource = bucket('test-bucket-delete').for('reading');
await bucketResource.on(
BucketNotificationType.Delete,
'test.png',
mockFn
);
});

afterAll(() => {
jest.resetAllMocks();
});

it('should declare the new resource', () => {
expect(existsSpy).toBeCalledTimes(1);
});

it('should create a new FaasClient', () => {
expect(faas.Faas).toBeCalledTimes(1);
});

it('should provide Faas with FileNotificationWorkerOptions', () => {
const expectedOpts = new FileNotificationWorkerOptions(
bucketResource,
BucketNotificationType.Delete,
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
Expand Down
31 changes: 31 additions & 0 deletions src/api/storage/v0/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ import {
} from '@nitric/api/proto/storage/v1/storage_pb';
import * as grpc from '@grpc/grpc-js';
import { fromGrpcError, InvalidArgumentError } from '../../errors';
import {
BucketNotificationMiddleware,
FileNotificationMiddleware,
} from '@nitric/sdk/faas';
import {
BucketNotification,
BucketNotificationType,
FileNotification,
} from '@nitric/sdk/resources';

/**
* Nitric storage client, facilitates writing and reading from blob storage (buckets).
Expand Down Expand Up @@ -97,6 +106,28 @@ export class Bucket {
}
return new File(this.storage, this, name);
}

/**
* Register and start a bucket notification handler that will be called for all matching notification events on this bucket
*
* @param notificationType the notification type that should trigger the middleware, either 'write' or 'delete'
* @param notificationPrefixFilter the file name prefix that files must match to trigger a notification
* @param middleware handler middleware which will be run for every incoming event
* @returns Promise which resolves when the handler server terminates
*/
on(
notificationType: BucketNotificationType,
notificationPrefixFilter: string,
...middleware: FileNotificationMiddleware[]
): Promise<void> {
const notification = new FileNotification(
this,
notificationType,
notificationPrefixFilter,
...middleware
);
return notification['start']();
}
}

export enum FileMode {
Expand Down
100 changes: 91 additions & 9 deletions src/faas/v0/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ import {
import * as api from '@opentelemetry/api';
import * as jspb from 'google-protobuf';
import { jsonResponse } from './json';
import { Bucket, File } from '@nitric/sdk/api';
import {
ApiWorkerOptions,
BucketNotificationWorkerOptions,
FileNotificationWorkerOptions,
SubscriptionWorkerOptions,
bucket,
} from '@nitric/sdk';
import { FaasWorkerOptions } from './start';

export abstract class TriggerContext<
Req extends AbstractRequest = AbstractRequest,
Expand Down Expand Up @@ -79,15 +88,25 @@ export abstract class TriggerContext<

// Instantiate a concrete TriggerContext from the gRPC trigger model
static fromGrpcTriggerRequest(
trigger: TriggerRequest
trigger: TriggerRequest,
options?: FaasWorkerOptions
): TriggerContext<any, any> {
// create context
if (trigger.hasHttp()) {
return HttpContext.fromGrpcTriggerRequest(trigger);
} else if (trigger.hasTopic()) {
return EventContext.fromGrpcTriggerRequest(trigger);
} else if (trigger.hasNotification()) {
return BucketNotificationContext.fromGrpcTriggerRequest(trigger);
if (options instanceof FileNotificationWorkerOptions) {
return FileNotificationContext.fromGrpcTriggerRequest(
trigger,
options as FileNotificationWorkerOptions
);
}
return BucketNotificationContext.fromGrpcTriggerRequest(
trigger,
options as BucketNotificationWorkerOptions
);
}
throw new Error('Unsupported trigger request type');
}
Expand Down Expand Up @@ -253,7 +272,10 @@ export class HttpContext extends TriggerContext<HttpRequest, HttpResponse> {
return this;
}

static fromGrpcTriggerRequest(trigger: TriggerRequest): HttpContext {
static fromGrpcTriggerRequest(
trigger: TriggerRequest,
options?: ApiWorkerOptions
): HttpContext {
const http = trigger.getHttp();
const ctx = new HttpContext();

Expand Down Expand Up @@ -403,7 +425,8 @@ export class EventContext<T> extends TriggerContext<
}

static fromGrpcTriggerRequest(
trigger: TriggerRequest
trigger: TriggerRequest,
options?: SubscriptionWorkerOptions
): EventContext<unknown> {
const topic = trigger.getTopic();
const ctx = new EventContext();
Expand Down Expand Up @@ -441,7 +464,8 @@ export class BucketNotificationContext extends TriggerContext<
}

static fromGrpcTriggerRequest(
trigger: TriggerRequest
trigger: TriggerRequest,
options?: BucketNotificationWorkerOptions
): BucketNotificationContext {
const ctx = new BucketNotificationContext();
const bucketConfig = trigger.getNotification().getBucket();
Expand Down Expand Up @@ -478,19 +502,20 @@ export enum BucketNotificationType {
}

export class BucketNotificationRequest extends AbstractRequest {
key: string;
type: BucketNotificationType;
public readonly key: string;
public readonly notificationType: BucketNotificationType;

constructor(
data: string | Uint8Array,
traceContext: api.Context,
key: string,
type: number
notificationType: number
) {
super(data, traceContext);

// Get reference to the bucket
this.key = key;
this.type = this.eventTypeToNotificationType(type);
this.notificationType = this.eventTypeToNotificationType(notificationType);
}

private eventTypeToNotificationType = (
Expand All @@ -507,6 +532,63 @@ export class BucketNotificationRequest extends AbstractRequest {
};
}

export class FileNotificationContext extends TriggerContext<
FileNotificationRequest,
BucketNotificationResponse
> {
public get bucketNotification(): FileNotificationContext {
return this;
}

static fromGrpcTriggerRequest(
trigger: TriggerRequest,
options: FileNotificationWorkerOptions
): BucketNotificationContext {
const ctx = new FileNotificationContext();
const bucketConfig = trigger.getNotification().getBucket();

ctx.request = new FileNotificationRequest(
trigger.getData_asU8(),
getTraceContext(trigger.getTraceContext()),
bucketConfig.getKey(),
bucketConfig.getType(),
options.bucketRef
);

ctx.response = {
success: true,
};

return ctx;
}

static toGrpcTriggerResponse(
ctx: TriggerContext<AbstractRequest, any>
): TriggerResponse {
const notifyCtx = ctx.bucketNotification;
const triggerResponse = new TriggerResponse();
const notificationResponse = new NotificationResponseContext();
notificationResponse.setSuccess(notifyCtx.res.success);
triggerResponse.setNotification(notificationResponse);
return triggerResponse;
}
}
export class FileNotificationRequest extends BucketNotificationRequest {
public readonly file: File;

constructor(
data: string | Uint8Array,
traceContext: api.Context,
key: string,
notificationType: number,
bucket: Bucket
) {
super(data, traceContext, key, notificationType);

this.file = bucket.file(key);
}
}

export interface BucketNotificationResponse {
success: boolean;
}
3 changes: 3 additions & 0 deletions src/faas/v0/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
HttpContext,
EventContext,
BucketNotificationContext,
FileNotificationContext,
} from '.';

export type GenericHandler<Ctx> = (ctx: Ctx) => Promise<Ctx> | Ctx;
Expand All @@ -40,6 +41,8 @@ export type EventMiddleware<
export type ScheduleMiddleware = GenericMiddleware<EventContext<undefined>>;
export type BucketNotificationMiddleware =
GenericMiddleware<BucketNotificationContext>;
export type FileNotificationMiddleware =
GenericMiddleware<FileNotificationContext>;

/**
* createHandler
Expand Down
Loading

0 comments on commit dc7859c

Please sign in to comment.