Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion etl/src/test_utils/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub fn assert_table_schema(
expected_columns: &[ColumnSchema],
) {
let table_schema = table_schemas.get(&table_id).unwrap();

assert_eq!(table_schema.id, table_id);
assert_eq!(table_schema.name, expected_table_name);

Expand Down
108 changes: 107 additions & 1 deletion etl/tests/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use etl::test_utils::test_schema::{
build_expected_users_inserts, get_n_integers_sum, get_users_age_sum_from_rows,
insert_mock_data, insert_users_data, setup_test_database_schema,
};
use etl::types::{EventType, PipelineId};
use etl::types::{Event, EventType, InsertEvent, PipelineId};
use etl_config::shared::BatchConfig;
use etl_postgres::below_version;
use etl_postgres::replication::slots::EtlReplicationSlot;
Expand Down Expand Up @@ -987,3 +987,109 @@ async fn table_without_primary_key_is_errored() {
let events = destination.get_events().await;
assert!(events.is_empty());
}

#[tokio::test(flavor = "multi_thread")]
async fn pipeline_respects_column_level_publication() {
init_test_tracing();
let database = spawn_source_database().await;

// Column filters in publication are only available from Postgres 15+.
if below_version!(database.server_version(), POSTGRES_15) {
eprintln!("Skipping test: PostgreSQL 15+ required for column filters");
return;
}

// Create a table with multiple columns including a sensitive 'email' column.
let table_name = test_table_name("users");
let table_id = database
.create_table(
table_name.clone(),
true,
&[
("name", "text not null"),
("age", "integer not null"),
("email", "text not null"),
],
)
.await
.unwrap();

// Create publication with only a subset of columns (excluding 'email').
let publication_name = "test_pub".to_string();
database
.run_sql(&format!(
"create publication {publication_name} for table {} (id, name, age)",
table_name.as_quoted_identifier()
))
.await
.expect("Failed to create publication with column filter");

let state_store = NotifyingStore::new();
let destination = TestDestinationWrapper::wrap(MemoryDestination::new());

let pipeline_id: PipelineId = random();
let mut pipeline = create_pipeline(
&database.config,
pipeline_id,
publication_name.clone(),
state_store.clone(),
destination.clone(),
);

// Wait for the table to finish syncing.
let sync_done_notify = state_store
.notify_on_table_state_type(table_id, TableReplicationPhaseType::SyncDone)
.await;

pipeline.start().await.unwrap();

sync_done_notify.notified().await;

// Wait for two insert events to be processed.
let insert_events_notify = destination
.wait_for_events_count(vec![(EventType::Insert, 2)])
.await;

// Insert test data with all columns (including email).
database
.run_sql(&format!(
"insert into {} (name, age, email) values ('Alice', 25, '[email protected]'), ('Bob', 30, '[email protected]')",
table_name.as_quoted_identifier()
))
.await
.unwrap();

insert_events_notify.notified().await;

pipeline.shutdown_and_wait().await.unwrap();

// Verify the events and check that only published columns are included.
let events = destination.get_events().await;
let grouped_events = group_events_by_type_and_table_id(&events);
let insert_events = grouped_events.get(&(EventType::Insert, table_id)).unwrap();
assert_eq!(insert_events.len(), 2);

// Check that each insert event contains only the published columns (id, name, age).
// Since Cell values don't include column names, we verify by checking the count.
for event in insert_events {
if let Event::Insert(InsertEvent { table_row, .. }) = event {
// Verify exactly 3 columns (id, name, age).
// If email was included, there would be 4 values.
assert_eq!(table_row.values.len(), 3);
}
}

// Also verify the stored table schema only includes published columns.
let table_schemas = state_store.get_table_schemas().await;
let stored_schema = table_schemas.get(&table_id).unwrap();
let column_names: Vec<&str> = stored_schema
.column_schemas
.iter()
.map(|c| c.name.as_str())
.collect();
assert!(column_names.contains(&"id"));
assert!(column_names.contains(&"name"));
assert!(column_names.contains(&"age"));
assert!(!column_names.contains(&"email"));
assert_eq!(stored_schema.column_schemas.len(), 3);
}
109 changes: 109 additions & 0 deletions etl/tests/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,115 @@ async fn test_table_copy_stream_respects_row_filter() {
assert_eq!(rows_count, expected_rows_count as u64);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_table_copy_stream_respects_column_filter() {
init_test_tracing();
let database = spawn_source_database().await;

// Column filters in publication are only available from Postgres 15+.
if below_version!(database.server_version(), POSTGRES_15) {
eprintln!("Skipping test: PostgreSQL 15+ required for column filters");
return;
}

// We create a table with multiple columns.
let test_table_name = test_table_name("table_1");
let test_table_id = database
.create_table(
test_table_name.clone(),
true,
&[("name", "text"), ("age", "integer"), ("email", "text")],
)
.await
.unwrap();

database
.run_sql(&format!(
"alter table {test_table_name} replica identity full"
))
.await
.unwrap();

// Create publication with only a subset of columns (excluding 'email').
let publication_name = "test_pub";
database
.run_sql(&format!(
"create publication {publication_name} for table {test_table_name} (id, name, age)"
))
.await
.unwrap();

let parent_client = PgReplicationClient::connect(database.config.clone())
.await
.unwrap();

// Insert test data with all columns.
database
.run_sql(&format!(
"insert into {test_table_name} (name, age, email) values ('Alice', 25, '[email protected]')"
))
.await
.unwrap();
database
.run_sql(&format!(
"insert into {test_table_name} (name, age, email) values ('Bob', 30, '[email protected]')"
))
.await
.unwrap();

// Create the slot when the database schema contains the test data.
let (transaction, _) = parent_client
.create_slot_with_transaction(&test_slot_name("my_slot"))
.await
.unwrap();

// Get table schema with the publication - should only include published columns.
let table_schemas = transaction
.get_table_schemas(&[test_table_id], Some(publication_name))
.await
.unwrap();
assert_table_schema(
&table_schemas,
test_table_id,
test_table_name,
&[
id_column_schema(),
ColumnSchema {
name: "name".to_string(),
typ: Type::TEXT,
modifier: -1,
nullable: true,
primary: false,
},
ColumnSchema {
name: "age".to_string(),
typ: Type::INT4,
modifier: -1,
nullable: true,
primary: false,
},
],
);

// Get table copy stream with the publication.
let stream = transaction
.get_table_copy_stream(
test_table_id,
&table_schemas[&test_table_id].column_schemas,
Some("test_pub"),
)
.await
.unwrap();

let rows_count = count_stream_rows(stream).await;

// Transaction should be committed after the copy stream is exhausted.
transaction.commit().await.unwrap();

// We expect to have 2 rows (the ones we inserted).
assert_eq!(rows_count, 2);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_table_copy_stream_no_row_filter() {
init_test_tracing();
Expand Down
Loading