Skip to content

Commit

Permalink
Merge pull request #36 from darkgnotic/messages-option
Browse files Browse the repository at this point in the history
feat: "messages" pgoutput option to receive messages from pg_logical_emit_message()
  • Loading branch information
kibae authored Sep 26, 2024
2 parents 15930f2 + 70ebd43 commit 3b08f66
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/output-plugins/pgoutput/pgoutput-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export class PgoutputPlugin extends AbstractPlugin<Options> {
const options = [
`proto_version '${this.options.protoVersion}'`,
`publication_names '${this.options.publicationNames.join(',')}'`,
`messages '${this.options.messages ?? false}'`,
];

const sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn} (${options.join(', ')})`;
Expand Down
1 change: 1 addition & 0 deletions src/output-plugins/pgoutput/pgoutput.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
export interface Options {
protoVersion: 1 | 2
publicationNames: string[]
messages?: boolean
}

export type Message =
Expand Down
49 changes: 49 additions & 0 deletions src/test/decoder-pgoutput.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,55 @@ describe('pgoutput', () => {
await service.stop();
});

it('Message', async () => {
const service = new LogicalReplicationService(TestClientConfig);
const plugin = new PgoutputPlugin({ protoVersion: 1, publicationNames: [publicationName], messages: true });
const messages: Pgoutput.Message[] = [];

service.on('data', (lsn: string, log: Pgoutput.Message) => {
messages.push(log);
});

service.subscribe(plugin, slotName).catch((e) => {
console.error('Error from .subscribe', e);
});

await sleep(100);

await client.query(
//language=sql
`SELECT pg_logical_emit_message(true, 'test_prefix', 'test_content')`
);
await client.query(
//language=sql
`SELECT pg_logical_emit_message(true, 'test_prefix2', 'test_content2')`
);

await sleep(1000);

const msgs = messages.filter((msg) => msg.tag === 'message');
expect(msgs.length).toBe(2);

expect(msgs[0]).toStrictEqual({
content: Buffer.from('test_content'),
flags: 1,
messageLsn: expect.stringMatching(lsnRe),
prefix: 'test_prefix',
tag: 'message',
transactional: true,
});
expect(msgs[1]).toStrictEqual({
content: Buffer.from('test_content2'),
flags: 1,
messageLsn: expect.stringMatching(lsnRe),
prefix: 'test_prefix2',
tag: 'message',
transactional: true,
});

await service.stop();
});

it('Huge transaction', async () => {
const service = new LogicalReplicationService(TestClientConfig);
const plugin = new PgoutputPlugin({ protoVersion: 1, publicationNames: [publicationName] });
Expand Down

0 comments on commit 3b08f66

Please sign in to comment.