Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
jtisbell4 committed May 15, 2024
1 parent 7469a50 commit 0e1c3ef
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ def __init__(
if isinstance(df.schema[ts_col].dataType, StringType): # pragma: no cover
sample_ts = df.select(ts_col).limit(1).collect()[0][0]
self.__validate_ts_string(sample_ts)
self.df = self.__add_double_ts()\
.drop(self.ts_col)\
.withColumnRenamed("double_ts", self.ts_col)
self.df = (
self.__add_double_ts()
.drop(self.ts_col)
.withColumnRenamed("double_ts", self.ts_col)
)

"""
Make sure DF is ordered by its respective ts_col and partition columns.
Expand All @@ -80,11 +82,13 @@ def __init__(
#

@staticmethod
def parse_nanos_timestamp(df: DataFrame,
str_ts_col: str,
ts_fmt: str = "yyyy-MM-dd HH:mm:ss",
double_ts_col: Optional[str] = None,
parsed_ts_col: Optional[str] = None) -> DataFrame:
def parse_nanos_timestamp(
df: DataFrame,
str_ts_col: str,
ts_fmt: str = "yyyy-MM-dd HH:mm:ss",
double_ts_col: Optional[str] = None,
parsed_ts_col: Optional[str] = None,
) -> DataFrame:
"""
Parse a string timestamp column with nanosecond precision into a double timestamp column.
Expand All @@ -100,23 +104,27 @@ def parse_nanos_timestamp(df: DataFrame,
"""

# add a parsed timestamp column if requested
src_df = df.withColumn(parsed_ts_col,
sfn.to_timestamp(sfn.col(str_ts_col), ts_fmt)) \
if parsed_ts_col else df
src_df = (
df.withColumn(parsed_ts_col, sfn.to_timestamp(sfn.col(str_ts_col), ts_fmt))
if parsed_ts_col
else df
)

return (
src_df.withColumn("nanos",
sfn.when(sfn.col(str_ts_col).contains("."),
sfn.concat(sfn.lit("0."),
sfn.split(sfn.col(str_ts_col),
r"\.")[1])
).otherwise(0).cast("double"))
.withColumn("long_ts",
sfn.unix_timestamp(str_ts_col, ts_fmt))
.withColumn((double_ts_col or str_ts_col),
sfn.col("long_ts") + sfn.col("nanos")))


src_df.withColumn(
"nanos",
sfn.when(
sfn.col(str_ts_col).contains("."),
sfn.concat(sfn.lit("0."), sfn.split(sfn.col(str_ts_col), r"\.")[1]),
)
.otherwise(0)
.cast("double"),
)
.withColumn("long_ts", sfn.unix_timestamp(str_ts_col, ts_fmt))
.withColumn(
(double_ts_col or str_ts_col), sfn.col("long_ts") + sfn.col("nanos")
)
)

def __add_double_ts(self) -> DataFrame:
"""Add a double (epoch) version of the string timestamp out to nanos"""
Expand Down

0 comments on commit 0e1c3ef

Please sign in to comment.