diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index abcf4d0b8..e2233c140 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -48,6 +48,22 @@ You can also define your own custom checks (see [Creating custom checks](#creati | `sql_expression` | Checks whether the values meet the condition provided as an SQL expression, e.g. `a = 'str1' and a > b`. SQL expressions are evaluated at runtime, so ensure that the expression is safe and that functions used within it (e.g. h3_ischildof, division) do not throw exceptions. You can achieve this by validating input arguments or columns beforehand using guards such as CASE WHEN, IS NOT NULL, RLIKE, or type try casts. | `expression`: sql expression to check on a DataFrame (fail the check if expression evaluates to True, pass if it evaluates to False); `msg`: optional message to output; `name`: optional name of the resulting column (it can be overwritten by `name` specified at the check level); `negate`: if the condition should be negated; `columns`: optional list of columns to be used for reporting and as name prefix if name not provided, unused in the actual logic | | `is_data_fresh` | Checks whether the values in the input timestamp column are not older than the specified number of minutes from the base timestamp column. This is useful for identifying stale data due to delayed pipelines and helps catch upstream issues early. | `column`: column of type timestamp/date to check (can be a string column name or a column expression); `max_age_minutes`: maximum age in minutes before data is considered stale; `base_timestamp`: optional base timestamp column from which the stale check is calculated. This can be a string, column expression, datetime value or literal value ex:F.lit(datetime(2024,1,1)). If not provided current_timestamp() function is used | | `does_not_contain_pii` | Checks whether the values in the input column contain Personally Identifiable Information (PII). Uses Microsoft Presidio to detect various named entities (e.g. PERSON, ADDRESS, EMAIL_ADDRESS). Requires installation of PII detection extras: `pip install 'databricks-labs-dqx[pii-detection]'`. See more details [here](#detecting-personally-identifiable-information-pii). | `column`: column to check (can be a string column name or a column expression); `threshold`: confidence threshold for PII detection (0.0 to 1.0, default: 0.7); `language`: optional language of the text (default: 'en'); `entities`: optional list of entities to detect; `nlp_engine_config`: optional dictionary configuring the NLP engine used for PII detection, see the [Presidio documentation](https://microsoft.github.io/presidio/analyzer/customizing_nlp_models/) for more information | +| `is_latitude` | Checks whether the values in the input column are valid latitude values (i.e. between -90 and 90 degrees). | `column`: column to check (can be a string column name or a column expression) | +| `is_longitude` | Checks whether the values in the input column are valid longitude values (i.e. between -180 and 180 degrees). | `column`: column to check (can be a string column name or a column expression) | +| `is_geometry` | Checks whether the values in the input column are valid geometries. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) | +| `is_geography` | Checks whether the values in the input column are valid geographies. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) | +| `is_point` | Checks whether the values in the input column are point geometries/geographies. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) | +| `is_linestring` | Checks whether the values in the input column are linestring geometries/geographies. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) | +| `is_polygon` | Checks whether the values in the input column are polygon geometries/geographies. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) | +| `is_multipoint` | Checks whether the values in the input column are multipoint geometries/geographies. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) | +| `is_multilinestring` | Checks whether the values in the input column are multilinestring geometries/geographies. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) | +| `is_multipolygon` | Checks whether the values in the input column are multipolygon geometries/geographies. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) | +| `is_geometrycollection` | Checks whether the values in the input column are geometrycollection geometries/geographies. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) | +| `is_ogc_valid` | Checks whether the values in the input column are valid geometries in the OGC sense. I.e a bowtie polygon is invalid because it has a self intersection. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) | +| `is_non_empty_geometry` | Checks whether the values in the input column are non-empty geometries. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression) | +| `has_dimension` | Checks whether the values in the input column are geometries of the specified dimension (2D projected dimension). This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression); `dimension`: dimension to check | +| `has_x_coordinate_between` | Checks whether the values in the input column are geometries with x coordinate between the provided boundaries. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression); `min_value`: minimum value; `max_value`: maximum value | +| `has_y_coordinate_between` | Checks whether the values in the input column are geometries with y coordinate between the provided boundaries. This function requires Databricks serverless compute or runtime >= 17.1. | `column`: column to check (can be a string column name or a column expression); `min_value`: minimum value; `max_value`: maximum value | | `column`: column to check (can be a string column name or a column expression); `min_value`: minimum value; `max_value`: maximum value | @@ -471,6 +487,123 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen for_each_column: # apply the check for each column in the list - col3 - col5 + +# is_latitude check +- criticality: error + check: + function: is_latitude + arguments: + column: col2 + +# is_longitude check +- criticality: error + check: + function: is_longitude + arguments: + column: col2 + +# is_geometry check +- criticality: error + check: + function: is_geometry + arguments: + column: point_geom + +# is_geography check +- criticality: error + check: + function: is_geography + arguments: + column: point_geom + +# is_point check +- criticality: error + check: + function: is_point + arguments: + column: point_geom + +# is_linestring check +- criticality: error + check: + function: is_linestring + arguments: + column: linestring_geom + +# is_polygon check +- criticality: error + check: + function: is_polygon + arguments: + column: polygon_geom + +# is_multipoint check +- criticality: error + check: + function: is_multipoint + arguments: + column: multipoint_geom + +# is_multilinestring check +- criticality: error + check: + function: is_multilinestring + arguments: + column: multilinestring_geom + +# is_multipolygon check +- criticality: error + check: + function: is_multipolygon + arguments: + column: multipolygon_geom + +# is_geometrycollection check +- criticality: error + check: + function: is_geometrycollection + arguments: + column: geometrycollection_geom + +# is_ogc_valid check +- criticality: error + check: + function: is_ogc_valid + arguments: + column: point_geom + +# is_non_empty_geometry check +- criticality: error + check: + function: is_non_empty_geometry + arguments: + column: point_geom + +# has_dimension check +- criticality: error + check: + function: has_dimension + arguments: + column: polygon_geom + dimension: 2 + +# has_x_coordinate_between check +- criticality: error + check: + function: has_x_coordinate_between + arguments: + column: polygon_geom + min_value: 0.0 + max_value: 10.0 + +# has_y_coordinate_between check +- criticality: error + check: + function: has_y_coordinate_between + arguments: + column: polygon_geom + min_value: 0.0 + max_value: 10.0 ``` @@ -479,6 +612,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen ```python from databricks.labs.dqx.rule import DQRowRule, DQForEachColRule from databricks.labs.dqx import check_funcs +from databricks.labs.dqx.geo import check_funcs as geo_check_funcs from databricks.labs.dqx.pii import pii_detection_funcs from datetime import datetime @@ -817,6 +951,121 @@ checks = [ } ), + # is_latitude check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_latitude, + column="col2" + ), + + # is_longitude check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_longitude, + column="col2" + ), + + # is_geometry check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geometry, + column="point_geom" + ), + + # is_geography check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geography, + column="point_geom" + ), + + # is_point check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_point, + column="point_geom" + ), + + # is_linestring check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_linestring, + column="linestring_geom" + ), + + # is_polygon check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_polygon, + column="polygon_geom" + ), + + # is_multipoint check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multipoint, + column="multipoint_geom" + ), + + # is_multilinestring check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multilinestring, + column="multilinestring_geom" + ), + + # is_multipolygon check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multipolygon, + column="multipolygon_geom" + ), + + # is_geometrycollection check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geometrycollection, + column="geometrycollection_geom" + ), + + # is_ogc_valid check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_ogc_valid, + column="point_geom" + ), + + # is_non_empty_geometry check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_non_empty_geometry, + column="point_geom" + ), + + # has_dimension check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_dimension, + column="polygon_geom" + check_func_kwargs={"dimension": 2} + ), + + # has_x_coordinate_between check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_x_coordinate_between, + column="polygon_geom" + check_func_kwargs={"min_value": 0.0, "max_value": 10.0} + ), + + # has_y_coordinate_between check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_y_coordinate_between, + column="polygon_geom" + check_func_kwargs={"min_value": 0.0, "max_value": 10.0} + ), + # sql_expression check DQRowRule( criticality="error", diff --git a/src/databricks/labs/dqx/checks_resolver.py b/src/databricks/labs/dqx/checks_resolver.py index d340a59f4..1e08ca544 100644 --- a/src/databricks/labs/dqx/checks_resolver.py +++ b/src/databricks/labs/dqx/checks_resolver.py @@ -6,6 +6,7 @@ from contextlib import contextmanager from databricks.labs.dqx import check_funcs +from databricks.labs.dqx.geo import check_funcs as geo_check_funcs from databricks.labs.dqx.errors import InvalidCheckError logger = logging.getLogger(__name__) @@ -30,6 +31,8 @@ def resolve_check_function( """ logger.debug(f"Resolving function: {function_name}") func = getattr(check_funcs, function_name, None) # resolve using predefined checks first + if not func: + func = getattr(geo_check_funcs, function_name, None) # resolve using prefedined geo checks if not func and custom_check_functions: func = custom_check_functions.get(function_name) # returns None if not found if fail_on_missing and not func: diff --git a/src/databricks/labs/dqx/geo/__init__.py b/src/databricks/labs/dqx/geo/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/databricks/labs/dqx/geo/check_funcs.py b/src/databricks/labs/dqx/geo/check_funcs.py new file mode 100644 index 000000000..612f7a717 --- /dev/null +++ b/src/databricks/labs/dqx/geo/check_funcs.py @@ -0,0 +1,450 @@ +from pyspark.sql import Column +import pyspark.sql.functions as F +from databricks.labs.dqx.rule import register_rule +from databricks.labs.dqx.check_funcs import make_condition, _get_normalized_column_and_expr + +POINT_TYPE = "ST_Point" +LINESTRING_TYPE = "ST_LineString" +POLYGON_TYPE = "ST_Polygon" +MULTIPOINT_TYPE = "ST_MultiPoint" +MULTILINESTRING_TYPE = "ST_MultiLineString" +MULTIPOLYGON_TYPE = "ST_MultiPolygon" +GEOMETRYCOLLECTION_TYPE = "ST_GeometryCollection" + + +@register_rule("row") +def is_latitude(column: str | Column) -> Column: + """Checks whether the values in the input column are valid latitudes. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are valid latitudes + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + condition = ~F.when(col_expr.isNull(), F.lit(None)).otherwise( + F.col(col_str_norm).try_cast("double").between(-90.0, 90.0) + ) + condition_str = f"` in column `{col_expr_str}` is not a valid latitude (must be between -90 and 90)" + + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_valid_latitude", + ) + + +@register_rule("row") +def is_longitude(column: str | Column) -> Column: + """Checks whether the values in the input column are valid longitudes. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are valid longitudes + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + condition = ~F.when(col_expr.isNull(), F.lit(None)).otherwise( + F.col(col_str_norm).try_cast("double").between(-180.0, 180.0) + ) + condition_str = f"` in column `{col_expr_str}` is not a valid longitude (must be between -180 and 180)" + + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_valid_longitude", + ) + + +@register_rule("row") +def is_geometry(column: str | Column) -> Column: + """Checks whether the values in the input column are valid geometries. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are valid geometries + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry` function. + geometry_col = F.expr(f"try_to_geometry({col_str_norm})") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geometry_col.isNull()) + condition_str = f"` in column `{col_expr_str}` is not a geometry" + + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_geometry", + ) + + +@register_rule("row") +def is_geography(column: str | Column) -> Column: + """Checks whether the values in the input column are valid geographies. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are valid geographies + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geography` function. + geometry_col = F.expr(f"try_to_geography({col_str_norm})") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geometry_col.isNull()) + condition_str = f"` in column `{col_expr_str}` is not a geography" + + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_geography", + ) + + +@register_rule("row") +def is_point(column: str | Column) -> Column: + """Checks whether the values in the input column are point geometries. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are point geometries + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry` and `st_geometrytype` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr(f"st_geometrytype(try_to_geometry({col_str_norm})) <> '{POINT_TYPE}'") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` is not a point geometry" + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_point", + ) + + +@register_rule("row") +def is_linestring(column: str | Column) -> Column: + """Checks whether the values in the input column are linestring geometries. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are linestring geometries + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry` and `st_geometrytype` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr(f"st_geometrytype(try_to_geometry({col_str_norm})) <> '{LINESTRING_TYPE}'") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` is not a linestring geometry" + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_linestring", + ) + + +@register_rule("row") +def is_polygon(column: str | Column) -> Column: + """Checks whether the values in the input column are polygon geometries. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are polygon geometries + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry` and `st_geometrytype` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr(f"st_geometrytype(try_to_geometry({col_str_norm})) <> '{POLYGON_TYPE}'") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` is not a polygon geometry" + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_polygon", + ) + + +@register_rule("row") +def is_multipoint(column: str | Column) -> Column: + """Checks whether the values in the input column are multipoint geometries. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are multipoint geometries + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry` and `st_geometrytype` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr(f"st_geometrytype(try_to_geometry({col_str_norm})) <> '{MULTIPOINT_TYPE}'") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` is not a multipoint geometry" + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_multipoint", + ) + + +@register_rule("row") +def is_multilinestring(column: str | Column) -> Column: + """Checks whether the values in the input column are multilinestring geometries. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are multilinestring geometries + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry` and `st_geometrytype` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr(f"st_geometrytype(try_to_geometry({col_str_norm})) <> '{MULTILINESTRING_TYPE}'") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` is not a multilinestring geometry" + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_multilinestring", + ) + + +@register_rule("row") +def is_multipolygon(column: str | Column) -> Column: + """Checks whether the values in the input column are multipolygon geometries. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are multipolygon geometries + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry` and `st_geometrytype` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr(f"st_geometrytype(try_to_geometry({col_str_norm})) <> '{MULTIPOLYGON_TYPE}'") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` is not a multipolygon geometry" + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_multipolygon", + ) + + +@register_rule("row") +def is_geometrycollection(column: str | Column) -> Column: + """Checks whether the values in the input column are geometrycollection geometries. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are geometrycollection geometries + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry` and `st_geometrytype` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr(f"st_geometrytype(try_to_geometry({col_str_norm})) <> '{GEOMETRYCOLLECTION_TYPE}'") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` is not a geometrycollection geometry" + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_geometrycollection", + ) + + +@register_rule("row") +def is_ogc_valid(column: str | Column) -> Column: + """Checks whether the values in the input column are valid geometries in the OGC sense. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are valid geometries + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry` and `st_isvalid` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr(f"NOT st_isvalid(try_to_geometry({col_str_norm}))") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` is not a valid geometry (in the OGC sense)" + + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_not_valid_geometry", + ) + + +@register_rule("row") +def is_non_empty_geometry(column: str | Column) -> Column: + """Checks whether the values in the input column are empty geometries. + + Args: + column: column to check; can be a string column name or a column expression + + Returns: + Column object indicating whether the values in the input column are empty geometries + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry` and `st_isempty` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr(f"st_isempty(try_to_geometry({col_str_norm}))") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` is an empty geometry" + + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_is_empty_geometry", + ) + + +@register_rule("row") +def has_dimension(column: str | Column, dimension: int) -> Column: + """Checks whether the geometries/geographies in the input column have a given dimension. + + Args: + column: column to check; can be a string column name or a column expression + dimension: required dimension of the geometries/geographies + + Returns: + Column object indicating whether the geometries/geographies in the input column have a given dimension + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry` and `st_dimension` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr(f"st_dimension(try_to_geometry({col_str_norm})) <> {dimension}") + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` does not have the required dimension ({dimension})" + + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_does_not_have_required_geo_dimension", + ) + + +@register_rule("row") +def has_x_coordinate_between(column: str | Column, min_value: float, max_value: float) -> Column: + """Checks whether the x coordinates of the geometries in the input column are between a given range. + + Args: + column: column to check; can be a string column name or a column expression + min_value: minimum value of the x coordinates + max_value: maximum value of the x coordinates + + Returns: + Column object indicating whether the x coordinates of the geometries in the input column are between a given range + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry`, `st_xmax` and `st_xmin` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr( + f"st_xmax(try_to_geometry({col_str_norm})) > {max_value} OR st_xmin(try_to_geometry({col_str_norm})) < {min_value}" + ) + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` has x coordinates outside the range [{min_value}, {max_value}]" + + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_has_x_coordinates_outside_range", + ) + + +@register_rule("row") +def has_y_coordinate_between(column: str | Column, min_value: float, max_value: float) -> Column: + """Checks whether the y coordinates of the geometries in the input column are between a given range. + + Args: + column: column to check; can be a string column name or a column expression + min_value: minimum value of the y coordinates + max_value: maximum value of the y coordinates + + Returns: + Column object indicating whether the y coordinates of the geometries in the input column are between a given range + + Note: + This function requires Databricks serverless compute or runtime 17.1 or above. + """ + col_str_norm, col_expr_str, col_expr = _get_normalized_column_and_expr(column) + # NOTE: This function is currently only available in Databricks runtime 17.1 or above or in + # Databricks SQL, due to the use of the `try_to_geometry`, `st_ymax` and `st_ymin` functions. + geom_cond = F.expr(f"try_to_geometry({col_str_norm}) IS NULL") + geom_type_cond = F.expr( + f"st_ymax(try_to_geometry({col_str_norm})) > {max_value} OR st_ymin(try_to_geometry({col_str_norm})) < {min_value}" + ) + condition = F.when(col_expr.isNull(), F.lit(None)).otherwise(geom_cond | geom_type_cond) + condition_str = f"` in column `{col_expr_str}` has y coordinates outside the range [{min_value}, {max_value}]" + + return make_condition( + condition, + F.concat_ws("", F.lit("value `"), col_expr.cast("string"), F.lit(condition_str)), + f"{col_str_norm}_has_y_coordinates_outside_range", + ) diff --git a/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml b/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml index 38abcabd9..f127b75f2 100644 --- a/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml +++ b/src/databricks/labs/dqx/llm/resources/yaml_checks_examples.yml @@ -320,6 +320,91 @@ for_each_column: - col3 - col5 +- criticality: error + check: + function: is_latitude + arguments: + column: col2 +- criticality: error + check: + function: is_longitude + arguments: + column: col2 +- criticality: error + check: + function: is_geometry + arguments: + column: point_geom +- criticality: error + check: + function: is_geography + arguments: + column: point_geom +- criticality: error + check: + function: is_point + arguments: + column: point_geom +- criticality: error + check: + function: is_linestring + arguments: + column: linestring_geom +- criticality: error + check: + function: is_polygon + arguments: + column: polygon_geom +- criticality: error + check: + function: is_multipoint + arguments: + column: multipoint_geom +- criticality: error + check: + function: is_multilinestring + arguments: + column: multilinestring_geom +- criticality: error + check: + function: is_multipolygon + arguments: + column: multipolygon_geom +- criticality: error + check: + function: is_geometrycollection + arguments: + column: geometrycollection_geom +- criticality: error + check: + function: is_ogc_valid + arguments: + column: point_geom +- criticality: error + check: + function: is_non_empty_geometry + arguments: + column: point_geom +- criticality: error + check: + function: has_dimension + arguments: + column: polygon_geom + dimension: 2 +- criticality: error + check: + function: has_x_coordinate_between + arguments: + column: polygon_geom + min_value: 0.0 + max_value: 10.0 +- criticality: error + check: + function: has_y_coordinate_between + arguments: + column: polygon_geom + min_value: 0.0 + max_value: 10.0 - criticality: error check: function: is_not_null diff --git a/tests/conftest.py b/tests/conftest.py index 98d2ba6f3..422c326eb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import os +import re from collections.abc import Callable, Generator from dataclasses import replace from functools import cached_property @@ -40,6 +41,44 @@ def set_utc_timezone(): os.environ.pop("TZ") +@pytest.fixture +def skip_if_runtime_not_geo_compatible(ws, debug_env): + """ + Skip the test if the cluster runtime does not support the required geo functions, i.e. + * serverless clusters have the required geo functions + * standard clusters require runtime 17.1 or above + + Args: + ws (WorkspaceClient): Workspace client to interact with Databricks. + debug_env (dict): Test environment variables. + """ + if "DATABRICKS_SERVERLESS_COMPUTE_ID" in debug_env: + return # serverless clusters have the required geo functions + + # standard clusters require runtime 17.1 or above + cluster_id = debug_env.get("DATABRICKS_CLUSTER_ID") + if not cluster_id: + raise ValueError("DATABRICKS_CLUSTER_ID is not set in debug_env") + + # Fetch cluster details + cluster_info = ws.clusters.get(cluster_id) + runtime_version = cluster_info.spark_version + + if not runtime_version: + raise ValueError(f"Unable to retrieve runtime version for cluster {cluster_id}") + + # Extract major and minor version numbers + match = re.match(r"(\d+)\.(\d+)", runtime_version) + if not match: + raise ValueError(f"Invalid runtime version format: {runtime_version}") + + major, minor = [int(x) for x in match.groups()] + valid = major > 17 or (major == 17 and minor >= 1) + + if not valid: + pytest.skip("This test requires a cluster with runtime 17.1 or above") + + class CommonUtils: def __init__(self, env_or_skip_fixture: Callable[[str], str], ws: WorkspaceClient): self._env_or_skip = env_or_skip_fixture diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py index 8964df832..bec66c90d 100644 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -22,6 +22,7 @@ ) from databricks.labs.dqx.schema import dq_result_schema from databricks.labs.dqx import check_funcs +import databricks.labs.dqx.geo.check_funcs as geo_check_funcs from tests.integration.conftest import REPORTING_COLUMNS, RUN_TIME, EXTRA_PARAMS @@ -4594,6 +4595,126 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak assert_df_equality(checked_df, expected, ignore_nullable=True) +def test_apply_checks_all_row_geo_checks_as_yaml_with_streaming( + skip_if_runtime_not_geo_compatible, ws, make_schema, make_random, make_volume, spark +): + catalog_name = "main" + schema_name = make_schema(catalog_name=catalog_name).name + input_table_name = f"{catalog_name}.{schema_name}.{make_random(6).lower()}" + output_table_name = f"{catalog_name}.{schema_name}.{make_random(6).lower()}" + volume = make_volume(catalog_name=catalog_name, schema_name=schema_name) + + file_path = Path(__file__).parent.parent / "resources" / "all_row_geo_checks.yaml" + with open(file_path, "r", encoding="utf-8") as f: + checks = yaml.safe_load(f) + + dq_engine = DQEngine(ws) + assert not dq_engine.validate_checks(checks).has_errors + + schema = ( + "col3: int, point_geom: string, linestring_geom: string, " + "polygon_geom: string, multipoint_geom: string, multilinestring_geom: string, " + "multipolygon_geom: string, geometrycollection_geom: string" + ) + test_df = spark.createDataFrame( + [ + [ + 1, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + ], + [ + 2, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + ], + [ + 3, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + ], + ], + schema, + ) + test_df.write.saveAsTable(input_table_name) + streaming_test_df = spark.readStream.table(input_table_name) + + streaming_checked_df = dq_engine.apply_checks_by_metadata(streaming_test_df, checks) + dq_engine.save_results_in_table( + output_df=streaming_checked_df, + output_config=OutputConfig( + location=output_table_name, + mode="append", + trigger={"availableNow": True}, + options={ + "checkpointLocation": f"/Volumes/{volume.catalog_name}/{volume.schema_name}/{volume.name}/{make_random(6).lower()}" + }, + ), + ) + + checked_df = spark.table(output_table_name) + + expected_schema = schema + REPORTING_COLUMNS + expected = spark.createDataFrame( + [ + [ + 1, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + None, + None, + ], + [ + 2, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + None, + None, + ], + [ + 3, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + None, + None, + ], + ], + expected_schema, + ) + + assert_df_equality(checked_df, expected, ignore_nullable=True) + + def test_apply_checks_all_checks_as_yaml(ws, spark): """Test applying all checks from a yaml file. @@ -4608,6 +4729,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): with open(file_path, "r", encoding="utf-8") as f: checks.extend(yaml.safe_load(f)) + # Geo checks are executed in a separate test as they require specific DBR + dq_engine = DQEngine(ws) status = dq_engine.validate_checks(checks) assert not status.has_errors @@ -4727,6 +4850,107 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): assert_df_equality(checked, expected, ignore_nullable=True) +def test_apply_checks_all_geo_checks_as_yaml(skip_if_runtime_not_geo_compatible, ws, spark): + """Test applying all geo checks from a yaml file.""" + file_path = Path(__file__).parent.parent / "resources" / "all_row_geo_checks.yaml" + with open(file_path, "r", encoding="utf-8") as f: + checks = yaml.safe_load(f) + + dq_engine = DQEngine(ws) + status = dq_engine.validate_checks(checks) + assert not status.has_errors + + schema = ( + "col3: int, point_geom: string, linestring_geom: string, " + "polygon_geom: string, multipoint_geom: string, multilinestring_geom: string, " + "multipolygon_geom: string, geometrycollection_geom: string" + ) + test_df = spark.createDataFrame( + [ + [ + 1, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + ], + [ + 2, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + ], + [ + 3, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + ], + ], + schema, + ) + + ref_df = test_df.withColumnRenamed("col1", "ref_col1").withColumnRenamed("col2", "ref_col2") + ref_dfs = {"ref_df_key": ref_df} + + checked = dq_engine.apply_checks_by_metadata(test_df, checks, ref_dfs=ref_dfs) + + expected_schema = schema + REPORTING_COLUMNS + expected = spark.createDataFrame( + [ + [ + 1, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + None, + None, + ], + [ + 2, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + None, + None, + ], + [ + 3, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + None, + None, + ], + ], + expected_schema, + ) + assert_df_equality(checked, expected, ignore_nullable=True) + + def test_apply_checks_all_checks_using_classes(ws, spark): """Test applying all checks using DQX classes. @@ -5445,6 +5669,287 @@ def test_apply_checks_all_checks_using_classes(ws, spark): assert_df_equality(checked, expected, ignore_nullable=True) +def test_apply_checks_all_geo_checks_using_classes(skip_if_runtime_not_geo_compatible, ws, spark): + """Test applying all geo checks using DQX classes. + + The checks used in the test are also showcased in the docs under /docs/reference/quality_checks.mdx + The checks should be kept up to date with the docs to make sure the documentation examples are validated. + """ + checks = [ + # is_latitude check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_latitude, + column="col2", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_latitude, + column=F.col("col2"), + ), + # is_longitude check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_longitude, + column="col2", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_longitude, + column=F.col("col2"), + ), + # is_geometry check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geometry, + column="point_geom", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geometry, + column=F.col("point_geom"), + ), + # is_geography check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geography, + column="point_geom", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geography, + column=F.col("point_geom"), + ), + # is_point check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_point, + column="point_geom", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_point, + column=F.col("point_geom"), + ), + # is_linestring check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_linestring, + column="linestring_geom", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_linestring, + column=F.col("linestring_geom"), + ), + # is_polygon check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_polygon, + column="polygon_geom", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_polygon, + column=F.col("polygon_geom"), + ), + # is_multipoint check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multipoint, + column="multipoint_geom", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multipoint, + column=F.col("multipoint_geom"), + ), + # is_multilinestring check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multilinestring, + column="multilinestring_geom", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multilinestring, + column=F.col("multilinestring_geom"), + ), + # is_multipolygon check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multipolygon, + column="multipolygon_geom", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multipolygon, + column=F.col("multipolygon_geom"), + ), + # is_geometrycollection check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geometrycollection, + column="geometrycollection_geom", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geometrycollection, + column=F.col("geometrycollection_geom"), + ), + # is_ogc_valid check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_ogc_valid, + column="point_geom", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_ogc_valid, + column=F.col("point_geom"), + ), + # is_non_empty_geometry check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_non_empty_geometry, + column="point_geom", + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_non_empty_geometry, + column=F.col("point_geom"), + ), + # has_dimension check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_dimension, + column="polygon_geom", + check_func_kwargs={"dimension": 2}, + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_dimension, + column=F.col("polygon_geom"), + check_func_kwargs={"dimension": 2}, + ), + # has_x_coordinate_between check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_x_coordinate_between, + column="polygon_geom", + check_func_kwargs={"min_value": 0.0, "max_value": 10.0}, + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_x_coordinate_between, + column=F.col("polygon_geom"), + check_func_kwargs={"min_value": 0.0, "max_value": 10.0}, + ), + # has_y_coordinate_between check + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_y_coordinate_between, + column="polygon_geom", + check_func_kwargs={"min_value": 0.0, "max_value": 10.0}, + ), + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_y_coordinate_between, + column=F.col("polygon_geom"), + check_func_kwargs={"min_value": 0.0, "max_value": 10.0}, + ), + ] + + dq_engine = DQEngine(ws) + + schema = ( + "col2: int, point_geom: string, linestring_geom: string, " + "polygon_geom: string, multipoint_geom: string, multilinestring_geom: string, " + "multipolygon_geom: string, geometrycollection_geom: string" + ) + test_df = spark.createDataFrame( + [ + [ + 1, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + ], + [ + 2, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + ], + [ + 3, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + ], + ], + schema, + ) + + checked = dq_engine.apply_checks(test_df, checks) + + expected_schema = schema + REPORTING_COLUMNS + expected = spark.createDataFrame( + [ + [ + 1, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + None, + None, + ], + [ + 2, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + None, + None, + ], + [ + 3, + "POINT(1 1)", + "LINESTRING(1 1, 2 2)", + "POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))", + "MULTIPOINT(1 1, 2 2)", + "MULTILINESTRING((1 1, 2 2))", + "MULTIPOLYGON(((1 1, 3 1, 3 3, 1 3, 1 1)))", + "GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(1 1, 2 2), POLYGON((1 1, 3 1, 3 3, 1 3, 1 1)))", + None, + None, + ], + ], + expected_schema, + ) + assert_df_equality(checked, expected, ignore_nullable=True) + + def test_define_user_metadata_and_extract_dq_results(ws, spark): user_metadata = {"key1": "value1", "key2": "value2"} extra_params = ExtraParams(run_time=RUN_TIME.isoformat(), user_metadata=user_metadata) diff --git a/tests/integration/test_row_checks_geo.py b/tests/integration/test_row_checks_geo.py new file mode 100644 index 000000000..a12ebcebe --- /dev/null +++ b/tests/integration/test_row_checks_geo.py @@ -0,0 +1,400 @@ +from chispa.dataframe_comparer import assert_df_equality # type: ignore +from databricks.labs.dqx.geo.check_funcs import ( + has_dimension, + has_x_coordinate_between, + has_y_coordinate_between, + is_non_empty_geometry, + is_geometry, + is_geography, + is_geometrycollection, + is_latitude, + is_linestring, + is_longitude, + is_multilinestring, + is_multipoint, + is_multipolygon, + is_point, + is_polygon, + is_ogc_valid, +) + + +def test_is_geometry(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom_string: string, geom_binary: binary, geom_int: int" + test_df = spark.createDataFrame( + [ + ["POINT(1 1)", None, None], # valid WKT + ["not-a-geometry", None, None], # invalid (not valid WKT) + [None, bytes.fromhex("01E9030000000000000000F03F00000000000000400000000000005940"), None], # valid WKB + [None, None, 42], # invalid (wrong data type) + ], + input_schema, + ) + + actual = test_df.select(is_geometry("geom_string"), is_geometry("geom_binary"), is_geometry("geom_int")) + + checked_schema = ( + "geom_string_is_not_geometry: string, geom_binary_is_not_geometry: string, geom_int_is_not_geometry: string" + ) + expected = spark.createDataFrame( + [ + [None, None, None], + ["value `not-a-geometry` in column `geom_string` is not a geometry", None, None], + [None, None, None], + [None, None, "value `42` in column `geom_int` is not a geometry"], + ], + checked_schema, + ) + + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_geography(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geography_string: string, geography_binary: binary, geography_int: int" + test_df = spark.createDataFrame( + [ + ["POINT(1 1)", None, None], # valid WKT + ["POINT(181 91)", None, None], # invalid (lat/lon out of range) + ["not-a-geography", None, None], # invalid (not valid WKT) + [None, bytes.fromhex("0101000000000000000000f03f0000000000000040"), None], # valid WKB + [None, None, 42], # invalid (wrong data type) + ], + input_schema, + ) + + actual = test_df.select( + is_geography("geography_string"), is_geography("geography_binary"), is_geography("geography_int") + ) + + checked_schema = "geography_string_is_not_geography: string, geography_binary_is_not_geography: string, geography_int_is_not_geography: string" + expected = spark.createDataFrame( + [ + [None, None, None], + ["value `POINT(181 91)` in column `geography_string` is not a geography", None, None], + ["value `not-a-geography` in column `geography_string` is not a geography", None, None], + [None, None, None], + [None, None, "value `42` in column `geography_int` is not a geography"], + ], + checked_schema, + ) + + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_point(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [["POINT(1 1)"], ["nonsense"], ["POLYGON((1 1, 2 2, 3 3, 1 1))"], [None]], + input_schema, + ) + + actual = test_df.select(is_point("geom")) + + checked_schema = "geom_is_not_point: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` is not a point geometry"], + ["value `POLYGON((1 1, 2 2, 3 3, 1 1))` in column `geom` is not a point geometry"], + [None], + ], + checked_schema, + ) + + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_linestring(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [["LINESTRING(1 1, 2 2)"], ["nonsense"], ["POLYGON((1 1, 2 2, 3 3, 1 1))"], [None]], + input_schema, + ) + + actual = test_df.select(is_linestring("geom")) + + checked_schema = "geom_is_not_linestring: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` is not a linestring geometry"], + ["value `POLYGON((1 1, 2 2, 3 3, 1 1))` in column `geom` is not a linestring geometry"], + [None], + ], + checked_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_polygon(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [["POLYGON((1 1, 2 2, 3 3, 1 1))"], ["nonsense"], ["LINESTRING(1 1, 2 2)"], [None]], + input_schema, + ) + + actual = test_df.select(is_polygon("geom")) + + checked_schema = "geom_is_not_polygon: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` is not a polygon geometry"], + ["value `LINESTRING(1 1, 2 2)` in column `geom` is not a polygon geometry"], + [None], + ], + checked_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_multipoint(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [["MULTIPOINT(1 1, 2 2)"], ["nonsense"], ["LINESTRING(1 1, 2 2)"], [None]], + input_schema, + ) + + actual = test_df.select(is_multipoint("geom")) + + checked_schema = "geom_is_not_multipoint: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` is not a multipoint geometry"], + ["value `LINESTRING(1 1, 2 2)` in column `geom` is not a multipoint geometry"], + [None], + ], + checked_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_multilinestring(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [["MULTILINESTRING((1 1, 2 2), (3 3, 4 4))"], ["nonsense"], ["POLYGON((1 1, 2 2, 3 3, 1 1))"], [None]], + input_schema, + ) + + actual = test_df.select(is_multilinestring("geom")) + + checked_schema = "geom_is_not_multilinestring: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` is not a multilinestring geometry"], + ["value `POLYGON((1 1, 2 2, 3 3, 1 1))` in column `geom` is not a multilinestring geometry"], + [None], + ], + checked_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_multipolygon(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [["MULTIPOLYGON(((1 1, 2 2, 3 3, 1 1)))"], ["nonsense"], ["LINESTRING(1 1, 2 2)"], [None]], + input_schema, + ) + + actual = test_df.select(is_multipolygon("geom")) + + checked_schema = "geom_is_not_multipolygon: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` is not a multipolygon geometry"], + ["value `LINESTRING(1 1, 2 2)` in column `geom` is not a multipolygon geometry"], + [None], + ], + checked_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_geometrycollection(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [ + ["GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(2 2, 3 3))"], + ["nonsense"], + ["POLYGON((1 1, 2 2, 3 3, 1 1))"], + [None], + ], + input_schema, + ) + + actual = test_df.select(is_geometrycollection("geom")) + + checked_schema = "geom_is_not_geometrycollection: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` is not a geometrycollection geometry"], + ["value `POLYGON((1 1, 2 2, 3 3, 1 1))` in column `geom` is not a geometrycollection geometry"], + [None], + ], + checked_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_ogc_valid(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [["POLYGON((0 0,10 0,0 10,0 0))"], ["nonsense"], ["POLYGON((0 0,10 10,10 0,0 10,0 0))"], [None]], + input_schema, + ) + + actual = test_df.select(is_ogc_valid("geom")) + + checked_schema = "geom_is_not_valid_geometry: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` is not a valid geometry (in the OGC sense)"], + ["value `POLYGON((0 0,10 10,10 0,0 10,0 0))` in column `geom` is not a valid geometry (in the OGC sense)"], + [None], + ], + checked_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_longitude(spark): + input_schema = "long_string: string, long_int: int, long_double: double" + test_df = spark.createDataFrame( + [["1", 120, 180.0], ["-181", None, 180.01]], + input_schema, + ) + + actual = test_df.select(is_longitude("long_string"), is_longitude("long_int"), is_longitude("long_double")) + + checked_schema = "long_string_is_not_valid_longitude: string, long_int_is_not_valid_longitude: string, long_double_is_not_valid_longitude: string" + expected = spark.createDataFrame( + [ + [None, None, None], + [ + "value `-181` in column `long_string` is not a valid longitude (must be between -180 and 180)", + None, + "value `180.01` in column `long_double` is not a valid longitude (must be between -180 and 180)", + ], + ], + checked_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_latitude(spark): + input_schema = "lat_string: string, lat_int: int, lat_double: double" + test_df = spark.createDataFrame( + [["1", 60, 90.0], ["-91", None, 90.01]], + input_schema, + ) + + actual = test_df.select(is_latitude("lat_string"), is_latitude("lat_int"), is_latitude("lat_double")) + + checked_schema = "lat_string_is_not_valid_latitude: string, lat_int_is_not_valid_latitude: string, lat_double_is_not_valid_latitude: string" + expected = spark.createDataFrame( + [ + [None, None, None], + [ + "value `-91` in column `lat_string` is not a valid latitude (must be between -90 and 90)", + None, + "value `90.01` in column `lat_double` is not a valid latitude (must be between -90 and 90)", + ], + ], + checked_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_non_empty_geometry(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [["POINT(1 1)"], ["nonsense"], ["POLYGON EMPTY"], [None]], + input_schema, + ) + + actual = test_df.select(is_non_empty_geometry("geom")) + + checked_schema = "geom_is_empty_geometry: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` is an empty geometry"], + ["value `POLYGON EMPTY` in column `geom` is an empty geometry"], + [None], + ], + checked_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_dimension(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [["POINT(1 1)"], ["nonsense"], ["POLYGON((0 0, 2 0, 0 2, 0 0))"], [None]], + input_schema, + ) + + actual = test_df.select(has_dimension("geom", 0)) + checked_schema = "geom_does_not_have_required_geo_dimension: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` does not have the required dimension (0)"], + ["value `POLYGON((0 0, 2 0, 0 2, 0 0))` in column `geom` does not have the required dimension (0)"], + [None], + ], + checked_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_x_coordinate_between(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [["POINT(1 1)"], ["nonsense"], ["POLYGON((0 0, 2 0, 0 2, 0 0))"], [None]], + input_schema, + ) + + actual = test_df.select(has_x_coordinate_between("geom", 0, 1)) + + checked_schema = "geom_has_x_coordinates_outside_range: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` has x coordinates outside the range [0, 1]"], + ["value `POLYGON((0 0, 2 0, 0 2, 0 0))` in column `geom` has x coordinates outside the range [0, 1]"], + [None], + ], + checked_schema, + ) + + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_y_coordinate_between(skip_if_runtime_not_geo_compatible, spark): + input_schema = "geom: string" + test_df = spark.createDataFrame( + [["POINT(1 1)"], ["nonsense"], ["POLYGON((0 0, 2 0, 0 2, 0 0))"], [None]], + input_schema, + ) + + actual = test_df.select(has_y_coordinate_between("geom", 0, 1)) + + checked_schema = "geom_has_y_coordinates_outside_range: string" + expected = spark.createDataFrame( + [ + [None], + ["value `nonsense` in column `geom` has y coordinates outside the range [0, 1]"], + ["value `POLYGON((0 0, 2 0, 0 2, 0 0))` in column `geom` has y coordinates outside the range [0, 1]"], + [None], + ], + checked_schema, + ) + + assert_df_equality(actual, expected, ignore_nullable=True) diff --git a/tests/perf/conftest.py b/tests/perf/conftest.py index 59c160c9b..7a1adff7f 100644 --- a/tests/perf/conftest.py +++ b/tests/perf/conftest.py @@ -160,6 +160,32 @@ def generated_ipv6_df(spark): return gen.build() +@pytest.fixture +def generated_geo_df(spark): + geo_schema_str = ( + "num_col: int, point_geom: string, linestring_geom: string, polygon_geom: string, multipoint_geom: string, " + "multilinestring_geom: string, multipolygon_geom: string, geometrycollection_geom: string" + ) + schema = _parse_datatype_string(geo_schema_str) + + geo_templates = { + "num_col": "int", + "point_geom": "POINT(x x)", + "linestring_geom": "LINESTRING(x x, x x)", + "polygon_geom": "POLYGON((x x, x x, x x, x x))", + "multipoint_geom": "MULTIPOINT(x x, x x)", + "multilinestring_geom": "MULTILINESSTRING((x x, x x))", + "multipolygon_geom": "MULTIPOLYGON(((x x, x x, x x, x x))", + "geometrycollection_geom": "GEOMETRYCOLLECTION(POINT(x x), LINESTRING(x x, x x), POLYGON((x x, x x, x x, x x)))", + } + + _, gen = make_data_gen(spark, n_rows=DEFAULT_ROWS, n_columns=len(geo_schema_str), partitions=DEFAULT_PARTITIONS) + gen = gen.withSchema(schema) + for col, template in geo_templates.items(): + gen = gen.withColumnSpec(col, template=template) + return gen.build() + + @pytest.fixture def make_ref_df(spark, n_rows=DEFAULT_ROWS): schema = _parse_datatype_string(REF_SCHEMA_STR) diff --git a/tests/perf/test_apply_checks.py b/tests/perf/test_apply_checks.py index 6282b5c5a..1f223d476 100644 --- a/tests/perf/test_apply_checks.py +++ b/tests/perf/test_apply_checks.py @@ -4,6 +4,7 @@ from databricks.labs.dqx.config import ExtraParams import pytest from databricks.labs.dqx import check_funcs +from databricks.labs.dqx.geo import check_funcs as geo_check_funcs from tests.perf.conftest import DEFAULT_ROWS RUN_TIME = datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc) @@ -1327,6 +1328,234 @@ def test_benchmark_is_ipv6_address_in_cidr(benchmark, ws, generated_ipv6_df, col assert actual_count == EXPECTED_ROWS +def test_benchmark_is_latitude(benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_latitude, + column="point_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_longitude(benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_longitude, + column="point_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_geometry(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geometry, + column="point_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_geography(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geography, + column="point_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_point(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_point, + column="point_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_linestring(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_linestring, + column="linestring_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_polygon(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_polygon, + column="polygon_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_multipoint(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multipoint, + column="multipoint_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_multilinestring(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multilinestring, + column="multilinestring_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_multipolygon(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_multipolygon, + column="multipolygon_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_geometrycollection(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_geometrycollection, + column="geometrycollection_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_ogc_valid(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_ogc_valid, + column="point_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_is_non_empty_geometry(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.is_non_empty_geometry, + column="point_geom", + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_has_dimension(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_dimension, + column="polygon_geom", + check_func_kwargs={"dimension": 2}, + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_has_x_coordinate_between(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_x_coordinate_between, + column="polygon_geom", + check_func_kwargs={"min_value": 0.0, "max_value": 10.0}, + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +def test_benchmark_has_y_coordinate_between(skip_if_runtime_not_geo_compatible, benchmark, ws, generated_geo_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=geo_check_funcs.has_y_coordinate_between, + column="polygon_geom", + check_func_kwargs={"min_value": 0.0, "max_value": 10.0}, + ) + ] + checked = dq_engine.apply_checks(generated_geo_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +@pytest.mark.benchmark(group="test_benchmark_has_valid_schema") def test_benchmark_has_valid_schema(benchmark, ws, generated_df): dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) checks = [ diff --git a/tests/resources/all_row_checks.yaml b/tests/resources/all_row_checks.yaml index 25a12cfb2..a7e8f47d8 100644 --- a/tests/resources/all_row_checks.yaml +++ b/tests/resources/all_row_checks.yaml @@ -432,4 +432,3 @@ column: col5 max_age_minutes: 18000 base_timestamp: col6 - diff --git a/tests/resources/all_row_geo_checks.yaml b/tests/resources/all_row_geo_checks.yaml new file mode 100644 index 000000000..8cc66ed7c --- /dev/null +++ b/tests/resources/all_row_geo_checks.yaml @@ -0,0 +1,119 @@ +# The checks used in the test are also showcased in the docs under /docs/reference/quality_checks.mdx +# The checks should be kept up to date with the docs to make sure the documentation examples are validated. + +# is_latitude check +- criticality: error + check: + function: is_latitude + arguments: + column: col3 + +# is_longitude check +- criticality: error + check: + function: is_longitude + arguments: + column: col3 + +# is_geometry check +- criticality: error + check: + function: is_geometry + arguments: + column: point_geom + +# is_geography check +- criticality: error + check: + function: is_geography + arguments: + column: point_geom + +# is_point check +- criticality: error + check: + function: is_point + arguments: + column: point_geom + +# is_linestring check +- criticality: error + check: + function: is_linestring + arguments: + column: linestring_geom + +# is_polygon check +- criticality: error + check: + function: is_polygon + arguments: + column: polygon_geom + +# is_multipoint check +- criticality: error + check: + function: is_multipoint + arguments: + column: multipoint_geom + +# is_multilinestring check +- criticality: error + check: + function: is_multilinestring + arguments: + column: multilinestring_geom + +# is_multipolygon check +- criticality: error + check: + function: is_multipolygon + arguments: + column: multipolygon_geom + +# is_geometrycollection check +- criticality: error + check: + function: is_geometrycollection + arguments: + column: geometrycollection_geom + +# is_ogc_valid check +- criticality: error + check: + function: is_ogc_valid + arguments: + column: point_geom + +# is_non_empty_geometry check +- criticality: error + check: + function: is_non_empty_geometry + arguments: + column: point_geom + +# has_dimension check +- criticality: error + check: + function: has_dimension + arguments: + column: polygon_geom + dimension: 2 + +# has_x_coordinate_between check +- criticality: error + check: + function: has_x_coordinate_between + arguments: + column: polygon_geom + min_value: 0.0 + max_value: 10.0 + +# has_y_coordinate_between check +- criticality: error + check: + function: has_y_coordinate_between + arguments: + column: polygon_geom + min_value: 0.0 + max_value: 10.0