diff --git a/packages/core/__tests__/index.js b/packages/core/__tests__/index.js index a6a4629f9..3a63627ae 100644 --- a/packages/core/__tests__/index.js +++ b/packages/core/__tests__/index.js @@ -1,6 +1,12 @@ import { setTimeout } from 'node:timers/promises' import test from 'ava' import sinon from 'sinon' +import { + createReadableStream, + createPassThroughStream, + createWritableStream, + pipejoin +} from '@datastream/core' import middy from '../index.js' const event = {} @@ -771,3 +777,113 @@ test('Should not invoke timeoutEarlyResponse on error', async (t) => { t.false(timeoutCalled) }) + +// streamifyResponse +globalThis.awslambda = { + streamifyResponse: (cb) => cb, + HttpResponseStream: { + from: (responseStream, metadata) => { + return responseStream + } + } +} + +test('Should return with streamifyResponse:true using undefined', async (t) => { + const input = '' + const handler = middy( + (event, context, { signal }) => { + return { + statusCode: 200, + headers: { + 'Content-Type': 'plain/text' + } + } + }, + { + streamifyResponse: true + } + ) + + let chunkResponse = '' + const responseStream = createWritableStream((chunk) => { + chunkResponse += chunk + }) + const response = await handler(event, responseStream, context) + t.is(response, undefined) + t.is(chunkResponse, input) +}) + +test('Should return with streamifyResponse:true using string', async (t) => { + const input = 'x'.repeat(1024 * 1024) + const handler = middy({ + streamifyResponse: true + }).handler((event, context, { signal }) => { + return { + statusCode: 200, + headers: { + 'Content-Type': 'plain/text' + }, + body: input + } + }) + + let chunkResponse = '' + const responseStream = createWritableStream((chunk) => { + chunkResponse += chunk + }) + const response = await handler(event, responseStream, context) + t.is(response, undefined) + t.is(chunkResponse, input) +}) + +test('Should return with streamifyResponse:true using ReadableStream', async (t) => { + const input = 'x'.repeat(1024 * 1024) + const handler = middy( + async (event, context, { signal }) => { + return { + statusCode: 200, + headers: { + 'Content-Type': 'plain/text' + }, + body: createReadableStream(input) + } + }, + { + streamifyResponse: true + } + ) + + let chunkResponse = '' + const responseStream = createWritableStream((chunk) => { + chunkResponse += chunk + }) + const response = await handler(event, responseStream, context) + t.is(response, undefined) + t.is(chunkResponse, input) +}) + +test('Should return with streamifyResponse:true using ReadableStream.pipe(...)', async (t) => { + const input = 'x'.repeat(1024 * 1024) + const handler = middy( + async (event, context, { signal }) => { + return { + statusCode: 200, + headers: { + 'Content-Type': 'plain/text' + }, + body: pipejoin([createReadableStream(input), createPassThroughStream()]) + } + }, + { + streamifyResponse: true + } + ) + + let chunkResponse = '' + const responseStream = createWritableStream((chunk) => { + chunkResponse += chunk + }) + const response = await handler(event, responseStream, context) + t.is(response, undefined) + t.is(chunkResponse, input) +}) diff --git a/packages/core/index.d.ts b/packages/core/index.d.ts index dd2e152de..131968864 100644 --- a/packages/core/index.d.ts +++ b/packages/core/index.d.ts @@ -6,7 +6,9 @@ import { declare type PluginHook = () => void declare type PluginHookWithMiddlewareName = (middlewareName: string) => void -declare type PluginHookPromise = (request: Request) => Promise | unknown +declare type PluginHookPromise = ( + request: Request +) => Promise | unknown interface PluginObject { internal?: any @@ -19,9 +21,15 @@ interface PluginObject { timeoutEarlyResponse?: PluginHook afterHandler?: PluginHook requestEnd?: PluginHookPromise + streamifyResponse?: Boolean } -export interface Request { +export interface Request< + TEvent = any, + TResult = any, + TErr = Error, + TContext extends LambdaContext = LambdaContext +> { event: TEvent context: TContext response: TResult | null @@ -31,9 +39,19 @@ export interface Request = (request: Request) => any +declare type MiddlewareFn< + TEvent = any, + TResult = any, + TErr = Error, + TContext extends LambdaContext = LambdaContext +> = (request: Request) => any -export interface MiddlewareObj { +export interface MiddlewareObj< + TEvent = unknown, + TResult = any, + TErr = Error, + TContext extends LambdaContext = LambdaContext +> { before?: MiddlewareFn after?: MiddlewareFn onError?: MiddlewareFn @@ -41,40 +59,101 @@ export interface MiddlewareObj so we have no choice but to follow and suppress the linter warning // eslint-disable-next-line @typescript-eslint/no-invalid-void-type -type MiddyInputHandler = (event: TEvent, context: TContext, callback: LambdaCallback) => void | Promise -type MiddyInputPromiseHandler = (event: TEvent, context: TContext,) => Promise +type MiddyInputHandler< + TEvent, + TResult, + TContext extends LambdaContext = LambdaContext +> = ( + event: TEvent, + context: TContext, + callback: LambdaCallback +) => // eslint-disable-next-line @typescript-eslint/no-invalid-void-type +void | Promise +type MiddyInputPromiseHandler< + TEvent, + TResult, + TContext extends LambdaContext = LambdaContext +> = (event: TEvent, context: TContext) => Promise -export interface MiddyfiedHandler extends MiddyInputHandler, +export interface MiddyfiedHandler< + TEvent = any, + TResult = any, + TErr = Error, + TContext extends LambdaContext = LambdaContext +> extends MiddyInputHandler, MiddyInputPromiseHandler { use: UseFn before: AttachMiddlewareFn after: AttachMiddlewareFn onError: AttachMiddlewareFn - handler: (handler: MiddlewareHandler, TContext>) => MiddyfiedHandler + handler: ( + handler: MiddlewareHandler< + LambdaHandler, + TContext + > + ) => MiddyfiedHandler } -declare type AttachMiddlewareFn = - (middleware: MiddlewareFn) => MiddyfiedHandler +declare type AttachMiddlewareFn< + TEvent = any, + TResult = any, + TErr = Error, + TContext extends LambdaContext = LambdaContext +> = ( + middleware: MiddlewareFn +) => MiddyfiedHandler -declare type AttachMiddlewareObj = - (middleware: MiddlewareObj) => MiddyfiedHandler +declare type AttachMiddlewareObj< + TEvent = any, + TResult = any, + TErr = Error, + TContext extends LambdaContext = LambdaContext +> = ( + middleware: MiddlewareObj +) => MiddyfiedHandler -declare type UseFn = - >(middlewares: TMiddleware | TMiddleware[]) => TMiddleware extends MiddlewareObj - ? MiddyfiedHandler // always true - : never +declare type UseFn< + TEvent = any, + TResult = any, + TErr = Error, + TContext extends LambdaContext = LambdaContext +> = >( + middlewares: TMiddleware | TMiddleware[] +) => TMiddleware extends MiddlewareObj< +infer TMiddlewareEvent, +any, +Error, +infer TMiddlewareContext +> + ? MiddyfiedHandler< + TMiddlewareEvent & TEvent, + TResult, + TErr, + TMiddlewareContext & TContext + > // always true + : never -declare type MiddlewareHandler, TContext extends LambdaContext = LambdaContext> = - THandler extends LambdaHandler // always true - ? MiddyInputHandler - : never +declare type MiddlewareHandler< + THandler extends LambdaHandler, + TContext extends LambdaContext = LambdaContext +> = THandler extends LambdaHandler // always true + ? MiddyInputHandler + : never /** * Middy factory function. Use it to wrap your existing handler to enable middlewares on it. * @param handler your original AWS Lambda function * @param plugin wraps around each middleware and handler to add custom lifecycle behaviours (e.g. to profile performance) */ -declare function middy (handler?: MiddlewareHandler, TContext>, plugin?: PluginObject): MiddyfiedHandler +declare function middy< + TEvent = unknown, + TResult = any, + TErr = Error, + TContext extends LambdaContext = LambdaContext +> ( + handler?: MiddlewareHandler, TContext>, + plugin?: PluginObject +): MiddyfiedHandler declare namespace middy { export { diff --git a/packages/core/index.js b/packages/core/index.js index 2a2782968..2899fab62 100644 --- a/packages/core/index.js +++ b/packages/core/index.js @@ -1,3 +1,6 @@ +/* global awslambda */ +import { Readable } from 'node:stream' +import { pipeline } from 'node:stream/promises' import { setTimeout } from 'node:timers/promises' const defaultLambdaHandler = () => {} @@ -5,7 +8,8 @@ const defaultPlugin = { timeoutEarlyInMillis: 5, timeoutEarlyResponse: () => { throw new Error('Timeout') - } + }, + streamifyResponse: false } const middy = (lambdaHandler = defaultLambdaHandler, plugin = {}) => { @@ -22,7 +26,7 @@ const middy = (lambdaHandler = defaultLambdaHandler, plugin = {}) => { const afterMiddlewares = [] const onErrorMiddlewares = [] - const middy = (event = {}, context = {}) => { + const middyHandler = (event = {}, context = {}) => { plugin.requestStart?.() const request = { event, @@ -41,6 +45,35 @@ const middy = (lambdaHandler = defaultLambdaHandler, plugin = {}) => { plugin ) } + const middy = plugin.streamifyResponse + ? awslambda.streamifyResponse(async (event, responseStream, context) => { + const response = await middyHandler(event, context) + response.body ??= '' + let { body } = response + + // Source @datastream/core (MIT) + if (typeof body === 'string') { + function * iterator (input) { + const size = 16 * 1024 // Node.js default + let position = 0 + const length = input.length + while (position < length) { + yield input.substring(position, position + size) + position += size + } + } + body = Readable.from(iterator(response.body)) + } + + // delete response.body // Not needed + responseStream = awslambda.HttpResponseStream.from( + responseStream, + response + ) + + await pipeline(body, responseStream) + }) + : middyHandler middy.use = (middlewares) => { if (!Array.isArray(middlewares)) { diff --git a/packages/core/package-lock.json b/packages/core/package-lock.json index fcc4e2b63..648501388 100644 --- a/packages/core/package-lock.json +++ b/packages/core/package-lock.json @@ -9,6 +9,7 @@ "version": "4.3.0", "license": "MIT", "devDependencies": { + "@datastream/core": "0.0.29", "@types/aws-lambda": "^8.10.76", "@types/node": "^18.0.0" }, @@ -20,6 +21,18 @@ "url": "https://github.com/sponsors/willfarrell" } }, + "node_modules/@datastream/core": { + "version": "0.0.29", + "resolved": "https://registry.npmjs.org/@datastream/core/-/core-0.0.29.tgz", + "integrity": "sha512-d74gAskHE0EZOjOG3ZgelKwp01n1Ygp0yQGxkZqaI1ayeRKGLoRLsoOp2qMbrcNsxFhMNLX6pjyFphS7PHsDPg==", + "dev": true, + "dependencies": { + "cloneable-readable": "3.0.0" + }, + "engines": { + "node": ">=18" + } + }, "node_modules/@types/aws-lambda": { "version": "8.10.114", "resolved": "https://registry.npmjs.org/@types/aws-lambda/-/aws-lambda-8.10.114.tgz", @@ -31,6 +44,133 @@ "resolved": "https://registry.npmjs.org/@types/node/-/node-18.15.9.tgz", "integrity": "sha512-dUxhiNzBLr6IqlZXz6e/rN2YQXlFgOei/Dxy+e3cyXTJ4txSUbGT2/fmnD6zd/75jDMeW5bDee+YXxlFKHoV0A==", "dev": true + }, + "node_modules/abort-controller": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/abort-controller/-/abort-controller-3.0.0.tgz", + "integrity": "sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg==", + "dev": true, + "dependencies": { + "event-target-shim": "^5.0.0" + }, + "engines": { + "node": ">=6.5" + } + }, + "node_modules/base64-js": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", + "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", + "dev": true, + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ] + }, + "node_modules/buffer": { + "version": "6.0.3", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz", + "integrity": "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==", + "dev": true, + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "dependencies": { + "base64-js": "^1.3.1", + "ieee754": "^1.2.1" + } + }, + "node_modules/cloneable-readable": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/cloneable-readable/-/cloneable-readable-3.0.0.tgz", + "integrity": "sha512-Lkfd9IRx1nfiBr7UHNxJSl/x7DOeUfYmxzCkxYJC2tyc/9vKgV75msgLGurGQsak/NvJDHMWcshzEXRlxfvhqg==", + "dev": true, + "dependencies": { + "readable-stream": "^4.0.0" + } + }, + "node_modules/event-target-shim": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz", + "integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==", + "dev": true, + "engines": { + "node": ">=6" + } + }, + "node_modules/events": { + "version": "3.3.0", + "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", + "integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==", + "dev": true, + "engines": { + "node": ">=0.8.x" + } + }, + "node_modules/ieee754": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", + "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", + "dev": true, + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ] + }, + "node_modules/process": { + "version": "0.11.10", + "resolved": "https://registry.npmjs.org/process/-/process-0.11.10.tgz", + "integrity": "sha512-cdGef/drWFoydD1JsMzuFf8100nZl+GT+yacc2bEced5f9Rjk4z+WtFUTBu9PhOi9j/jfmBPu0mMEY4wIdAF8A==", + "dev": true, + "engines": { + "node": ">= 0.6.0" + } + }, + "node_modules/readable-stream": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-4.3.0.tgz", + "integrity": "sha512-MuEnA0lbSi7JS8XM+WNJlWZkHAAdm7gETHdFK//Q/mChGyj2akEFtdLZh32jSdkWGbRwCW9pn6g3LWDdDeZnBQ==", + "dev": true, + "dependencies": { + "abort-controller": "^3.0.0", + "buffer": "^6.0.3", + "events": "^3.3.0", + "process": "^0.11.10" + }, + "engines": { + "node": "^12.22.0 || ^14.17.0 || >=16.0.0" + } } } } diff --git a/packages/core/package.json b/packages/core/package.json index 1589370a7..daabdfe70 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -62,6 +62,7 @@ "url": "https://github.com/sponsors/willfarrell" }, "devDependencies": { + "@datastream/core": "0.0.29", "@types/aws-lambda": "^8.10.76", "@types/node": "^18.0.0" }, diff --git a/packages/http-content-encoding/__tests__/index.js b/packages/http-content-encoding/__tests__/index.js index f21df36ad..e250e9341 100644 --- a/packages/http-content-encoding/__tests__/index.js +++ b/packages/http-content-encoding/__tests__/index.js @@ -10,7 +10,7 @@ const context = { const compressibleBody = JSON.stringify(new Array(100).fill(0)) -test('It should encode using br', async (t) => { +test('It should encode string using br', async (t) => { const body = compressibleBody const handler = middy((event, context) => ({ statusCode: 200, body })).use( httpContentEncoding() @@ -30,7 +30,7 @@ test('It should encode using br', async (t) => { }) }) -test('It should encode using gzip', async (t) => { +test('It should encode string using gzip', async (t) => { const body = compressibleBody const handler = middy((event, context) => ({ statusCode: 200, body })).use( httpContentEncoding() @@ -50,7 +50,7 @@ test('It should encode using gzip', async (t) => { }) }) -test('It should encode using deflate', async (t) => { +test('It should encode string using deflate', async (t) => { const body = compressibleBody const handler = middy((event, context) => ({ statusCode: 200, body })) handler.use(httpContentEncoding()) diff --git a/packages/http-content-encoding/index.js b/packages/http-content-encoding/index.js index 48e62d826..b818da026 100644 --- a/packages/http-content-encoding/index.js +++ b/packages/http-content-encoding/index.js @@ -63,6 +63,13 @@ const httpContentEncodingMiddleware = (opts) => { break } + // Support streamifyResponse + if (response.body?._readableState) { + request.response.headers['Content-Encoding'] = contentEncoding + request.response.body.pipe(contentEncodingStream) + return + } + const stream = Readable.from(response.body).pipe(contentEncodingStream) const chunks = [] diff --git a/website/docs/events/function-url.md b/website/docs/events/function-url.md index 7e7f2c026..e684b91f6 100644 --- a/website/docs/events/function-url.md +++ b/website/docs/events/function-url.md @@ -1,7 +1,8 @@ --- title: Function URL --- -Same as API Gateway (HTTP) + +Same as API Gateway (HTTP), but with support for response streams. ## AWS Documentation @@ -38,7 +39,8 @@ export const handler = middy({ return { statusCode: 408 } - } + }, + streamifyResponse: true }) .use(warmupMiddleware()) .use(httpEventNormalizerMiddleware()) diff --git a/website/docs/intro/06-streamify-response.md b/website/docs/intro/06-streamify-response.md new file mode 100644 index 000000000..e3709c3d4 --- /dev/null +++ b/website/docs/intro/06-streamify-response.md @@ -0,0 +1,30 @@ +--- +title: Streamify Response +position: 5 +--- + +Middy also supports streamed responses. + +> You can progressively stream response payloads through Lambda function URLs, including as an Amazon CloudFront origin, along with using the AWS SDK or using Lambda’s invoke API. You can not use Amazon API Gateway and Application Load Balancer to progressively stream response payloads, but you can use the functionality to return larger payloads. (https://aws.amazon.com/blogs/compute/introducing-aws-lambda-response-streaming/) + +1. Set `streamifyResponse: true` into middy options +2. Return using an HTTP event response with the body as a string or ReadableStream. +3. - API Gateway: If you're getting a `500` status code. Be sure to set your integration to `HTTP_PROXY` over `LAMBDA_PROXY` and enable Function URL on the lambda. + - Function URLs: If receiving no content and non-200 status code are being converted to `200`. Be sure to set `Invoke Mode` to `RESPONSE_STREAM` over `BUFFERED`. + +```javascript +import middy from '@middy/core' +import { createReadableStream } from '@datastream/core' + +export const handler = middy({ streamifyResponse: true }).handler( + (event, context) => { + return { + statusCode: 200, + headers: { + 'Content-Type': 'text/csv' + }, + body: createReadableStream('...') // or string + } + } +) +``` diff --git a/website/docs/middlewares/http-content-encoding.md b/website/docs/middlewares/http-content-encoding.md index f25bbbf49..11f42b4a1 100644 --- a/website/docs/middlewares/http-content-encoding.md +++ b/website/docs/middlewares/http-content-encoding.md @@ -13,12 +13,14 @@ npm install --save @middy/http-content-encoding ``` ## Options + - `br` (object) (default `{}`): `zlib.createBrotliCompress` [brotliOptions](https://nodejs.org/api/zlib.html#zlib_class_brotlioptions) - `gzip` (object) (default `{}`): `zlib.createGzip` [gzipOptions](https://nodejs.org/api/zlib.html#zlib_class_options) - `deflate` (object) (default `{}`): `zlib.createDeflate` [deflateOptions](https://nodejs.org/api/zlib.html#zlib_class_options) - `overridePreferredEncoding` (array[string]) (optional): Override the preferred encoding order, most browsers prefer `gzip` over `br`, even though `br` has higher compression. Default: `[]` NOTES: + - **Important** For `br` encoding NodeJS defaults to `11`. Levels `10` & `11` have been shown to have lower performance for the level of compression they apply. Testing is recommended to ensure the right balance of compression & performance. ## Sample usage @@ -27,26 +29,51 @@ NOTES: import middy from '@middy/core' import httpContentNegotiation from '@middy/http-content-negotiation' import httpContentEncoding from '@middy/http-content-encoding' -import { constants } from 'zlib' - -const handler = middy((event, context) => { - return { - statusCode: 200, - body: '{...}' - } -}) +import { constants } from 'node:zlib' -handler +export const handler = middy() .use(httpContentNegotiation()) .use(httpCompressMiddleware({ br: { params: { - [constants.BROTLI_PARAM_MODE]: constants.BROTLI_MODE_TEXT, // adjusted for UTF-8 text + [constants.BROTLI_PARAM_MODE]: constants.BROTLI_MODE_TEXT, // adjusted for UTF-8 text [constants.BROTLI_PARAM_QUALITY]: 7 } }, overridePreferredEncoding: ['br', 'gzip', 'deflate'] - })) + }) + .handler((event, context) => { + return { + statusCode: 200, + body: '{...}' + } + }) +``` + +### Using streams + +```javascript +import middy from '@middy/core' +import httpContentNegotiation from '@middy/http-content-negotiation' +import httpContentEncoding from '@middy/http-content-encoding' +import { constants } from 'node:zlib' +import { createReadableStream } from '@datastream/core' -export default { handler } +export const handler = middy({ streamifyResponse: true }) + .use(httpContentNegotiation()) + .use(httpCompressMiddleware({ + br: { + params: { + [constants.BROTLI_PARAM_MODE]: constants.BROTLI_MODE_TEXT, // adjusted for UTF-8 text + [constants.BROTLI_PARAM_QUALITY]: 7 + } + }, + overridePreferredEncoding: ['br', 'gzip', 'deflate'] + }) + .handler((event, context) => { + return { + statusCode: 200, + body: createReadableStream('{...}') + } + }) ```