Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

PostgreSQL Logical Replication CDC Module for Streaming Database Changes with Golang

License

Notifications You must be signed in to change notification settings

redpanda-data/pglogicalstream

Repository files navigation

PostgreSQL Logical Replication CDC Module for Go Go

This Go module builds upon github.com/jackc/pglogrepl to provide an advanced logical replication solution for PostgreSQL. It extends the capabilities of jackc/pglogrep for logical replication by introducing several key features, making it easier to implement Change Data Capture (CDC) in your Go-based applications.

Features

  • Checkpoints Storing: Efficiently manage and store replication checkpoints, facilitating better tracking and management of data changes.

  • Snapshot Streaming: Seamlessly capture and replicate snapshots of your PostgreSQL database, ensuring all data is streamed through the pipeline.

  • Table Filtering: Tailor your CDC process by selectively filtering and replicating specific tables, optimizing resource usage.

Getting Started

Follow these steps to get started with our PostgreSQL Logical Replication CDC Module for Go:

Install the module

import (
    "github.com/usedatabrew/pglogicalstream"
)

Configure your replication stream

Create config.yaml file

db_host: database host
db_password: password12345
db_user: postgres
db_port: 5432
db_name: mocks
db_schema: public
db_tables:
  - rides
replication_slot_name: morning_elephant
tls_verify: require
stream_old_data: true

Basic usage example

By defaultpglogicalstreamwill create replication slot and publication for the tables you provide in Yaml config It immediately starts streaming updates and you can receive them in the OnMessage function

package main

import (
  "fmt"
  "github.com/usedatabrew/pglogicalstream"
  "gopkg.in/yaml.v3"
  "io/ioutil"
  "log"
)

func main() {
  var config pglogicalstream.Config
  yamlFile, err := ioutil.ReadFile("./example/simple/config.yaml")
  if err != nil {
    log.Printf("yamlFile.Get err   #%v ", err)
  }

  err = yaml.Unmarshal(yamlFile, &config)
  if err != nil {
    log.Fatalf("Unmarshal: %v", err)
  }

  pgStream, err := pglogicalstream.NewPgStream(config, log.WithPrefix("pg-cdc"))
  if err != nil {
    panic(err)
  }

  pgStream.OnMessage(func(message messages.Wal2JsonChanges) {
    fmt.Println(message.Changes)
  })
}

Example with checkpointer

In order to recover after the failure, etc you have to store LSN somewhere to continue streaming the data You can implement CheckPointer interface and pass it's instance to NewPgStreamCheckPointer and your LSN will be stored automatically

checkPointer, err := NewPgStreamCheckPointer("redis.com:port", "user", "password")
if err != nil {
    log.Fatalf("Checkpointer error")
}
pgStream, err := pglogicalstream.NewPgStream(config, checkPointer)

Contributing

We welcome contributions from the Go community!