Skip to content

Commit

Permalink
Separate input from output
Browse files Browse the repository at this point in the history
  • Loading branch information
PabloSzx committed Jul 13, 2023
1 parent 37411c1 commit ff78cf7
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 17 deletions.
5 changes: 5 additions & 0 deletions .changeset/famous-moles-shave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@soundxyz/redis-pubsub": major
---

Separate input from output of inputSchema and outputSchema
30 changes: 14 additions & 16 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export const EventCodes = {
SUBSCRIPTION_ABORTED: "SUBSCRIPTION_ABORTED",
} as const;

export type EventCodes = typeof EventCodes[keyof typeof EventCodes];
export type EventCodes = (typeof EventCodes)[keyof typeof EventCodes];

export function RedisPubSub({
publisher,
Expand Down Expand Up @@ -270,7 +270,7 @@ export function RedisPubSub({
);
}

function createChannel<Input, Output>({
function createChannel<PublishInput, ChannelData, SubscriberData>({
name,
isLazy = true,
...schemas
Expand All @@ -282,12 +282,12 @@ export function RedisPubSub({
isLazy?: boolean;
} & (
| {
inputSchema: ZodSchema<Input, ZodTypeDef, Input>;
outputSchema: ZodSchema<Output, ZodTypeDef, Input>;
inputSchema: ZodSchema<ChannelData, ZodTypeDef, PublishInput>;
outputSchema: ZodSchema<SubscriberData, ZodTypeDef, ChannelData>;
schema?: never;
}
| {
schema: ZodSchema<Output, ZodTypeDef, Input>;
schema: ZodSchema<SubscriberData, ZodTypeDef, PublishInput>;
inputSchema?: never;
outputSchema?: never;
}
Expand Down Expand Up @@ -315,8 +315,6 @@ export function RedisPubSub({
unsubscribe,
publish,
unsubscribeAll,
inputSchema,
outputSchema,
};

function getSubscriptionValue({
Expand All @@ -339,23 +337,23 @@ export function RedisPubSub({
});
}

function subscribe<FilteredValue extends Output>(subscribeArguments: {
function subscribe<FilteredValue extends SubscriberData>(subscribeArguments: {
abortSignal?: AbortSignal;
filter: (value: Output) => value is FilteredValue;
filter: (value: SubscriberData) => value is FilteredValue;
identifier?: string | number;
}): AsyncGenerator<FilteredValue, void, unknown>;
function subscribe(subscribeArguments?: {
abortSignal?: AbortSignal;
filter?: (value: Output) => unknown;
filter?: (value: SubscriberData) => unknown;
identifier?: string | number;
}): AsyncGenerator<Output, void, unknown>;
}): AsyncGenerator<SubscriberData, void, unknown>;
async function* subscribe({
abortSignal,
filter,
identifier,
}: {
abortSignal?: AbortSignal;
filter?: (value: Output) => unknown;
filter?: (value: SubscriberData) => unknown;
identifier?: string | number;
} = {}) {
const channel = identifier ? name + identifier : name;
Expand Down Expand Up @@ -428,7 +426,7 @@ export function RedisPubSub({
while (true) {
await dataPromise.current.promise;

for (const value of dataPromise.current.values as Output[]) {
for (const value of dataPromise.current.values as SubscriberData[]) {
if (filter && !filter(value)) {
if (enabledLogEvents?.SUBSCRIPTION_MESSAGE_FILTERED_OUT) {
logMessage("SUBSCRIPTION_MESSAGE_FILTERED_OUT", {
Expand Down Expand Up @@ -498,15 +496,15 @@ export function RedisPubSub({

async function publish(
...values: [
{ value: Input; identifier?: string | number },
...{ value: Input; identifier?: string | number }[]
{ value: PublishInput; identifier?: string | number },
...{ value: PublishInput; identifier?: string | number }[],
]
) {
await Promise.all(
values.map(async ({ value, identifier }) => {
const tracing = enabledLogEvents?.PUBLISH_MESSAGE_EXECUTION_TIME ? getTracing() : null;

let parsedValue: Input | Output;
let parsedValue: ChannelData | SubscriberData;

try {
parsedValue = await inputSchema.parseAsync(value);
Expand Down
2 changes: 1 addition & 1 deletion test/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type LoggedEvents, RedisPubSub,type RedisPubSubOptions } from "../src";
import { type LoggedEvents, RedisPubSub, type RedisPubSubOptions } from "../src";
import Pino from "pino";
import Redis from "ioredis";

Expand Down

0 comments on commit ff78cf7

Please sign in to comment.