Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fct_vehicle_locations_grouped - calculate direction from the prior position #3771

Merged
merged 17 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions warehouse/models/mart/gtfs/_mart_gtfs_fcts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ x-common-fields:
# occurs prior to that
error_if: ">20"
- &gtfs_rt_dataset_key
name: gtfs_dataset_key
name: &gtfs_rt_dataset_key_desc | #gtfs_dataset_key
description: |
Foreign key to the associated GTFS dataset record.
tests:
Expand Down Expand Up @@ -2525,27 +2525,16 @@ models:
Synthetic primary key constructed from `service_date`, `base64_url`,
`location_timestamp`, `vehicle_id`, `vehicle_label`,
`trip_id`, and `trip_start_time`.
tests: *almost_unique_rt_key_tests
- *gtfs_rt_dt
- *rt_service_date
- name: gtfs_dataset_key
description: *gtfs_dataset_key_desc
tests:
- dbt_utils.relationships_where:
to: ref('dim_gtfs_datasets')
field: key
to_condition: "type = 'vehicle_positions'"
description: *gtfs_rt_dataset_key_desc
- *base64_url
- *gtfs_rt_name
- *gtfs_rt_schedule_dataset_key
- name: gtfs_rt_schedule_dataset_key
description: '{{ doc("column_rt_schedule_dataset_key") }}'
- <<: *trip_instance_key
tests:
- not_null
- unique_proportion:
at_least: 0.9999
- relationships:
to: ref('fct_observed_trips')
field: trip_instance_key
- *rt_location_timestamp
- name: moving_timestamp
description: |
Expand Down
137 changes: 64 additions & 73 deletions warehouse/models/mart/gtfs/fct_vehicle_locations_grouped.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
}}

WITH fct_vehicle_locations AS (
SELECT
SELECT
key,
dt,
gtfs_dataset_key,
base64_url,
gtfs_dataset_name,
Expand All @@ -24,113 +23,105 @@ WITH fct_vehicle_locations AS (
trip_instance_key,
location_timestamp,
location,
next_location_key,
-- rather than using next_location_key, use lag to calculate direction from previous
FROM {{ ref('fct_vehicle_locations') }}
WHERE {{ incremental_where(default_start_var='PROD_GTFS_RT_START') }}
ORDER by service_date, trip_instance_key, location_timestamp
),

),

