Skip to content

Commit

Permalink
feat: Add generic typing for queues (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjholm committed May 1, 2023
2 parents 796e12f + ae7c395 commit 096041b
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 76 deletions.
65 changes: 36 additions & 29 deletions src/api/queues/v0/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
// limitations under the License.
import { QueueServiceClient } from '@nitric/api/proto/queue/v1/queue_grpc_pb';
import {
NitricTask,
NitricTask as NitricTaskPb,
QueueSendRequest,
QueueSendBatchRequest,
QueueReceiveRequest,
QueueCompleteRequest,
} from '@nitric/api/proto/queue/v1/queue_pb';
import { SERVICE_BIND } from '../../../constants';
import * as grpc from '@grpc/grpc-js';
import type { Task } from '../../../types';
import { NitricTask } from '../../../types';
import { Struct } from 'google-protobuf/google/protobuf/struct_pb';
import {
fromGrpcError,
Expand All @@ -32,8 +32,8 @@ import {
/**
* A message that has failed to be enqueued
*/
interface FailedMessage {
task: Task;
interface FailedMessage<T> {
task: NitricTask<T>;
message: string;
}

Expand All @@ -44,8 +44,8 @@ interface FailedMessage {
* @param task to convert
* @returns the wire representation of the task
*/
function taskToWire(task: Task) {
const wireTask = new NitricTask();
function taskToWire(task: NitricTask) {
const wireTask = new NitricTaskPb();

wireTask.setId(task.id);
wireTask.setPayloadType(task.payloadType);
Expand Down Expand Up @@ -76,16 +76,16 @@ export class Queueing {
this.QueueServiceClient = newQueueServiceClient();
}

queue = (name: string): Queue => {
queue = <T>(name: string): Queue<T> => {
if (!name) {
throw new InvalidArgumentError('A queue name is needed to use a Queue.');
}

return new Queue(this, name);
return new Queue<T>(this, name);
};
}

export class Queue {
export class Queue<T extends Record<string, any> = Record<string, any>> {
queueing: Queueing;
name: string;

Expand Down Expand Up @@ -120,17 +120,21 @@ export class Queue {
* };
* });
*/
public async send(tasks: Task[]): Promise<FailedMessage[]>;
public async send(tasks: Task): Promise<void>;
public async send(tasks: Task | Task[]): Promise<void | FailedMessage[]> {
public async send(tasks: T[] | NitricTask<T>[]): Promise<FailedMessage<T>[]>;
public async send(tasks: T | NitricTask<T>): Promise<void>;
public async send(
tasks: T[] | T | NitricTask<T> | NitricTask<T>[]
): Promise<void | FailedMessage<T>[]> {
return new Promise((resolve, reject) => {
const request = new QueueSendBatchRequest();

request.setTasksList(
Array.isArray(tasks)
? tasks.map((task) => taskToWire(task))
: [taskToWire(tasks)]
// Convert to NitricTask if not specified
const tasksArray = Array.isArray(tasks) ? tasks : [tasks];
const nitricTasksArray = tasksArray.map((t) =>
t instanceof NitricTask ? t : new NitricTask({ payload: t })
);

request.setTasksList(nitricTasksArray.map(taskToWire));
request.setQueue(this.name);

this.queueing.QueueServiceClient.sendBatch(request, (error, response) => {
Expand All @@ -139,11 +143,11 @@ export class Queue {
return;
}
const failedTasks = response.getFailedtasksList().map((m) => ({
task: {
task: new NitricTask<T>({
id: m.getTask().getId(),
payload: m.getTask().getPayload().toJavaScript(),
payloadType: m.getTask().getPayloadType(),
},
payload: m.getTask().getPayload().toJavaScript() as T,
}),
message: m.getMessage(),
}));
if (!Array.isArray(tasks)) {
Expand Down Expand Up @@ -181,7 +185,7 @@ export class Queue {
* // do something with task
* ```
*/
public async receive(depth?: number): Promise<ReceivedTask[]> {
public async receive(depth?: number): Promise<ReceivedTask<T>[]> {
return new Promise((resolve, reject) => {
const request = new QueueReceiveRequest();

Expand All @@ -201,7 +205,7 @@ export class Queue {
response.getTasksList().map((m) => {
return new ReceivedTask({
id: m.getId(),
payload: m.getPayload().toJavaScript(),
payload: m.getPayload().toJavaScript() as T,
payloadType: m.getPayloadType(),
leaseId: m.getLeaseId(),
queue: this,
Expand All @@ -214,11 +218,10 @@ export class Queue {
}
}

export class ReceivedTask implements Task {
id: string;
export class ReceivedTask<
T extends Record<string, any> = Record<string, any>
> extends NitricTask<T> {
leaseId: string;
payloadType?: string;
payload?: Record<string, any>;
queue: Queue;

constructor({
Expand All @@ -227,11 +230,15 @@ export class ReceivedTask implements Task {
payload,
payloadType,
queue,
}: Task & { id: string; leaseId: string; queue: Queue }) {
this.id = id;
}: {
id: string;
payload: T;
payloadType: string;
leaseId: string;
queue: Queue;
}) {
super({ id, payloadType, payload });
this.leaseId = leaseId;
this.payloadType = payloadType;
this.payload = payload;
this.queue = queue;
}

Expand Down
12 changes: 9 additions & 3 deletions src/resources/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ export type QueuePermission = 'sending' | 'receiving';
/**
* Queue resource for async send/receive messaging
*/
export class QueueResource extends SecureResource<QueuePermission> {
export class QueueResource<
T extends Record<string, any> = Record<string, any>
> extends SecureResource<QueuePermission> {
/**
* Register this queue as a required resource for the calling function/container.
*
Expand Down Expand Up @@ -91,11 +93,15 @@ export class QueueResource extends SecureResource<QueuePermission> {
* @param perms the access that the currently scoped function is requesting to this resource.
* @returns a useable queue.
*/
public for(...perms: QueuePermission[]): Queue {
public for(...perms: QueuePermission[]): Queue<T> {
this.registerPolicy(...perms);

return queues().queue(this.name);
}
}

export const queue = make(QueueResource);
export const queue = make(QueueResource) as <
T extends Record<string, any> = Record<string, any>
>(
name: string
) => QueueResource<T>;
62 changes: 18 additions & 44 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// export interface NitricEvent<
// T extends Record<string, any> = Record<string, any>
// > {
// /**
// * Uniquely identifies the event.
// *
// * Within your app you must ensure the ID is unique.
// * Subscribers can assume events with the same ID are duplicates and avoid reprocessing them
// */
// id?: string;
// /**
// * An optional description of the event type.
// *
// * Can be useful for de-serialization, routing or observability. The format of this value is determined by the producer.
// */
// payloadType?: string;
// /**
// * The event's payload data, with details of the event.
// */
// payload: T;
// }

export class NitricEvent<T extends Record<string, any> = Record<string, any>> {
public readonly payload: T;
Expand All @@ -45,29 +24,24 @@ export class NitricEvent<T extends Record<string, any> = Record<string, any>> {
}
}

export interface Task<T extends Record<string, any> = Record<string, any>> {
/**
* Uniquely identifies the task.
*
* Within your app you must ensure the ID is unique.
*/
id?: string;
/**
* The ID for the current lease of this task.
*
* A task may be leased multiple times, resulting in new lease IDs.
*/
leaseId?: string;
/**
* An optional description of the task type.
*
* Can be useful for de-serialization, routing or observability. The format of this value is determined by the producer.
*/
payloadType?: string;
/**
* The task's payload data, with details of the task or work to be done.
*/
payload?: Record<string, any>;
export class NitricTask<T extends Record<string, any> = Record<string, any>> {
public readonly id: string | undefined;
public readonly payloadType: string;
public readonly payload: T;

constructor({
id = undefined,
payload,
payloadType = 'none',
}: {
id?: string;
payloadType?: string;
payload: T;
}) {
this.id = id;
this.payload = payload;
this.payloadType = payloadType;
}
}

export type WhereQueryOperator =
Expand Down

0 comments on commit 096041b

Please sign in to comment.