Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 69 additions & 83 deletions docker/db.py → docker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
import sys
import subprocess
import time
from typing import cast
import psycopg
from psycopg.abc import Query
from psycopg import sql
import sh

from urllib import parse

import qgis_styles, helpers
Expand Down Expand Up @@ -67,6 +71,13 @@ def connection_string(admin: bool=False) -> str:

return conn_string

def set_db_env_vars():
"""Set the Postgres connection string environment variables"""
# PGOSM_CONN is required to be set by the Lua styles used by osm2pgsql
os.environ['PGOSM_CONN'] = connection_string()
# Connection to DB for admin purposes, e.g. drop/create main database
os.environ['PGOSM_CONN_PG'] = connection_string(admin=True)


def pg_conn_parts() -> dict:
"""Returns dictionary of connection parts based on environment variables
Expand Down Expand Up @@ -203,10 +214,10 @@ def prepare_pgosm_db(
, import_mode: helpers.ImportMode
, schema_name: str
):
"""Runs through steps to prepare the target database for PgOSM Flex.
"""Prepares the target database for PgOSM Flex.

Includes additional preparation for using --replication and --updated=append
modes.
Includes additional preparation for using `--replication`
and `--updated=append` modes.
"""
if pg_conn_parts()['pg_host'] == 'localhost':
drop_it = True
Expand All @@ -222,15 +233,21 @@ def prepare_pgosm_db(
LOGGER.debug('Dropping local database if exists')
drop_pgosm_db()
else:
LOGGER.debug('Not dropping local DB. This is expected with subsequent import via --replication OR --update=append.')
msg = 'Not dropping local DB. '
msg += 'This is expected with subsequent import via --replication OR --update=append.'
LOGGER.debug(msg)

create_pgosm_db()

else:
LOGGER.info('Using external database. Ensure the target database is setup properly with proper permissions.')

prepare_osm_schema(db_path=db_path, skip_qgis_style=skip_qgis_style,
schema_name=schema_name)
msg = 'Using external database. '
msg += 'Ensure the target database is setup properly with proper permissions.'
LOGGER.info(msg)

prepare_osm_schema(
db_path=db_path
, skip_qgis_style=skip_qgis_style
, schema_name=schema_name)
run_insert_pgosm_road(db_path=db_path, schema_name=schema_name)

if import_mode.replication_update or import_mode.update == 'append':
Expand All @@ -249,12 +266,7 @@ def start_import(
, schema_name: str
, input_file: str
) -> int:
"""Creates record in osm.pgosm_flex table.

Returns
----------------------------
import_id : int
Value from the `id` column in `osm.pgosm_flex`.
"""Creates record in `osm.pgosm_flex` table and returns `id` from `osm.pgosm_flex`.
"""
params = {'pgosm_region': pgosm_region
, 'pgosm_date': pgosm_date
Expand All @@ -281,7 +293,8 @@ def start_import(
;
"""
sql_raw = sql_raw.format(schema_name=schema_name)
with get_db_conn(conn_string=os.environ['PGOSM_CONN']) as conn:
# FIXME: Why os environ here instead of get conn string???
with get_db_conn(conn_string=connection_string()) as conn:
cur = conn.cursor()
cur.execute(sql_raw, params=params)
import_id = cur.fetchone()[0]
Expand Down Expand Up @@ -318,7 +331,7 @@ def pg_version_check() -> int:
def drop_pgosm_db() -> bool:
"""Drops the pgosm database if it exists.

Intentionally hard coded to `pgosm` database for in-Docker use only.
This is for in-Docker use only. Intentionally hard coded to `pgosm` database.
"""
if not pg_conn_parts()['pg_host'] == 'localhost':
LOGGER.error('Attempted to drop database external from Docker. Not doing that')
Expand All @@ -327,7 +340,7 @@ def drop_pgosm_db() -> bool:
sql_raw = 'DROP DATABASE IF EXISTS pgosm;'
conn = get_db_conn(conn_string=os.environ['PGOSM_CONN_PG'])

LOGGER.debug('Setting Pg conn to enable autocommit - required for drop/create DB')
# Setting Pg conn to enable autocommit - required for drop/create DB'
conn.autocommit = True
conn.execute(sql_raw)
conn.close()
Expand Down Expand Up @@ -396,11 +409,15 @@ def prepare_osm_schema(db_path: str, skip_qgis_style: bool, schema_name: str):


def run_insert_pgosm_road(db_path: str, schema_name: str):
"""Runs script to load data to pgosm.road table.
"""Runs script to load data to `pgosm.road` table.
"""
sql_filename = 'roads-us.sql'
run_deploy_file(db_path=db_path, sql_filename=sql_filename,
schema_name=schema_name, subfolder='data')
run_deploy_file(
db_path=db_path
, sql_filename=sql_filename
, schema_name=schema_name
, subfolder='data'
)


def run_deploy_file(
Expand All @@ -410,42 +427,32 @@ def run_deploy_file(
, subfolder: str='deploy'
):
"""Run a SQL script under the deploy path. Used to setup PgOSM Flex DB.

Parameters
---------------------------
db_path : str
Path to folder with SQL scripts.
sql_filename : sql_filename
subfolder : str
Set subfolder under `db_path`.
Default: deploy
schema_name : str
Schema name for OpenStreetMap data
"""
full_path = os.path.join(db_path, subfolder, sql_filename)
LOGGER.info(f'Deploying {full_path}')

with open(full_path) as f:
deploy_sql = f.read()
deploy_sql_raw = f.read()

deploy_sql = deploy_sql.format(schema_name=schema_name)
deploy_sql = deploy_sql_raw.format(schema_name=schema_name)
query = cast(Query, deploy_sql)

with get_db_conn(conn_string=os.environ['PGOSM_CONN']) as conn:
with get_db_conn(conn_string=connection_string()) as conn:
cur = conn.cursor()
cur.execute(deploy_sql)
cur.execute(query)
LOGGER.debug(f'Ran SQL in {sql_filename}')


def get_db_conn(conn_string: str) -> psycopg.Connection | bool:
"""Establishes psycopg database connection.
def get_db_conn(conn_string: str) -> psycopg.Connection:
"""Return a `psycopg` database connection.
"""
try:
conn = psycopg.connect(conn_string)
LOGGER.debug('Connection to Postgres established')
except psycopg.OperationalError as err:
err_msg = 'Database connection error. Error: {}'.format(err)
LOGGER.error(err_msg)
return False
raise # previously returned False, re-raising seems more appropriate

return conn

Expand Down Expand Up @@ -476,47 +483,36 @@ def pgosm_after_import(flex_path: str) -> bool:
return True


def pgosm_nested_admin_polygons(flex_path: str, schema_name: str):
def pgosm_nested_admin_polygons(schema_name: str):
"""Runs two stored procedures to calculate nested admin polygons via psql.
"""
# Populate the table
sql_raw_1 = f'CALL {schema_name}.populate_place_polygon_nested();'

conn_string = os.environ['PGOSM_CONN']
cmds = ['psql', '-d', conn_string, '-c', sql_raw_1]
# Prepare the base data
LOGGER.info('Populating place_polygon_nested table (osm.populate_place_polygon_nested() )')
output = subprocess.run(cmds,
text=True,
cwd=flex_path,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
LOGGER.info(f'Nested polygon output: \n {output.stdout}')

if output.returncode != 0:
err_msg = f'Failed to populate nested polygon data. Return code: {output.returncode}'
LOGGER.error(err_msg)
sys.exit(f'{err_msg} - Check the log output for details.')
sql_raw_1 = 'CALL {schema_name}.populate_place_polygon_nested();'
query_1 = sql.SQL(sql_raw_1).format(
schema_name = sql.Identifier(schema_name)
)

with get_db_conn(conn_string=connection_string()) as conn:
# Setting autocommit to avoid issues with the explicit transaction control
# inside the SQL procedures.
conn.autocommit = True
cur = conn.cursor()
cur.execute(query_1)

# Build the data
sql_raw_2 = f' CALL {schema_name}.build_nested_admin_polygons();'

conn_string = os.environ['PGOSM_CONN']
cmds = ['psql', '-d', conn_string, '-c', sql_raw_2]
# Build the nested data
LOGGER.info('Building nested polygons... (this can take a while)')
output = subprocess.run(cmds,
text=True,
cwd=flex_path,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
LOGGER.info(f'Nested polygon output: \n {output.stdout}')
sql_raw_2 = ' CALL {schema_name}.build_nested_admin_polygons();'
query_2 = sql.SQL(sql_raw_2).format(
schema_name = sql.Identifier(schema_name)
)

if output.returncode != 0:
err_msg = f'Failed to build nested polygons. Return code: {output.returncode}'
LOGGER.error(err_msg)
sys.exit(f'{err_msg} - Check the log output for details.')
with get_db_conn(conn_string=connection_string()) as conn:
# Setting autocommit to avoid issues with the explicit transaction control
# inside the SQL procedures.
conn.autocommit = True
cur = conn.cursor()
cur.execute(query_2)


def osm2pgsql_replication_start():
Expand Down Expand Up @@ -559,13 +555,7 @@ def osm2pgsql_replication_finish(skip_nested: bool):


def run_pg_dump(export_path: str, skip_qgis_style: bool):
"""Runs pg_dump to save processed data to load into other PostGIS DBs.

Parameters
---------------------------
export_path : str
Absolute path to output .sql file
skip_qgis_style : bool
"""Runs `pg_dump` to save processed data to load into other PostGIS DBs.
"""
logger = logging.getLogger('pgosm-flex')
conn_string = os.environ['PGOSM_CONN']
Expand Down Expand Up @@ -607,8 +597,6 @@ def fix_pg_dump_create_public(export_path: str):

def log_import_message(import_id: int, msg: str, schema_name: str):
"""Logs msg to database in `osm.pgosm_flex` for `import_id`.

Overwrites `osm_date` if `pbf_timestamp` is set.
"""
sql_raw = """
UPDATE {schema_name}.pgosm_flex pf
Expand Down Expand Up @@ -651,5 +639,3 @@ def get_prior_import(schema_name: str) -> dict:
results = {}

return results


Loading
Loading