next_location AS (
lat_lon AS (
SELECT
key AS next_location_key,
location AS next_location,
fct_vehicle_locations.trip_instance_key,
fct_vehicle_locations.service_date,
fct_vehicle_locations.key,
fct_vehicle_locations.location_timestamp,
ST_X(fct_vehicle_locations.location) AS longitude,
ST_Y(fct_vehicle_locations.location) AS latitude,
FROM fct_vehicle_locations
),

same_locations AS (
lagged AS (
SELECT
fct_vehicle_locations.key,
fct_vehicle_locations.next_location_key,
ST_X(fct_vehicle_locations.location) AS lon,
ST_Y(fct_vehicle_locations.location) AS lat,
ST_X(next_location.next_location) - ST_X(fct_vehicle_locations.location) AS delta_lon,
ST_Y(next_location.next_location) - ST_Y(fct_vehicle_locations.location) AS delta_lat,
CASE
WHEN ST_EQUALS(fct_vehicle_locations.location, next_location.next_location)
THEN 0
ELSE 1
END AS new_group,
FROM fct_vehicle_locations
INNER JOIN next_location
ON fct_vehicle_locations.next_location_key = next_location.next_location_key
lat_lon.*,
LAG(longitude, 1, NULL)
OVER (PARTITION BY service_date, trip_instance_key
ORDER BY location_timestamp)
AS previous_longitude,
LAG(latitude, 1, NULL)
OVER (PARTITION BY service_date, trip_instance_key
ORDER BY location_timestamp)
AS previous_latitude,
FROM lat_lon
),

deltas AS (
SELECT
lagged.*,
lagged.longitude - lagged.previous_longitude AS delta_lon,
lagged.latitude - lagged.previous_latitude AS delta_lat,
FROM lagged
),

direction AS (
SELECT
same_locations.next_location_key AS key,
same_locations.new_group,
deltas.*,
CASE
WHEN (ABS(delta_lon) > ABS(delta_lat)) AND (delta_lon > 0)
THEN "East"
WHEN (ABS(delta_lon) > ABS(delta_lat)) AND (delta_lon < 0)
THEN "West"
WHEN (ABS(delta_lon) < ABS(delta_lat)) AND (delta_lat > 0)
WHEN (ABS(delta_lon) <= ABS(delta_lat)) AND (delta_lat > 0)
THEN "North"
WHEN (ABS(delta_lon) < ABS(delta_lat)) AND (delta_lat < 0)
WHEN (ABS(delta_lon) <= ABS(delta_lat)) AND (delta_lat < 0)
THEN "South"
WHEN (delta_lon = delta_lat) AND (delta_lat = 0)
THEN "Unknown"
WHEN delta_lon IS NULL OR delta_lat IS NULL
THEN "Unknown"
END AS vp_direction,
FROM same_locations
WHERE same_locations.new_group = 1
-- subset to where new_group is identified so we can fill in unknown
-- direction / dwelling points once we group the vp
),

keys_grouped AS (
SELECT
fct_vehicle_locations.key,
direction.new_group,
direction.vp_direction
FROM fct_vehicle_locations
LEFT JOIN direction
ON fct_vehicle_locations.key = direction.key
CASE
WHEN delta_lon = delta_lat AND delta_lat = 0
THEN 0
WHEN delta_lon IS NULL OR delta_lat IS NULL
THEN 0
ELSE 1
END AS new_group,
FROM deltas
),

vp_grouper AS (
SELECT
fct_vehicle_locations.key,
fct_vehicle_locations.dt,
fct_vehicle_locations.gtfs_dataset_key,
fct_vehicle_locations.base64_url,
fct_vehicle_locations.gtfs_dataset_name,
fct_vehicle_locations.schedule_gtfs_dataset_key,
fct_vehicle_locations.service_date,
fct_vehicle_locations.trip_instance_key,
fct_vehicle_locations.location,
fct_vehicle_locations.location_timestamp,
SUM(keys_grouped.new_group)
direction.trip_instance_key,
direction.service_date,
direction.key,
SUM(direction.new_group)
OVER (
PARTITION BY service_date, trip_instance_key
ORDER BY location_timestamp
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS vp_group,
keys_grouped.vp_direction
FROM fct_vehicle_locations
INNER JOIN keys_grouped
ON fct_vehicle_locations.key = keys_grouped.key
direction.vp_direction,
FROM direction
),

fct_grouped_locations AS (
SELECT
MIN(vp_grouper.key) AS key,
vp_grouper.dt,
vp_grouper.gtfs_dataset_key,
vp_grouper.base64_url,
vp_grouper.gtfs_dataset_name,
vp_grouper.schedule_gtfs_dataset_key,
vp_grouper.service_date,
fct_vehicle_locations.gtfs_dataset_key,
fct_vehicle_locations.base64_url,
fct_vehicle_locations.gtfs_dataset_name,
fct_vehicle_locations.schedule_gtfs_dataset_key,
fct_vehicle_locations.service_date,
vp_grouper.trip_instance_key,
MIN(vp_grouper.location_timestamp) AS location_timestamp,
MAX(vp_grouper.location_timestamp) AS moving_timestamp,
MIN(vp_grouper.key) AS key,
MIN(fct_vehicle_locations.location_timestamp) AS location_timestamp,
MAX(fct_vehicle_locations.location_timestamp) AS moving_timestamp,
ST_GEOGFROMTEXT(MIN(ST_ASTEXT(fct_vehicle_locations.location))) AS location,
COUNT(*) AS n_vp,
ST_GEOGFROMTEXT(MIN(ST_ASTEXT(vp_grouper.location))) AS location,
CASE
WHEN MIN(vp_grouper.vp_direction) IS NULL
THEN "Unknown" -- now that we grabbed a valid direction, any remaining should be unknown
ELSE MIN(vp_grouper.vp_direction)
END AS vp_direction,
MIN(vp_grouper.vp_direction) AS vp_direction,
vp_grouper.vp_group,
FROM vp_grouper
GROUP BY dt, gtfs_dataset_key, base64_url, gtfs_dataset_name, schedule_gtfs_dataset_key, service_date, trip_instance_key, vp_group
INNER JOIN fct_vehicle_locations
ON fct_vehicle_locations.key = vp_grouper.key
GROUP BY gtfs_dataset_key, base64_url, gtfs_dataset_name, schedule_gtfs_dataset_key, service_date, trip_instance_key, vp_group, ST_ASTEXT(fct_vehicle_locations.location)
)

SELECT * FROM fct_grouped_locations
Loading