diff --git a/etl/src/test_utils/table.rs b/etl/src/test_utils/table.rs index a2c6d46a..15e8b05d 100644 --- a/etl/src/test_utils/table.rs +++ b/etl/src/test_utils/table.rs @@ -18,7 +18,6 @@ pub fn assert_table_schema( expected_columns: &[ColumnSchema], ) { let table_schema = table_schemas.get(&table_id).unwrap(); - assert_eq!(table_schema.id, table_id); assert_eq!(table_schema.name, expected_table_name); diff --git a/etl/tests/pipeline.rs b/etl/tests/pipeline.rs index cb8d975a..ff6605e9 100644 --- a/etl/tests/pipeline.rs +++ b/etl/tests/pipeline.rs @@ -13,7 +13,7 @@ use etl::test_utils::test_schema::{ build_expected_users_inserts, get_n_integers_sum, get_users_age_sum_from_rows, insert_mock_data, insert_users_data, setup_test_database_schema, }; -use etl::types::{EventType, PipelineId}; +use etl::types::{Event, EventType, InsertEvent, PipelineId}; use etl_config::shared::BatchConfig; use etl_postgres::below_version; use etl_postgres::replication::slots::EtlReplicationSlot; @@ -987,3 +987,109 @@ async fn table_without_primary_key_is_errored() { let events = destination.get_events().await; assert!(events.is_empty()); } + +#[tokio::test(flavor = "multi_thread")] +async fn pipeline_respects_column_level_publication() { + init_test_tracing(); + let database = spawn_source_database().await; + + // Column filters in publication are only available from Postgres 15+. + if below_version!(database.server_version(), POSTGRES_15) { + eprintln!("Skipping test: PostgreSQL 15+ required for column filters"); + return; + } + + // Create a table with multiple columns including a sensitive 'email' column. + let table_name = test_table_name("users"); + let table_id = database + .create_table( + table_name.clone(), + true, + &[ + ("name", "text not null"), + ("age", "integer not null"), + ("email", "text not null"), + ], + ) + .await + .unwrap(); + + // Create publication with only a subset of columns (excluding 'email'). + let publication_name = "test_pub".to_string(); + database + .run_sql(&format!( + "create publication {publication_name} for table {} (id, name, age)", + table_name.as_quoted_identifier() + )) + .await + .expect("Failed to create publication with column filter"); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name.clone(), + state_store.clone(), + destination.clone(), + ); + + // Wait for the table to finish syncing. + let sync_done_notify = state_store + .notify_on_table_state_type(table_id, TableReplicationPhaseType::SyncDone) + .await; + + pipeline.start().await.unwrap(); + + sync_done_notify.notified().await; + + // Wait for two insert events to be processed. + let insert_events_notify = destination + .wait_for_events_count(vec![(EventType::Insert, 2)]) + .await; + + // Insert test data with all columns (including email). + database + .run_sql(&format!( + "insert into {} (name, age, email) values ('Alice', 25, 'alice@example.com'), ('Bob', 30, 'bob@example.com')", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + insert_events_notify.notified().await; + + pipeline.shutdown_and_wait().await.unwrap(); + + // Verify the events and check that only published columns are included. + let events = destination.get_events().await; + let grouped_events = group_events_by_type_and_table_id(&events); + let insert_events = grouped_events.get(&(EventType::Insert, table_id)).unwrap(); + assert_eq!(insert_events.len(), 2); + + // Check that each insert event contains only the published columns (id, name, age). + // Since Cell values don't include column names, we verify by checking the count. + for event in insert_events { + if let Event::Insert(InsertEvent { table_row, .. }) = event { + // Verify exactly 3 columns (id, name, age). + // If email was included, there would be 4 values. + assert_eq!(table_row.values.len(), 3); + } + } + + // Also verify the stored table schema only includes published columns. + let table_schemas = state_store.get_table_schemas().await; + let stored_schema = table_schemas.get(&table_id).unwrap(); + let column_names: Vec<&str> = stored_schema + .column_schemas + .iter() + .map(|c| c.name.as_str()) + .collect(); + assert!(column_names.contains(&"id")); + assert!(column_names.contains(&"name")); + assert!(column_names.contains(&"age")); + assert!(!column_names.contains(&"email")); + assert_eq!(stored_schema.column_schemas.len(), 3); +} diff --git a/etl/tests/replication.rs b/etl/tests/replication.rs index 00694bc1..806ca818 100644 --- a/etl/tests/replication.rs +++ b/etl/tests/replication.rs @@ -443,6 +443,115 @@ async fn test_table_copy_stream_respects_row_filter() { assert_eq!(rows_count, expected_rows_count as u64); } +#[tokio::test(flavor = "multi_thread")] +async fn test_table_copy_stream_respects_column_filter() { + init_test_tracing(); + let database = spawn_source_database().await; + + // Column filters in publication are only available from Postgres 15+. + if below_version!(database.server_version(), POSTGRES_15) { + eprintln!("Skipping test: PostgreSQL 15+ required for column filters"); + return; + } + + // We create a table with multiple columns. + let test_table_name = test_table_name("table_1"); + let test_table_id = database + .create_table( + test_table_name.clone(), + true, + &[("name", "text"), ("age", "integer"), ("email", "text")], + ) + .await + .unwrap(); + + database + .run_sql(&format!( + "alter table {test_table_name} replica identity full" + )) + .await + .unwrap(); + + // Create publication with only a subset of columns (excluding 'email'). + let publication_name = "test_pub"; + database + .run_sql(&format!( + "create publication {publication_name} for table {test_table_name} (id, name, age)" + )) + .await + .unwrap(); + + let parent_client = PgReplicationClient::connect(database.config.clone()) + .await + .unwrap(); + + // Insert test data with all columns. + database + .run_sql(&format!( + "insert into {test_table_name} (name, age, email) values ('Alice', 25, 'alice@example.com')" + )) + .await + .unwrap(); + database + .run_sql(&format!( + "insert into {test_table_name} (name, age, email) values ('Bob', 30, 'bob@example.com')" + )) + .await + .unwrap(); + + // Create the slot when the database schema contains the test data. + let (transaction, _) = parent_client + .create_slot_with_transaction(&test_slot_name("my_slot")) + .await + .unwrap(); + + // Get table schema with the publication - should only include published columns. + let table_schemas = transaction + .get_table_schemas(&[test_table_id], Some(publication_name)) + .await + .unwrap(); + assert_table_schema( + &table_schemas, + test_table_id, + test_table_name, + &[ + id_column_schema(), + ColumnSchema { + name: "name".to_string(), + typ: Type::TEXT, + modifier: -1, + nullable: true, + primary: false, + }, + ColumnSchema { + name: "age".to_string(), + typ: Type::INT4, + modifier: -1, + nullable: true, + primary: false, + }, + ], + ); + + // Get table copy stream with the publication. + let stream = transaction + .get_table_copy_stream( + test_table_id, + &table_schemas[&test_table_id].column_schemas, + Some("test_pub"), + ) + .await + .unwrap(); + + let rows_count = count_stream_rows(stream).await; + + // Transaction should be committed after the copy stream is exhausted. + transaction.commit().await.unwrap(); + + // We expect to have 2 rows (the ones we inserted). + assert_eq!(rows_count, 2); +} + #[tokio::test(flavor = "multi_thread")] async fn test_table_copy_stream_no_row_filter() { init_test_tracing();