Skip to content

Commit

Permalink
Feat/1492 extend timestamp config (#1669)
Browse files Browse the repository at this point in the history
* feat: add timezone flag to configure timestamp data

* fix: delete timezone init

* test: add duckdb timestamps with timezone

* test: fix resource hints for timestamp

* test: correct duckdb timestamps

* test: timezone tests for parquet files

* exp: add notebook with timestamp exploration

* test: refactor timestamp tests

* test: simplified tests and extended experiments

* exp: timestamp exp for duckdb and parquet

* fix: add pyarrow reflection for timezone flag

* fix lint errors

* fix: CI/CD move tests pyarrow module

* fix: pyarrow timezone defaults true

* refactor: typemapper signatures

* fix: duckdb timestamp config

* docs: updated duckdb.md timestamps

* fix: revert duckdb timestamp defaults

* fix: restore duckdb timestamp default

* fix: duckdb timestamp mapper

* fix: delete notebook

* docs: added timestamp and timezone section

* refactor: duckdb precision exception message

* feat: postgres timestamp timezone config

* fix: postgres timestamp precision

* fix: postgres timezone false case

* feat: add snowflake timezone and precision flag

* test: postgres invalid timestamp precision

* test: unified timestamp invalid precision

* test: unified column flag timezone

* chore: add warn log for unsupported timezone or precision flag

* docs: timezone and precision flags for timestamps

* fix: none case error

* docs: add duckdb default precision

* fix: typing errors

* rebase: formatted files from upstream devel

* fix: warning message and reference TODO

* test: delete duplicated input_data array

* docs: moved timestamp config to data types section

* fix: lint and format

* fix: lint local errors
  • Loading branch information
donotpush authored and willi-mueller committed Sep 2, 2024
1 parent 729fe55 commit 73ed515
Show file tree
Hide file tree
Showing 21 changed files with 536 additions and 96 deletions.
9 changes: 8 additions & 1 deletion dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ def get_py_arrow_datatype(
elif column_type == "bool":
return pyarrow.bool_()
elif column_type == "timestamp":
return get_py_arrow_timestamp(column.get("precision") or caps.timestamp_precision, tz)
# sets timezone to None when timezone hint is false
timezone = tz if column.get("timezone", True) else None
precision = column.get("precision") or caps.timestamp_precision
return get_py_arrow_timestamp(precision, timezone)
elif column_type == "bigint":
return get_pyarrow_int(column.get("precision"))
elif column_type == "binary":
Expand Down Expand Up @@ -139,6 +142,10 @@ def get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType:
precision = 6
else:
precision = 9

if dtype.tz is None:
return dict(data_type="timestamp", precision=precision, timezone=False)

return dict(data_type="timestamp", precision=precision)
elif pyarrow.types.is_date(dtype):
return dict(data_type="date")
Expand Down
1 change: 1 addition & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class TColumnType(TypedDict, total=False):
data_type: Optional[TDataType]
precision: Optional[int]
scale: Optional[int]
timezone: Optional[bool]


class TColumnSchemaBase(TColumnType, total=False):
Expand Down
14 changes: 7 additions & 7 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ class AthenaTypeMapper(TypeMapper):
def __init__(self, capabilities: DestinationCapabilitiesContext):
super().__init__(capabilities)

def to_db_integer_type(
self, precision: Optional[int], table_format: TTableFormat = None
) -> str:
def to_db_integer_type(self, column: TColumnSchema, table: TTableSchema = None) -> str:
precision = column.get("precision")
table_format = table.get("table_format")
if precision is None:
return "bigint"
if precision <= 8:
Expand Down Expand Up @@ -403,9 +403,9 @@ def _from_db_type(
) -> TColumnType:
return self.type_mapper.from_db_type(hive_t, precision, scale)

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
return (
f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}"
f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table)}"
)

