Skip to content

Conversation

anuunchin
Copy link
Contributor

@anuunchin anuunchin commented Jul 21, 2025

Description

This PR adds a new pipeline function that syncs the dlt schema with the destination (not vice versa) by removing a column from the schema if that column has been manually deleted in the destination.

The motivation behind this is that rather than offering a cli command that drops columns - where we need to have separate drop_columns functions due to dialect differences and thus adds additional maintenance overhead - we delegate the dropping part to the user and instead allow them to sync the dlt schema in those scenarios.

Related PRs:

#2754

Further:

This should be extended to table drop syncs as well.

Note:

This is essentially solving the problem when the user manually drops things in the destination and the dlt pipeline breaks.

Copy link

netlify bot commented Jul 21, 2025

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit d2ad80e
🔍 Latest deploy log https://app.netlify.com/projects/dlt-hub-docs/deploys/68bac554550c830008d938d7

@anuunchin anuunchin force-pushed the feat/1153-drop-column-sync branch 2 times, most recently from c280794 to c37c422 Compare July 21, 2025 07:56
"An incremental field is being removed from schema."
"You should unset the"
" incremental with `incremental=dlt.sources.incremental.EMPTY`"
)
Copy link
Contributor Author

@anuunchin anuunchin Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized the incremental setting is also saved outside of the schema, so when the user

  • manually drops a column from destination with an incremental setting
  • syncs the schema destructively

They will need to also unset the incremental with apply_hints(incremental=dlt.sources.incremental.EMPTY)

def _get_actual_columns(self, table_name: str) -> List[str]:
schema_columns = self.schema.get_table_columns(table_name)
return list(schema_columns.keys())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just get columns from schema because it's a dummy

else:
schema_columns = self.schema.get_table_columns(table_name)
return list(schema_columns.keys())

Copy link
Contributor Author

@anuunchin anuunchin Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's impossible to get the actual columns from filesystem files without table format unless we read entire files. Also, it's unlikely that the user manually deletes specific columns from filesystem files I think 👀 . Therefore, we should raise NotImplemented instead of doing basically nothing.

@anuunchin anuunchin force-pushed the feat/1153-drop-column-sync branch from c37c422 to 598ca5b Compare July 21, 2025 08:54
@anuunchin anuunchin self-assigned this Jul 21, 2025
@anuunchin anuunchin requested a review from rudolfix July 21, 2025 09:58
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very good idea but we need to approach it in more systematic way:

  1. (almost) all of our destinations have
def get_storage_table(self, table_name: str) -> Tuple[str, TTableSchemaColumns]:

and/or

def get_storage_tables(
        self, table_names: Iterable[str]
    ) -> Iterable[Tuple[str, TTableSchemaColumns]]: 

implemented. this will reflect the storage to get table schema out of it. you can use it to compare with the pipeline schema.

  1. let's formalize it: add mixin class like WithTableReflection in the same manner WithStateSync is done. get_storage_tables is the more general method so you can add only this to the mixin
  2. Now add this mixing to all JobClientBase implementations for which you want to support our new sync schema.

When the above is done we are able to actually compute schema diff.

Top level interface:

  1. we have sync_schema that will do a regular schema migration (add missing columns and tables in the destination`
  2. we need another method which is the reverse: it will delete columns and tables in the schema that are not present in the destination and then do the schema sync above
  3. the method above should have a dry run mode - where we do not change the pipeline schema and we do not sync it
  4. it should make sure if destination_client() implements WithTableReflection before continuing
  5. it should allow to select tables to be affected

when this is done we can think about extending cli ie dlt pipeline <name> schema command

"""Get actual column names from files in storage for regular (non-delta/iceberg) tables
or column names from schema"""

if self.is_open_table("iceberg", table_name):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be merged into get_storage_tables which is already present in the filesystem. it will be really helpful.

note: both are operating on Arrow schemas so look how get_storage_table works in lance... you do not need to implement TypeMapper just look at lancedb implementation

)
return expected_update

def update_stored_schema_destructively(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is overall a good idea to add to JobClient but we should be more selective. Look at my top level review. We need 1. mixin class 2. it needs to be optional

@anuunchin anuunchin force-pushed the feat/1153-drop-column-sync branch 3 times, most recently from c256d51 to d4aadb2 Compare July 28, 2025 07:53
@anuunchin anuunchin requested a review from rudolfix July 28, 2025 09:07
@anuunchin anuunchin force-pushed the feat/1153-drop-column-sync branch from d4aadb2 to ab715ff Compare July 28, 2025 09:28
@anuunchin anuunchin force-pushed the feat/1153-drop-column-sync branch from ab715ff to 17651e2 Compare August 5, 2025 07:08
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some changes needed

@anuunchin anuunchin force-pushed the feat/1153-drop-column-sync branch 3 times, most recently from 354680c to 85448dc Compare September 2, 2025 07:43
@anuunchin anuunchin force-pushed the feat/1153-drop-column-sync branch from 3e0e5ca to b1c3f3f Compare September 5, 2025 07:22
@anuunchin anuunchin force-pushed the feat/1153-drop-column-sync branch from b1c3f3f to d2ad80e Compare September 5, 2025 11:11
Comment on lines +511 to +518
logger.warning(
f"Table '{table_name}' does not use a table format and does not support"
" true schema reflection. Returning column schemas from the dlt"
" schema, which may be stale if the underlying files were manually"
" modified. "
)
yield (table_name, self.schema.get_table_columns(table_name))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that for parquet files we can also just use pyarrow and read actual metadata 👀 , but I still don't think people drop columns in parquet files...

@anuunchin anuunchin requested a review from rudolfix September 9, 2025 11:23
@anuunchin anuunchin mentioned this pull request Sep 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants