3
3
from functools import partial
4
4
5
5
import pytz
6
- from dagster import asset , Output , Field
6
+ from dagster import asset , Output , Field , get_dagster_logger
7
7
from pgutils import PostgresTableIdentifier
8
8
from psycopg .sql import Literal , SQL
9
9
from psycopg .types .json import Jsonb , set_json_dumps
@@ -198,11 +198,12 @@ def compute_load_metadata(
198
198
Returns:
199
199
None
200
200
"""
201
+ logger = get_dagster_logger ()
201
202
tile_id = context .partition_key
202
203
conn = context .resources .db_connection .connect
203
204
if not laz_files_ahn .new :
204
205
if not context .op_execution_context .op_execution_context .op_config ["force" ]:
205
- context . log .info (
206
+ logger .info (
206
207
f"Metadata for this LAZ tile { tile_id } already exists, "
207
208
f"skipping computation."
208
209
)
@@ -241,7 +242,7 @@ def compute_load_metadata(
241
242
ST_SetSRID(ST_GeomFromGeoJSON({boundary}), 28992)
242
243
);
243
244
""" ).format (** query_params )
244
- context . log .debug (conn .print_query (query ))
245
+ logger .debug (conn .print_query (query ))
245
246
conn .send_query (query )
246
247
# Cannot index the table here, because this is a partitioned assed. This means that
247
248
# this function is called for each partition, which would index the table after
@@ -250,11 +251,12 @@ def compute_load_metadata(
250
251
251
252
252
253
def metadata_table_ahn (context , ahn_version : int ) -> PostgresTableIdentifier :
254
+ logger = get_dagster_logger ()
253
255
conn = context .resources .db_connection .connect
254
256
new_schema = "ahn"
255
257
create_schema (context , new_schema )
256
258
new_table = PostgresTableIdentifier (new_schema , f"metadata_ahn{ ahn_version } " )
257
259
query = load_sql (query_params = {"new_table" : new_table })
258
- context . log .info (conn .print_query (query ))
260
+ logger .info (conn .print_query (query ))
259
261
conn .send_query (query )
260
262
return new_table
0 commit comments