def _iceberg_partition_clause(self, partition_hints: Optional[Dict[str, str]]) -> str:
Expand All @@ -429,9 +429,9 @@ def _get_table_update_sql(
# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries
# or if we are in iceberg mode, we create iceberg tables for all tables
table = self.prepare_load_table(table_name, self.in_staging_mode)
table_format = table.get("table_format")

is_iceberg = self._is_iceberg_table(table) or table.get("write_disposition", None) == "skip"
columns = ", ".join([self._get_column_def_sql(c, table_format) for c in new_columns])
columns = ", ".join([self._get_column_def_sql(c, table) for c in new_columns])

# create unique tag for iceberg table so it is never recreated in the same folder
# athena requires some kind of special cleaning (or that is a bug) so we cannot refresh
Expand Down
8 changes: 4 additions & 4 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ class BigQueryTypeMapper(TypeMapper):
"TIME": "time",
}

def to_db_decimal_type(self, precision: Optional[int], scale: Optional[int]) -> str:
def to_db_decimal_type(self, column: TColumnSchema) -> str:
# Use BigQuery's BIGNUMERIC for large precision decimals
precision, scale = self.decimal_precision(precision, scale)
precision, scale = self.decimal_precision(column.get("precision"), column.get("scale"))
if precision > 38 or scale > 9:
return "BIGNUMERIC(%i,%i)" % (precision, scale)
return "NUMERIC(%i,%i)" % (precision, scale)
Expand Down Expand Up @@ -417,10 +417,10 @@ def _get_info_schema_columns_query(

return query, folded_table_names

def _get_column_def_sql(self, column: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, column: TColumnSchema, table: TTableSchema = None) -> str:
name = self.sql_client.escape_column_name(column["name"])
column_def_sql = (
f"{name} {self.type_mapper.to_db_type(column, table_format)} {self._gen_not_null(column.get('nullable', True))}"
f"{name} {self.type_mapper.to_db_type(column, table)} {self._gen_not_null(column.get('nullable', True))}"
)
if column.get(ROUND_HALF_EVEN_HINT, False):
column_def_sql += " OPTIONS (rounding_mode='ROUND_HALF_EVEN')"
Expand Down
6 changes: 3 additions & 3 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def _create_merge_followup_jobs(
) -> List[FollowupJobRequest]:
return [ClickHouseMergeJob.from_table_chain(table_chain, self.sql_client)]

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
# Build column definition.
# The primary key and sort order definition is defined outside column specification.
hints_ = " ".join(
Expand All @@ -307,9 +307,9 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non
# Alter table statements only accept `Nullable` modifiers.
# JSON type isn't nullable in ClickHouse.
type_with_nullability_modifier = (
f"Nullable({self.type_mapper.to_db_type(c)})"
f"Nullable({self.type_mapper.to_db_type(c,table)})"
if c.get("nullable", True)
else self.type_mapper.to_db_type(c)
else self.type_mapper.to_db_type(c, table)
)

return (
Expand Down
15 changes: 8 additions & 7 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ class DatabricksTypeMapper(TypeMapper):
"wei": "DECIMAL(%i,%i)",
}

def to_db_integer_type(
self, precision: Optional[int], table_format: TTableFormat = None
) -> str:
def to_db_integer_type(self, column: TColumnSchema, table: TTableSchema = None) -> str:
precision = column.get("precision")
if precision is None:
return "BIGINT"
if precision <= 8:
Expand Down Expand Up @@ -323,10 +322,12 @@ def _create_merge_followup_jobs(
return [DatabricksMergeJob.from_table_chain(table_chain, self.sql_client)]

def _make_add_column_sql(
self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None
self, new_columns: Sequence[TColumnSchema], table: TTableSchema = None
) -> List[str]:
# Override because databricks requires multiple columns in a single ADD COLUMN clause
return ["ADD COLUMN\n" + ",\n".join(self._get_column_def_sql(c) for c in new_columns)]
return [
"ADD COLUMN\n" + ",\n".join(self._get_column_def_sql(c, table) for c in new_columns)
]

def _get_table_update_sql(
self,
Expand All @@ -351,10 +352,10 @@ def _from_db_type(
) -> TColumnType:
return self.type_mapper.from_db_type(bq_t, precision, scale)

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
name = self.sql_client.escape_column_name(c["name"])
return (
f"{name} {self.type_mapper.to_db_type(c)} {self._gen_not_null(c.get('nullable', True))}"
f"{name} {self.type_mapper.to_db_type(c,table)} {self._gen_not_null(c.get('nullable', True))}"
)

def _get_storage_table_query_columns(self) -> List[str]:
Expand Down
12 changes: 8 additions & 4 deletions dlt/destinations/impl/dremio/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,10 @@ def _from_db_type(
) -> TColumnType:
return self.type_mapper.from_db_type(bq_t, precision, scale)

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
name = self.sql_client.escape_column_name(c["name"])
return (
f"{name} {self.type_mapper.to_db_type(c)} {self._gen_not_null(c.get('nullable', True))}"
f"{name} {self.type_mapper.to_db_type(c,table)} {self._gen_not_null(c.get('nullable', True))}"
)

def _create_merge_followup_jobs(
Expand All @@ -207,9 +207,13 @@ def _create_merge_followup_jobs(
return [DremioMergeJob.from_table_chain(table_chain, self.sql_client)]

def _make_add_column_sql(
self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None
self, new_columns: Sequence[TColumnSchema], table: TTableSchema = None
) -> List[str]:
return ["ADD COLUMNS (" + ", ".join(self._get_column_def_sql(c) for c in new_columns) + ")"]
return [
"ADD COLUMNS ("
+ ", ".join(self._get_column_def_sql(c, table) for c in new_columns)
+ ")"
]

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
return self.config.truncate_tables_on_staging_destination_before_load
43 changes: 31 additions & 12 deletions dlt/destinations/impl/duckdb/duck.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ class DuckDbTypeMapper(TypeMapper):
"TIMESTAMP_NS": "timestamp",
}

def to_db_integer_type(
self, precision: Optional[int], table_format: TTableFormat = None
) -> str:
def to_db_integer_type(self, column: TColumnSchema, table: TTableSchema = None) -> str:
precision = column.get("precision")
if precision is None:
return "BIGINT"
# Precision is number of bits
Expand All @@ -83,19 +82,39 @@ def to_db_integer_type(
)

def to_db_datetime_type(
self, precision: Optional[int], table_format: TTableFormat = None
self,
column: TColumnSchema,
table: TTableSchema = None,
) -> str:
column_name = column.get("name")
table_name = table.get("name")
timezone = column.get("timezone")
precision = column.get("precision")

if timezone and precision is not None:
raise TerminalValueError(
f"DuckDB does not support both timezone and precision for column '{column_name}' in"
f" table '{table_name}'. To resolve this issue, either set timezone to False or"
" None, or use the default precision."
)

if timezone:
return "TIMESTAMP WITH TIME ZONE"
elif timezone is not None: # condition for when timezone is False given that none is falsy
return "TIMESTAMP"

if precision is None or precision == 6:
return super().to_db_datetime_type(precision, table_format)
if precision == 0:
return None
elif precision == 0:
return "TIMESTAMP_S"
if precision == 3:
elif precision == 3:
return "TIMESTAMP_MS"
if precision == 9:
elif precision == 9:
return "TIMESTAMP_NS"

raise TerminalValueError(
f"timestamp with {precision} decimals after seconds cannot be mapped into duckdb"
" TIMESTAMP type"
f"DuckDB does not support precision '{precision}' for '{column_name}' in table"
f" '{table_name}'"
)

def from_db_type(
Expand Down Expand Up @@ -162,15 +181,15 @@ def create_load_job(
job = DuckDbCopyJob(file_path)
return job

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
hints_str = " ".join(
self.active_hints.get(h, "")
for h in self.active_hints.keys()
if c.get(h, False) is True
)
column_name = self.sql_client.escape_column_name(c["name"])
return (
f"{column_name} {self.type_mapper.to_db_type(c)} {hints_str} {self._gen_not_null(c.get('nullable', True))}"
f"{column_name} {self.type_mapper.to_db_type(c,table)} {hints_str} {self._gen_not_null(c.get('nullable', True))}"
)

def _from_db_type(
Expand Down
24 changes: 15 additions & 9 deletions dlt/destinations/impl/lancedb/lancedb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
LoadJob,
)
from dlt.common.pendulum import timedelta
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema import Schema, TTableSchema, TSchemaTables, TColumnSchema
from dlt.common.schema.typing import (
TColumnType,
TTableFormat,
Expand Down Expand Up @@ -105,21 +105,27 @@ class LanceDBTypeMapper(TypeMapper):
pa.date32(): "date",
}

def to_db_decimal_type(
self, precision: Optional[int], scale: Optional[int]
) -> pa.Decimal128Type:
precision, scale = self.decimal_precision(precision, scale)
def to_db_decimal_type(self, column: TColumnSchema) -> pa.Decimal128Type:
precision, scale = self.decimal_precision(column.get("precision"), column.get("scale"))
return pa.decimal128(precision, scale)

def to_db_datetime_type(
self, precision: Optional[int], table_format: TTableFormat = None
self,
column: TColumnSchema,
table: TTableSchema = None,
) -> pa.TimestampType:
column_name = column.get("name")
timezone = column.get("timezone")
precision = column.get("precision")
if timezone is not None or precision is not None:
logger.warning(
"LanceDB does not currently support column flags for timezone or precision."
f" These flags were used in column '{column_name}'."
)
unit: str = TIMESTAMP_PRECISION_TO_UNIT[self.capabilities.timestamp_precision]
return pa.timestamp(unit, "UTC")

def to_db_time_type(
self, precision: Optional[int], table_format: TTableFormat = None
) -> pa.Time64Type:
def to_db_time_type(self, column: TColumnSchema, table: TTableSchema = None) -> pa.Time64Type:
unit: str = TIMESTAMP_PRECISION_TO_UNIT[self.capabilities.timestamp_precision]
return pa.time64(unit)

Expand Down
15 changes: 6 additions & 9 deletions dlt/destinations/impl/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ class MsSqlTypeMapper(TypeMapper):
"int": "bigint",
}

def to_db_integer_type(
self, precision: Optional[int], table_format: TTableFormat = None
) -> str:
def to_db_integer_type(self, column: TColumnSchema, table: TTableSchema = None) -> str:
precision = column.get("precision")
if precision is None:
return "bigint"
if precision <= 8:
Expand Down Expand Up @@ -166,20 +165,18 @@ def _create_merge_followup_jobs(
return [MsSqlMergeJob.from_table_chain(table_chain, self.sql_client)]

def _make_add_column_sql(
self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None
self, new_columns: Sequence[TColumnSchema], table: TTableSchema = None
) -> List[str]:
# Override because mssql requires multiple columns in a single ADD COLUMN clause
return [
"ADD \n" + ",\n".join(self._get_column_def_sql(c, table_format) for c in new_columns)
]
return ["ADD \n" + ",\n".join(self._get_column_def_sql(c, table) for c in new_columns)]

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
sc_type = c["data_type"]
if sc_type == "text" and c.get("unique"):
# MSSQL does not allow index on large TEXT columns
db_type = "nvarchar(%i)" % (c.get("precision") or 900)
else:
db_type = self.type_mapper.to_db_type(c)
db_type = self.type_mapper.to_db_type(c, table)

hints_str = " ".join(
self.active_hints.get(h, "")
Expand Down
Loading

0 comments on commit 73ed515

Please sign in to comment.