diff --git a/activitysim/abm/models/__init__.py b/activitysim/abm/models/__init__.py index 44534fa53..aea244b31 100644 --- a/activitysim/abm/models/__init__.py +++ b/activitysim/abm/models/__init__.py @@ -29,6 +29,7 @@ non_mandatory_scheduling, non_mandatory_tour_frequency, parking_location_choice, + park_and_ride_lot_choice, school_escorting, stop_frequency, summarize, diff --git a/activitysim/abm/models/atwork_subtour_mode_choice.py b/activitysim/abm/models/atwork_subtour_mode_choice.py index b9889aee7..9db50c8df 100644 --- a/activitysim/abm/models/atwork_subtour_mode_choice.py +++ b/activitysim/abm/models/atwork_subtour_mode_choice.py @@ -8,6 +8,8 @@ import pandas as pd from activitysim.abm.models.util.mode import run_tour_mode_choice_simulate +from activitysim.abm.models.util.logsums import setup_skims +from activitysim.abm.models.park_and_ride_lot_choice import run_park_and_ride_lot_choice from activitysim.core import config, estimation, expressions, los, tracing, workflow from activitysim.core.configuration.logit import TourModeComponentSettings from activitysim.core.util import assign_in_place @@ -64,66 +66,35 @@ def atwork_subtour_mode_choice( constants = {} constants.update(model_settings.CONSTANTS) - skim_dict = network_los.get_default_skim_dict() + if "pnr_zone_id" in subtours_merged.columns: + if model_settings.run_atwork_pnr_lot_choice: + subtours_merged["pnr_zone_id"] = run_park_and_ride_lot_choice( + state, + choosers=subtours_merged.copy(), + land_use=state.get_dataframe("land_use"), + network_los=network_los, + model_settings=None, + choosers_dest_col_name="destination", + choosers_origin_col_name="workplace_zone_id", + estimator=None, + pnr_capacity_cls=None, + trace_label=tracing.extend_trace_label(trace_label, "pnr_lot_choice"), + ) + else: + subtours_merged["pnr_zone_id"].fillna(-1, inplace=True) # setup skim keys - orig_col_name = "workplace_zone_id" - dest_col_name = "destination" - out_time_col_name = "start" - in_time_col_name = "end" - odt_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=orig_col_name, dest_key=dest_col_name, dim3_key="out_period" - ) - dot_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=dest_col_name, dest_key=orig_col_name, dim3_key="in_period" - ) - odr_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=orig_col_name, dest_key=dest_col_name, dim3_key="in_period" - ) - dor_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=dest_col_name, dest_key=orig_col_name, dim3_key="out_period" + skims = setup_skims( + network_los, + subtours_merged, + add_periods=False, + include_pnr_skims=model_settings.run_atwork_pnr_lot_choice, + orig_col_name="workplace_zone_id", + dest_col_name="destination", + trace_label=trace_label, ) - od_skim_stack_wrapper = skim_dict.wrap(orig_col_name, dest_col_name) - - skims = { - "odt_skims": odt_skim_stack_wrapper, - "dot_skims": dot_skim_stack_wrapper, - "odr_skims": odr_skim_stack_wrapper, - "dor_skims": dor_skim_stack_wrapper, - "od_skims": od_skim_stack_wrapper, - "orig_col_name": orig_col_name, - "dest_col_name": dest_col_name, - "out_time_col_name": out_time_col_name, - "in_time_col_name": in_time_col_name, - } if network_los.zone_system == los.THREE_ZONE: - # fixme - is this a lightweight object? - tvpb = network_los.tvpb - - tvpb_logsum_odt = tvpb.wrap_logsum( - orig_key=orig_col_name, - dest_key=dest_col_name, - tod_key="out_period", - segment_key="demographic_segment", - cache_choices=True, - trace_label=trace_label, - tag="tvpb_logsum_odt", - ) - tvpb_logsum_dot = tvpb.wrap_logsum( - orig_key=dest_col_name, - dest_key=orig_col_name, - tod_key="in_period", - segment_key="demographic_segment", - cache_choices=True, - trace_label=trace_label, - tag="tvpb_logsum_dot", - ) - - skims.update( - {"tvpb_logsum_odt": tvpb_logsum_odt, "tvpb_logsum_dot": tvpb_logsum_dot} - ) - # TVPB constants can appear in expressions constants.update( network_los.setting("TVPB_SETTINGS.tour_mode_choice.CONSTANTS") @@ -157,7 +128,7 @@ def atwork_subtour_mode_choice( tvpb_mode_path_types = model_settings.tvpb_mode_path_types for mode, path_types in tvpb_mode_path_types.items(): for direction, skim in zip( - ["od", "do"], [tvpb_logsum_odt, tvpb_logsum_dot] + ["od", "do"], [skims["tvpb_logsum_odt"], skims["tvpb_logsum_dot"]] ): path_type = path_types[direction] skim_cache = skim.cache[path_type] diff --git a/activitysim/abm/models/disaggregate_accessibility.py b/activitysim/abm/models/disaggregate_accessibility.py index df229c6ad..ebae8c7e4 100644 --- a/activitysim/abm/models/disaggregate_accessibility.py +++ b/activitysim/abm/models/disaggregate_accessibility.py @@ -183,6 +183,8 @@ class DisaggregateAccessibilitySettings(PydanticReadable, extra="forbid"): If less than 1, use this fraction of the total number of rows. If not supplied or None, will default to the chunk size in the location choice model settings. """ + NEAREST_ZONE_SKIM: str = "DIST" + """The skim to use for finding the nearest zone when distributing logsums to un-sampled zones.""" def read_disaggregate_accessibility_yaml( diff --git a/activitysim/abm/models/park_and_ride_lot_choice.py b/activitysim/abm/models/park_and_ride_lot_choice.py new file mode 100644 index 000000000..caacef362 --- /dev/null +++ b/activitysim/abm/models/park_and_ride_lot_choice.py @@ -0,0 +1,379 @@ +# ActivitySim +# See full license in LICENSE.txt. +import logging + +import numpy as np +import pandas as pd +from typing import Literal + +from activitysim.core import ( + config, + expressions, + los, + estimation, + simulate, + tracing, + workflow, +) +from activitysim.core.configuration.logit import ( + LogitComponentSettings, + PreprocessorSettings, +) +from activitysim.core.interaction_simulate import interaction_simulate +from activitysim.abm.models.util import logsums +from activitysim.abm.models.util.park_and_ride_capacity import ParkAndRideCapacity + +logger = logging.getLogger(__name__) + + +class ParkAndRideLotChoiceSettings(LogitComponentSettings, extra="forbid"): + """ + Settings for the `external_identification` component. + """ + + LANDUSE_PNR_SPACES_COLUMN: str + """lists the column name in the land use table that contains the number of park-and-ride spaces available in the zone""" + + TRANSIT_SKIMS_FOR_ELIGIBILITY: list[str] | None = None + """A list of skim names to use for filtering choosers to only those with destinations that have transit access. + If None, all tours are considered eligible for park-and-ride lot choice.""" + + LANDUSE_COL_FOR_PNR_ELIGIBLE_DEST: str | None = None + """The column name in the land use table that indicates whether a destination is eligible for park-and-ride. + If supplied, then TRANSIT_SKIMS_FOR_ELIGIBILITY is not used. + """ + + explicit_chunk: float = 0 + """ + If > 0, use this chunk size instead of adaptive chunking. + If less than 1, use this fraction of the total number of rows. + """ + + alts_preprocessor: PreprocessorSettings | None = None + """Preprocessor settings for park-and-ride lot alternatives.""" + + ITERATE_WITH_TOUR_MODE_CHOICE: bool = False + """If True, iterate with tour mode choice to find un-capacitated pnr lots. """ + + MAX_ITERATIONS: int = 5 + """Maximum number of iterations to iterate park-and-ride choice with tour mode choice.""" + + ACCEPTED_TOLERANCE: float = 0.95 + """Lot is considered full if the lot is at least this percentage full.""" + + RESAMPLE_STRATEGY: Literal["latest", "random"] = "latest" + """Strategy to use when selecting tours for resampling park-and-ride lot choices. + - latest: tours arriving the latest are selected for resampling. + - random: randomly resample from all tours at the over-capacitated lot. + """ + + PARK_AND_RIDE_MODES: list[str] | None = None + """List of modes that are considered park-and-ride modes. + Needed for filtering choices to calculate park-and-ride lot capacities. + Should correspond to the columns in the tour mode choice specification file. + """ + + +def filter_chooser_to_transit_accessible_destinations( + state: workflow.State, + choosers: pd.DataFrame, + pnr_alts: pd.DataFrame, + network_los: los.Network_LOS, + model_settings: ParkAndRideLotChoiceSettings, + choosers_dest_col_name: str, +) -> pd.DataFrame: + """ + Filter choosers to only those with destinations that have transit access. + If precomputed landuse column is supplied, use that. + Otherwise look at the skims and check the destination has any non-zero terms for transit access. + If no landuse column or skim cores supplied, return all choosers. + """ + + if model_settings.LANDUSE_COL_FOR_PNR_ELIGIBLE_DEST is not None: + col = model_settings.LANDUSE_COL_FOR_PNR_ELIGIBLE_DEST + assert ( + col in pnr_alts.columns + ), f"{col} not in landuse table, check LANDUSE_COL_FOR_PNR_ELIGIBLE_DEST setting in park_and_ride_lot_choice.yaml" + available_dests = pnr_alts[pnr_alts[col]].index + filtered_choosers = choosers[ + choosers[choosers[choosers_dest_col_name].isin(available_dests)] + ] + + elif model_settings.TRANSIT_SKIMS_FOR_ELIGIBILITY is None: + + skim_dict = network_los.get_default_skim_dict() + unique_destinations = choosers[choosers_dest_col_name].unique() + unique_lot_locations = pnr_alts.index.values + transit_accessible = np.full( + shape=len(unique_destinations), fill_value=False, dtype=bool + ) + + for skim_name in model_settings.TRANSIT_SKIMS_FOR_ELIGIBILITY: + if "__" in skim_name: + # If the skim name contains '__', it is a 3D skim + # we need to pass the skim name as a tuple to the lookup method, e.g. ('WALK_TRANSIT_IVTT', 'MD') + skim_name = tuple(skim_name.split("__")) + if skim_name not in skim_dict.skim_info.omx_keys.keys(): + raise ValueError( + f"Skim '{skim_name}' not found in the skim dictionary." + "Please update the model setting TRANSIT_SKIMS_FOR_ELIGIBILITY with valid skim names." + ) + # Filter choosers to only those with destinations that have transit access + # want to check whether ANY of the lot locations have transit access to EVERY destination + transit_accessible_i = [ + ( + skim_dict.lookup( + unique_lot_locations, + np.full(shape=len(unique_lot_locations), fill_value=dest), + skim_name, + ) + > 0 + ).any() + for dest in unique_destinations + ] + transit_accessible = np.logical_or(transit_accessible, transit_accessible_i) + + eligible_destinations = unique_destinations[transit_accessible] + filtered_choosers = choosers[ + choosers[choosers_dest_col_name].isin(eligible_destinations) + ] + else: + filtered_choosers = choosers + + logger.info( + f"Preparing choosers for park-and-ride lot choice model:\n" + f" Filtered tours to {len(filtered_choosers)} with transit access to their destination.\n" + f" Total number of tours: {len(choosers)}.\n" + f" Percentage of tours with transit access at destination: " + f"{len(filtered_choosers) / len(choosers) * 100:.2f}%" + ) + + return filtered_choosers + + +def run_park_and_ride_lot_choice( + state: workflow.State, + choosers: pd.DataFrame, + land_use: pd.DataFrame, + network_los: los.Network_LOS, + model_settings: ParkAndRideLotChoiceSettings | None = None, + choosers_dest_col_name: str = "destination", + choosers_origin_col_name: str = "home_zone_id", + estimator=None, + model_settings_file_name: str = "park_and_ride_lot_choice.yaml", + pnr_capacity_cls: ParkAndRideCapacity | None = None, + trace_label: str = "park_and_ride_lot_choice", +) -> None: + """ + Run the park-and-ride lot choice model. + + Provides another entry point for the model which is useful for getting pnr locations while creating logsums. + """ + + if model_settings is None: + model_settings = ParkAndRideLotChoiceSettings.read_settings_file( + state.filesystem, + "park_and_ride_lot_choice.yaml", + ) + + spec = state.filesystem.read_model_spec(file_name=model_settings.SPEC) + coefficients = state.filesystem.read_model_coefficients(model_settings) + model_spec = simulate.eval_coefficients(state, spec, coefficients, estimator) + locals_dict = model_settings.CONSTANTS + + pnr_alts = land_use[land_use[model_settings.LANDUSE_PNR_SPACES_COLUMN] > 0] + pnr_alts["pnr_zone_id"] = pnr_alts.index.values + + # if we are running with capacitated pnr lots, we need to flag the lots that are over-capacitated + if pnr_capacity_cls is not None: + pnr_alts["pnr_lot_full"] = pnr_capacity_cls.flag_capacitated_pnr_zones(pnr_alts) + # if there are no available pnr lots left, we return a series of -1 + if (pnr_alts["pnr_lot_full"] == 1).all(): + logger.info( + "All park-and-ride lots are full. Returning -1 as park-and-ride lot choice." + ) + return pd.Series(data=-1, index=choosers.index) + else: + pnr_alts["pnr_lot_full"] = 0 + + original_index = None + if not choosers.index.is_unique: + # non-unique index will crash interaction_simulate + # so we need to reset the index and add it to ActivitySim's rng + # this happens while the disaggregate accessibility model is running pnr lot choice + original_index = choosers.index + oi_name = original_index.name + oi_name = oi_name if oi_name else "index" + choosers = choosers.reset_index(drop=False) + idx_multiplier = choosers.groupby(oi_name).size().max() + # round to the nearest 10's place + idx_multiplier = int(np.ceil(idx_multiplier / 10.0) * 10) + choosers.index = ( + original_index * idx_multiplier + choosers.groupby(oi_name).cumcount() + ) + choosers.index.name = oi_name + "_pnr_lot_choice" + state.get_rn_generator().add_channel("pnr_lot_choice", choosers) + assert ( + choosers.index.is_unique + ), "The index of the choosers DataFrame must be unique after resetting the index." + assert len(choosers.index) == len(original_index), ( + f"The length of the choosers DataFrame must be equal to the original index length:" + f" {len(choosers.index)} != {len(original_index)}" + ) + + trn_accessible_choosers = filter_chooser_to_transit_accessible_destinations( + state, + choosers, + pnr_alts, + network_los, + model_settings, + choosers_dest_col_name, + ) + + if trn_accessible_choosers.empty: + logger.debug( + "No choosers with transit accessible destinations found. Returning -1 as park-and-ride lot choice." + ) + # need to drop rng channel that we created before trn_accessible_choosers + state.get_rn_generator().drop_channel("pnr_lot_choice") + return pd.Series(data=-1, index=choosers.index) + + add_periods = False if "in_period" in trn_accessible_choosers.columns else True + skims = logsums.setup_skims( + network_los, + trn_accessible_choosers, + add_periods=add_periods, + include_pnr_skims=True, + orig_col_name=choosers_origin_col_name, + dest_col_name=choosers_dest_col_name, + ) + locals_dict.update(skims) + + if model_settings.preprocessor: + # Need to check whether the table exists in the state. + # This can happen if you have preprocessor settings that reference tours + # but the tours table doesn't exist yet because you are calculating logsums. + # Expressions using these tables need to be written with short-circuiting conditionals + # to avoid errors when the table is not present. + tables = model_settings.preprocessor.TABLES.copy() + for table_name in model_settings.preprocessor.TABLES: + if table_name not in state.existing_table_names: + logger.debug( + f"Table '{table_name}' not found in state. " + "Removing table from preprocessor list." + ) + tables.remove(table_name) + model_settings.preprocessor.TABLES = tables + + # preprocess choosers + expressions.annotate_preprocessors( + state, + df=trn_accessible_choosers, + locals_dict=locals_dict, + # not including skims because lot alt destination not in chooser table + # they are included through the locals_dict instead + skims=None, + model_settings=model_settings, + trace_label=trace_label, + ) + + # preprocess alternatives + expressions.annotate_preprocessors( + state, + df=pnr_alts, + locals_dict=locals_dict, + skims=None, + model_settings=model_settings, + trace_label=trace_label, + preprocessor_setting_name="alts_preprocessor", + ) + + if estimator: + estimator.write_coefficients(model_settings=model_settings) + estimator.write_coefficients_template(model_settings=model_settings) + estimator.write_spec(model_settings) + estimator.write_model_settings(model_settings, model_settings_file_name) + # in production, all choosers with a transit accessible destination are selected as choosers + # but in estimation, it would only be those who actually reported a pnr lot + # unclear exactly how to handle this, but for now, we will write all choosers + estimator.write_choosers(trn_accessible_choosers) + estimator.write_alternatives(pnr_alts) + + choices = interaction_simulate( + state, + choosers=trn_accessible_choosers, + alternatives=pnr_alts, + spec=model_spec, + skims=skims, + log_alt_losers=state.settings.log_alt_losers, + locals_d=locals_dict, + trace_label=trace_label, + trace_choice_name=trace_label, + estimator=estimator, + explicit_chunk_size=model_settings.explicit_chunk, + compute_settings=model_settings.compute_settings, + ) + + choices = choices.reindex(choosers.index, fill_value=-1) + + if original_index is not None: + # set the choices index back to the original index + choices.index = original_index + state.get_rn_generator().drop_channel("pnr_lot_choice") + + if estimator: + # careful -- there could be some tours in estimation data that are not transit accessible + # but still reported a pnr location + # warning! untested code! + estimator.write_choices(choices) + choices = estimator.get_survey_values(choices, "tours", "pnr_zone_id") + estimator.write_override_choices(choices) + estimator.end_estimation() + + return choices + + +@workflow.step +def park_and_ride_lot_choice( + state: workflow.State, + tours: pd.DataFrame, + tours_merged: pd.DataFrame, + land_use: pd.DataFrame, + network_los: los.Network_LOS, + model_settings: ParkAndRideLotChoiceSettings | None = None, + model_settings_file_name: str = "park_and_ride_lot_choice.yaml", + trace_label: str = "park_and_ride_lot_choice", + trace_hh_id: bool = False, +) -> None: + """ + This model predicts which lot location would be used for a park-and-ride tour. + """ + if model_settings is None: + model_settings = ParkAndRideLotChoiceSettings.read_settings_file( + state.filesystem, + model_settings_file_name, + ) + + estimator = estimation.manager.begin_estimation(state, "park_and_ride_lot_choice") + + choices = run_park_and_ride_lot_choice( + state, + choosers=tours_merged, + land_use=land_use, + network_los=network_los, + model_settings=model_settings, + choosers_dest_col_name="destination", + choosers_origin_col_name="home_zone_id", + estimator=estimator, + pnr_capacity_cls=None, + trace_label=trace_label, + ) + + choices = choices.reindex(tours.index, fill_value=-1) + + tours["pnr_zone_id"] = choices + + state.add_table("tours", tours) + + if trace_hh_id: + state.tracing.trace_df(tours, label=trace_label, warn_if_empty=True) diff --git a/activitysim/abm/models/tour_mode_choice.py b/activitysim/abm/models/tour_mode_choice.py index d2052f099..4163d06b5 100644 --- a/activitysim/abm/models/tour_mode_choice.py +++ b/activitysim/abm/models/tour_mode_choice.py @@ -7,7 +7,12 @@ import numpy as np import pandas as pd -from activitysim.abm.models.util import school_escort_tours_trips, trip +from activitysim.abm.models.util import ( + school_escort_tours_trips, + trip, + logsums, + park_and_ride_capacity, +) from activitysim.abm.models.util.mode import run_tour_mode_choice_simulate from activitysim.core import ( config, @@ -20,6 +25,10 @@ expressions, ) from activitysim.core.configuration.logit import TourModeComponentSettings +from activitysim.abm.models.park_and_ride_lot_choice import ( + ParkAndRideLotChoiceSettings, + run_park_and_ride_lot_choice, +) from activitysim.core.util import assign_in_place, reindex logger = logging.getLogger(__name__) @@ -241,76 +250,21 @@ def tour_mode_choice_simulate( # model_constants can appear in expressions constants.update(model_settings.CONSTANTS) - skim_dict = network_los.get_default_skim_dict() - # setup skim keys - orig_col_name = "home_zone_id" - dest_col_name = "destination" - - out_time_col_name = "start" - in_time_col_name = "end" - odt_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=orig_col_name, dest_key=dest_col_name, dim3_key="out_period" - ) - dot_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=dest_col_name, dest_key=orig_col_name, dim3_key="in_period" - ) - odr_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=orig_col_name, dest_key=dest_col_name, dim3_key="in_period" - ) - dor_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=dest_col_name, dest_key=orig_col_name, dim3_key="out_period" + skims = logsums.setup_skims( + network_los, + primary_tours_merged, + add_periods=False, + include_pnr_skims=("pnr_zone_id" in primary_tours_merged.columns), + trace_label=trace_label, ) - od_skim_stack_wrapper = skim_dict.wrap(orig_col_name, dest_col_name) - do_skim_stack_wrapper = skim_dict.wrap(dest_col_name, orig_col_name) - - skims = { - "odt_skims": odt_skim_stack_wrapper, - "dot_skims": dot_skim_stack_wrapper, - "odr_skims": odr_skim_stack_wrapper, # dot return skims for e.g. TNC bridge return fare - "dor_skims": dor_skim_stack_wrapper, # odt return skims for e.g. TNC bridge return fare - "od_skims": od_skim_stack_wrapper, - "do_skims": do_skim_stack_wrapper, - "orig_col_name": orig_col_name, - "dest_col_name": dest_col_name, - "out_time_col_name": out_time_col_name, - "in_time_col_name": in_time_col_name, - } - if network_los.zone_system == los.THREE_ZONE: - # fixme - is this a lightweight object? - - tvpb = network_los.tvpb - - tvpb_logsum_odt = tvpb.wrap_logsum( - orig_key=orig_col_name, - dest_key=dest_col_name, - tod_key="out_period", - segment_key="demographic_segment", - cache_choices=True, - trace_label=trace_label, - tag="tvpb_logsum_odt", - ) - tvpb_logsum_dot = tvpb.wrap_logsum( - orig_key=dest_col_name, - dest_key=orig_col_name, - tod_key="in_period", - segment_key="demographic_segment", - cache_choices=True, - trace_label=trace_label, - tag="tvpb_logsum_dot", - ) - - skims.update( - {"tvpb_logsum_odt": tvpb_logsum_odt, "tvpb_logsum_dot": tvpb_logsum_dot} + # TVPB constants can appear in expressions + if (network_los.zone_system == los.THREE_ZONE) & model_settings.use_TVPB_constants: + constants.update( + network_los.setting("TVPB_SETTINGS.tour_mode_choice.CONSTANTS") ) - # TVPB constants can appear in expressions - if model_settings.use_TVPB_constants: - constants.update( - network_los.setting("TVPB_SETTINGS.tour_mode_choice.CONSTANTS") - ) - # don't create estimation data bundle if trip mode choice is being called # from another model step (i.e. tour mode choice logsum creation) if state.get_rn_generator().step_name != "tour_mode_choice_simulate": @@ -346,49 +300,123 @@ def tour_mode_choice_simulate( trace_label, ) - choices_list = [] - for tour_purpose, tours_segment in primary_tours_merged.groupby( - segment_column_name, observed=True - ): - logger.info( - "tour_mode_choice_simulate tour_type '%s' (%s tours)" - % ( - tour_purpose, - len(tours_segment.index), - ) + max_iterations = 1 # default number of times to run tour mode choice if park-and-ride is not capacitated + + # if park-and-ride is included, need to check whether we iterate and how many times + if "pnr_zone_id" in primary_tours_merged.columns: + # read the park-and-ride lot choice model settings + pnr_model_settings = ParkAndRideLotChoiceSettings.read_settings_file( + state.filesystem, + "park_and_ride_lot_choice.yaml", ) + # only iterate if enabled and not in estimation mode + if ( + pnr_model_settings.ITERATE_WITH_TOUR_MODE_CHOICE + and (pnr_model_settings.MAX_ITERATIONS > 0) + and (estimator is None) + ): + max_iterations = pnr_model_settings.MAX_ITERATIONS + + # create pnr capacity helper class + pnr_capacity_cls = park_and_ride_capacity.ParkAndRideCapacity( + state, pnr_model_settings + ) - if network_los.zone_system == los.THREE_ZONE: - tvpb_logsum_odt.extend_trace_label(tour_purpose) - tvpb_logsum_dot.extend_trace_label(tour_purpose) + choosers = primary_tours_merged + final_choices = None + + # iterating tour mode choice with park-and-ride lot choice + # the first iteration includes all tours + # subsequent iterations only includes tours that selected a capacitated park-and-ride lot + for i in range(max_iterations): + if max_iterations > 1: + trace_label = tracing.extend_trace_label(trace_label, f"i{i}") + + choices_list = [] + for tour_purpose, tours_segment in choosers.groupby( + segment_column_name, observed=True + ): + logger.info( + "tour_mode_choice_simulate tour_type '%s' (%s tours)" + % ( + tour_purpose, + len(tours_segment.index), + ) + ) - # name index so tracing knows how to slice - assert tours_segment.index.name == "tour_id" + if network_los.zone_system == los.THREE_ZONE: + skims["tvpb_logsum_odt"].extend_trace_label(tour_purpose) + skims["tvpb_logsum_dot"].extend_trace_label(tour_purpose) - choices_df = run_tour_mode_choice_simulate( - state, - tours_segment, - tour_purpose, - model_settings, - mode_column_name=mode_column_name, - logsum_column_name=logsum_column_name, - network_los=network_los, - skims=skims, - constants=constants, - estimator=estimator, - trace_label=tracing.extend_trace_label(trace_label, tour_purpose), - trace_choice_name="tour_mode_choice", - ) + # name index so tracing knows how to slice + assert tours_segment.index.name == "tour_id" - tracing.print_summary( - "tour_mode_choice_simulate %s choices_df" % tour_purpose, - choices_df.tour_mode, - value_counts=True, - ) + choices_df = run_tour_mode_choice_simulate( + state, + tours_segment, + tour_purpose, + model_settings, + mode_column_name=mode_column_name, + logsum_column_name=logsum_column_name, + network_los=network_los, + skims=skims, + constants=constants, + estimator=estimator, + trace_label=tracing.extend_trace_label(trace_label, tour_purpose), + trace_choice_name="tour_mode_choice", + ) - choices_list.append(choices_df) + tracing.print_summary( + "tour_mode_choice_simulate %s choices_df" % tour_purpose, + choices_df.tour_mode, + value_counts=True, + ) - choices_df = pd.concat(choices_list) + choices_list.append(choices_df) + + choices_i = pd.concat(choices_list) + if final_choices is None: + final_choices = choices_i.copy() + else: + # need to just update the existing choices with the new ones decided during this iteration + final_choices.loc[choices_i.index] = choices_i + + if (max_iterations > 1) and (i < max_iterations - 1): + # need to update the park-and-ride lot capacities and select new choosers + pnr_capacity_cls.iteration = i + # grabbing pnr_zone_id to calculate capacities + choices_i["pnr_zone_id"] = choosers["pnr_zone_id"].reindex(choices_i.index) + # grabbing start time to help determine which tours need to get resimulated + choices_i["start"] = choosers["start"].reindex(choices_i.index) + pnr_capacity_cls.set_choices(choices_i) + choosers = pnr_capacity_cls.select_new_choosers(state, choosers) + if choosers.empty: + logger.info( + f"finished tour mode choice iterations at iteration {i} because all park-and-ride demand was met" + ) + if pnr_capacity_cls.num_processes > 1: + # need to have this subprocess check-in still to satisfy barrier in synchronize_choices + for j in range(i, max_iterations): + dummy_choices = pd.DataFrame(columns=choices_i.columns) + pnr_capacity_cls.set_choices(dummy_choices) + break + choosers["pnr_zone_id"] = run_park_and_ride_lot_choice( + state, + choosers=choosers, + land_use=state.get_dataframe("land_use"), + network_los=network_los, + model_settings=pnr_capacity_cls.model_settings, + choosers_dest_col_name="destination", + choosers_origin_col_name="home_zone_id", + estimator=estimator, + pnr_capacity_cls=pnr_capacity_cls, + trace_label=trace_label, + ) + # drop out_period and in_period from choosers since they get generated in run_tour_mode_choice_simulate + # they are added since we are not sending a copy of the choosers to the above pnr lot choice call + choosers.drop( + columns=["out_period", "in_period"], errors="ignore", inplace=True + ) # add cached tvpb_logsum tap choices for modes specified in tvpb_mode_path_types if network_los.zone_system == los.THREE_ZONE: @@ -396,7 +424,7 @@ def tour_mode_choice_simulate( if tvpb_mode_path_types is not None: for mode, path_types in tvpb_mode_path_types.items(): for direction, skim in zip( - ["od", "do"], [tvpb_logsum_odt, tvpb_logsum_dot] + ["od", "do"], [skims["tvpb_logsum_odt"], skims["tvpb_logsum_dot"]] ): path_type = path_types[direction] skim_cache = skim.cache[path_type] @@ -406,34 +434,34 @@ def tour_mode_choice_simulate( for c in skim_cache: dest_col = f"{direction}_{c}" - if dest_col not in choices_df: - choices_df[dest_col] = ( + if dest_col not in final_choices: + final_choices[dest_col] = ( np.nan if pd.api.types.is_numeric_dtype(skim_cache[c]) else "" ) - choices_df[dest_col].where( - choices_df.tour_mode != mode, skim_cache[c], inplace=True + final_choices[dest_col].where( + final_choices.tour_mode != mode, skim_cache[c], inplace=True ) if estimator: - estimator.write_choices(choices_df.tour_mode) - choices_df.tour_mode = estimator.get_survey_values( - choices_df.tour_mode, "tours", "tour_mode" + estimator.write_choices(final_choices.tour_mode) + final_choices.tour_mode = estimator.get_survey_values( + final_choices.tour_mode, "tours", "tour_mode" ) - estimator.write_override_choices(choices_df.tour_mode) + estimator.write_override_choices(final_choices.tour_mode) estimator.end_estimation() tracing.print_summary( "tour_mode_choice_simulate all tour type choices", - choices_df.tour_mode, + final_choices.tour_mode, value_counts=True, ) # so we can trace with annotations assign_in_place( primary_tours, - choices_df, + final_choices, state.settings.downcast_int, state.settings.downcast_float, ) @@ -442,7 +470,7 @@ def tour_mode_choice_simulate( all_tours = tours assign_in_place( all_tours, - choices_df, + final_choices, state.settings.downcast_int, state.settings.downcast_float, ) diff --git a/activitysim/abm/models/trip_matrices.py b/activitysim/abm/models/trip_matrices.py index 5552e3b00..0a79fe632 100644 --- a/activitysim/abm/models/trip_matrices.py +++ b/activitysim/abm/models/trip_matrices.py @@ -20,7 +20,13 @@ class MatrixTableSettings(PydanticReadable): name: str + """Name of the core in the omx output file""" data_field: str + """Column in the trips table to aggregate""" + origin: str = "origin" + """Column in the trips table representing the from zone""" + destination: str = "destination" + """Column in the trips table representing the to zone""" class MatrixSettings(PydanticReadable): @@ -127,113 +133,59 @@ def write_trip_matrices( # write matrices by zone system type if network_los.zone_system == los.ONE_ZONE: # taz trips written to taz matrices logger.info("aggregating trips one zone...") - aggregate_trips = trips_df.groupby(["origin", "destination"], sort=False).sum( - numeric_only=True - ) - - # use the average household weight for all trips in the origin destination pair - hh_weight_col = model_settings.HH_EXPANSION_WEIGHT_COL - aggregate_weight = ( - trips_df[["origin", "destination", hh_weight_col]] - .groupby(["origin", "destination"], sort=False) - .mean() - ) - aggregate_trips[hh_weight_col] = aggregate_weight[hh_weight_col] - - orig_vals = aggregate_trips.index.get_level_values("origin") - dest_vals = aggregate_trips.index.get_level_values("destination") # use the land use table for the set of possible tazs land_use = state.get_dataframe("land_use") - zone_index = land_use.index - assert all(zone in zone_index for zone in orig_vals) - assert all(zone in zone_index for zone in dest_vals) - - _, orig_index = zone_index.reindex(orig_vals) - _, dest_index = zone_index.reindex(dest_vals) - try: zone_labels = land_use[f"_original_{land_use.index.name}"] except KeyError: zone_labels = land_use.index write_matrices( - state, aggregate_trips, zone_labels, orig_index, dest_index, model_settings + state=state, + trips_df=trips_df, + zone_index=zone_labels, # Series or Index; used for mapping and shape + model_settings=model_settings, + is_tap=False, ) elif network_los.zone_system == los.TWO_ZONE: # maz trips written to taz matrices logger.info("aggregating trips two zone...") - trips_df["otaz"] = ( - state.get_dataframe("land_use").reindex(trips_df["origin"]).TAZ.tolist() - ) - trips_df["dtaz"] = ( - state.get_dataframe("land_use") - .reindex(trips_df["destination"]) - .TAZ.tolist() - ) - aggregate_trips = trips_df.groupby(["otaz", "dtaz"], sort=False).sum( - numeric_only=True - ) - - # use the average household weight for all trips in the origin destination pair - hh_weight_col = model_settings.HH_EXPANSION_WEIGHT_COL - aggregate_weight = ( - trips_df[["otaz", "dtaz", hh_weight_col]] - .groupby(["otaz", "dtaz"], sort=False) - .mean() - ) - aggregate_trips[hh_weight_col] = aggregate_weight[hh_weight_col] - orig_vals = aggregate_trips.index.get_level_values("otaz") - dest_vals = aggregate_trips.index.get_level_values("dtaz") + # TAZ domain for output + zone_index = pd.Index(network_los.get_tazs(state), name="TAZ") + # If original TAZ labels are provided, use them in the OMX mapping try: land_use_taz = state.get_dataframe("land_use_taz") except (KeyError, RuntimeError): pass # table missing, ignore else: if "_original_TAZ" in land_use_taz.columns: - orig_vals = orig_vals.map(land_use_taz["_original_TAZ"]) - dest_vals = dest_vals.map(land_use_taz["_original_TAZ"]) - - zone_index = pd.Index(network_los.get_tazs(state), name="TAZ") - assert all(zone in zone_index for zone in orig_vals) - assert all(zone in zone_index for zone in dest_vals) - - _, orig_index = zone_index.reindex(orig_vals) - _, dest_index = zone_index.reindex(dest_vals) + zone_index = pd.Series( + land_use_taz["_original_TAZ"] + .reindex(zone_index) + .fillna(zone_index) + .values, + index=pd.Index(zone_index, name="TAZ"), + name="TAZ", + ) write_matrices( - state, aggregate_trips, zone_index, orig_index, dest_index, model_settings + state=state, + trips_df=trips_df, + zone_index=zone_index, + model_settings=model_settings, + is_tap=False, ) elif ( network_los.zone_system == los.THREE_ZONE ): # maz trips written to taz and tap matrices logger.info("aggregating trips three zone taz...") - trips_df["otaz"] = ( - state.get_dataframe("land_use").reindex(trips_df["origin"]).TAZ.tolist() - ) - trips_df["dtaz"] = ( - state.get_dataframe("land_use") - .reindex(trips_df["destination"]) - .TAZ.tolist() - ) - aggregate_trips = trips_df.groupby(["otaz", "dtaz"], sort=False).sum( - numeric_only=True - ) - # use the average household weight for all trips in the origin destination pair - hh_weight_col = model_settings.HH_EXPANSION_WEIGHT_COL - aggregate_weight = ( - trips_df[["otaz", "dtaz", hh_weight_col]] - .groupby(["otaz", "dtaz"], sort=False) - .mean() - ) - aggregate_trips[hh_weight_col] = aggregate_weight[hh_weight_col] - - orig_vals = aggregate_trips.index.get_level_values("otaz") - dest_vals = aggregate_trips.index.get_level_values("dtaz") + # TAZ domain for output + zone_index = pd.Index(network_los.get_tazs(state), name="TAZ") try: land_use_taz = state.get_dataframe("land_use_taz") @@ -241,52 +193,33 @@ def write_trip_matrices( pass # table missing, ignore else: if "_original_TAZ" in land_use_taz.columns: - orig_vals = orig_vals.map(land_use_taz["_original_TAZ"]) - dest_vals = dest_vals.map(land_use_taz["_original_TAZ"]) - - zone_index = pd.Index(network_los.get_tazs(state), name="TAZ") - assert all(zone in zone_index for zone in orig_vals) - assert all(zone in zone_index for zone in dest_vals) - - _, orig_index = zone_index.reindex(orig_vals) - _, dest_index = zone_index.reindex(dest_vals) + zone_index = pd.Series( + land_use_taz["_original_TAZ"] + .reindex(zone_index) + .fillna(zone_index) + .values, + index=pd.Index(zone_index, name="TAZ"), + name="TAZ", + ) write_matrices( - state, aggregate_trips, zone_index, orig_index, dest_index, model_settings + state=state, + trips_df=trips_df, + zone_index=zone_index, + model_settings=model_settings, + is_tap=False, ) logger.info("aggregating trips three zone tap...") - aggregate_trips = trips_df.groupby(["btap", "atap"], sort=False).sum( - numeric_only=True - ) - - # use the average household weight for all trips in the origin destination pair - hh_weight_col = model_settings.HH_EXPANSION_WEIGHT_COL - aggregate_weight = ( - trips_df[["btap", "atap", hh_weight_col]] - .groupby(["btap", "atap"], sort=False) - .mean() - ) - aggregate_trips[hh_weight_col] = aggregate_weight[hh_weight_col] - - orig_vals = aggregate_trips.index.get_level_values("btap") - dest_vals = aggregate_trips.index.get_level_values("atap") - zone_index = pd.Index(network_los.get_taps(), name="TAP") - assert all(zone in zone_index for zone in orig_vals) - assert all(zone in zone_index for zone in dest_vals) - - _, orig_index = zone_index.reindex(orig_vals) - _, dest_index = zone_index.reindex(dest_vals) + tap_index = pd.Index(network_los.get_taps(), name="TAP") write_matrices( - state, - aggregate_trips, - zone_index, - orig_index, - dest_index, - model_settings, - True, + state=state, + trips_df=trips_df, + zone_index=tap_index, + model_settings=model_settings, + is_tap=True, ) if "parking_location" in state.settings.models: @@ -369,67 +302,147 @@ def annotate_trips( def write_matrices( state: workflow.State, - aggregate_trips, - zone_index, - orig_index, - dest_index, + trips_df: pd.DataFrame, + zone_index: pd.Index | pd.Series, model_settings: WriteTripMatricesSettings, - is_tap=False, + is_tap: bool = False, ): """ - Write aggregated trips to OMX format. - - The MATRICES setting lists the new OMX files to write. - Each file can contain any number of 'tables', each specified by a - table key ('name') and a trips table column ('data_field') to use - for aggregated counts. - - Any data type may be used for columns added in the annotation phase, - but the table 'data_field's must be summable types: ints, floats, bools. + Write aggregated trips to OMX format per table using table-specific origin/destination + columns and allow repeated table names to accumulate into the same matrix. """ matrix_settings = model_settings.MATRICES if not matrix_settings: logger.error("Missing MATRICES setting in write_trip_matrices.yaml") + return + + # Normalize zone index used for placement (pos_index) and mapping labels (map_labels) + if isinstance(zone_index, pd.Series): + pos_index = zone_index.index + map_labels = zone_index + else: + pos_index = zone_index + map_labels = zone_index + + n_zones = len(pos_index) + pos_index = pd.Index(pos_index, name=pos_index.name or "ZONE") + hh_weight_col = model_settings.HH_EXPANSION_WEIGHT_COL + + # For each output file, accumulate table_name -> matrix for matrix in matrix_settings: - matrix_is_tap = matrix.is_tap - - if matrix_is_tap == is_tap: # only write tap matrices to tap matrix files - filename = str(matrix.file_name) - filepath = state.get_output_file_path(filename) - logger.info("opening %s" % filepath) - file = omx.open_file(str(filepath), "w") # possibly overwrite existing file - table_settings = matrix.tables - - for table in table_settings: - table_name = table.name - col = table.data_field - - if col not in aggregate_trips: - logger.error(f"missing {col} column in aggregate_trips DataFrame") - return - - hh_weight_col = model_settings.HH_EXPANSION_WEIGHT_COL - if hh_weight_col: - aggregate_trips[col] = ( - aggregate_trips[col] / aggregate_trips[hh_weight_col] - ) - - data = np.zeros((len(zone_index), len(zone_index))) - data[orig_index, dest_index] = aggregate_trips[col] - logger.debug( - "writing %s sum %0.2f" % (table_name, aggregate_trips[col].sum()) + if matrix.is_tap != is_tap: + continue + + filename = str(matrix.file_name) + filepath = state.get_output_file_path(filename) + logger.info(f"opening {filepath}") + f = omx.open_file(str(filepath), "w") # overwrite file each run + + # Accumulator for datasets in this file + datasets: dict[str, np.ndarray] = {} + + for table in matrix.tables: + table_name = table.name + col = table.data_field + + if col not in trips_df.columns: + logger.warning( + f"Skipping table {table_name}: missing column {col} in trips" ) - file[table_name] = data # write to file + continue - # include the index-to-zone map in the file - logger.info( - "adding %s mapping for %s zones to %s" - % (zone_index.name, zone_index.size, filename) - ) - file.create_mapping(zone_index.name, zone_index.to_numpy()) + # Effective origin/destination columns + ocol = table.origin or ("btap" if is_tap else "origin") + dcol = table.destination or ("atap" if is_tap else "destination") + + if ocol not in trips_df.columns or dcol not in trips_df.columns: + logger.warning( + f"Skipping table {table_name}: missing origin/destination columns " + f"{ocol}/{dcol} in trips" + ) + continue + + # Build a working frame with needed columns + work = trips_df[[ocol, dcol, col]].copy() + if hh_weight_col and hh_weight_col in trips_df.columns: + work[hh_weight_col] = trips_df[hh_weight_col] + + # Map to zone domain if needed (TAZ/TAP domain is pos_index) + # if values are already in domain, keep; else try MAZ->TAZ mapping for TAZ outputs + def to_domain_vals(series: pd.Series) -> pd.Series: + # already in domain? + in_domain = pd.Series(series.isin(pos_index).values, index=series.index) + if in_domain.all() or is_tap: + return series + # try MAZ -> TAZ using land_use while preserving the original index + try: + lu = state.get_dataframe("land_use") + if "TAZ" in lu.columns: + mapped = series.map(lu["TAZ"]) + # if mapping produced any non-nulls, use it + if mapped.notna().any(): + return mapped + except Exception: + pass + return series # fallback; may drop later if not in domain + + work["_o"] = to_domain_vals(work[ocol]) + work["_d"] = to_domain_vals(work[dcol]) + + # Drop rows with either missing origin/destination + work = work.dropna(subset=["_o", "_d"]) + + # Aggregate by OD + if hh_weight_col and hh_weight_col in work.columns: + grouped_sum = work.groupby(["_o", "_d"], sort=False)[col].sum() + mean_w = work.groupby(["_o", "_d"], sort=False)[hh_weight_col].mean() + vals = (grouped_sum / mean_w.replace(0, np.nan)).fillna(0.0) + else: + vals = work.groupby(["_o", "_d"], sort=False)[col].sum() + + if vals.empty: + continue + + # Map OD labels to positional indices + o_vals = vals.index.get_level_values(0) + d_vals = vals.index.get_level_values(1) + + oi = pos_index.get_indexer(o_vals) + di = pos_index.get_indexer(d_vals) + + mask = (oi != -1) & (di != -1) + if not np.any(mask): + logger.warning( + f"No valid OD pairs for table {table_name} in domain; skipping." + ) + continue + + oi = oi[mask] + di = di[mask] + v = vals.to_numpy()[mask] + + # Accumulate into dataset matrix + data = datasets.get(table_name) + if data is None: + data = np.zeros((n_zones, n_zones), dtype=float) + datasets[table_name] = data + data[oi, di] += v + + logger.debug(f"accumulated {table_name} sum {v.sum():0.2f}") + + # Write all accumulated datasets for this file + for table_name, data in datasets.items(): + f[table_name] = data + + # include the index-to-zone map in the file + logger.info( + "adding %s mapping for %s zones to %s" + % (map_labels.name or "ZONE", len(map_labels), filename) + ) + f.create_mapping(map_labels.name or "ZONE", pd.Index(map_labels).to_numpy()) - logger.info("closing %s" % filepath) - file.close() + logger.info("closing %s" % filepath) + f.close() diff --git a/activitysim/abm/models/util/logsums.py b/activitysim/abm/models/util/logsums.py index f9e4208f0..e3f3a383a 100644 --- a/activitysim/abm/models/util/logsums.py +++ b/activitysim/abm/models/util/logsums.py @@ -12,10 +12,163 @@ TourLocationComponentSettings, TourModeComponentSettings, ) +from activitysim.abm.models.park_and_ride_lot_choice import run_park_and_ride_lot_choice logger = logging.getLogger(__name__) +def setup_skims( + network_los: los.Network_LOS, + choosers: pd.DataFrame, + add_periods: bool = True, + include_pnr_skims: bool = False, + orig_col_name: str = "home_zone_id", + dest_col_name: str = "destination", + pnr_lot_dest_col_name: str = "pnr_zone_id", + trace_label: str | None = None, +): + """ + Setup skims wrappers for use in utility calculations. + + skim wrapper labels represent the following: + o = tour origin + d = tour destination + l = parking lot location + t = tour start time + r = tour return time + + Parameters + ---------- + network_los : los.Network_LOS + choosers : pd.DataFrame + DataFrame of choosers with columns for origin and destination zone ids. + add_periods : bool + If True, adds 'out_period' and 'in_period' columns to choosers + include_pnr_skims : bool + If True, adds skims for PNR lots to the skims dictionary. + orig_col_name : str + Name of the column in choosers representing the origin zone id. + dest_col_name : str + Name of the column in choosers representing the destination zone id. + pnr_lot_dest_col_name : str + Name of the column in choosers representing the PNR lot destination zone id. + trace_label : str | None + Optional label for tracing when creating tvpb skim wrappers for 3-zone models. + + Returns + ------- + skims : dict + Dictionary of skim wrappers and related information. + Contains keys for origin and destination columns, time period columns, + and various skim wrappers for different combinations of origin, destination, + and time periods. + """ + skim_dict = network_los.get_default_skim_dict() + + # setup skim keys + out_time_col_name = "start" + in_time_col_name = "end" + + # creating skim wrappers + odt_skim_stack_wrapper = skim_dict.wrap_3d( + orig_key=orig_col_name, dest_key=dest_col_name, dim3_key="out_period" + ) + + dot_skim_stack_wrapper = skim_dict.wrap_3d( + orig_key=dest_col_name, dest_key=orig_col_name, dim3_key="in_period" + ) + odr_skim_stack_wrapper = skim_dict.wrap_3d( + orig_key=orig_col_name, dest_key=dest_col_name, dim3_key="in_period" + ) + dor_skim_stack_wrapper = skim_dict.wrap_3d( + orig_key=dest_col_name, dest_key=orig_col_name, dim3_key="out_period" + ) + od_skim_stack_wrapper = skim_dict.wrap(orig_col_name, dest_col_name) + do_skim_stack_wrapper = skim_dict.wrap(dest_col_name, orig_col_name) + + skims = { + "odt_skims": odt_skim_stack_wrapper, + "dot_skims": dot_skim_stack_wrapper, + "odr_skims": odr_skim_stack_wrapper, # dot return skims for e.g. TNC bridge return fare + "dor_skims": dor_skim_stack_wrapper, # odt return skims for e.g. TNC bridge return fare + "od_skims": od_skim_stack_wrapper, + "do_skims": do_skim_stack_wrapper, + "orig_col_name": orig_col_name, + "dest_col_name": dest_col_name, + "out_time_col_name": out_time_col_name, + "in_time_col_name": in_time_col_name, + } + + # add pnr skims if pnr lot choice model is being run + if include_pnr_skims: + olt_skim_stack_wrapper = skim_dict.wrap_3d( + orig_key=orig_col_name, + dest_key=pnr_lot_dest_col_name, + dim3_key="out_period", + ) + ldt_skim_stack_wrapper = skim_dict.wrap_3d( + orig_key=pnr_lot_dest_col_name, + dest_key=dest_col_name, + dim3_key="out_period", + ) + dlt_skim_stack_wrapper = skim_dict.wrap_3d( + orig_key=dest_col_name, dest_key=pnr_lot_dest_col_name, dim3_key="in_period" + ) + lot_skim_stack_wrapper = skim_dict.wrap_3d( + orig_key=pnr_lot_dest_col_name, dest_key=orig_col_name, dim3_key="in_period" + ) + ol_skim_stack_wrapper = skim_dict.wrap(orig_col_name, pnr_lot_dest_col_name) + ld_skim_stack_wrapper = skim_dict.wrap(pnr_lot_dest_col_name, dest_col_name) + + skims.update( + { + "olt_skims": olt_skim_stack_wrapper, + "ldt_skims": ldt_skim_stack_wrapper, + "dlt_skims": dlt_skim_stack_wrapper, + "lot_skims": lot_skim_stack_wrapper, + "ol_skims": ol_skim_stack_wrapper, + "ld_skims": ld_skim_stack_wrapper, + "pnr_lot_dest_col_name": pnr_lot_dest_col_name, + } + ) + + if network_los.zone_system == los.THREE_ZONE: + # fixme - is this a lightweight object? + tvpb = network_los.tvpb + + tvpb_logsum_odt = tvpb.wrap_logsum( + orig_key=orig_col_name, + dest_key=dest_col_name, + tod_key="out_period", + segment_key="demographic_segment", + trace_label=None, + tag="tvpb_logsum_odt", + ) + tvpb_logsum_dot = tvpb.wrap_logsum( + orig_key=dest_col_name, + dest_key=orig_col_name, + tod_key="in_period", + segment_key="demographic_segment", + trace_label=None, + tag="tvpb_logsum_dot", + ) + + skims.update( + {"tvpb_logsum_odt": tvpb_logsum_odt, "tvpb_logsum_dot": tvpb_logsum_dot} + ) + + # add time periods to skims if requested + if add_periods: + choosers["out_period"] = network_los.skim_time_period_label( + choosers[out_time_col_name] + ) + choosers["in_period"] = network_los.skim_time_period_label( + choosers[in_time_col_name] + ) + + return skims + + def filter_chooser_columns( choosers, logsum_settings: dict | PydanticBase, model_settings: dict | PydanticBase ): @@ -157,6 +310,22 @@ def compute_location_choice_logsums( else: logger.error("Choosers table already has column 'duration'.") + if logsum_settings.include_pnr_for_logsums: + # if the logsum settings include explicit PNR, then we need to add the + # PNR lot destination column to the choosers table by running PnR lot choice + choosers["pnr_zone_id"] = run_park_and_ride_lot_choice( + state, + choosers=choosers, + land_use=state.get_dataframe("land_use"), + network_los=network_los, + model_settings=None, + choosers_dest_col_name=dest_col_name, + choosers_origin_col_name=orig_col_name, + estimator=None, + pnr_capacity_cls=None, + trace_label=tracing.extend_trace_label(trace_label, "pnr_lot_choice"), + ) + logsum_spec = state.filesystem.read_model_spec(file_name=logsum_settings.SPEC) coefficients = state.filesystem.get_segment_coefficients( logsum_settings, tour_purpose @@ -178,66 +347,24 @@ def compute_location_choice_logsums( # constrained coefficients can appear in expressions locals_dict.update(coefficients) - # setup skim keys - skim_dict = network_los.get_default_skim_dict() - - odt_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=orig_col_name, dest_key=dest_col_name, dim3_key="out_period" - ) - dot_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=dest_col_name, dest_key=orig_col_name, dim3_key="in_period" - ) - odr_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=orig_col_name, dest_key=dest_col_name, dim3_key="in_period" - ) - dor_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=dest_col_name, dest_key=orig_col_name, dim3_key="out_period" + skims = setup_skims( + network_los, + choosers, + add_periods=False, # already added periods to choosers above + orig_col_name=orig_col_name, + dest_col_name=dest_col_name, + pnr_lot_dest_col_name="pnr_zone_id", + include_pnr_skims=logsum_settings.include_pnr_for_logsums, + trace_label=trace_label, ) - od_skim_stack_wrapper = skim_dict.wrap(orig_col_name, dest_col_name) - - skims = { - "odt_skims": odt_skim_stack_wrapper, - "dot_skims": dot_skim_stack_wrapper, - "odr_skims": odr_skim_stack_wrapper, - "dor_skims": dor_skim_stack_wrapper, - "od_skims": od_skim_stack_wrapper, - "orig_col_name": orig_col_name, - "dest_col_name": dest_col_name, - } - - if network_los.zone_system == los.THREE_ZONE: - # fixme - is this a lightweight object? - tvpb = network_los.tvpb - - tvpb_logsum_odt = tvpb.wrap_logsum( - orig_key=orig_col_name, - dest_key=dest_col_name, - tod_key="out_period", - segment_key="demographic_segment", - trace_label=trace_label, - tag="tvpb_logsum_odt", - ) - tvpb_logsum_dot = tvpb.wrap_logsum( - orig_key=dest_col_name, - dest_key=orig_col_name, - tod_key="in_period", - segment_key="demographic_segment", - trace_label=trace_label, - tag="tvpb_logsum_dot", - ) + locals_dict.update(skims) - skims.update( - {"tvpb_logsum_odt": tvpb_logsum_odt, "tvpb_logsum_dot": tvpb_logsum_dot} + # TVPB constants can appear in expressions + if (network_los.zone_system == los.THREE_ZONE) & logsum_settings.use_TVPB_constants: + locals_dict.update( + network_los.setting("TVPB_SETTINGS.tour_mode_choice.CONSTANTS") ) - # TVPB constants can appear in expressions - if logsum_settings.use_TVPB_constants: - locals_dict.update( - network_los.setting("TVPB_SETTINGS.tour_mode_choice.CONSTANTS") - ) - - locals_dict.update(skims) - # - run preprocessor to annotate choosers # allow specification of alternate preprocessor for nontour choosers preprocessor = model_settings.LOGSUM_PREPROCESSOR diff --git a/activitysim/abm/models/util/park_and_ride_capacity.py b/activitysim/abm/models/util/park_and_ride_capacity.py new file mode 100644 index 000000000..3c232eccd --- /dev/null +++ b/activitysim/abm/models/util/park_and_ride_capacity.py @@ -0,0 +1,469 @@ +import pandas as pd +import numpy as np +import ctypes +import logging +import multiprocessing as mp +import time + +from activitysim.core import util, logit, tracing + +logger = logging.getLogger(__name__) + + +class ParkAndRideCapacity: + """ + Class to handle park-and-ride lot capacity calculations. + + This class is used to get and set park-and-ride lot choices for tours + and handles sending the choices across other multiprocess steps. + It works very similarly to shadow pricing -- see `activitysim/abm/tables/shadow_pricing.py` + """ + + def __init__(self, state, model_settings): + self.model_settings = model_settings + self.num_processes = state.get_injectable("num_processes", 1) + + data_buffers = state.get_injectable("data_buffers", None) + self.iteration = 0 + + if data_buffers is not None: + # grab shared data and locks created for multiprocessing + self.shared_pnr_choice = data_buffers["shared_pnr_choice"] + self.shared_pnr_choice_idx = data_buffers["shared_pnr_choice_idx"] + self.shared_pnr_choice_start = data_buffers["shared_pnr_choice_start"] + self.pnr_mp_tally = data_buffers["pnr_mp_tally"] + else: + assert ( + self.num_processes == 1 + ), "data_buffers must be provided for multiprocessing" + self.shared_pnr_choice = None + self.shared_pnr_choice_idx = None + self.shared_pnr_choice_start = None + self.pnr_mp_tally = None + + # occupancy counts for pnr zones is populated from choices synced across processes + self.shared_pnr_occupancy_df = pd.DataFrame( + columns=["pnr_occupancy"], index=state.get_dataframe("land_use").index + ) + self.shared_pnr_occupancy_df["pnr_occupancy"] = 0 + + self.scale_pnr_capacity(state) + + def set_choices(self, choices): + """ + aggregate individual location choices to modeled_size by zone and segment + + Parameters + ---------- + choices : pandas.Series + zone id of location choice indexed by person_id + segment_ids : pandas.Series + segment id tag for this individual indexed by person_id + + Returns + ------- + updates self.modeled_size + """ + # first remove non-park-and-ride choices + pnr_modes = self.model_settings.PARK_AND_RIDE_MODES + assert ( + pnr_modes is not None + ), "PARK_AND_RIDE_MODES must be set in model settings" + choices = choices[choices.tour_mode.isin(pnr_modes)] + + if self.num_processes == 1: + # - not multiprocessing + self.choices_synced = choices[["pnr_zone_id", "start"]] + + else: + # - multiprocessing + self.choices_synced = self.synchronize_choices(choices) + + # tally up the counts by zone + # index of shared_pnr_occupancy_df is zone_id in the landuse table across all processes + # choices_synced object also now contains all choices across all processes + pnr_counts = self.choices_synced.pnr_zone_id.value_counts().reindex( + self.shared_pnr_occupancy_df.index + ) + pnr_counts = pnr_counts.fillna(0).astype(int) + + # new occupancy is what was already at the lots after the last iteration + new pnr choices + # (those selected for resimulation are removed in the select_new_choosers function) + self.shared_pnr_occupancy_df["pnr_occupancy"].values[:] += pnr_counts.values + + # logging summary of this aggregation + lots_with_demand = int((pnr_counts > 0).sum()) + logger.info( + f"PNR iter {self.iteration}: aggregated {len(self.choices_synced)} choices across " + f"{lots_with_demand} lots." + ) + + def synchronize_choices(self, choices): + """ + Synchronize the choices across all processes. + + Parameters + ---------- + choices : pandas.Series + Series of choices indexed by tour_id + + Returns + ------- + pandas.Series + Synchronized choices across all processes. + """ + assert self.shared_pnr_choice is not None, "shared_pnr_choice is not set" + assert ( + self.shared_pnr_choice_idx is not None + ), "shared_pnr_choice_idx is not set" + assert self.num_processes > 1, "num_processes must be greater than 1" + + # barrier implemented with arrival count (idx 0) and generation (idx 1) + # cannot just use mp.barrier() because we do not know how many processes + # there will be when the tour mode choice iteration with pnr begins + def barrier(reset_callback=None): + while True: + with self.pnr_mp_tally.get_lock(): + gen = self.pnr_mp_tally[1] + self.pnr_mp_tally[0] += 1 # arrived + if self.pnr_mp_tally[0] == self.num_processes: + # last to arrive + if reset_callback is not None: + reset_callback() + # release all waiters by advancing generation and resetting arrival count + self.pnr_mp_tally[0] = 0 + self.pnr_mp_tally[1] = gen + 1 + return + # not last; remember current generation to wait on + wait_gen = gen + # spin until generation changes + while True: + with self.pnr_mp_tally.get_lock(): + if self.pnr_mp_tally[1] != wait_gen: + break + time.sleep(1) + return + + # can send in empty chocies to ensure all subprocesses will hit the barrier + if not choices.empty: + with self.shared_pnr_choice.get_lock(): + # first_in = self.pnr_mp_tally[0] == 0 + # create a dataframe of the already existing choices + mp_choices_df = pd.DataFrame( + data={ + "pnr_zone_id": np.frombuffer( + self.shared_pnr_choice.get_obj(), dtype=np.int64 + ), + "start": np.frombuffer( + self.shared_pnr_choice_start.get_obj(), dtype=np.int64 + ), + }, + index=np.frombuffer( + self.shared_pnr_choice_idx.get_obj(), dtype=np.int64 + ), + ) + mp_choices_df.index.name = "tour_id" + # discard zero entries + mp_choices_df = mp_choices_df[mp_choices_df.pnr_zone_id > 0] + # append the new choices + synced_choices = pd.concat( + [mp_choices_df, choices[["pnr_zone_id", "start"]]], + axis=0, + ignore_index=False, + ) + # sort by index (tour_id) + synced_choices = synced_choices.sort_index() + + # now append any additional rows need to get size back to original length + pad = len(self.shared_pnr_choice) - len(synced_choices) + new_arr_values = np.concatenate( + [ + synced_choices["pnr_zone_id"].to_numpy(np.int64), + np.zeros(pad, dtype=np.int64), + ] + ) + new_arr_idx = np.concatenate( + [ + synced_choices.index.to_numpy(np.int64), + np.zeros(pad, dtype=np.int64), + ] + ) + new_arr_start = np.concatenate( + [ + synced_choices["start"].to_numpy(np.int64), + np.zeros(pad, dtype=np.int64), + ] + ) + + # write the updated arrays back to the shared memory + self.shared_pnr_choice_idx[:] = new_arr_idx.tolist() + self.shared_pnr_choice[:] = new_arr_values.tolist() + self.shared_pnr_choice_start[:] = new_arr_start.tolist() + + # Wait for all processes to finish writing + barrier() + + # need to create the final synced_choices again since other processes may have written to the shared memory + # don't need the lock since we are only reading at this stage + synced_choices = pd.DataFrame( + data={ + "pnr_zone_id": np.frombuffer( + self.shared_pnr_choice.get_obj(), dtype=np.int64 + ), + "start": np.frombuffer( + self.shared_pnr_choice_start.get_obj(), dtype=np.int64 + ), + }, + index=np.frombuffer(self.shared_pnr_choice_idx.get_obj(), dtype=np.int64), + ) + synced_choices.index.name = "tour_id" + synced_choices = synced_choices[synced_choices.pnr_zone_id > 0].copy() + + # barrier 2: last-out resets arrays for next iteration + def reset_arrays(): + self.shared_pnr_choice[:] = np.zeros( + len(self.shared_pnr_choice), dtype=np.int64 + ).tolist() + self.shared_pnr_choice_idx[:] = np.zeros( + len(self.shared_pnr_choice_idx), dtype=np.int64 + ).tolist() + self.shared_pnr_choice_start[:] = np.zeros( + len(self.shared_pnr_choice_start), dtype=np.int64 + ).tolist() + + barrier(reset_callback=reset_arrays) + + return synced_choices + + def scale_pnr_capacity(self, state): + """ + scale the pnr capacity based on the simulation sample rate + + Parameters + ---------- + choices_df : pandas.DataFrame + DataFrame containing the choices made by each person. + + Returns + ------- + None -- class variable self.scaled_pnr_capacity_df is updated + """ + + # scale the capacities based on simulation sample rate + capacities = state.get_dataframe("land_use")[ + self.model_settings.LANDUSE_PNR_SPACES_COLUMN + ] + sample_rate = state.get_dataframe("households")["sample_rate"].mode()[0] + scaled_capacities = np.ceil(capacities * sample_rate) + scaled_capacities = scaled_capacities.clip(lower=0).fillna(0).astype(int) + + # update the shared DataFrame with the new capacities + self.scaled_pnr_capacity_df = scaled_capacities.to_frame(name="pnr_capacity") + + def select_new_choosers(self, state, choosers): + """ + Select new choosers for park-and-ride lots based on the current choices. + + Parameters + ---------- + state : object + The current state of the simulation. + choosers : pandas.DataFrame + DataFrame containing the current choosers. + + Returns + ------- + pandas.DataFrame + Updated DataFrame with the new choosers in this process. + """ + # select tours from capacitated zones + # note that this dataframe contains tours across all processes but choosers is only from the current process + self.determine_capacitated_pnr_zones(state) + tours_in_cap_zones = self.choices_synced[ + self.choices_synced.pnr_zone_id.isin(self.capacitated_zones) + ] + + # if no tours in capacitated zones, return empty DataFrame + if tours_in_cap_zones.empty: + return pd.DataFrame(columns=choosers.columns) + + if self.model_settings.RESAMPLE_STRATEGY == "latest": + # determining how many tours to select from each over-capacitated zone + num_over_limit = ( + self.shared_pnr_occupancy_df["pnr_occupancy"] + - self.scaled_pnr_capacity_df["pnr_capacity"] + ) + num_over_limit = num_over_limit[num_over_limit > 0].astype(int) + + tours_in_cap_zones = pd.merge( + tours_in_cap_zones, + num_over_limit.to_frame(name="num_over_limit"), + left_on="pnr_zone_id", + right_index=True, + how="left", + ) + + # sort tours by order arriving at each pnr zone + tours_in_cap_zones.sort_values( + by=["pnr_zone_id", "start"], ascending=[True, False], inplace=True + ) + # counting tours in each pnr zone numbered by reverse arrival order + tours_in_cap_zones["arrival_num_latest"] = ( + tours_in_cap_zones.groupby("pnr_zone_id").cumcount() + 1 + ) + + # tours_in_cap_zones + # tour_id | pnr_zone_id | num_over_limit | start | arrival_num_latest + # 123 5 2 10 1 + # 456 5 2 9 2 + # 789 5 2 8 3 + + # selecting tours that are over the capacity limit + over_capacitated_tours = tours_in_cap_zones[ + tours_in_cap_zones["arrival_num_latest"] + <= tours_in_cap_zones["num_over_limit"] + ] + + choosers = choosers[choosers.index.isin(over_capacitated_tours.index)] + + elif self.model_settings.RESAMPLE_STRATEGY == "random": + # first determine sample rate for each zone + zonal_sample_rate = ( + self.shared_pnr_occupancy_df["pnr_occupancy"] + / self.scaled_pnr_capacity_df["pnr_capacity"] + ) + zonal_sample_rate = zonal_sample_rate[zonal_sample_rate > 1] + zonal_sample_rate = (zonal_sample_rate - 1).clip(lower=0, upper=1) + + # person's probability of being selected for re-simulation is from the zonal sample rate + sample_rates = tours_in_cap_zones.pnr_zone_id.map( + zonal_sample_rate.to_dict() + ) + probs = pd.DataFrame( + data={"0": 1 - sample_rates, "1": sample_rates}, + index=tours_in_cap_zones.index, + ) + # using ActivitySim's RNG to make choices for repeatability + current_sample, rands = logit.make_choices(state, probs) + current_sample = current_sample[current_sample == 1] + + choosers = choosers[choosers.index.isin(current_sample.index)] + + # subtract the number of selected tours from the occupancy counts since they are getting resimulated + pnr_counts = ( + choosers.pnr_zone_id.value_counts() + .reindex(self.shared_pnr_occupancy_df.index) + .fillna(0) + .astype(int) + ) + self.shared_pnr_occupancy_df["pnr_occupancy"].values[:] -= pnr_counts.values + + return choosers + + def determine_capacitated_pnr_zones(self, state): + """ + Determine which park-and-ride zones are at or over capacity. + + Returns + ------- + None -- class variable self.capacitated_zones is updated + """ + tol = self.model_settings.ACCEPTED_TOLERANCE + + cap = self.scaled_pnr_capacity_df["pnr_capacity"] + occ = self.shared_pnr_occupancy_df["pnr_occupancy"] + assert cap.index.equals( + occ.index + ), "capacity and occupancy indices do not match" + + valid = cap > 0 # ignore zero-capacity zones + # capacitated if at least tol fraction full (or over capacity) + capacitated_zones_mask = valid & (occ >= np.ceil(cap * tol)) + + self.capacitated_zones = cap.index[capacitated_zones_mask] + + # generating a df summarizing zonal capacity + df = pd.DataFrame({"pnr_capacity": cap, "pnr_occupancy": occ}) + with np.errstate(divide="ignore", invalid="ignore"): + df["pct_utilized"] = np.where( + df.pnr_capacity > 0, df.pnr_occupancy / df.pnr_capacity * 100, np.nan + ) + df["over_by"] = (df.pnr_occupancy - df.pnr_capacity).clip(lower=0) + df["capacitated"] = capacitated_zones_mask + self.capacity_snapshot = df + + # writing snapshot to output trace folder + if state.settings.trace_hh_id: + state.tracing.trace_df( + df=self.capacity_snapshot, + label=f"pnr_capacity_snapshot_i{self.iteration}", + transpose=False, + ) + + def flag_capacitated_pnr_zones(self, pnr_alts): + """ + Flag park-and-ride lots that are at capacity. + Not removing them from the alternatives, just setting a flag for use in utility calculations. + Don't want to change the number of alternatives between iterations for better tracing. + + Parameters + ---------- + pnr_alts : pandas.DataFrame + landuse table with park-and-ride alternatives + + Returns + ------- + pandas.DataFrame + updated landuse table + """ + + return np.where(pnr_alts.index.isin(self.capacitated_zones), 1, 0) + + +def create_park_and_ride_capacity_data_buffers(state): + """ + Sets up multiprocessing buffers for park-and-ride lot choice. + + One buffer for adding up the number of park-and-ride lot choice zone ids to calculate capacities. + One other buffer keeping track of the choices for each tour so we can choose which ones to resimulate. + This function is called before the multiprocesses are kicked off in activitysim/core/mp_tasks.py + """ + + # get landuse and person tables to determine the size of the buffers + persons = state.get_dataframe("persons") + + # don't know a-priori how many park-and-ride tours there are at the start of the model run + # giving the buffer a size equal to the number of persons should be sufficient + n = len(persons) + + # creating one interprocess lock to be shared by all arrays + shared_lock = mp.RLock() + + # need two arrays -- one for the choices and one for the IDs of the tours making the choice + choice_arr = mp.Array(ctypes.c_int64, n, lock=shared_lock) + choice_arr_idx = mp.Array(ctypes.c_int64, n, lock=shared_lock) + choice_arr_start = mp.Array(ctypes.c_int64, n, lock=shared_lock) + + # init the arrays to 0 + choice_arr[:] = np.zeros(n, dtype=np.int64).tolist() + choice_arr_idx[:] = np.zeros(n, dtype=np.int64).tolist() + choice_arr_start[:] = np.zeros(n, dtype=np.int64).tolist() + + # one more shared array of length two to count processes as they check in and out + pnr_mp_tally = mp.Array(ctypes.c_int64, 2, lock=shared_lock) + pnr_mp_tally[:] = [0, 0] + + # recording memory size + total_bytes = n * np.dtype(np.int64).itemsize * 3 + logger.info( + f"allocating shared park-and-ride lot choice buffers with buffer_size {total_bytes} bytes ({util.GB(total_bytes)})" + ) + + data_buffers = { + "shared_pnr_choice": choice_arr, + "shared_pnr_choice_idx": choice_arr_idx, + "shared_pnr_choice_start": choice_arr_start, + "pnr_mp_tally": pnr_mp_tally, + } + + return data_buffers diff --git a/activitysim/abm/models/util/test/test_write_matrices.py b/activitysim/abm/models/util/test/test_write_matrices.py new file mode 100644 index 000000000..81cc8adc5 --- /dev/null +++ b/activitysim/abm/models/util/test/test_write_matrices.py @@ -0,0 +1,230 @@ +import numpy as np +import pandas as pd +import openmatrix as omx +import os +from pathlib import Path + +from activitysim.abm.models.trip_matrices import ( + write_matrices, + WriteTripMatricesSettings, + MatrixSettings, + MatrixTableSettings, +) +from activitysim.core import workflow + + +def get_settings(omx_file_name): + # Settings: same table name twice, second time with custom destination column + settings = WriteTripMatricesSettings( + MATRICES=[ + MatrixSettings( + file_name=Path(omx_file_name), + is_tap=False, + tables=[ + MatrixTableSettings( + name="DRIVEALONE_VOT1", + data_field="DRIVEALONE_VOT1", + ), + MatrixTableSettings( + name="DRIVEALONE_VOT1", + data_field="PNR_DRIVEALONE_OUT", + origin="origin", + destination="pnr_zone_id", + ), + MatrixTableSettings( + name="WALK_TRN_OUT", + data_field="WALK_TRN_OUT", + origin="origin", + destination="destination", + ), + MatrixTableSettings( + name="WALK_TRN_OUT", + data_field="PNR_TRN_OUT", + origin="pnr_zone_id", + destination="destination", + ), + ], + ) + ] + ) + return settings + + +def test_write_matrices_one_zone(): + state = workflow.State.make_default(__file__) + + # Zones domain (TAZ) + zone_index = pd.Index([1, 2, 3, 4], name="TAZ") + + # Trips with both standard drive and PNR drive legs + trips_df = pd.DataFrame( + { + "origin": [1, 1, 2], + "destination": [2, 2, 4], + "pnr_zone_id": [3, -1, 4], + # we have more flags below than trips, but that's ok for testing + "DRIVEALONE_VOT1": [0, 1, 1], + "PNR_DRIVEALONE_OUT": [1, 0, 1], + "WALK_TRN_OUT": [1, 0, 1], + "PNR_TRN_OUT": [1, 0, 1], + "sample_rate": [0.5, 0.5, 0.5], + } + ) + + # Run writer + write_matrices( + state=state, + trips_df=trips_df, + zone_index=zone_index, + model_settings=get_settings("trips_one_zone.omx"), + is_tap=False, + ) + + # Validate output + omx_path = os.path.join(os.path.dirname(__file__), "output", "trips_one_zone.omx") + assert os.path.exists(omx_path), "OMX file not created" + + with omx.open_file(omx_path, mode="r") as f: + # Mapping should reflect zone_index order [1,2,3,4] + mapping = f.mapping("TAZ") + assert list(mapping) == [1, 2, 3, 4] + + # --- checking driving output + assert "DRIVEALONE_VOT1" in f.list_matrices(), "Expected matrix not found" + data = f["DRIVEALONE_VOT1"][:] + + # Expected totals: + # - zones (1->2): 1 drive = 1 + # - zones (1->3): 1 PNR drive leg = 1 + # - zones (2->4): 1 PNR drive leg + 1 drive = 2 + # and double them based on sample rate + assert data.shape == (4, 4) + # indices are zero-based positions for labels [1,2,3,4] + assert data[0, 1] == 2 # 1->2 + assert data[0, 2] == 2 # 1->3 + assert data[1, 3] == 4 # 2->4 + # everything else remains zero + zero_mask = np.ones_like(data, dtype=bool) + zero_mask[0, 1] = False + zero_mask[0, 2] = False + zero_mask[1, 3] = False + assert np.all(data[zero_mask] == 0.0) + + # ---- checking transit output + assert "WALK_TRN_OUT" in f.list_matrices(), "Expected matrix not found" + data = f["WALK_TRN_OUT"][:] + + # Expected totals: + # - zones (1->2): 1 wlk trn + # - zones (2->4): 1 wlk trn + # - zones (3->2): 1 PNR trn leg + # - zones (4->4): 1 PNR trn leg + # and double them based on sample rate + assert data.shape == (4, 4) + assert data[0, 1] == 2 # 1->2 + assert data[1, 3] == 2 # 2->4 + assert data[2, 1] == 2 # 3->2 + assert data[3, 3] == 2 # 4->4 + # everything else remains zero + zero_mask = np.ones_like(data, dtype=bool) + zero_mask[0, 1] = False + zero_mask[1, 3] = False + zero_mask[2, 1] = False + zero_mask[3, 3] = False + assert np.all(data[zero_mask] == 0.0) + + +# ...existing code... +def test_write_matrices_two_zone(): + state = workflow.State.make_default(__file__) + + # land_use table for MAZ -> TAZ mapping + # MAZ: 101,102 -> TAZ 1; 103,104 -> TAZ 2 + land_use = pd.DataFrame( + {"TAZ": [1, 1, 2, 2]}, + index=pd.Index([101, 102, 103, 104], name="MAZ"), + ) + state.add_table("land_use", land_use) + + # TAZ domain for output + zone_index = pd.Index([1, 2], name="TAZ") + + # Trips in MAZ space, with both standard drive and PNR drive legs + trips_df = pd.DataFrame( + { + "origin": [101, 102, 103], + "destination": [102, 104, 104], + "pnr_zone_id": [103, 0, 104], + # we have more flags below than trips, but that's ok for testing + "DRIVEALONE_VOT1": [0, 1, 1], + "PNR_DRIVEALONE_OUT": [1, 0, 1], + "WALK_TRN_OUT": [1, 0, 1], + "PNR_TRN_OUT": [1, 0, 1], + "sample_rate": [0.5, 0.5, 0.5], + } + ) + + # Run writer + write_matrices( + state=state, + trips_df=trips_df, + zone_index=zone_index, + model_settings=get_settings("trips_two_zone.omx"), + is_tap=False, + ) + + # Validate output + omx_path = os.path.join(os.path.dirname(__file__), "output", "trips_two_zone.omx") + assert os.path.exists(omx_path), "OMX file not created" + + with omx.open_file(omx_path, mode="r") as f: + mapping = f.mapping("TAZ") + assert list(mapping) == [1, 2] + + # ---- checking drive output + assert "DRIVEALONE_VOT1" in f.list_matrices(), "Expected matrix not found" + data = f["DRIVEALONE_VOT1"][:] + + # Expected before expansion weighting: + # Standard drive (MAZ->TAZ): + # 102->104 => 1->2 : 1 + # 103->104 => 2->2 : 1 + # PNR drive legs (origin->pnr_zone_id): + # 101->103 => 1->2 : +1 + # 103->104 => 2->2 : +1 + # Totals: (1,2)=2, (2,2)=2 + # With sample_rate=0.5, values are doubled: + assert data.shape == (2, 2) + assert data[0, 1] == 4 # 1->2 + assert data[1, 1] == 4 # 2->2 + + # All other cells zero + zero_mask = np.ones_like(data, dtype=bool) + zero_mask[0, 1] = False + zero_mask[1, 1] = False + assert np.all(data[zero_mask] == 0.0) + + # ---- checking transit output + assert "WALK_TRN_OUT" in f.list_matrices(), "Expected matrix not found" + data = f["WALK_TRN_OUT"][:] + + # Expected before expansion weighting: + # regular transit + # 101->102 => 1->1 : 1 + # 103->104 => 2->2 : 1 + # pnr transit legs + # 103->102 => 2->1 : +1 + # 104->104 => 2->2 : +1 + # Totals: (1,1)=1, (2,1)=1, (2,2)=2 + # With sample_rate=0.5, values are doubled: + assert data.shape == (2, 2) + assert data[0, 0] == 2 # 1->1 + assert data[1, 0] == 2 # 2->1 + assert data[1, 1] == 4 # 2->2 + + # All other cells zero + zero_mask = np.ones_like(data, dtype=bool) + zero_mask[0, 0] = False + zero_mask[1, 0] = False + zero_mask[1, 1] = False + assert np.all(data[zero_mask] == 0.0) diff --git a/activitysim/abm/models/util/vectorize_tour_scheduling.py b/activitysim/abm/models/util/vectorize_tour_scheduling.py index d4593c21f..e338918cb 100644 --- a/activitysim/abm/models/util/vectorize_tour_scheduling.py +++ b/activitysim/abm/models/util/vectorize_tour_scheduling.py @@ -18,6 +18,8 @@ from activitysim.core.configuration.logit import LogitComponentSettings from activitysim.core.interaction_sample_simulate import interaction_sample_simulate from activitysim.core.util import reindex +from activitysim.abm.models.util.logsums import setup_skims +from activitysim.abm.models.park_and_ride_lot_choice import run_park_and_ride_lot_choice logger = logging.getLogger(__name__) @@ -64,80 +66,6 @@ class TourSchedulingSettings(LogitComponentSettings, extra="forbid"): """ -def skims_for_logsums( - state: workflow.State, - tour_purpose, - model_settings: TourSchedulingSettings, - trace_label: str, -): - network_los = state.get_injectable("network_los") - - skim_dict = network_los.get_default_skim_dict() - - orig_col_name = "home_zone_id" - - destination_for_tour_purpose = model_settings.DESTINATION_FOR_TOUR_PURPOSE - if isinstance(destination_for_tour_purpose, str): - dest_col_name = destination_for_tour_purpose - elif isinstance(destination_for_tour_purpose, dict): - dest_col_name = destination_for_tour_purpose.get(tour_purpose) - else: - raise RuntimeError( - f"expected string or dict DESTINATION_FOR_TOUR_PURPOSE model_setting for {tour_purpose}" - ) - - odt_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=orig_col_name, dest_key=dest_col_name, dim3_key="out_period" - ) - dot_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=dest_col_name, dest_key=orig_col_name, dim3_key="in_period" - ) - odr_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=orig_col_name, dest_key=dest_col_name, dim3_key="in_period" - ) - dor_skim_stack_wrapper = skim_dict.wrap_3d( - orig_key=dest_col_name, dest_key=orig_col_name, dim3_key="out_period" - ) - od_skim_stack_wrapper = skim_dict.wrap(orig_col_name, dest_col_name) - - skims = { - "odt_skims": odt_skim_stack_wrapper, - "dot_skims": dot_skim_stack_wrapper, - "odr_skims": odr_skim_stack_wrapper, - "dor_skims": dor_skim_stack_wrapper, - "od_skims": od_skim_stack_wrapper, - "orig_col_name": orig_col_name, - "dest_col_name": dest_col_name, - } - - if network_los.zone_system == los.THREE_ZONE: - # fixme - is this a lightweight object? - tvpb = network_los.tvpb - - tvpb_logsum_odt = tvpb.wrap_logsum( - orig_key=orig_col_name, - dest_key=dest_col_name, - tod_key="out_period", - segment_key="demographic_segment", - trace_label=trace_label, - tag="tvpb_logsum_odt", - ) - tvpb_logsum_dot = tvpb.wrap_logsum( - orig_key=dest_col_name, - dest_key=orig_col_name, - tod_key="in_period", - segment_key="demographic_segment", - trace_label=trace_label, - tag="tvpb_logsum_dot", - ) - - skims.update( - {"tvpb_logsum_odt": tvpb_logsum_odt, "tvpb_logsum_dot": tvpb_logsum_dot} - ) - - return skims - - def _compute_logsums( state: workflow.State, alt_tdd, @@ -145,7 +73,6 @@ def _compute_logsums( tour_purpose, model_settings: TourSchedulingSettings, network_los, - skims, trace_label, ): """ @@ -165,6 +92,42 @@ def _compute_logsums( f"{trace_label} compute_logsums for {choosers.shape[0]} choosers {alt_tdd.shape[0]} alts" ) + if logsum_settings.include_pnr_for_logsums: + # if the logsum settings include explicit PNR, then we need to add the + # PNR lot destination column to the choosers table by running PnR lot choice + choosers["pnr_zone_id"] = run_park_and_ride_lot_choice( + state, + choosers=choosers, + land_use=state.get_dataframe("land_use"), + network_los=state.get_injectable("network_los"), + model_settings=None, + choosers_dest_col_name="destination", + choosers_origin_col_name="home_zone_id", + estimator=None, + pnr_capacity_cls=None, + trace_label=tracing.extend_trace_label(trace_label, "pnr_lot_choice"), + ) + + # set destination column name for skims used in logsums + destination_for_tour_purpose = model_settings.DESTINATION_FOR_TOUR_PURPOSE + if isinstance(destination_for_tour_purpose, str): + dest_col_name = destination_for_tour_purpose + elif isinstance(destination_for_tour_purpose, dict): + dest_col_name = destination_for_tour_purpose.get(tour_purpose) + else: + raise RuntimeError( + f"expected string or dict DESTINATION_FOR_TOUR_PURPOSE model_setting for {tour_purpose}" + ) + + skims = setup_skims( + state.get_injectable("network_los"), + choosers, + add_periods=False, + include_pnr_skims=logsum_settings.include_pnr_for_logsums, + orig_col_name="home_zone_id", + dest_col_name=dest_col_name, + ) + # - locals_dict constants = config.get_model_constants(logsum_settings) locals_dict = {} @@ -369,7 +332,6 @@ def compute_tour_scheduling_logsums( tours_merged, tour_purpose, model_settings: TourSchedulingSettings, - skims, trace_label, *, chunk_sizer: chunk.ChunkSizer, @@ -415,7 +377,6 @@ def compute_tour_scheduling_logsums( tour_purpose, model_settings, network_los, - skims, trace_label, ) return logsums @@ -443,7 +404,6 @@ def compute_tour_scheduling_logsums( tour_purpose, model_settings, network_los, - skims, trace_label, ) @@ -487,7 +447,6 @@ def compute_tour_scheduling_logsums( tour_purpose, model_settings, network_los, - skims, trace_label, ) state.tracing.trace_df( @@ -703,7 +662,6 @@ def _schedule_tours( spec, logsum_tour_purpose, model_settings: TourSchedulingSettings, - skims, timetable, window_id_col, previous_tour, @@ -802,7 +760,6 @@ def _schedule_tours( tours, logsum_tour_purpose, model_settings, - skims, tour_trace_label, chunk_sizer=chunk_sizer, ) @@ -918,14 +875,6 @@ def schedule_tours( else: assert not tours[timetable_window_id_col].duplicated().any() - if model_settings.LOGSUM_SETTINGS: - # we need skims to calculate tvpb skim overhead in 3_ZONE systems for use by calc_rows_per_chunk - skims = skims_for_logsums( - state, logsum_tour_purpose, model_settings, tour_trace_label - ) - else: - skims = None - result_list = [] for ( _i, @@ -947,7 +896,6 @@ def schedule_tours( spec, logsum_tour_purpose, model_settings, - skims, timetable, timetable_window_id_col, previous_tour, diff --git a/activitysim/abm/tables/disaggregate_accessibility.py b/activitysim/abm/tables/disaggregate_accessibility.py index 7828e1c4c..e21db54b7 100644 --- a/activitysim/abm/tables/disaggregate_accessibility.py +++ b/activitysim/abm/tables/disaggregate_accessibility.py @@ -16,7 +16,7 @@ def find_nearest_accessibility_zone( - state: workflow.State, choosers, accessibility_df, method="skims" + state: workflow.State, model_settings, choosers, accessibility_df, method="skims" ): """ Matches choosers zone to the nearest accessibility zones. @@ -30,9 +30,16 @@ def weighted_average(df, values, weights): def nearest_skim(oz, zones): # need to pass equal # of origins and destinations to skim_dict orig_zones = np.full(shape=len(zones), fill_value=oz, dtype=int) + + skim_name = model_settings.NEAREST_ZONE_SKIM + if "__" in skim_name: + # If the skim name contains '__', it is a 3D skim + # we need to pass the skim name as a tuple to the lookup method, e.g. ('WALK_TRANSIT_IVTT', 'MD') + skim_name = tuple(skim_name.split("__")) + return ( oz, - zones[np.argmin(skim_dict.lookup(orig_zones, zones, "DIST"))], + zones[np.argmin(skim_dict.lookup(orig_zones, zones, skim_name))], ) def nearest_node(oz, zones_df): @@ -186,7 +193,11 @@ def disaggregate_accessibility(state: workflow.State) -> pd.DataFrame: # Note that from here on the 'home_zone_id' is the matched name if "nearest_accessibility_zone_id" not in persons_merged_df.columns: persons_merged_df = find_nearest_accessibility_zone( - state, persons_merged_df, proto_accessibility_df, nearest_method + state, + model_settings, + persons_merged_df, + proto_accessibility_df, + nearest_method, ) # Copy home_zone_id in proto-table to match the temporary 'nearest_zone_id' diff --git a/activitysim/core/configuration/logit.py b/activitysim/core/configuration/logit.py index a97143f2d..9dae78444 100644 --- a/activitysim/core/configuration/logit.py +++ b/activitysim/core/configuration/logit.py @@ -282,3 +282,13 @@ class TourModeComponentSettings(TemplatedLogitComponentSettings, extra="forbid") PreprocessorSettings ] | None = None LOGSUM_CHOOSER_COLUMNS: list[str] = [] + + run_atwork_pnr_lot_choice: bool = False + """ + Flag to determine whether to include explicit park-and-ride lot locations for atwork subtours + """ + include_pnr_for_logsums: bool = False + """ + Flag to determine whether to include park-and-ride lot locations in the logsum calculations. + This means that every OD pair for which a logsum is created will also run the park-and-ride lot choice model. + """ diff --git a/activitysim/core/mp_tasks.py b/activitysim/core/mp_tasks.py index 7d31134bc..62a362c02 100644 --- a/activitysim/core/mp_tasks.py +++ b/activitysim/core/mp_tasks.py @@ -1274,6 +1274,43 @@ def allocate_shared_shadow_pricing_buffers_choice(state): return shadow_pricing_buffers_choice +def allocate_park_and_ride_lot_choice_buffers(state: workflow.State, run_list: dict): + """ + This is called by the main process to allocate memory buffer to share with subprocs + + Only allocates memory if park-and-ride lot choice model is included and iterated with tour mode choice + + Returns + ------- + multiprocessing.RawArray + """ + + info(state, "allocate_park_and_ride_lot_choice_buffers") + + park_and_ride_lot_choice_buffers = {} + + # only allocate buffers if park_and_ride_lot_choice is in run_list + # and if it is iterated with tour mode choice + if "park_and_ride_lot_choice" in run_list["models"]: + from activitysim.abm.models import park_and_ride_lot_choice + + model_settings = ( + park_and_ride_lot_choice.ParkAndRideLotChoiceSettings.read_settings_file( + state.filesystem, + "park_and_ride_lot_choice.yaml", + ) + ) + + if model_settings.ITERATE_WITH_TOUR_MODE_CHOICE: + from activitysim.abm.models.util import park_and_ride_capacity + + park_and_ride_lot_choice_buffers = ( + park_and_ride_capacity.create_park_and_ride_capacity_data_buffers(state) + ) + + return park_and_ride_lot_choice_buffers + + def run_sub_simulations( state: workflow.State, injectables, @@ -1641,6 +1678,14 @@ def find_breadcrumb(crumb, default=None): t0 = tracing.print_elapsed_time("allocate shared shadow_pricing choice buffer", t0) state.trace_memory_info("allocate_shared_shadow_pricing_buffers_choice.completed") + # add park_and_ride_lot_choice buffers to shared_data_buffers + t0 = tracing.print_elapsed_time() + shared_data_buffers.update( + allocate_park_and_ride_lot_choice_buffers(state, run_list) + ) + t0 = tracing.print_elapsed_time("allocate park_and_ride_lot_choice buffer", t0) + state.trace_memory_info("allocate_park_and_ride_lot_choice_buffers.completed") + start_time = time.time() if sharrow_enabled: shared_data_buffers["skim_dataset"] = "sh.Dataset:skim_dataset"