Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UK country filters don't work with constituency outputs #2209

Merged
merged 4 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- bump: patch
changes:
fixed:
- Bug causing UK country filters to fail with constituency outputs
added:
- Unit tests for UK country filter functionality with constituencies
78 changes: 59 additions & 19 deletions policyengine_api/jobs/calculate_economy_simulation_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
from typing import Type
import pandas as pd
import numpy as np

from policyengine_api.jobs import BaseJob
from policyengine_api.jobs.tasks import compute_general_economy
Expand Down Expand Up @@ -264,17 +265,19 @@ def _create_simulation_uk(
)
simulation.default_calculation_period = time_period
if region != "uk":
constituency_weights_path = download_huggingface_dataset(
repo="policyengine/policyengine-uk-data",
repo_filename="parliamentary_constituency_weights.h5",
)
constituency_names_path = download_huggingface_dataset(
repo="policyengine/policyengine-uk-data",
repo_filename="constituencies_2024.csv",
)
constituency_names = pd.read_csv(constituency_names_path)
with h5py.File(constituency_weights_path, "r") as f:
weights = f["2025"][...]
if "constituency/" in region:
constituency = region.split("/")[1]
constituency_weights_path = download_huggingface_dataset(
repo="policyengine/policyengine-uk-data",
repo_filename="parliamentary_constituency_weights.h5",
)
constituency_names_path = download_huggingface_dataset(
repo="policyengine/policyengine-uk-data",
repo_filename="constituencies_2024.csv",
)
constituency_names = pd.read_csv(constituency_names_path)
if constituency in constituency_names.code.values:
constituency_id = constituency_names[
constituency_names.code == constituency
Expand All @@ -288,23 +291,15 @@ def _create_simulation_uk(
f"Constituency {constituency} not found. See {constituency_names_path} for the list of available constituencies."
)
simulation.calculate("household_net_income", 2025)
with h5py.File(constituency_weights_path, "r") as f:
weights = f["2025"][...]

weights = weights[constituency_id]

simulation.set_input("household_weight", 2025, weights)
simulation.get_holder("person_weight").delete_arrays()
simulation.get_holder("benunit_weight").delete_arrays()
elif "country/" in region:
country_region = region.split("/")[1]
region_values = simulation.calculate(
"country", map_to="person"
).values
df = simulation.to_input_dataframe()
simulation = Microsimulation(
dataset=df[region_values == country_region],
reform=reform,
self._apply_uk_country_filter(
region, weights, constituency_names, simulation
)

return simulation
Expand Down Expand Up @@ -364,6 +359,51 @@ def _create_simulation_us(
# Return completed simulation
return Microsimulation(**sim_options)

def _apply_uk_country_filter(
self, region, weights, constituency_names, simulation
):
"""
Apply a country filter for UK simulations based on constituency codes.

Parameters:
-----------
region : str
The region string in format 'country/{country}' where country can be
england, scotland, wales, or ni.
weights : np.array
The constituency weights array from h5py file.
constituency_names : pd.DataFrame
Dataframe containing constituency codes and names.
simulation : Microsimulation
The microsimulation object to apply the filter to.
"""
simulation.calculate("household_net_income", 2025)
country_region = region.split("/")[1]

# Map country region to prefix codes in constituency data
country_region_code = {
"england": "E",
"scotland": "S",
"wales": "W",
"ni": "N",
}[country_region]

# Create a boolean mask for constituencies in the selected country
weight_indices = constituency_names.code.str.startswith(
country_region_code
)

# Apply the filter to the weights
# weights shape = (650, 100180). weight_indices_shape = (650)
weights_ = np.zeros((weights.shape[0], weights.shape[1]))
weights_[weight_indices] = weights[weight_indices]
weights_ = weights_.sum(axis=0)

# Update the simulation with filtered weights
simulation.set_input("household_weight", 2025, weights_)
simulation.get_holder("person_weight").delete_arrays()
simulation.get_holder("benunit_weight").delete_arrays()

def _compute_cliff_impacts(self, simulation: Microsimulation) -> Dict:
cliff_gap = simulation.calculate("cliff_gap")
is_on_cliff = simulation.calculate("is_on_cliff")
Expand Down
1 change: 1 addition & 0 deletions tests/unit/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Unit tests directory
Empty file.
107 changes: 107 additions & 0 deletions tests/unit/fixtures/jobs/test_calculate_economy_simulation_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import pytest
import unittest.mock as mock
import numpy as np
import pandas as pd
import h5py


@pytest.fixture
def mock_huggingface_downloads(monkeypatch):
"""Mock the huggingface dataset downloads."""

def mock_download(repo, repo_filename):
# Return mock file paths for constituency data
if "constituency_weights" in repo_filename:
return "mock_weights.h5"
elif "constituencies_2024.csv" in repo_filename:
return "mock_constituencies.csv"
return repo_filename

Check warning on line 18 in tests/unit/fixtures/jobs/test_calculate_economy_simulation_job.py

View check run for this annotation

Codecov / codecov/patch

tests/unit/fixtures/jobs/test_calculate_economy_simulation_job.py#L18

Added line #L18 was not covered by tests

monkeypatch.setattr(
"policyengine_api.jobs.calculate_economy_simulation_job.download_huggingface_dataset",
mock_download,
)


@pytest.fixture
def mock_country():
"""Create a mock UK country object."""
mock_country = mock.MagicMock()
mock_country.name = "uk"
return mock_country


@pytest.fixture
def mock_h5py_weights(monkeypatch):
"""Mock reading h5py weights."""
# Create a weight matrix with 650 constituencies and 100 households
mock_weights = np.ones((650, 100))

# Create a mock dataset that works with [...] syntax
mock_dataset = mock.MagicMock()
mock_dataset.__getitem__.return_value = mock_weights

# Create a mock group with the dataset
mock_group = mock.MagicMock()
mock_group.__getitem__.return_value = mock_dataset

# Create a mock file
mock_file = mock.MagicMock()
mock_file.__enter__.return_value = mock_group
mock_file.__exit__.return_value = None

monkeypatch.setattr(h5py, "File", lambda path, mode: mock_file)
return mock_weights


@pytest.fixture
def mock_constituency_names(monkeypatch):
"""Mock constituency names dataframe."""
# Create mock constituency data with English (E), Scottish (S), Welsh (W) and Northern Irish (N) constituencies
# Need 650 constituencies to match the weights array shape
codes = []
names = []

# Create 400 English constituencies
for i in range(400):
codes.append(f"E{i:07d}")
names.append(f"English Constituency {i}")

# Create 150 Scottish constituencies
for i in range(150):
codes.append(f"S{i:07d}")
names.append(f"Scottish Constituency {i}")

# Create 50 Welsh constituencies
for i in range(50):
codes.append(f"W{i:07d}")
names.append(f"Welsh Constituency {i}")

# Create 50 Northern Irish constituencies
for i in range(50):
codes.append(f"N{i:07d}")
names.append(f"Northern Irish Constituency {i}")

data = {"code": codes, "name": names}
mock_df = pd.DataFrame(data)

monkeypatch.setattr(pd, "read_csv", lambda path: mock_df)
return mock_df


@pytest.fixture
def mock_simulation():
"""Create a mock simulation object."""
simulation = mock.MagicMock()
simulation.calculate.return_value = None
simulation.set_input.return_value = None

# Mock the holder objects
person_holder = mock.MagicMock()
benunit_holder = mock.MagicMock()
simulation.get_holder.side_effect = lambda name: {
"person_weight": person_holder,
"benunit_weight": benunit_holder,
}.get(name)

return simulation
1 change: 1 addition & 0 deletions tests/unit/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Jobs tests directory
Loading
Loading