Skip to content

Commit

Permalink
fix: duckdb timestamp config
Browse files Browse the repository at this point in the history
  • Loading branch information
donotpush committed Aug 13, 2024
1 parent 8872b1f commit 3c09dbb
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 26 deletions.
55 changes: 33 additions & 22 deletions dlt/destinations/impl/duckdb/duck.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ class DuckDbTypeMapper(TypeMapper):
"TIMESTAMP_NS": "timestamp",
}

def to_db_integer_type(
self, precision: Optional[int], table_format: TTableFormat = None
) -> str:
def to_db_integer_type(self, column: TColumnSchema = None, table: TTableSchema = None) -> str:
precision = column.get("precision")
if precision is None:
return "BIGINT"
# Precision is number of bits
Expand All @@ -88,28 +87,40 @@ def to_db_integer_type(

def to_db_datetime_type(
self,
timezone: Optional[bool],
precision: Optional[int],
table_format: TTableFormat = None,
column: TColumnSchema = None,
table: TTableSchema = None,
) -> str:
# TIMESTAMP and TIMESTAMPTZ(TIMESTAMP WITH TIME ZONE) supports microsecond precision
if timezone:
return "TIMESTAMP WITH TIME ZONE"
column_name = column.get("name")
table_name = table.get("name")
timezone = column.get("timezone")
precision = column.get("precision")

if precision is None or precision == 6:
return "TIMESTAMP"
elif precision == 0:
return "TIMESTAMP_S"
elif precision == 3:
return "TIMESTAMP_MS"
elif precision == 9:
return "TIMESTAMP_NS"
else:
if timezone and precision is not None:
raise TerminalValueError(
f"timestamp with {precision} decimals after seconds cannot be mapped into DuckDB"
" TIMESTAMP type"
f"DuckDB does not support both timezone and precision for column '{column_name}' in"
f" table '{table_name}'. To resolve this issue, either set timezone to False or"
" None, or use the default precision."
)

if timezone:
return "TIMESTAMP WITH TIME ZONE"

# map precision to the appropriate TIMESTAMP type
precision_map = {
None: "TIMESTAMP",
6: "TIMESTAMP",
0: "TIMESTAMP_S",
3: "TIMESTAMP_MS",
9: "TIMESTAMP_NS",
}
if precision in precision_map:
return precision_map[precision]

raise TerminalValueError(
f"Precision '{precision}' decimals after seconds cannot be mapped to a DuckDB TIMESTAMP"
" type."
)

def from_db_type(
self, db_type: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
Expand Down Expand Up @@ -182,15 +193,15 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
job = DuckDbCopyJob(table["name"], file_path, self.sql_client)
return job

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
def _get_column_def_sql(self, c: TColumnSchema, table: TTableSchema = None) -> str:
hints_str = " ".join(
self.active_hints.get(h, "")
for h in self.active_hints.keys()
if c.get(h, False) is True
)
column_name = self.sql_client.escape_column_name(c["name"])
return (
f"{column_name} {self.type_mapper.to_db_type(c)} {hints_str} {self._gen_not_null(c.get('nullable', True))}"
f"{column_name} {self.type_mapper.to_db_type(c,table)} {hints_str} {self._gen_not_null(c.get('nullable', True))}"
)

def _from_db_type(
Expand Down
38 changes: 34 additions & 4 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
DestinationTerminalException,
UnknownDestinationModule,
)
from dlt.common.exceptions import PipelineStateNotAvailable
from dlt.common.exceptions import PipelineStateNotAvailable, TerminalValueError
from dlt.common.pipeline import LoadInfo, PipelineContext
from dlt.common.runtime.collector import LogCollector
from dlt.common.schema.exceptions import TableIdentifiersFrozen
Expand Down Expand Up @@ -2651,8 +2651,38 @@ def assert_imported_file(
)


def test_duckdb_column_invalid_timestamp() -> None:
# DuckDB does not have timestamps with timezone and precision
@dlt.resource(
columns={"event_tstamp": {"data_type": "timestamp", "timezone": True, "precision": 3}},
primary_key="event_id",
)
def events():
yield [{"event_id": 1, "event_tstamp": "2024-07-30T10:00:00.123+00:00"}]

pipeline = dlt.pipeline(destination="duckdb")

with pytest.raises((TerminalValueError, PipelineStepFailed)):
pipeline.run(events())


def test_duckdb_column_invalid_timestamp_precision() -> None:
# DuckDB does not support precision higher than 9
@dlt.resource(
columns={"event_tstamp": {"data_type": "timestamp", "precision": 10}},
primary_key="event_id",
)
def events():
yield [{"event_id": 1, "event_tstamp": "2024-07-30T10:00:00.123+00:00"}]

pipeline = dlt.pipeline(destination="duckdb")

with pytest.raises((TerminalValueError, PipelineStepFailed)):
pipeline.run(events())


def test_duckdb_column_hint_timezone() -> None:
# talbe: events_timezone_off
# table: events_timezone_off
@dlt.resource(
columns={"event_tstamp": {"data_type": "timestamp", "timezone": False}},
primary_key="event_id",
Expand All @@ -2664,7 +2694,7 @@ def events_timezone_off():
{"event_id": 3, "event_tstamp": "2024-07-30T10:00:00.123456"},
]

# talbe: events_timezone_on
# table: events_timezone_on
@dlt.resource(
columns={"event_tstamp": {"data_type": "timestamp", "timezone": True}},
primary_key="event_id",
Expand All @@ -2676,7 +2706,7 @@ def events_timezone_on():
{"event_id": 3, "event_tstamp": "2024-07-30T10:00:00.123456"},
]

# talbe: events_timezone_unset
# table: events_timezone_unset
@dlt.resource(
primary_key="event_id",
)
Expand Down

0 comments on commit 3c09dbb

Please sign in to comment.