diff --git a/.changeset/famous-moles-shave.md b/.changeset/famous-moles-shave.md new file mode 100644 index 0000000..a70bcfa --- /dev/null +++ b/.changeset/famous-moles-shave.md @@ -0,0 +1,5 @@ +--- +"@soundxyz/redis-pubsub": major +--- + +Separate input from output of inputSchema and outputSchema diff --git a/src/index.ts b/src/index.ts index bc11877..f286462 100644 --- a/src/index.ts +++ b/src/index.ts @@ -57,7 +57,7 @@ export const EventCodes = { SUBSCRIPTION_ABORTED: "SUBSCRIPTION_ABORTED", } as const; -export type EventCodes = typeof EventCodes[keyof typeof EventCodes]; +export type EventCodes = (typeof EventCodes)[keyof typeof EventCodes]; export function RedisPubSub({ publisher, @@ -270,7 +270,7 @@ export function RedisPubSub({ ); } - function createChannel({ + function createChannel({ name, isLazy = true, ...schemas @@ -282,12 +282,12 @@ export function RedisPubSub({ isLazy?: boolean; } & ( | { - inputSchema: ZodSchema; - outputSchema: ZodSchema; + inputSchema: ZodSchema; + outputSchema: ZodSchema; schema?: never; } | { - schema: ZodSchema; + schema: ZodSchema; inputSchema?: never; outputSchema?: never; } @@ -315,8 +315,6 @@ export function RedisPubSub({ unsubscribe, publish, unsubscribeAll, - inputSchema, - outputSchema, }; function getSubscriptionValue({ @@ -339,23 +337,23 @@ export function RedisPubSub({ }); } - function subscribe(subscribeArguments: { + function subscribe(subscribeArguments: { abortSignal?: AbortSignal; - filter: (value: Output) => value is FilteredValue; + filter: (value: SubscriberData) => value is FilteredValue; identifier?: string | number; }): AsyncGenerator; function subscribe(subscribeArguments?: { abortSignal?: AbortSignal; - filter?: (value: Output) => unknown; + filter?: (value: SubscriberData) => unknown; identifier?: string | number; - }): AsyncGenerator; + }): AsyncGenerator; async function* subscribe({ abortSignal, filter, identifier, }: { abortSignal?: AbortSignal; - filter?: (value: Output) => unknown; + filter?: (value: SubscriberData) => unknown; identifier?: string | number; } = {}) { const channel = identifier ? name + identifier : name; @@ -428,7 +426,7 @@ export function RedisPubSub({ while (true) { await dataPromise.current.promise; - for (const value of dataPromise.current.values as Output[]) { + for (const value of dataPromise.current.values as SubscriberData[]) { if (filter && !filter(value)) { if (enabledLogEvents?.SUBSCRIPTION_MESSAGE_FILTERED_OUT) { logMessage("SUBSCRIPTION_MESSAGE_FILTERED_OUT", { @@ -498,15 +496,15 @@ export function RedisPubSub({ async function publish( ...values: [ - { value: Input; identifier?: string | number }, - ...{ value: Input; identifier?: string | number }[] + { value: PublishInput; identifier?: string | number }, + ...{ value: PublishInput; identifier?: string | number }[], ] ) { await Promise.all( values.map(async ({ value, identifier }) => { const tracing = enabledLogEvents?.PUBLISH_MESSAGE_EXECUTION_TIME ? getTracing() : null; - let parsedValue: Input | Output; + let parsedValue: ChannelData | SubscriberData; try { parsedValue = await inputSchema.parseAsync(value); diff --git a/test/helpers.ts b/test/helpers.ts index f78c626..3edad15 100644 --- a/test/helpers.ts +++ b/test/helpers.ts @@ -1,4 +1,4 @@ -import { type LoggedEvents, RedisPubSub,type RedisPubSubOptions } from "../src"; +import { type LoggedEvents, RedisPubSub, type RedisPubSubOptions } from "../src"; import Pino from "pino"; import Redis from "ioredis";