Skip to content

Commit f3f7479

Browse files
committed
allow sync all columns
1 parent 2580ac5 commit f3f7479

File tree

1 file changed

+107
-12
lines changed

1 file changed

+107
-12
lines changed

dbt/include/spark/macros/adapters.sql

+107-12
Original file line numberDiff line numberDiff line change
@@ -398,35 +398,130 @@
398398
{% endcall %}
399399
{% endmacro %}
400400

401+
{% macro check_table_properties(relation, expected_properties) %}
402+
{# Fetching current properties and populate a dict to easily compare #}
403+
{% set properties_table = fetch_tbl_properties(relation) %}
404+
405+
{% set current_properties = {} %}
406+
407+
{% for row in properties_table.rows %}
408+
{% set current_properties = current_properties.update({ row['key']: row['value'] }) %}
409+
{% endfor %}
410+
411+
{# Control variable for monitoring validation status #}
412+
{% set missing_properties = {} %}
413+
414+
{# Iterated through expected properties #}
415+
416+
{% for key, expected_value in expected_properties.items() %}
417+
{% set current_value = current_properties.get(key) %}
418+
419+
{# Check for known numeric values to be >= #}
420+
{% if key in ['delta.minReaderVersion', 'delta.minWriterVersion'] %}
421+
{% if current_value is not none and current_value | int >= expected_value %}
422+
{{ log("Property '" ~ key ~ "' is valid : " ~ current_value ~ " >= " ~ expected_value) }}
423+
{% else %}
424+
{{ log("Property '" ~ key ~ "' is not valid. Found : " ~ current_value ~ ", expected : " ~ expected_value) }}
425+
{% do missing_properties.update({key : expected_value}) %}
426+
{% endif %}
427+
{% else %}
428+
{# Check for other properties to be = #}
429+
{% if current_value == expected_value %}
430+
{{ log("Property '" ~ key ~ "' valid : " ~ current_value) }}
431+
{% else %}
432+
{{ log("Property '" ~ key ~ "' is not valid. Found : " ~ current_value ~ ", expected : " ~ expected_value) }}
433+
{% do missing_properties.update({key : expected_value}) %}
434+
{% endif %}
435+
{% endif %}
436+
{% endfor %}
437+
438+
{{ return(missing_properties) }}
439+
{% endmacro %}
440+
401441

402442
{% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
403443

404-
{% if remove_columns %}
405-
{% if relation.is_delta %}
406-
{% set platform_name = 'Delta Lake' %}
407-
{% elif relation.is_iceberg %}
444+
{% if remove_columns and not relation.is_delta %}
445+
{% if relation.is_iceberg %}
408446
{% set platform_name = 'Iceberg' %}
409447
{% else %}
410448
{% set platform_name = 'Apache Spark' %}
411449
{% endif %}
412450
{{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }}
451+
{% elif remove_columns and relation.is_delta %}
452+
{# Checking Delta table properties to see if we can drop columns #}
453+
{# It must have the following properties #}
454+
455+
{% set expected_properties = {
456+
'delta.minReaderVersion': 2,
457+
'delta.minWriterVersion': 5,
458+
'delta.columnMapping.mode': 'name'
459+
} %}
460+
461+
{% set missing_properties = check_table_properties(relation, expected_properties) %}
462+
{% if missing_properties %}
463+
{% set msg %}
464+
Delta table properties do not allow dropping columns. Dropping is available with the following properties:
465+
{{ expected_properties }}
466+
Either run the following command :
467+
468+
ALTER TABLE {{ relation }}
469+
SET TBLPROPERTIES (
470+
{% for key, value in missing_properties.items() %}
471+
'{{ key }}' = '{{ value }}'{{ ',' if not loop.last }}
472+
{% endfor %}
473+
)
474+
475+
Or add the following to your model condfig and rebuild it :
476+
table_properties={
477+
'delta.minReaderVersion': '2',
478+
'delta.minWriterVersion': '5',
479+
'delta.columnMapping.mode': 'name'
480+
}
481+
{% endset %}
482+
483+
{{ exceptions.raise_compiler_error(msg) }}
484+
{% endif %}
413485
{% endif %}
414486

415487
{% if add_columns is none %}
416488
{% set add_columns = [] %}
417489
{% endif %}
418490

419-
{% set sql -%}
491+
{% if remove_columns is none %}
492+
{% set remove_columns = [] %}
493+
{% endif %}
420494

421-
alter {{ relation.type }} {{ relation }}
495+
{% if add_columns %}
496+
{% set sql -%}
497+
alter {{ relation.type }} {{ relation }}
498+
add columns
499+
{% for column in add_columns %}
500+
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
501+
{% endfor %}
502+
{%- endset -%}
503+
504+
{% call statement("run_query_statement", fetch_result=false, auto_begin=false) %}
505+
{{ sql }}
506+
{% endcall %}
507+
{% endif %}
422508

423-
{% if add_columns %} add columns {% endif %}
424-
{% for column in add_columns %}
425-
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
426-
{% endfor %}
509+
{# Will run only if using Delta format with appropriated table properties #}
510+
{% if remove_columns %}
511+
{% set sql -%}
512+
alter {{ relation.type }} {{ relation }}
513+
drop columns
514+
{% for column in remove_columns %}
515+
{{ column.name }}{{ ',' if not loop.last }}
516+
{% endfor %}
517+
{%- endset -%}
518+
519+
{% call statement("run_query_statement", fetch_result=false, auto_begin=false) %}
520+
{{ sql }}
521+
{% endcall %}
522+
{% endif %}
427523

428-
{%- endset -%}
429524

430-
{% do run_query(sql) %}
525+
431526

432527
{% endmacro %}

0 commit comments

Comments
 (0)