Skip to content

Commit 66175c6

Browse files
authored
feat(replication): Add support for column filtering in publication (#414)
1 parent e3ba89c commit 66175c6

File tree

3 files changed

+216
-2
lines changed

3 files changed

+216
-2
lines changed

etl/src/test_utils/table.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ pub fn assert_table_schema(
1818
expected_columns: &[ColumnSchema],
1919
) {
2020
let table_schema = table_schemas.get(&table_id).unwrap();
21-
2221
assert_eq!(table_schema.id, table_id);
2322
assert_eq!(table_schema.name, expected_table_name);
2423

etl/tests/pipeline.rs

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use etl::test_utils::test_schema::{
1313
build_expected_users_inserts, get_n_integers_sum, get_users_age_sum_from_rows,
1414
insert_mock_data, insert_users_data, setup_test_database_schema,
1515
};
16-
use etl::types::{EventType, PipelineId};
16+
use etl::types::{Event, EventType, InsertEvent, PipelineId};
1717
use etl_config::shared::BatchConfig;
1818
use etl_postgres::below_version;
1919
use etl_postgres::replication::slots::EtlReplicationSlot;
@@ -987,3 +987,109 @@ async fn table_without_primary_key_is_errored() {
987987
let events = destination.get_events().await;
988988
assert!(events.is_empty());
989989
}
990+
991+
#[tokio::test(flavor = "multi_thread")]
992+
async fn pipeline_respects_column_level_publication() {
993+
init_test_tracing();
994+
let database = spawn_source_database().await;
995+
996+
// Column filters in publication are only available from Postgres 15+.
997+
if below_version!(database.server_version(), POSTGRES_15) {
998+
eprintln!("Skipping test: PostgreSQL 15+ required for column filters");
999+
return;
1000+
}
1001+
1002+
// Create a table with multiple columns including a sensitive 'email' column.
1003+
let table_name = test_table_name("users");
1004+
let table_id = database
1005+
.create_table(
1006+
table_name.clone(),
1007+
true,
1008+
&[
1009+
("name", "text not null"),
1010+
("age", "integer not null"),
1011+
("email", "text not null"),
1012+
],
1013+
)
1014+
.await
1015+
.unwrap();
1016+
1017+
// Create publication with only a subset of columns (excluding 'email').
1018+
let publication_name = "test_pub".to_string();
1019+
database
1020+
.run_sql(&format!(
1021+
"create publication {publication_name} for table {} (id, name, age)",
1022+
table_name.as_quoted_identifier()
1023+
))
1024+
.await
1025+
.expect("Failed to create publication with column filter");
1026+
1027+
let state_store = NotifyingStore::new();
1028+
let destination = TestDestinationWrapper::wrap(MemoryDestination::new());
1029+
1030+
let pipeline_id: PipelineId = random();
1031+
let mut pipeline = create_pipeline(
1032+
&database.config,
1033+
pipeline_id,
1034+
publication_name.clone(),
1035+
state_store.clone(),
1036+
destination.clone(),
1037+
);
1038+
1039+
// Wait for the table to finish syncing.
1040+
let sync_done_notify = state_store
1041+
.notify_on_table_state_type(table_id, TableReplicationPhaseType::SyncDone)
1042+
.await;
1043+
1044+
pipeline.start().await.unwrap();
1045+
1046+
sync_done_notify.notified().await;
1047+
1048+
// Wait for two insert events to be processed.
1049+
let insert_events_notify = destination
1050+
.wait_for_events_count(vec![(EventType::Insert, 2)])
1051+
.await;
1052+
1053+
// Insert test data with all columns (including email).
1054+
database
1055+
.run_sql(&format!(
1056+
"insert into {} (name, age, email) values ('Alice', 25, '[email protected]'), ('Bob', 30, '[email protected]')",
1057+
table_name.as_quoted_identifier()
1058+
))
1059+
.await
1060+
.unwrap();
1061+
1062+
insert_events_notify.notified().await;
1063+
1064+
pipeline.shutdown_and_wait().await.unwrap();
1065+
1066+
// Verify the events and check that only published columns are included.
1067+
let events = destination.get_events().await;
1068+
let grouped_events = group_events_by_type_and_table_id(&events);
1069+
let insert_events = grouped_events.get(&(EventType::Insert, table_id)).unwrap();
1070+
assert_eq!(insert_events.len(), 2);
1071+
1072+
// Check that each insert event contains only the published columns (id, name, age).
1073+
// Since Cell values don't include column names, we verify by checking the count.
1074+
for event in insert_events {
1075+
if let Event::Insert(InsertEvent { table_row, .. }) = event {
1076+
// Verify exactly 3 columns (id, name, age).
1077+
// If email was included, there would be 4 values.
1078+
assert_eq!(table_row.values.len(), 3);
1079+
}
1080+
}
1081+
1082+
// Also verify the stored table schema only includes published columns.
1083+
let table_schemas = state_store.get_table_schemas().await;
1084+
let stored_schema = table_schemas.get(&table_id).unwrap();
1085+
let column_names: Vec<&str> = stored_schema
1086+
.column_schemas
1087+
.iter()
1088+
.map(|c| c.name.as_str())
1089+
.collect();
1090+
assert!(column_names.contains(&"id"));
1091+
assert!(column_names.contains(&"name"));
1092+
assert!(column_names.contains(&"age"));
1093+
assert!(!column_names.contains(&"email"));
1094+
assert_eq!(stored_schema.column_schemas.len(), 3);
1095+
}

etl/tests/replication.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,115 @@ async fn test_table_copy_stream_respects_row_filter() {
443443
assert_eq!(rows_count, expected_rows_count as u64);
444444
}
445445

446+
#[tokio::test(flavor = "multi_thread")]
447+
async fn test_table_copy_stream_respects_column_filter() {
448+
init_test_tracing();
449+
let database = spawn_source_database().await;
450+
451+
// Column filters in publication are only available from Postgres 15+.
452+
if below_version!(database.server_version(), POSTGRES_15) {
453+
eprintln!("Skipping test: PostgreSQL 15+ required for column filters");
454+
return;
455+
}
456+
457+
// We create a table with multiple columns.
458+
let test_table_name = test_table_name("table_1");
459+
let test_table_id = database
460+
.create_table(
461+
test_table_name.clone(),
462+
true,
463+
&[("name", "text"), ("age", "integer"), ("email", "text")],
464+
)
465+
.await
466+
.unwrap();
467+
468+
database
469+
.run_sql(&format!(
470+
"alter table {test_table_name} replica identity full"
471+
))
472+
.await
473+
.unwrap();
474+
475+
// Create publication with only a subset of columns (excluding 'email').
476+
let publication_name = "test_pub";
477+
database
478+
.run_sql(&format!(
479+
"create publication {publication_name} for table {test_table_name} (id, name, age)"
480+
))
481+
.await
482+
.unwrap();
483+
484+
let parent_client = PgReplicationClient::connect(database.config.clone())
485+
.await
486+
.unwrap();
487+
488+
// Insert test data with all columns.
489+
database
490+
.run_sql(&format!(
491+
"insert into {test_table_name} (name, age, email) values ('Alice', 25, '[email protected]')"
492+
))
493+
.await
494+
.unwrap();
495+
database
496+
.run_sql(&format!(
497+
"insert into {test_table_name} (name, age, email) values ('Bob', 30, '[email protected]')"
498+
))
499+
.await
500+
.unwrap();
501+
502+
// Create the slot when the database schema contains the test data.
503+
let (transaction, _) = parent_client
504+
.create_slot_with_transaction(&test_slot_name("my_slot"))
505+
.await
506+
.unwrap();
507+
508+
// Get table schema with the publication - should only include published columns.
509+
let table_schemas = transaction
510+
.get_table_schemas(&[test_table_id], Some(publication_name))
511+
.await
512+
.unwrap();
513+
assert_table_schema(
514+
&table_schemas,
515+
test_table_id,
516+
test_table_name,
517+
&[
518+
id_column_schema(),
519+
ColumnSchema {
520+
name: "name".to_string(),
521+
typ: Type::TEXT,
522+
modifier: -1,
523+
nullable: true,
524+
primary: false,
525+
},
526+
ColumnSchema {
527+
name: "age".to_string(),
528+
typ: Type::INT4,
529+
modifier: -1,
530+
nullable: true,
531+
primary: false,
532+
},
533+
],
534+
);
535+
536+
// Get table copy stream with the publication.
537+
let stream = transaction
538+
.get_table_copy_stream(
539+
test_table_id,
540+
&table_schemas[&test_table_id].column_schemas,
541+
Some("test_pub"),
542+
)
543+
.await
544+
.unwrap();
545+
546+
let rows_count = count_stream_rows(stream).await;
547+
548+
// Transaction should be committed after the copy stream is exhausted.
549+
transaction.commit().await.unwrap();
550+
551+
// We expect to have 2 rows (the ones we inserted).
552+
assert_eq!(rows_count, 2);
553+
}
554+
446555
#[tokio::test(flavor = "multi_thread")]
447556
async fn test_table_copy_stream_no_row_filter() {
448557
init_test_tracing();

0 commit comments

Comments
 (0)