Skip to content

Commit

Permalink
PUB-90 Add support for gzip compression/decompression (#107)
Browse files Browse the repository at this point in the history
* Add pako gzip package

* topic compression options

* Message decompression

* Added examples

* docs(changeset): PUB-90 Add support for gzip compression/decompression

* Update src/interface/publishOptions.ts

* fix tests

* review fix

* Update src/message/gzip.ts

* reduce coverage threshold
  • Loading branch information
rndD authored May 15, 2023
1 parent 7ba94a0 commit 30efd65
Show file tree
Hide file tree
Showing 22 changed files with 258 additions and 26 deletions.
5 changes: 5 additions & 0 deletions .changeset/sweet-buses-sip.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@honestfoodcompany/pubsub': minor
---

PUB-90 Add support for gzip compression/decompression
21 changes: 20 additions & 1 deletion __tests__/helpers/generateMockMessage.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { PreciseDate } from '@google-cloud/precise-date';
import Pako from 'pako';

/**
* Generates a mock message for use in testing
* @param {object} message
*/
export default function generateMockMessage(message: any): any {
export function generateMockMessage(message: any): any {
return {
ackId: 'RUFeQBJMJAxESVMrQwsqWBFOBCEhPjA',
attributes: { key: 'value' },
Expand All @@ -19,3 +20,21 @@ export default function generateMockMessage(message: any): any {
},
};
}

export function generateMockCompressedMessage(message: any): any {
const data = Buffer.from(Pako.gzip(JSON.stringify(message)));

return {
ackId: 'RUFeQBJMJAxESVMrQwsqWBFOBCEhPjB',
attributes: { key: 'value' },
data,
id: '1551297743043',
orderingKey: 'ordering-key',
publishTime: new PreciseDate('2019-02-27T20:02:19.029534186Z'),
received: 1551297743043,
length: data.byteLength,
ack: () => {
// we no-op this to fit the parameters of the expected function
},
};
}
19 changes: 18 additions & 1 deletion __tests__/message.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { Message } from '@honestfoodcompany/pubsub';
import generateMockMessage from './helpers/generateMockMessage';
import {
generateMockMessage,
generateMockCompressedMessage,
} from './helpers/generateMockMessage';

describe('@Message', () => {
const message = new Message();

it('should be a class', () => {
expect(message).toBeInstanceOf(Message);
});
Expand All @@ -20,4 +24,17 @@ describe('@Message', () => {
expect(newGCloudMessage).toHaveProperty('data');
expect(newGCloudMessage).toHaveProperty('gCloudMessage');
});

it('should decompress data if it is compressed with gzip', async () => {
const msg = 'compressed test data';
const newGCloudMessage = await Message.fromGCloud(
generateMockCompressedMessage(msg),
);

const decompressSpy = jest.spyOn(newGCloudMessage, 'decompress');
const json = newGCloudMessage.toJSON();
expect(decompressSpy).toHaveBeenCalledTimes(1);

expect(json).toEqual(msg);
});
});
2 changes: 1 addition & 1 deletion __tests__/subscription.auto-load.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { SubscriptionService } from '@honestfoodcompany/pubsub';
import { SubscriberTuple } from '../src/subscriber';
import generateMockMessage from './helpers/generateMockMessage';
import { generateMockMessage } from './helpers/generateMockMessage';

const mockPubSub = jest.fn();

Expand Down
2 changes: 1 addition & 1 deletion __tests__/topic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ describe('topics', (): void => {
expect.not.objectContaining({
_timestamp: expect.stringContaining(':'),
}),
undefined,
{},
);
});

Expand Down
9 changes: 8 additions & 1 deletion docs/publishing/Topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export interface Payload extends BasePayload {
export default class SimpleTopic extends Topic<Payload> {
static readonly topicName = 'simple.topic.name';
}

```

:::tip
Expand All @@ -35,3 +34,11 @@ import SimpleTopic from 'PUBSUB_ROOT_DIR/topics/simple.topic.name';

new SimpleTopic().publish({ id: 1, data: 'My first message' });
```

### Compression

Framework supports gzip compression on publish. It can be enabled in Topic options.

With option enabled, message data will compressed before publish to PubSub.

See [Compression Topic example](https://github.com/deliveryhero/hfc-pubsub/tree/main/examples/typescript/topics/compression.topic.ts), [Messages decompression](../subscribing/Messages_compression#decompression).
40 changes: 40 additions & 0 deletions docs/subscribing/Messages Compression.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
---
id: Messages_compression
title: Messages compression
sidebar_position: 1
---

Framework supports gzip compression/decompression for messages.

It uses [zlib port to js](https://github.com/nodeca/pako) for gzip and ungzip messages.

## Compression

[Topic compression option](../publishing/topics#compression)

## Decompression

`message.toJSON()` automatically checks if payload is gzip compressed and decompress it.

Also, framework exports `isGzipCompressed(data: Buffer)` function to check if data payload is compressed.

## Examples

```ts title="/pubsub/subscriptions/compression.topic.example.sub.ts"
import { SubscriberObject, isGzipCompressed } from '@honestfoodcompany/pubsub';

type Payload = any;

const subscriber: SubscriberObject<Payload> = {
topicName: 'compression.topic',
subscriptionName: 'compression.topic.console-log.subscription',

handleMessage: (message) => {
console.log('is compressed', isGzipCompressed(message.data)); // true if payload is compressed
console.log(message.toJSON()); // automatically decompress payload if it's compressed
message.ack();
},
};

export default subscriber;
```
2 changes: 1 addition & 1 deletion docs/subscribing/Subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Files ending in `.sub.js` or `.sub.ts` in `PUBSUB_ROOT_DIR/subscriptions` will b
### Typescript subscription example

```ts title="/pubsub/subscriptions/simple.topic.name.console-log.sub.ts"
import { SubscriberObject } from "@honestfoodcompany/pubsub";
import { SubscriberObject } from "@honestfoodcompany/pubsub";
import { Payload } from '../topics/test-topic';

export default: SubscriberObject<Payload> = {
Expand Down
13 changes: 13 additions & 0 deletions examples/typescript/scripts/compress-publish.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import CompressionTopic from '../topics/compression.topic';

// Run with: npx ts-node -r tsconfig-paths/register examples/typescript/scripts/compress-publish.ts

const main = async () => {
const topic = new CompressionTopic();
const messageId = await topic.publish({ testPayload: 'test' });
console.log('messageId', messageId);
};

if (require.main === module) {
void main();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { SubscriberObject, isGzipCompressed } from '@honestfoodcompany/pubsub';
interface Payload {
testPayload: string;
}

const subscriber: SubscriberObject<Payload> = {
topicName: 'compression.topic',
subscriptionName: 'compression.topic.console-log.subscription',
description: 'Will console log messages published on compression.topic',

handleMessage: (message) => {
console.log('is compressed', isGzipCompressed(message.data));

console.log(message.toJSON(), typeof message.toJSON());
const x = message.toJSON();
console.log(x?.testPayload);
message.ack();
},
};

export default subscriber;
20 changes: 20 additions & 0 deletions examples/typescript/topics/compression.topic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {
Topic,
Payload as BasePayload,
TopicOptions,
} from '@honestfoodcompany/pubsub';

/**
* Defining a topic and payload:
*/
export interface Payload extends BasePayload {
testPayload: any;
}

export default class CompressionTopic extends Topic<Payload> {
static readonly topicName = 'compression.topic';

public options: TopicOptions = {
enableGZipCompression: true,
};
}
2 changes: 1 addition & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ module.exports = {
coverageThreshold: {
global: {
statements: 60,
branches: 55,
branches: 50,
functions: 60,
lines: 60,
},
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"cli-table": "0.3.6",
"dotenv": "10.0.0",
"find-config": "1.0.0",
"pako": "2.1.0",
"wrap-ansi": "7.0.0",
"yargs": "17.2.1"
},
Expand All @@ -55,6 +56,7 @@
"@types/cli-table": "^0.3.0",
"@types/jest": "^27.0.2",
"@types/node": "^16",
"@types/pako": "^2",
"@types/react": "^17.0.34",
"@types/wrap-ansi": "^8.0.1",
"@types/yargs": "^17.0.5",
Expand Down Expand Up @@ -108,4 +110,4 @@
}
},
"packageManager": "[email protected]"
}
}
14 changes: 13 additions & 1 deletion src/client/eventBus.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import EventEmitter from 'events';
import Pako from 'pako';
import { PublishOptions } from '../interface/publishOptions';
import {
AllSubscriptions,
IsOpenTuple,
Expand Down Expand Up @@ -27,8 +29,14 @@ export default class EventBus extends EventEmitter implements PubSubClientV2 {
public async publish<T extends TopicProperties>(
topic: T,
message: Record<string, unknown>,
options?: PublishOptions,
): Promise<string> {
EventBus.getInstance().emit(topic.topicName, message);
let msg: Record<string, unknown> | Uint8Array = message;
if (options?.enableGZipCompression) {
msg = this.compressMessage(message);
}

EventBus.getInstance().emit(topic.topicName, msg);
return 'done';
}

Expand All @@ -53,4 +61,8 @@ export default class EventBus extends EventEmitter implements PubSubClientV2 {
'This feature is not available with the synchronous driver',
);
}

public compressMessage(message: Record<string, unknown>): Uint8Array {
return Pako.gzip(JSON.stringify(message));
}
}
14 changes: 14 additions & 0 deletions src/client/googlePubSub/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
import Bluebird from 'bluebird';
import chalk from 'chalk';

import Pako from 'pako';
import { TopicProperties } from '../../topic';
import { PublishOptions } from '../../interface/publishOptions';
import {
Expand Down Expand Up @@ -78,6 +79,15 @@ export default class GooglePubSubAdapter implements PubSubClientV2 {
const pubSubTopic = await this.createOrGetTopic(topic.topicName, {
project: topic.project,
});

if (options?.enableGZipCompression) {
const compressedMsg = Buffer.from(this.compressMessage(message));
const [messageId] = await pubSubTopic.publishMessage({
data: compressedMsg,
attributes: options?.attributes,
});
return messageId;
}
// FIXME: PUB-49 retryConfig not being considered, see https://github.com/googleapis/nodejs-pubsub/blob/master/samples/publishWithRetrySettings.js for how to use it
const messageId = await pubSubTopic.publishJSON(
message,
Expand Down Expand Up @@ -460,4 +470,8 @@ export default class GooglePubSubAdapter implements PubSubClientV2 {

return subscriptions.flat();
}

public compressMessage(message: Record<string, unknown>): Uint8Array {
return Pako.gzip(JSON.stringify(message));
}
}
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import PubSubService from './service/pubsub';
import { setLogger } from './service/logger';
import Message from './message';

import { isGzipCompressed } from './message/gzip';

export {
SubscriberObject,
SubscriberMetadata,
Expand All @@ -21,4 +23,5 @@ export {
PubSubService,
SubscriptionService,
setLogger,
isGzipCompressed,
};
4 changes: 3 additions & 1 deletion src/interface/pubSubClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ export type IsOpenTuple = [string, boolean];
export interface PubSubClientV2 {
publish<T extends TopicProperties>(
topic: T,
message: Record<string, unknown>,
message: Record<string, unknown> | string,
options?: PublishOptions,
): Promise<string>;
subscribe(subscriber: SubscriberTuple): void;
close(subscriber: SubscriberTuple): void;
getAllSubscriptions(): Promise<AllSubscriptions[]>;
getAllSubscriptionsState(): IsOpenTuple[];

compressMessage(message: Record<string, unknown>): Uint8Array;
//@todo: getAllTopics(): Promise<string[]>;
}
10 changes: 10 additions & 0 deletions src/interface/publishOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ export interface RetryConfig {
export interface TopicPublishOptions {
attributes?: Attributes;
retryConfig?: RecursivePartial<RetryConfig>;

/**
* If true msg will be gzip compressed before being published
*/
enableGZipCompression?: boolean;
}

/**
Expand All @@ -58,6 +63,11 @@ export interface TopicPublishOptions {
export interface PublishOptions {
retryConfig?: RetryConfig;
attributes?: Attributes;

/**
* If true msg will be gzip compressed before being published
*/
enableGZipCompression?: boolean;
}

export interface BackoffSettings {
Expand Down
11 changes: 11 additions & 0 deletions src/message/gzip.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* gzip files contain a 10-byte header, containing a:
* - magic number (1f 8b)
* - the compression method (08 for DEFLATE)
* - 1-byte of header flags, a 4-byte timestamp, compression flags and the operating system ID.
*
* @see https://en.wikipedia.org/wiki/Gzip#File_format
*/
export const isGzipCompressed = (buf: Buffer): boolean => {
return buf[0] === 0x1f && buf[1] === 0x8b && buf[2] === 0x08;
};
Loading

0 comments on commit 30efd65

Please sign in to comment.