Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/1492 extend timestamp config #1669

Merged
merged 42 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
f35d717
feat: add timezone flag to configure timestamp data
donotpush Jul 29, 2024
e030594
fix: delete timezone init
donotpush Jul 30, 2024
79cdbef
test: add duckdb timestamps with timezone
donotpush Jul 30, 2024
4aed5ef
test: fix resource hints for timestamp
donotpush Jul 30, 2024
e2ee13b
test: correct duckdb timestamps
donotpush Jul 30, 2024
874f1bf
test: timezone tests for parquet files
donotpush Jul 30, 2024
9ca83dc
exp: add notebook with timestamp exploration
donotpush Aug 1, 2024
f4ac0dc
test: refactor timestamp tests
donotpush Aug 5, 2024
3cd9694
test: simplified tests and extended experiments
donotpush Aug 6, 2024
2723777
exp: timestamp exp for duckdb and parquet
donotpush Aug 7, 2024
c7c9f9b
fix: add pyarrow reflection for timezone flag
donotpush Aug 7, 2024
a2e40eb
fix lint errors
donotpush Aug 7, 2024
6bc03d8
fix: CI/CD move tests pyarrow module
donotpush Aug 7, 2024
a19c01f
fix: pyarrow timezone defaults true
donotpush Aug 12, 2024
5e755c8
refactor: typemapper signatures
donotpush Aug 13, 2024
0b32cb7
fix: duckdb timestamp config
donotpush Aug 13, 2024
e0ff92a
docs: updated duckdb.md timestamps
donotpush Aug 13, 2024
c4b6010
fix: revert duckdb timestamp defaults
donotpush Aug 14, 2024
c6d59b7
fix: restore duckdb timestamp default
donotpush Aug 14, 2024
43a17c8
fix: duckdb timestamp mapper
donotpush Aug 14, 2024
f3242be
fix: delete notebook
donotpush Aug 16, 2024
380123c
docs: added timestamp and timezone section
donotpush Aug 19, 2024
91b55dd
refactor: duckdb precision exception message
donotpush Aug 19, 2024
6fc4a0a
feat: postgres timestamp timezone config
donotpush Aug 19, 2024
7b0b14c
fix: postgres timestamp precision
donotpush Aug 20, 2024
e537758
fix: postgres timezone false case
donotpush Aug 20, 2024
386b74b
feat: add snowflake timezone and precision flag
donotpush Aug 20, 2024
7558f48
test: postgres invalid timestamp precision
donotpush Aug 21, 2024
2256133
test: unified timestamp invalid precision
donotpush Aug 21, 2024
628742b
test: unified column flag timezone
donotpush Aug 21, 2024
b0894c3
chore: add warn log for unsupported timezone or precision flag
donotpush Aug 23, 2024
a09f0da
docs: timezone and precision flags for timestamps
donotpush Aug 23, 2024
67b3340
fix: none case error
donotpush Aug 23, 2024
464ba98
docs: add duckdb default precision
donotpush Aug 23, 2024
3a4613c
fix: typing errors
donotpush Aug 23, 2024
2e856dc
rebase: formatted files from upstream devel
donotpush Aug 23, 2024
0a8e35e
fix: warning message and reference TODO
donotpush Aug 23, 2024
35d4628
test: delete duplicated input_data array
donotpush Aug 28, 2024
3208c58
docs: moved timestamp config to data types section
donotpush Aug 28, 2024
ed965c3
merge: resolve conflicts origin/devel
donotpush Aug 28, 2024
7004a35
fix: lint and format
donotpush Aug 28, 2024
6ef862a
fix: lint local errors
donotpush Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ def get_py_arrow_datatype(
elif column_type == "bool":
return pyarrow.bool_()
elif column_type == "timestamp":
return get_py_arrow_timestamp(column.get("precision") or caps.timestamp_precision, tz)
# sets timezone to None when timezone hint is false
timezone = tz if column.get("timezone", True) else None
precision = column.get("precision") or caps.timestamp_precision
return get_py_arrow_timestamp(precision, timezone)
elif column_type == "bigint":
return get_pyarrow_int(column.get("precision"))
elif column_type == "binary":
Expand Down Expand Up @@ -139,6 +142,10 @@ def get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType:
precision = 6
else:
precision = 9

if dtype.tz is None:
return dict(data_type="timestamp", precision=precision, timezone=False)

return dict(data_type="timestamp", precision=precision)
elif pyarrow.types.is_date(dtype):
return dict(data_type="date")
Expand Down
1 change: 1 addition & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class TColumnType(TypedDict, total=False):
data_type: Optional[TDataType]
precision: Optional[int]
scale: Optional[int]
timezone: Optional[bool]


class TColumnSchemaBase(TColumnType, total=False):
Expand Down
14 changes: 7 additions & 7 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ class AthenaTypeMapper(TypeMapper):
def __init__(self, capabilities: DestinationCapabilitiesContext):
super().__init__(capabilities)

def to_db_integer_type(
self, precision: Optional[int], table_format: TTableFormat = None
) -> str:
def to_db_integer_type(self, column: TColumnSchema, table: TTableSchema = None) -> str:
precision = column.get("precision")
table_format = table.get("table_format")
if precision is None:
return "bigint"
if precision <= 8:
Expand Down Expand Up @@ -403,9 +403,9 @@ def _from_db_type(
) -> TColumnType:
return self.type_mapper.from_db_type(hive_t, precision, scale)

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
return (
f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}"
f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table)}"
)

