diff --git a/src/output-plugins/pgoutput/pgoutput-plugin.ts b/src/output-plugins/pgoutput/pgoutput-plugin.ts index 74505cc..5d5459b 100644 --- a/src/output-plugins/pgoutput/pgoutput-plugin.ts +++ b/src/output-plugins/pgoutput/pgoutput-plugin.ts @@ -25,6 +25,7 @@ export class PgoutputPlugin extends AbstractPlugin { const options = [ `proto_version '${this.options.protoVersion}'`, `publication_names '${this.options.publicationNames.join(',')}'`, + `messages '${this.options.messages ?? false}'`, ]; const sql = `START_REPLICATION SLOT "${slotName}" LOGICAL ${lastLsn} (${options.join(', ')})`; diff --git a/src/output-plugins/pgoutput/pgoutput.types.ts b/src/output-plugins/pgoutput/pgoutput.types.ts index d49d951..d99466e 100644 --- a/src/output-plugins/pgoutput/pgoutput.types.ts +++ b/src/output-plugins/pgoutput/pgoutput.types.ts @@ -2,6 +2,7 @@ export interface Options { protoVersion: 1 | 2 publicationNames: string[] + messages?: boolean } export type Message = diff --git a/src/test/decoder-pgoutput.spec.ts b/src/test/decoder-pgoutput.spec.ts index dab737b..b954fe6 100644 --- a/src/test/decoder-pgoutput.spec.ts +++ b/src/test/decoder-pgoutput.spec.ts @@ -267,6 +267,55 @@ describe('pgoutput', () => { await service.stop(); }); + it('Message', async () => { + const service = new LogicalReplicationService(TestClientConfig); + const plugin = new PgoutputPlugin({ protoVersion: 1, publicationNames: [publicationName], messages: true }); + const messages: Pgoutput.Message[] = []; + + service.on('data', (lsn: string, log: Pgoutput.Message) => { + messages.push(log); + }); + + service.subscribe(plugin, slotName).catch((e) => { + console.error('Error from .subscribe', e); + }); + + await sleep(100); + + await client.query( + //language=sql + `SELECT pg_logical_emit_message(true, 'test_prefix', 'test_content')` + ); + await client.query( + //language=sql + `SELECT pg_logical_emit_message(true, 'test_prefix2', 'test_content2')` + ); + + await sleep(1000); + + const msgs = messages.filter((msg) => msg.tag === 'message'); + expect(msgs.length).toBe(2); + + expect(msgs[0]).toStrictEqual({ + content: Buffer.from('test_content'), + flags: 1, + messageLsn: expect.stringMatching(lsnRe), + prefix: 'test_prefix', + tag: 'message', + transactional: true, + }); + expect(msgs[1]).toStrictEqual({ + content: Buffer.from('test_content2'), + flags: 1, + messageLsn: expect.stringMatching(lsnRe), + prefix: 'test_prefix2', + tag: 'message', + transactional: true, + }); + + await service.stop(); + }); + it('Huge transaction', async () => { const service = new LogicalReplicationService(TestClientConfig); const plugin = new PgoutputPlugin({ protoVersion: 1, publicationNames: [publicationName] });