Skip to content

Commit 17b2e1d

Browse files
authored
feat(core): Add partitioned table support (#410)
1 parent fd52ba9 commit 17b2e1d

File tree

10 files changed

+1862
-83
lines changed

10 files changed

+1862
-83
lines changed

docs/how-to/configure-postgres.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,28 @@ CREATE PUBLICATION all_tables FOR ALL TABLES;
116116
CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert');
117117
```
118118

119+
#### Partitioned Tables
120+
121+
If you want to replicate partitioned tables, you must use `publish_via_partition_root = true` when creating your publication. This option tells Postgres to treat the [partitioned table as a single table](https://www.postgresql.org/docs/current/sql-createpublication.html#SQL-CREATEPUBLICATION-PARAMS-WITH-PUBLISH-VIA-PARTITION-ROOT) from the replication perspective, rather than replicating each partition individually. All changes to any partition will be published as changes to the parent table:
122+
123+
```sql
124+
-- Create publication with partitioned table support
125+
CREATE PUBLICATION my_publication FOR TABLE users, orders WITH (publish_via_partition_root = true);
126+
127+
-- For all tables including partitioned tables
128+
CREATE PUBLICATION all_tables FOR ALL TABLES WITH (publish_via_partition_root = true);
129+
```
130+
131+
**Limitation:** If this option is enabled, `TRUNCATE` operations performed directly on individual partitions are not replicated. To replicate a truncate operation, you must execute it on the parent table instead:
132+
133+
```sql
134+
-- This will NOT be replicated
135+
TRUNCATE TABLE orders_2024_q1;
136+
137+
-- This WILL be replicated
138+
TRUNCATE TABLE orders;
139+
```
140+
119141
### Managing Publications
120142

121143
```sql

etl-api/src/db/publications.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ pub async fn create_publication(
4343
}
4444
}
4545

46+
// Ensure partitioned tables publish via ancestor/root schema for logical replication
47+
query.push_str(" with (publish_via_partition_root = true)");
48+
4649
pool.execute(query.as_str()).await?;
4750
Ok(())
4851
}

etl-postgres/src/tokio/test_utils.rs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,24 +46,27 @@ impl<G: GenericClient> PgDatabase<G> {
4646
self.server_version
4747
}
4848

49-
/// Creates a Postgres publication for the specified tables.
49+
/// Creates a Postgres publication for the specified tables with an optional configuration
50+
/// parameter.
5051
///
51-
/// Sets up logical replication by creating a publication that includes
52-
/// the given tables for change data capture.
53-
pub async fn create_publication(
52+
/// This method is used for specific cases which should mutate the defaults when creating a
53+
/// publication which is done only for a small subset of tests.
54+
pub async fn create_publication_with_config(
5455
&self,
5556
publication_name: &str,
5657
table_names: &[TableName],
58+
publish_via_partition_root: bool,
5759
) -> Result<(), tokio_postgres::Error> {
5860
let table_names = table_names
5961
.iter()
6062
.map(TableName::as_quoted_identifier)
6163
.collect::<Vec<_>>();
6264

6365
let create_publication_query = format!(
64-
"create publication {} for table {}",
66+
"create publication {} for table {} with (publish_via_partition_root = {})",
6567
publication_name,
66-
table_names.join(", ")
68+
table_names.join(", "),
69+
publish_via_partition_root
6770
);
6871
self.client
6972
.as_ref()
@@ -74,6 +77,16 @@ impl<G: GenericClient> PgDatabase<G> {
7477
Ok(())
7578
}
7679

80+
/// Creates a Postgres publication for the specified tables.
81+
pub async fn create_publication(
82+
&self,
83+
publication_name: &str,
84+
table_names: &[TableName],
85+
) -> Result<(), tokio_postgres::Error> {
86+
self.create_publication_with_config(publication_name, table_names, true)
87+
.await
88+
}
89+
7790
pub async fn create_publication_for_all(
7891
&self,
7992
publication_name: &str,
@@ -87,9 +100,11 @@ impl<G: GenericClient> PgDatabase<G> {
87100
// PostgreSQL 15+ supports FOR ALL TABLES IN SCHEMA syntax
88101
let create_publication_query = match schema {
89102
Some(schema_name) => format!(
90-
"create publication {publication_name} for tables in schema {schema_name}"
103+
"create publication {publication_name} for tables in schema {schema_name} with (publish_via_partition_root = true)"
104+
),
105+
None => format!(
106+
"create publication {publication_name} for all tables with (publish_via_partition_root = true)"
91107
),
92-
None => format!("create publication {publication_name} for all tables"),
93108
};
94109

95110
client.execute(&create_publication_query, &[]).await?;
@@ -115,8 +130,9 @@ impl<G: GenericClient> PgDatabase<G> {
115130
}
116131
}
117132
None => {
118-
let create_publication_query =
119-
format!("create publication {publication_name} for all tables");
133+
let create_publication_query = format!(
134+
"create publication {publication_name} for all tables with (publish_via_partition_root = true)"
135+
);
120136
client.execute(&create_publication_query, &[]).await?;
121137
}
122138
}

etl/src/pipeline.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,41 @@ where
300300
publication_table_ids.len()
301301
);
302302

303+
// Validate that the publication is configured correctly for partitioned tables.
304+
//
305+
// When `publish_via_partition_root = false`, logical replication messages contain
306+
// child partition OIDs instead of parent table OIDs. Since our schema cache only
307+
// contains parent table IDs (from `get_publication_table_ids`), relation messages
308+
// with child OIDs would cause pipeline failures.
309+
let publish_via_partition_root = replication_client
310+
.get_publish_via_partition_root(&self.config.publication_name)
311+
.await?;
312+
313+
if !publish_via_partition_root {
314+
let has_partitioned_tables = replication_client
315+
.has_partitioned_tables(&publication_table_ids)
316+
.await?;
317+
318+
if has_partitioned_tables {
319+
error!(
320+
"publication '{}' has publish_via_partition_root=false but contains partitioned table(s)",
321+
self.config.publication_name
322+
);
323+
324+
bail!(
325+
ErrorKind::ConfigError,
326+
"Invalid publication configuration for partitioned tables",
327+
format!(
328+
"The publication '{}' contains partitioned tables but has publish_via_partition_root=false. \
329+
This configuration causes replication messages to use child partition OIDs, which are not \
330+
tracked by the pipeline and will cause failures. Please recreate the publication with \
331+
publish_via_partition_root=true or use: ALTER PUBLICATION {} SET (publish_via_partition_root = true);",
332+
self.config.publication_name, self.config.publication_name
333+
)
334+
);
335+
}
336+
}
337+
303338
self.store.load_table_replication_states().await?;
304339
let table_replication_states = self.store.get_table_replication_states().await?;
305340

0 commit comments

Comments
 (0)