Skip to content

Commit

Permalink
Support writing to dynamic table if stream source is table. Otherwise…
Browse files Browse the repository at this point in the history
…, use UDTF to stream to table.

Signed-off-by: sfc-gh-mvashishtha <[email protected]>
  • Loading branch information
sfc-gh-mvashishtha committed Dec 13, 2024
1 parent e60adfe commit f926024
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 109 deletions.
142 changes: 113 additions & 29 deletions snowpark_streaming_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,32 @@ def generate_json_data():

session = Session.builder.create()

STATIC_TABLE_NAME = "static_df"

SIMULATED_STREAM_DATA_NAME = "static_df2"

# Create static dataframe
session.create_dataframe(
pd.DataFrame(
{
'KEY': [str(i) for i in range(10)],
'STATIC_VALUE': [generate_json_data() for _ in range(10)]
'ID': [str(i) for i in range(1000)],
'STATIC_VALUE': [generate_json_data() for _ in range(1000)]
}
)
).write.save_as_table(table_name=STATIC_TABLE_NAME, mode="overwrite")
static_df = session.table(STATIC_TABLE_NAME)

# Create static dataframe 2
data = [generate_json_data() for _ in range(10)]
session.create_dataframe(
pd.DataFrame(
{
"ID": [row["id"] for row in data ],
"TIMESTAMP": [row["timestamp"] for row in data],
"NAME": [row["name"] for row in data],
}
)
).write.save_as_table(table_name="static_df", mode="overwrite")
static_df = session.table("static_df")
).write.save_as_table(table_name=SIMULATED_STREAM_DATA_NAME, mode="overwrite")


kafka_event_schema = StructType(
Expand All @@ -57,32 +73,48 @@ def generate_json_data():
.load()
)

LANDING_TABLE_NAME = "dynamic_join_result";
LANDING_TABLE_NAME = "dynamic_join_result"

transformed_df = kafka_ingest_df \
.select(col("id"), col("timestamp"), col("name")) \
.filter(col("price") > 100.0)


"""
This query looks like
SELECT write_stream_udf('dynamic_join_result', "id", "timestamp", "name")
FROM (SELECT id,
name,
price,
timestamp
FROM ( TABLE (my_streaming_udtf('host1:port1,host2:port2', 'topic1', 1
:: INT
) )))
WHERE ( "price" > 100.0 )
"""
assert transformed_df._stream_source == "kafka"

streaming_query: AsyncJob = transformed_df \
.writeStream \
.toTable(LANDING_TABLE_NAME)


# The source table is from a kafka stream, so we write the result via UDTF.

"""
This query looks like
SELECT
write_stream_udf('dynamic_join_result', "id", "timestamp", "name")
FROM
(
SELECT
id,
name,
price,
timestamp
FROM
(
TABLE (
my_streaming_udtf(
'host1:port1,host2:port2',
'topic1',
1::INT
)
)
)
)
WHERE
("price" > 100.0)
"""


streaming_query.cancel()

Expand All @@ -92,16 +124,68 @@ def generate_json_data():
session
.readStream
.format("table")
.option("table_name", LANDING_TABLE_NAME)
)
# TODO: Temporarily reading from another static table here because the UDTF
# currently doesn't produce an output table.
.option("table_name", SIMULATED_STREAM_DATA_NAME)
# .option("table_name", LANDING_TABLE_NAME)
).load()


complex_df = df_streamed_from_table.join(static_df, on="ID").groupBy("NAME").count()

# # Write streaming dataframe to output data sink
# sink_query = (
# source_df
# .writeStream
# .format("snowflake")
# .option("table", "<table>")
# )
FINAL_TABLE_NAME = "final_table"


assert complex_df._stream_source == "table"

# One source is a Snowflake table, and the other source is a static table, so
# we write the result as a dynamic table instead of using a UDTF.

"""
The query here is:
CREATE
OR REPLACE DYNAMIC TABLE final_table LAG = '60 seconds' WAREHOUSE = NEW_WAREHOUSE REFRESH_MODE = 'incremental' AS
SELECT
*
FROM
(
SELECT
"NAME",
count(1) AS "COUNT"
FROM
(
SELECT
*
FROM
(
(
SELECT
"ID" AS "ID",
"TIMESTAMP" AS "TIMESTAMP",
"NAME" AS "NAME"
FROM
static_df2
) AS SNOWPARK_LEFT
INNER JOIN (
SELECT
"ID" AS "ID",
"STATIC_VALUE" AS "STATIC_VALUE"
FROM
static_df
) AS SNOWPARK_RIGHT USING (ID)
)
)
GROUP BY
"NAME"
)
"""

# sink_query.start()
(
complex_df
.writeStream
.outputMode("append")
# Dynamic Tables do not support lag values under 60 second(s).
.trigger(processingTime='60 seconds')
.toTable(FINAL_TABLE_NAME)
)
Loading

0 comments on commit f926024

Please sign in to comment.