Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added macros for data type optimization #2

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ To add this package into your dbt project you need to make an entry in the packa

- [generate_schema_name](#generate_schema_name) [(source)](./macros/generate_schema_name.sql)
- [smart_source](#smart_source) [(source)](./macros/smart_source.sql)

- [optimized_data_loader](#optimized_data_loader) [(source)](./macros/optimized_data_loader.sql)
- [data_type_optimizor_v2](#data_type_optimizor) [(source)](./macros/data_type_optimizor.sql) (to be used as internal macro)
- [find_proposed_column_for_numbers](#find_proposed_column_for_numbers) [(source)](./macros/optimized_dataload/data_type_optimization_helper.sql) (to be used as internal macro)
- [find_proposed_column_for_boolean](#find_proposed_column_for_boolean) [(source)](./macros/optimized_dataload/data_type_optimization_helper.sql) (to be used as internal macro)

### Usage
#### generate_schema_name
Expand Down Expand Up @@ -64,4 +67,24 @@ renamed as (
),

select * from renamed
```

#### optimized_data_loader

This macro is helpful to begin the optimized data load in the table it is designed in such a way that it also calls the data_type_optimizer when required initially.

Executing using scrachpad/statement tab in dbtCloud IDE

```sql
select * from ({{ optimized_data_loader (source('<source_name>', '<table_name>')) }})
```

#### data_type_optimizor_v2

This macro is helpful to create the optimized table .

Executing using scrachpad/statement tab in dbtCloud IDE

```sql
select * from ({{ data_type_optimizor_v2 (source('<source_name>', '<table_name>'), 'false' ) }})
```
108 changes: 108 additions & 0 deletions macros/optimized_dataload/data_type_optimization_helper.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
{% macro find_proposed_column_for_numbers(column_name, table_fqn) %}
{%set find_proposed_column_qry%}
{% set project_id = table_fqn.database %}
{% set dataset_id = table_fqn.schema %}
{% set table_id = table_fqn.identifier %}

SELECT
'{{column_name}}' as `Column_Name`,
(CASE
WHEN {{column_name}}_supports_boolean_fg = 1
AND {{column_name}}_min_value IN (0, 1)
AND {{column_name}}_max_value IN (0, 1) THEN 'BOOLEAN'
WHEN {{column_name}}_supports_int64_fg = {{column_name}}_not_missing_cnt
OR
{{column_name}}_unique_values_cnt = 0
THEN 'INT64'
WHEN {{column_name}}_supports_float64_fg = {{column_name}}_not_missing_cnt
THEN 'FLOAT64'
WHEN {{column_name}}_supports_numeric_fg = {{column_name}}_not_missing_cnt
THEN 'NUMERIC'
END
) AS proposed_data_type
FROM (
SELECT
'{{column_name}}' AS column_name_to_be_updated,
sum(CASE
WHEN {{column_name}} IS NOT NULL THEN 1
ELSE 0 END
) AS {{column_name}}_not_missing_cnt,
count(DISTINCT {{column_name}}) AS {{column_name}}_unique_values_cnt,
min({{column_name}}) AS {{column_name}}_min_value,
max({{column_name}}) AS {{column_name}}_max_value,
(CASE
WHEN (count(distinct {{column_name}}) = 2)
AND CONTAINS_SUBSTR('{{column_name}}', 'flg') = true THEN 1
ELSE 0 END
) AS {{column_name}}_supports_boolean_fg,
sum(CASE
WHEN safe_cast({{column_name}} AS NUMERIC) = {{column_name}} THEN 1
ELSE 0 END
) AS {{column_name}}_supports_numeric_fg,
sum(CASE
WHEN safe_cast({{column_name}} AS FLOAT64) = {{column_name}} THEN 1
ELSE 0 END
) AS {{column_name}}_supports_float64_fg,
sum(CASE
WHEN safe_cast({{column_name}} AS INT64) = {{column_name}} THEN 1
ELSE 0 END
) AS {{column_name}}_supports_int64_fg
FROM {{project_id}}.{{dataset_id}}.{{table_id}}
)
{% endset %}
{% set proposed_columns = run_query(find_proposed_column_qry) %}
{{ return (proposed_columns)}}
{% endmacro %}

{% macro find_proposed_column_for_boolean(column_name, table_fqn)%}
{%set find_proposed_column_qry%}
{% set project_id = table_fqn.database %}
{% set dataset_id = table_fqn.schema %}
{% set table_id = table_fqn.identifier %}

SELECT
'{{column_name}}' as `Column_Name`,
(CASE
WHEN supports_boolean_fg = 1
AND upper(min_value) IN ('', 'X')
AND upper(max_value) IN ('', 'X') THEN 'BOOLEAN'
WHEN supports_boolean_fg = 1
AND upper(min_value) IN ('', 'Y')
AND upper(max_value) IN ('', 'Y') THEN 'BOOLEAN'
WHEN supports_boolean_fg = 1
AND upper(min_value) IN ('Y', 'N')
AND upper(max_value) IN ('Y', 'N') THEN 'BOOLEAN'
WHEN supports_boolean_fg = 1
AND upper(min_value) IN ('0', '1')
AND upper(max_value) IN ('0', '1') THEN 'BOOLEAN'
WHEN supports_boolean_fg = 1
AND upper(min_value) IN ('YES', 'NO')
AND upper(max_value) IN ('NO', 'YES') THEN 'BOOLEAN'
WHEN supports_boolean_fg = 1
AND upper(min_value) IN ('TRUE', 'FALES')
AND upper(max_value) IN ('FALSE', 'TRUE') THEN 'BOOLEAN'
ELSE 'STRING'
END
) AS proposed_data_type
, min_value
, max_value
FROM (
SELECT
'{{column_name}}' AS column_name_to_be_updated,
sum(CASE
WHEN {{column_name}} IS NOT NULL THEN 1
ELSE 0 END
) AS not_missing_cnt,
count(DISTINCT {{column_name}}) AS unique_values_cnt,
min({{column_name}}) AS min_value,
max({{column_name}}) AS max_value,
(CASE
WHEN count(distinct {{column_name}}) = 2 THEN 1
ELSE 0 END
) AS supports_boolean_fg
FROM {{project_id}}.{{dataset_id}}.{{table_id}}
)
{% endset %}
{% set proposed_columns = run_query(find_proposed_column_qry) %}
{{ return (proposed_columns)}}
{% endmacro %}
120 changes: 120 additions & 0 deletions macros/optimized_dataload/data_type_optimizor.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
{% macro data_type_optimizor_v2(table_fqn, optimize_datatypes) %}
{% do log(table_fqn, info=true) %}
{{ log('not executed') }}
{% set project_id = table_fqn.database %}
{% set dataset_id = table_fqn.schema %}
{% set table_id = table_fqn.identifier %}

{% set get_columns_query %}
SELECT
table_catalog, table_schema, table_name, column_name, data_type
FROM
{{project_id}}.{{dataset_id}}.INFORMATION_SCHEMA.COLUMNS
WHERE
table_name = "{{table_id}}" AND
(--data_type LIKE 'BOOL%' OR
data_type LIKE 'FLOAT%' OR
data_type LIKE 'INT%' OR
data_type = 'NUMERIC' )
{% endset %}
SELECT
{% if execute %}
{% set results = run_query(get_columns_query) %}
{% for cols in results %}
{%- set col_names = cols[3] -%}
{% set pcols = find_proposed_column_for_numbers(col_names, table_fqn ) %}
{{log('here for numeric')}}
{% for c in pcols%}
{% if '{{ c[1] }}' != '{{ cols[4] }}' %}
{% if '{{ c[1] }}' == 'BOOL' or '{{ c[1] }}' == 'BOOLEAN' %}
{% if '{{ cols[4] }}' == 'NUMERIC' or '{{ cols[4] }}' == 'FLOAT64' or '{{ cols[4] }}' == 'INT64' %}
safe_cast(case when safe_cast(`{{ cols['column_name'] }}` as 'INT64') > 0 then 1
when safe_cast(`{{ cols['column_name'] }}` as 'INT64') = 0 then 0
else null end as {{ c[1] }} ) as `{{ cols['column_name'] }}` ,
{% else %}
safe_cast(case when upper(`{{ cols['column_name'] }}`) in ('YES','X','Y') then 1
when upper(`{{ cols['column_name'] }}`) in ('NO', '', 'N') then 0
else null end as {{ c[1] }} ) as `{{ cols['column_name'] }}` ,
{% endif %}
{% elif '{{ c[1] }}' == 'INT64' %}
safe_cast(`{{ cols['column_name'] }}` as INT64) as `{{ cols['column_name']}}` ,
{% elif '{{ c[1] }}' == 'FLOAT64' %}
safe_cast(`{{ cols['column_name'] }}` as FLOAT64) as `{{ cols['column_name']}}` ,
{% else %}
safe_cast(`{{ cols['column_name'] }}` as {{ c[1] }} ) as `{{ cols['column_name']}}` ,
{% endif %}
{% else %}
`{{ cols['column_name']}}` ,
{% endif %}
{% endfor %}
{% endfor %}
{% endif %}

{% set get_bool_columns_query %}
SELECT
table_catalog, table_schema, table_name, column_name, data_type
FROM
{{project_id}}.{{dataset_id}}.INFORMATION_SCHEMA.COLUMNS
WHERE
table_name = "{{table_id}}" AND
(data_type = 'STRING' and CONTAINS_SUBSTR(column_name, 'flg') = true)
{% endset %}

{% if execute %}
{% set results = run_query(get_bool_columns_query) %}
{% for cols in results %}
{%- set col_names = cols[3] -%}
{% set pcols = find_proposed_column_for_boolean(col_names, table_fqn ) %}
{{log('here for bool')}}
{% for c in pcols%}
{% if '{{ c[1] }}' != '{{ cols[4] }}' %}
{% if '{{ c[1] }}' == 'BOOL' or '{{ c[1] }}' == 'BOOLEAN' %}
{% if '{{ cols[4] }}' == 'NUMERIC' or '{{ cols[4] }}' == 'FLOAT64' or '{{ cols[4] }}' == 'INT64' %}
safe_cast(case when safe_cast(`{{ cols['column_name'] }}` as 'INT64') > 0 then 1
when safe_cast(`{{ cols['column_name'] }}` as 'INT64') = 0 then 0
else null end as {{ c[1] }} ) as `{{ cols['column_name'] }}` ,
{% else %}
safe_cast(case when upper(`{{ cols['column_name'] }}`) in ('YES','X','Y') then 1
when upper(`{{ cols['column_name'] }}`) in ('NO', '', 'N') then 0
else null end as {{ c[1] }} ) as `{{ cols['column_name'] }}` ,
{% endif %}
{% elif '{{ c[1] }}' == 'INT64' %}
safe_cast(`{{ cols['column_name'] }}` as INT64) as `{{ cols['column_name']}}` ,
{% elif '{{ c[1] }}' == 'FLOAT64' %}
safe_cast(`{{ cols['column_name'] }}` as FLOAT64) as `{{ cols['column_name']}}` ,
{% else %}
safe_cast(`{{ cols['column_name'] }}` as {{ c[1] }} ) as `{{ cols['column_name']}}` ,
{% endif %}
{% else %}
`{{ cols['column_name']}}` ,
{% endif %}
{% endfor %}
{% endfor %}
{% endif %}

{% set get_remaining_columns_query %}
SELECT
table_catalog, table_schema, table_name, column_name, data_type
FROM
{{project_id}}.{{dataset_id}}.INFORMATION_SCHEMA.COLUMNS
WHERE
table_name = "{{table_id}}" AND
(data_type LIKE 'DATE%' OR
data_type LIKE 'BOOL%' OR
(data_type = 'STRING' and CONTAINS_SUBSTR(column_name, 'flg') = false))

{% endset %}

{% if execute %}
{% set results = run_query(get_remaining_columns_query) %}
{%- for cols in results -%}
{{log('here for bool')}}
`{{ cols['column_name']}}` ,
{%- endfor -%}
{% endif %}
FROM
{{ table_fqn }}
{% if optimize_datatypes == 'true' %}
WHERE false
{% endif %}
{% endmacro %}
86 changes: 86 additions & 0 deletions macros/optimized_dataload/optimized_data_loader.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
{% macro optimized_data_loader(table_fqn) %}
{% do log(table_fqn, info=true) %}
{{ log('not executed') }}
{% set project_id = table_fqn.database %}
{% set dataset_id = table_fqn.schema %}
{% set table_id = table_fqn.identifier %}
{% set list1 = project_id.split('-') %}
{% set source_project = list1[0] %}
{% set sproject_id = target.database %}
{% set sdataset_id = generate_schema_name() + 'input' %}
{% set stable_id = 'dbt_' + source_project + '_' + dataset_id + '_' + table_id %}
{% set get_columns_query %}
WITH
src AS (
SELECT
1 AS id,
src.*
FROM
`{{ project_id }}.{{ dataset_id }}..INFORMATION_SCHEMA.COLUMNS` src
WHERE
src.table_name = '{{ table_id}}' ),
tgt AS (
SELECT
1 AS id,
tgt.column_name AS tgt_column_name,
tgt.data_type AS tgt_data_type
FROM
`{{ sproject_id }}.{{ sdataset_id }}..INFORMATION_SCHEMA.COLUMNS` tgt
WHERE
tgt.table_name = '{{ stable_id }}' )
SELECT
src.table_catalog,
src.table_schema,
src.column_name,
tgt.tgt_data_type proposed_data_type,
src.data_type original_data_type
FROM
src
INNER JOIN
tgt
ON
src.id = tgt.id
AND src.column_name = tgt.tgt_column_name
{% endset %}

{% if execute %}
{% set results = run_query(get_columns_query) %}
{%set cmdresultscount %}
SELECT count(1) from ({{ get_columns_query }})
{% endset %}
{% set results_count = run_query(cmdresultscount) %}

{% if results_count[0][0] > 1 %}
SELECT
{% for cols in results %}
{% set p_data_type = cols['proposed_data_type'] %}
{% set o_data_type = cols['original_data_type'] %}
{% if p_data_type != o_data_type %}
{% if p_data_type == 'BOOL' or p_data_type == 'BOOLEAN' %}
{% if o_data_type == 'NUMERIC' or o_data_type == 'FLOAT64' or o_data_type == 'INT64' %}
safe_cast(case when safe_cast(`{{ cols['column_name'] }}` as 'INT64') > 0 then 1
when safe_cast(`{{ cols['column_name'] }}` as 'INT64') = 0 then 0
else null end as {{ p_data_type }} ) as `{{ cols['column_name'] }}` ,
{% else %}
safe_cast(case when upper(`{{ cols['column_name'] }}`) in ('YES','X','Y') then 1
when upper(`{{ cols['column_name'] }}`) in ('NO', '', 'N') then 0
else null end as {{ p_data_type }} ) as `{{ cols['column_name'] }}` ,
{% endif %}
{% elif p_data_type == 'INT64' %}
safe_cast(`{{ cols['column_name'] }}` as INT64) as `{{ cols['column_name']}}` ,
{% elif p_data_type == 'FLOAT64' %}
safe_cast(`{{ cols['column_name'] }}` as FLOAT64) as `{{ cols['column_name']}}` ,
{% else %}
safe_cast(`{{ cols['column_name'] }}` as {{ p_data_type}}) as `{{ cols['column_name']}}` ,
{% endif %}
{% else %}
{{ cols['column_name'] }} ,
{% endif %}
{% endfor %}
FROM
{{ table_fqn }}
{% else %}
{{ data_type_optimizor_v2 (table_fqn, 'false' ) }}
{% endif %}
{% endif %}
{% endmacro %}