Skip to content

Commit

Permalink
Expose EventSchemas in Inngest instances (#657)
Browse files Browse the repository at this point in the history
## Summary
<!-- Succinctly describe your change, providing context, what you've
changed, and why. -->

Exposes "runtime schemas" on an `Inngest` client, allowing middleware to
use it to add custom validation using the schemas passed to
`EventSchemas`.

We'll add this as a top-level option later, but this allows us to
explore the functionality while we settle on APIs, default
functionality, and supporting many validation libraries.

As an example, here is a custom middleware that makes use of this to add
runtime validation to any Zod schemas that exist within your client's
`EventSchemas`. It assumes you have `zod` installed.

```sh
# Use this PR to test
npm install inngest@pr-657
```

```ts
export const inngest = new Inngest({
  id: "my-app",
  schemas,
  middleware: [experimentalValidationMiddleware()],
});
```

```ts
import {
  InngestMiddleware,
  internalEvents,
  type EventPayload,
  type InngestFunction,
} from "inngest";
import { z, ZodType } from "zod";

/**
 * Experimental middleware that validates events using Zod schemas passed using
 * `EventSchemas.fromZod()`.
 */
export const experimentalValidationMiddleware = (opts?: {
  /**
   * Disallow events that don't have a schema defined.
   */
  disallowSchemalessEvents?: boolean;

  /**
   * Disallow events that have a schema defined, but the schema is unknown and
   * not handled in this code.
   */
  disallowUnknownSchemas?: boolean;

  /**
   * Disable validation of incoming events.
   */
  disableIncomingValidation?: boolean;

  /**
   * Disable validation of outgoing events using `inngest.send()` or
   * `step.sendEvent()`.
   */
  disableOutgoingValidation?: boolean;
}) => {
  const mw = new InngestMiddleware({
    name: "Inngest Experimental: Runtime schema validation",
    init({ client }) {
      /**
       * Given an `event`, validate it against its schema.
       */
      const validateEvent = async (
        event: EventPayload,
        potentialInvokeEvents: string[] = []
      ): Promise<EventPayload> => {
        let schemasToAttempt = new Set<string>([event.name]);
        let hasSchema = false;

        /**
         * Trust internal events; don't allow overwriting their typing.
         */
        if (event.name.startsWith("inngest/")) {
          if (event.name !== internalEvents.FunctionInvoked) {
            return event;
          }

          /**
           * If this is an `inngest/function.invoked` event, try validating the
           * payload against one of the function's schemas.
           */
          schemasToAttempt = new Set<string>(potentialInvokeEvents);

          hasSchema = Boolean(
            schemasToAttempt.intersection(
              new Set<string>(
                Object.keys(client["schemas"]?.["runtimeSchemas"] || {})
              )
            ).size
          );
        } else {
          hasSchema = Boolean(
            client["schemas"]?.["runtimeSchemas"][event.name]
          );
        }

        if (!hasSchema) {
          if (opts?.disallowSchemalessEvents) {
            throw new Error(
              `Event "${event.name}" has no schema defined; disallowing`
            );
          }

          return event;
        }

        const errors: Record<string, Error> = {};

        for (const schemaName of schemasToAttempt) {
          try {
            const schema = client["schemas"]?.["runtimeSchemas"][schemaName];

            /**
             * The schema could be a full Zod object.
             */
            if (helpers.isZodObject(schema)) {
              const { success, data, error } = await schema
                .passthrough()
                .safeParseAsync(event);

              if (success) {
                return data as unknown as EventPayload;
              }

              throw new Error(`${error.name}: ${error.message}`);
            }

            /**
             * The schema could also be a regular object with Zod objects inside.
             */
            if (helpers.isObject(schema)) {
              // It could be a partial schema; validate each field
              return await Object.keys(schema).reduce<Promise<EventPayload>>(
                async (acc, key) => {
                  const fieldSchema = schema[key];
                  const eventField = event[key as keyof EventPayload];

                  if (!helpers.isZodObject(fieldSchema) || !eventField) {
                    return acc;
                  }

                  const { success, data, error } = await fieldSchema
                    .passthrough()
                    .safeParseAsync(eventField);

                  if (success) {
                    return { ...(await acc), [key]: data };
                  }

                  throw new Error(`${error.name}: ${error.message}`);
                },
                Promise.resolve<EventPayload>({ ...event })
              );
            }

            /**
             * Didn't find anything? Throw or warn.
             *
             * We only allow this for assessing single schemas, as otherwise we're
             * assessing an invocation would could be multiple.
             */
            if (opts?.disallowUnknownSchemas && schemasToAttempt.size === 1) {
              throw new Error(
                `Event "${event.name}" has an unknown schema; disallowing`
              );
            } else {
              console.warn(
                "Unknown schema found; cannot validate, but allowing"
              );
            }
          } catch (err) {
            errors[schemaName] = err as Error;
          }
        }

        if (Object.keys(errors).length) {
          throw new Error(
            `Event "${event.name}" failed validation:\n\n${Object.keys(errors)
              .map((key) => `Using ${key}: ${errors[key].message}`)
              .join("\n\n")}`
          );
        }

        return event;
      };

      return {
        ...(opts?.disableIncomingValidation
          ? {}
          : {
              async onFunctionRun({ fn }) {
                const backupEvents = (
                  (fn.opts as InngestFunction.Options).triggers || []
                ).reduce<string[]>((acc, trigger) => {
                  if (trigger.event) {
                    return [...acc, trigger.event];
                  }

                  return acc;
                }, []);

                return {
                  async transformInput({ ctx: { events } }) {
                    const validatedEvents = await Promise.all(
                      events.map((event) => {
                        return validateEvent(event, backupEvents);
                      })
                    );

                    return {
                      ctx: {
                        event: validatedEvents[0],
                        events: validatedEvents,
                      } as {},
                    };
                  },
                };
              },
            }),

        ...(opts?.disableOutgoingValidation
          ? {}
          : {
              async onSendEvent() {
                return {
                  async transformInput({ payloads }) {
                    return {
                      payloads: await Promise.all(
                        payloads.map((payload) => {
                          return validateEvent(payload);
                        })
                      ),
                    };
                  },
                };
              },
            }),
      };
    },
  });

  return mw;
};

const helpers = {
  isZodObject: (value: unknown): value is z.ZodObject<any> => {
    return value instanceof ZodType && value._def.typeName === "ZodObject";
  },

  isObject: (value: unknown): value is Record<string, any> => {
    return typeof value === "object" && value !== null && !Array.isArray(value);
  },
};
```

## Checklist
<!-- Tick these items off as you progress. -->
<!-- If an item isn't applicable, ideally please strikeout the item by
wrapping it in "~~"" and suffix it with "N/A My reason for skipping
this." -->
<!-- e.g. "- [ ] ~~Added tests~~ N/A Only touches docs" -->

- [ ] ~Added a [docs PR](https://github.com/inngest/website) that
references this PR~ N/A
- [ ] ~Added unit/integration tests~ N/A
- [x] Added changesets if applicable

## Related

- Partially addresses #410
  • Loading branch information
jpwilliams authored Oct 21, 2024
1 parent 9438960 commit 7ca9537
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/mean-brooms-pull.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"inngest": patch
---

Expose `EventSchemas` in `Inngest` instances
30 changes: 29 additions & 1 deletion packages/inngest/src/components/EventSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,15 @@ export class EventSchemas<
[internalEvents.ScheduledTimer]: ScheduledTimerEventPayload;
}>,
> {
protected runtimeSchemas: Record<string, unknown> = {};

private addRuntimeSchemas(schemas: Record<string, unknown>) {
this.runtimeSchemas = {
...this.runtimeSchemas,
...schemas,
};
}

/**
* Use generated Inngest types to type events.
*/
Expand Down Expand Up @@ -343,7 +352,6 @@ export class EventSchemas<
* ```
*/
public fromZod<T extends ZodEventSchemas | LiteralZodEventSchemas>(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
schemas: T
): EventSchemas<
Combine<
Expand All @@ -353,6 +361,26 @@ export class EventSchemas<
>
>
> {
let runtimeSchemas: Record<string, unknown>;

if (Array.isArray(schemas)) {
runtimeSchemas = schemas.reduce((acc, schema) => {
const {
name: { value: name },
...rest
} = schema.shape;

return {
...acc,
[name]: rest,
};
}, {});
} else {
runtimeSchemas = schemas;
}

this.addRuntimeSchemas(runtimeSchemas);

return this;
}
}
4 changes: 4 additions & 0 deletions packages/inngest/src/components/Inngest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ export class Inngest<TClientOpts extends ClientOptions = ClientOptions> {
*/
private _mode!: Mode;

protected readonly schemas?: NonNullable<TClientOpts["schemas"]>;

get apiBaseUrl(): string | undefined {
return this._apiBaseUrl;
}
Expand Down Expand Up @@ -192,6 +194,7 @@ export class Inngest<TClientOpts extends ClientOptions = ClientOptions> {
logger = new DefaultLogger(),
middleware,
isDev,
schemas,
} = this.options;

if (!id) {
Expand All @@ -216,6 +219,7 @@ export class Inngest<TClientOpts extends ClientOptions = ClientOptions> {
mode: this.mode,
});

this.schemas = schemas;
this.loadModeEnvVars();

this.logger = logger;
Expand Down

0 comments on commit 7ca9537

Please sign in to comment.