Skip to content

Commit e3ba89c

Browse files
authored
feat(version): Improve Postgres version handling (#413)
1 parent 17b2e1d commit e3ba89c

File tree

9 files changed

+190
-42
lines changed

9 files changed

+190
-42
lines changed

README.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,22 @@ ETL is a Rust framework by [Supabase](https://supabase.com) for building high‑
4242

4343
## Highlights
4444

45-
- 🚀 Real‑time replication: stream changes as they happen
46-
- ⚡ High performance: batching and parallel workers
47-
- 🛡️ Fault tolerant: retries and recovery built in
48-
- 🔧 Extensible: implement custom stores and destinations
49-
- 🧭 Typed, ergonomic Rust API
45+
- **Real‑time replication**: stream changes in real time to your own destinations.
46+
- **High performance**: configurable batching and parallelism to maximize throughput.
47+
- **Fault-tolerant**: robust error handling and retry logic built-in.
48+
- **Extensible**: implement your own custom destinations and state/schema stores.
49+
- **Rust native**: typed and ergonomic Rust API.
50+
51+
## Requirements
52+
53+
**PostgreSQL Version:** ETL officially supports and tests against **PostgreSQL 14, 15, 16, and 17**.
54+
55+
- **PostgreSQL 15+** is recommended for access to advanced publication features including:
56+
- Column-level filtering
57+
- Row-level filtering with `WHERE` clauses
58+
- `FOR ALL TABLES IN SCHEMA` syntax
59+
60+
For detailed configuration instructions, see the [Configure Postgres documentation](https://supabase.github.io/etl/how-to/configure-postgres/).
5061

5162
## Get Started
5263

docs/how-to/configure-postgres.md

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ This guide covers the essential Postgres concepts and configuration needed for l
66

77
## Prerequisites
88

9-
- Postgres 10 or later
9+
- **PostgreSQL 14, 15, 16, or 17** (officially supported and tested versions)
10+
- PostgreSQL 15+ is recommended for advanced publication filtering features (column-level and row-level filters, `FOR ALL TABLES IN SCHEMA` syntax)
11+
- PostgreSQL 14 is supported but has limited publication filtering capabilities
1012
- Superuser access to the Postgres server
1113
- Ability to restart Postgres server (for configuration changes)
1214

@@ -157,6 +159,44 @@ ALTER PUBLICATION my_publication DROP TABLE products;
157159
DROP PUBLICATION my_publication;
158160
```
159161

162+
## Version-Specific Features
163+
164+
ETL supports PostgreSQL versions 14 through 17, with enhanced features available in newer versions:
165+
166+
### PostgreSQL 15+ Features
167+
168+
**Column-Level Filtering:**
169+
```sql
170+
-- Replicate only specific columns from a table
171+
CREATE PUBLICATION user_basics FOR TABLE users (id, email, created_at);
172+
```
173+
174+
**Row-Level Filtering:**
175+
```sql
176+
-- Replicate only rows that match a condition
177+
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');
178+
```
179+
180+
**Schema-Level Publications:**
181+
```sql
182+
-- Replicate all tables in a schema
183+
CREATE PUBLICATION schema_pub FOR ALL TABLES IN SCHEMA public;
184+
```
185+
186+
### PostgreSQL 14 Limitations
187+
188+
PostgreSQL 14 supports table-level publication filtering only. Column-level and row-level filters are not available. When using PostgreSQL 14, you'll need to filter data at the application level if selective replication is required.
189+
190+
### Feature Compatibility Matrix
191+
192+
| Feature | PostgreSQL 14 | PostgreSQL 15+ |
193+
|---------|--------------|----------------|
194+
| Table-level publication |||
195+
| Column-level filtering |||
196+
| Row-level filtering |||
197+
| `FOR ALL TABLES IN SCHEMA` |||
198+
| Partitioned table support |||
199+
160200
## Complete Configuration Example
161201

162202
Here's a minimal `Postgres.conf` setup:

etl-postgres/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ pub mod sqlx;
1010
#[cfg(feature = "tokio")]
1111
pub mod tokio;
1212
pub mod types;
13+
pub mod version;

etl-postgres/src/tokio/test_utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use tokio_postgres::{Client, GenericClient, NoTls, Transaction};
77
use tracing::info;
88

99
use crate::replication::extract_server_version;
10+
use crate::requires_version;
1011
use crate::types::{ColumnSchema, TableId, TableName};
12+
use crate::version::POSTGRES_15;
1113

1214
/// Table modification operations for ALTER TABLE statements.
1315
pub enum TableModification<'a> {
@@ -94,9 +96,7 @@ impl<G: GenericClient> PgDatabase<G> {
9496
) -> Result<(), tokio_postgres::Error> {
9597
let client = self.client.as_ref().unwrap();
9698

97-
if let Some(server_version) = self.server_version
98-
&& server_version.get() >= 150000
99-
{
99+
if requires_version!(self.server_version, POSTGRES_15) {
100100
// PostgreSQL 15+ supports FOR ALL TABLES IN SCHEMA syntax
101101
let create_publication_query = match schema {
102102
Some(schema_name) => format!(

etl-postgres/src/version.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
//! PostgreSQL version constants and utilities.
2+
//!
3+
//! This module provides version constants for supported PostgreSQL versions and macros
4+
//! for ergonomic version comparison. Version numbers follow PostgreSQL's internal format:
5+
//! `MAJOR * 10000 + MINOR * 100 + PATCH`.
6+
//!
7+
//! # Supported Versions
8+
//!
9+
//! ETL officially supports PostgreSQL versions 14, 15, 16, and 17.
10+
11+
use std::num::NonZeroI32;
12+
13+
pub const POSTGRES_14: i32 = 140000;
14+
pub const POSTGRES_15: i32 = 150000;
15+
pub const POSTGRES_16: i32 = 160000;
16+
pub const POSTGRES_17: i32 = 170000;
17+
18+
/// Returns [`true`] if the server version meets or exceeds the required version.
19+
///
20+
/// This function handles [`None`] server versions by returning [`false`], making it
21+
/// safe to use in contexts where version information might not be available.
22+
pub fn meets_version(server_version: Option<NonZeroI32>, required_version: i32) -> bool {
23+
server_version.is_some_and(|v| v.get() >= required_version)
24+
}
25+
26+
/// Checks if the server version meets or exceeds the required version.
27+
///
28+
/// This macro provides ergonomic version checking by accepting various input types
29+
/// for the server version (Option<NonZeroI32>, NonZeroI32, i32) and comparing against
30+
/// version constants.
31+
#[macro_export]
32+
macro_rules! requires_version {
33+
($server_version:expr, $required:expr) => {
34+
$crate::version::meets_version($server_version, $required)
35+
};
36+
}
37+
38+
/// Checks if the server version is below the specified version.
39+
///
40+
/// This macro is useful for conditional logic when features are not available
41+
/// in older PostgreSQL versions.
42+
#[macro_export]
43+
macro_rules! below_version {
44+
($server_version:expr, $required:expr) => {
45+
!$crate::version::meets_version($server_version, $required)
46+
};
47+
}
48+
49+
#[cfg(test)]
50+
mod tests {
51+
use super::*;
52+
53+
#[test]
54+
fn test_version_constants() {
55+
assert_eq!(POSTGRES_14, 140000);
56+
assert_eq!(POSTGRES_15, 150000);
57+
assert_eq!(POSTGRES_16, 160000);
58+
assert_eq!(POSTGRES_17, 170000);
59+
}
60+
61+
#[test]
62+
fn test_meets_version_with_some() {
63+
let version = NonZeroI32::new(150500);
64+
assert!(meets_version(version, POSTGRES_14));
65+
assert!(meets_version(version, POSTGRES_15));
66+
assert!(!meets_version(version, POSTGRES_16));
67+
assert!(!meets_version(version, POSTGRES_17));
68+
}
69+
70+
#[test]
71+
fn test_meets_version_with_none() {
72+
assert!(!meets_version(None, POSTGRES_14));
73+
assert!(!meets_version(None, POSTGRES_15));
74+
assert!(!meets_version(None, POSTGRES_16));
75+
}
76+
77+
#[test]
78+
fn test_meets_version_exact_match() {
79+
let version = NonZeroI32::new(POSTGRES_15);
80+
assert!(meets_version(version, POSTGRES_15));
81+
}
82+
83+
#[test]
84+
fn test_requires_version_macro() {
85+
let version = NonZeroI32::new(160200);
86+
assert!(requires_version!(version, POSTGRES_14));
87+
assert!(requires_version!(version, POSTGRES_15));
88+
assert!(requires_version!(version, POSTGRES_16));
89+
assert!(!requires_version!(version, POSTGRES_17));
90+
}
91+
92+
#[test]
93+
fn test_below_version_macro() {
94+
let version = NonZeroI32::new(140800);
95+
assert!(!below_version!(version, POSTGRES_14));
96+
assert!(below_version!(version, POSTGRES_15));
97+
assert!(below_version!(version, POSTGRES_16));
98+
assert!(below_version!(version, POSTGRES_17));
99+
}
100+
101+
#[test]
102+
fn test_requires_version_with_none() {
103+
let version: Option<NonZeroI32> = None;
104+
assert!(!requires_version!(version, POSTGRES_14));
105+
}
106+
}

etl/src/replication/client.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use etl_config::shared::{IntoConnectOptions, PgConnectionConfig};
55
use etl_postgres::replication::extract_server_version;
66
use etl_postgres::types::convert_type_oid_to_type;
77
use etl_postgres::types::{ColumnSchema, TableId, TableName, TableSchema};
8+
use etl_postgres::version::POSTGRES_15;
9+
use etl_postgres::{below_version, requires_version};
810
use pg_escape::{quote_identifier, quote_literal};
911
use postgres_replication::LogicalReplicationStream;
1012
use rustls::ClientConfig;
@@ -740,9 +742,7 @@ impl PgReplicationClient {
740742
};
741743

742744
// Postgres 15+ supports column-level filtering via prattrs
743-
if let Some(server_version) = self.server_version
744-
&& server_version.get() >= 150000
745-
{
745+
if requires_version!(self.server_version, POSTGRES_15) {
746746
return PublicationFilter {
747747
ctes: format!(
748748
"pub_info as (
@@ -899,9 +899,7 @@ impl PgReplicationClient {
899899
publication_name: Option<&str>,
900900
) -> EtlResult<Option<String>> {
901901
// Row filters on publications were added in Postgres 15. For any earlier versions we know that there is no row filter
902-
if let Some(server_version) = self.server_version
903-
&& server_version.get() < 150000
904-
{
902+
if below_version!(self.server_version, POSTGRES_15) {
905903
return Ok(None);
906904
}
907905
// If we don't have a publication the row filter is implicitly non-existent

etl/tests/pipeline.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ use etl::test_utils::test_schema::{
1515
};
1616
use etl::types::{EventType, PipelineId};
1717
use etl_config::shared::BatchConfig;
18+
use etl_postgres::below_version;
1819
use etl_postgres::replication::slots::EtlReplicationSlot;
1920
use etl_postgres::tokio::test_utils::TableModification;
21+
use etl_postgres::version::POSTGRES_15;
2022
use etl_telemetry::tracing::init_test_tracing;
2123
use rand::random;
2224
use std::time::Duration;
@@ -148,12 +150,8 @@ async fn publication_changes_are_correctly_handled() {
148150

149151
let database = spawn_source_database().await;
150152

151-
if let Some(server_version) = database.server_version()
152-
&& server_version.get() <= 150000
153-
{
154-
println!(
155-
"Skipping test for PostgreSQL version <= 15, CREATE PUBLICATION FOR TABLES IN SCHEMA is not supported"
156-
);
153+
if below_version!(database.server_version(), POSTGRES_15) {
154+
eprintln!("Skipping test: PostgreSQL 15+ required for FOR TABLES IN SCHEMA");
157155
return;
158156
}
159157

@@ -309,12 +307,8 @@ async fn publication_for_all_tables_in_schema_ignores_new_tables_until_restart()
309307

310308
let database = spawn_source_database().await;
311309

312-
if let Some(server_version) = database.server_version()
313-
&& server_version.get() <= 150000
314-
{
315-
println!(
316-
"Skipping test for PostgreSQL version <= 15, CREATE PUBLICATION FOR TABLES IN SCHEMA is not supported"
317-
);
310+
if below_version!(database.server_version(), POSTGRES_15) {
311+
eprintln!("Skipping test: PostgreSQL 15+ required for FOR TABLES IN SCHEMA");
318312
return;
319313
}
320314

etl/tests/pipeline_with_partitioned_table.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use etl::test_utils::test_schema::create_partitioned_table;
1212
use etl::types::EventType;
1313
use etl::types::PipelineId;
1414
use etl::types::TableId;
15+
use etl_postgres::below_version;
16+
use etl_postgres::version::POSTGRES_15;
1517
use etl_telemetry::tracing::init_test_tracing;
1618
use rand::random;
1719
use tokio_postgres::types::Type;
@@ -861,11 +863,9 @@ async fn partition_detach_with_schema_publication_does_not_replicate_detached_in
861863
let database = spawn_source_database().await;
862864

863865
// Skip test if PostgreSQL version is < 15 (FOR TABLES IN SCHEMA requires 15+).
864-
if let Some(version) = database.server_version() {
865-
if version.get() < 150000 {
866-
eprintln!("Skipping test: PostgreSQL 15+ required for FOR TABLES IN SCHEMA");
867-
return;
868-
}
866+
if below_version!(database.server_version(), POSTGRES_15) {
867+
eprintln!("Skipping test: PostgreSQL 15+ required for FOR TABLES IN SCHEMA");
868+
return;
869869
}
870870

871871
let table_name = test_table_name("partitioned_events_schema_detach");
@@ -1003,11 +1003,9 @@ async fn partition_detach_with_schema_publication_does_replicate_detached_insert
10031003
let database = spawn_source_database().await;
10041004

10051005
// Skip test if PostgreSQL version is < 15 (FOR TABLES IN SCHEMA requires 15+).
1006-
if let Some(version) = database.server_version() {
1007-
if version.get() < 150000 {
1008-
eprintln!("Skipping test: PostgreSQL 15+ required for FOR TABLES IN SCHEMA");
1009-
return;
1010-
}
1006+
if below_version!(database.server_version(), POSTGRES_15) {
1007+
eprintln!("Skipping test: PostgreSQL 15+ required for FOR TABLES IN SCHEMA");
1008+
return;
10111009
}
10121010

10131011
let table_name = test_table_name("partitioned_events_schema_restart");

etl/tests/replication.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ use etl::test_utils::database::{spawn_source_database, test_table_name};
88
use etl::test_utils::pipeline::test_slot_name;
99
use etl::test_utils::table::assert_table_schema;
1010
use etl::test_utils::test_schema::create_partitioned_table;
11+
use etl_postgres::below_version;
1112
use etl_postgres::tokio::test_utils::{TableModification, id_column_schema};
1213
use etl_postgres::types::ColumnSchema;
14+
use etl_postgres::version::POSTGRES_15;
1315
use etl_telemetry::tracing::init_test_tracing;
1416
use futures::StreamExt;
1517
use postgres_replication::LogicalReplicationStream;
@@ -370,11 +372,9 @@ async fn test_table_copy_stream_respects_row_filter() {
370372
init_test_tracing();
371373
let database = spawn_source_database().await;
372374

373-
// Row filters in publication are only available from Postgres 15 and onwards.
374-
// As such, we skip the test for versions earlier than 15.
375-
if let Some(server_version) = database.server_version()
376-
&& server_version.get() < 150000
377-
{
375+
// Row filters in publication are only available from Postgres 15+;
376+
if below_version!(database.server_version(), POSTGRES_15) {
377+
eprintln!("Skipping test: PostgreSQL 15+ required for row filters");
378378
return;
379379
}
380380
// We create a table and insert one row.

0 commit comments

Comments
 (0)