def _iceberg_partition_clause(self, partition_hints: Optional[Dict[str, str]]) -> str:
Expand All @@ -429,9 +429,9 @@ def _get_table_update_sql(
# for the system tables we need to create empty iceberg tables to be able to run, DELETE and UPDATE queries
# or if we are in iceberg mode, we create iceberg tables for all tables
table = self.prepare_load_table(table_name, self.in_staging_mode)
table_format = table.get("table_format")

is_iceberg = self._is_iceberg_table(table) or table.get("write_disposition", None) == "skip"
columns = ", ".join([self._get_column_def_sql(c, table_format) for c in new_columns])
columns = ", ".join([self._get_column_def_sql(c, table) for c in new_columns])

# create unique tag for iceberg table so it is never recreated in the same folder
# athena requires some kind of special cleaning (or that is a bug) so we cannot refresh
Expand Down
8 changes: 4 additions & 4 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ class BigQueryTypeMapper(TypeMapper):
"TIME": "time",
}

def to_db_decimal_type(self, precision: Optional[int], scale: Optional[int]) -> str:
def to_db_decimal_type(self, column: TColumnSchema) -> str:
# Use BigQuery's BIGNUMERIC for large precision decimals
precision, scale = self.decimal_precision(precision, scale)
precision, scale = self.decimal_precision(column.get("precision"), column.get("scale"))
if precision > 38 or scale > 9:
return "BIGNUMERIC(%i,%i)" % (precision, scale)
return "NUMERIC(%i,%i)" % (precision, scale)
Expand Down Expand Up @@ -417,10 +417,10 @@ def _get_info_schema_columns_query(

return query, folded_table_names

def _get_column_def_sql(self, column: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, column: TColumnSchema, table: TTableSchema = None) -> str:
name = self.sql_client.escape_column_name(column["name"])
column_def_sql = (
f"{name} {self.type_mapper.to_db_type(column, table_format)} {self._gen_not_null(column.get('nullable', True))}"
f"{name} {self.type_mapper.to_db_type(column, table)} {self._gen_not_null(column.get('nullable', True))}"
)
if column.get(ROUND_HALF_EVEN_HINT, False):
column_def_sql += " OPTIONS (rounding_mode='ROUND_HALF_EVEN')"
Expand Down
6 changes: 3 additions & 3 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def _create_merge_followup_jobs(
) -> List[FollowupJobRequest]:
return [ClickHouseMergeJob.from_table_chain(table_chain, self.sql_client)]

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
# Build column definition.
# The primary key and sort order definition is defined outside column specification.
hints_ = " ".join(
Expand All @@ -307,9 +307,9 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non
# Alter table statements only accept `Nullable` modifiers.
# JSON type isn't nullable in ClickHouse.
type_with_nullability_modifier = (
f"Nullable({self.type_mapper.to_db_type(c)})"
f"Nullable({self.type_mapper.to_db_type(c,table)})"
if c.get("nullable", True)
else self.type_mapper.to_db_type(c)
else self.type_mapper.to_db_type(c, table)
)

return (
Expand Down
15 changes: 8 additions & 7 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ class DatabricksTypeMapper(TypeMapper):
"wei": "DECIMAL(%i,%i)",
}

def to_db_integer_type(
self, precision: Optional[int], table_format: TTableFormat = None
) -> str:
def to_db_integer_type(self, column: TColumnSchema, table: TTableSchema = None) -> str:
precision = column.get("precision")
if precision is None:
return "BIGINT"
if precision <= 8:
Expand Down Expand Up @@ -323,10 +322,12 @@ def _create_merge_followup_jobs(
return [DatabricksMergeJob.from_table_chain(table_chain, self.sql_client)]

def _make_add_column_sql(
self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None
self, new_columns: Sequence[TColumnSchema], table: TTableSchema = None
) -> List[str]:
# Override because databricks requires multiple columns in a single ADD COLUMN clause
return ["ADD COLUMN\n" + ",\n".join(self._get_column_def_sql(c) for c in new_columns)]
return [
"ADD COLUMN\n" + ",\n".join(self._get_column_def_sql(c, table) for c in new_columns)
]

def _get_table_update_sql(
self,
Expand All @@ -351,10 +352,10 @@ def _from_db_type(
) -> TColumnType:
return self.type_mapper.from_db_type(bq_t, precision, scale)

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
name = self.sql_client.escape_column_name(c["name"])
return (
f"{name} {self.type_mapper.to_db_type(c)} {self._gen_not_null(c.get('nullable', True))}"
f"{name} {self.type_mapper.to_db_type(c,table)} {self._gen_not_null(c.get('nullable', True))}"
)

def _get_storage_table_query_columns(self) -> List[str]:
Expand Down
12 changes: 8 additions & 4 deletions dlt/destinations/impl/dremio/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,10 @@ def _from_db_type(
) -> TColumnType:
return self.type_mapper.from_db_type(bq_t, precision, scale)

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
name = self.sql_client.escape_column_name(c["name"])
return (
f"{name} {self.type_mapper.to_db_type(c)} {self._gen_not_null(c.get('nullable', True))}"
f"{name} {self.type_mapper.to_db_type(c,table)} {self._gen_not_null(c.get('nullable', True))}"
)

def _create_merge_followup_jobs(
Expand All @@ -207,9 +207,13 @@ def _create_merge_followup_jobs(
return [DremioMergeJob.from_table_chain(table_chain, self.sql_client)]

def _make_add_column_sql(
self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None
self, new_columns: Sequence[TColumnSchema], table: TTableSchema = None
) -> List[str]:
return ["ADD COLUMNS (" + ", ".join(self._get_column_def_sql(c) for c in new_columns) + ")"]
return [
"ADD COLUMNS ("
+ ", ".join(self._get_column_def_sql(c, table) for c in new_columns)
+ ")"
]

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
return self.config.truncate_tables_on_staging_destination_before_load
43 changes: 31 additions & 12 deletions dlt/destinations/impl/duckdb/duck.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ class DuckDbTypeMapper(TypeMapper):
"TIMESTAMP_NS": "timestamp",
}

def to_db_integer_type(
self, precision: Optional[int], table_format: TTableFormat = None
) -> str:
def to_db_integer_type(self, column: TColumnSchema, table: TTableSchema = None) -> str:
precision = column.get("precision")
if precision is None:
return "BIGINT"
# Precision is number of bits
Expand All @@ -83,19 +82,39 @@ def to_db_integer_type(
)

def to_db_datetime_type(
self, precision: Optional[int], table_format: TTableFormat = None
self,
column: TColumnSchema,
table: TTableSchema = None,
) -> str:
column_name = column.get("name")
table_name = table.get("name")
timezone = column.get("timezone")
precision = column.get("precision")

if timezone and precision is not None:
raise TerminalValueError(
f"DuckDB does not support both timezone and precision for column '{column_name}' in"
f" table '{table_name}'. To resolve this issue, either set timezone to False or"
" None, or use the default precision."
)

if timezone:
donotpush marked this conversation as resolved.
Show resolved Hide resolved
return "TIMESTAMP WITH TIME ZONE"
elif timezone is not None: # condition for when timezone is False given that none is falsy
return "TIMESTAMP"

if precision is None or precision == 6:
return super().to_db_datetime_type(precision, table_format)
if precision == 0:
return None
elif precision == 0:
return "TIMESTAMP_S"
if precision == 3:
elif precision == 3:
return "TIMESTAMP_MS"
if precision == 9:
elif precision == 9:
return "TIMESTAMP_NS"

raise TerminalValueError(
f"timestamp with {precision} decimals after seconds cannot be mapped into duckdb"
" TIMESTAMP type"
f"DuckDB does not support precision '{precision}' for '{column_name}' in table"
f" '{table_name}'"
)

def from_db_type(
Expand Down Expand Up @@ -162,15 +181,15 @@ def create_load_job(
job = DuckDbCopyJob(file_path)
return job

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
hints_str = " ".join(
self.active_hints.get(h, "")
for h in self.active_hints.keys()
if c.get(h, False) is True
)
column_name = self.sql_client.escape_column_name(c["name"])
return (
f"{column_name} {self.type_mapper.to_db_type(c)} {hints_str} {self._gen_not_null(c.get('nullable', True))}"
f"{column_name} {self.type_mapper.to_db_type(c,table)} {hints_str} {self._gen_not_null(c.get('nullable', True))}"
)

def _from_db_type(
Expand Down
24 changes: 15 additions & 9 deletions dlt/destinations/impl/lancedb/lancedb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
LoadJob,
)
from dlt.common.pendulum import timedelta
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema import Schema, TTableSchema, TSchemaTables, TColumnSchema
from dlt.common.schema.typing import (
TColumnType,
TTableFormat,
Expand Down Expand Up @@ -105,21 +105,27 @@ class LanceDBTypeMapper(TypeMapper):
pa.date32(): "date",
}

def to_db_decimal_type(
self, precision: Optional[int], scale: Optional[int]
) -> pa.Decimal128Type:
precision, scale = self.decimal_precision(precision, scale)
def to_db_decimal_type(self, column: TColumnSchema) -> pa.Decimal128Type:
precision, scale = self.decimal_precision(column.get("precision"), column.get("scale"))
return pa.decimal128(precision, scale)

def to_db_datetime_type(
self, precision: Optional[int], table_format: TTableFormat = None
self,
column: TColumnSchema,
table: TTableSchema = None,
) -> pa.TimestampType:
column_name = column.get("name")
timezone = column.get("timezone")
precision = column.get("precision")
if timezone is not None or precision is not None:
logger.warning(
"LanceDB does not currently support column flags for timezone or precision."
f" These flags were used in column '{column_name}'."
)
unit: str = TIMESTAMP_PRECISION_TO_UNIT[self.capabilities.timestamp_precision]
return pa.timestamp(unit, "UTC")

def to_db_time_type(
self, precision: Optional[int], table_format: TTableFormat = None
) -> pa.Time64Type:
def to_db_time_type(self, column: TColumnSchema, table: TTableSchema = None) -> pa.Time64Type:
unit: str = TIMESTAMP_PRECISION_TO_UNIT[self.capabilities.timestamp_precision]
return pa.time64(unit)

Expand Down
15 changes: 6 additions & 9 deletions dlt/destinations/impl/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ class MsSqlTypeMapper(TypeMapper):
"int": "bigint",
}

def to_db_integer_type(
self, precision: Optional[int], table_format: TTableFormat = None
) -> str:
def to_db_integer_type(self, column: TColumnSchema, table: TTableSchema = None) -> str:
precision = column.get("precision")
if precision is None:
return "bigint"
if precision <= 8:
Expand Down Expand Up @@ -166,20 +165,18 @@ def _create_merge_followup_jobs(
return [MsSqlMergeJob.from_table_chain(table_chain, self.sql_client)]

def _make_add_column_sql(
self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None
self, new_columns: Sequence[TColumnSchema], table: TTableSchema = None
) -> List[str]:
# Override because mssql requires multiple columns in a single ADD COLUMN clause
return [
"ADD \n" + ",\n".join(self._get_column_def_sql(c, table_format) for c in new_columns)
]
return ["ADD \n" + ",\n".join(self._get_column_def_sql(c, table) for c in new_columns)]

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
sc_type = c["data_type"]
if sc_type == "text" and c.get("unique"):
# MSSQL does not allow index on large TEXT columns
db_type = "nvarchar(%i)" % (c.get("precision") or 900)
else:
db_type = self.type_mapper.to_db_type(c)
db_type = self.type_mapper.to_db_type(c, table)

hints_str = " ".join(
self.active_hints.get(h, "")
Expand Down
Loading
Loading