Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,5 @@ jobs:
tests/test_batch_pipeline_dry_run_quarantine.py \
tests/test_batch_e1_hygiene.py \
tests/test_batch_e2_hygiene.py \
tests/test_skymodel_phase_dir.py
tests/test_skymodel_phase_dir.py \
tests/test_epoch_gaincal_field_shape.py
8 changes: 5 additions & 3 deletions dsa110_continuum/calibration/bandpass_diagnostics.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,12 @@ def check_geometric_setup(
field_indices = list(range(24)) # Default: all fields

# Check field phase centers
from dsa110_continuum.calibration.runner import _extract_field_ra_dec

with table(f"{ms_path}::FIELD", readonly=True) as tb:
phase_dir = tb.getcol("PHASE_DIR") # Shape: (nfields, 1, 2)
ra_rad = phase_dir[:, 0, 0]
dec_rad = phase_dir[:, 0, 1]
phase_dir = tb.getcol("PHASE_DIR")
# Shape-tolerant: handles both (nfields, 1, 2) and (nfields, 2, 1)
ra_rad, dec_rad = _extract_field_ra_dec(phase_dir)

# Filter to selected fields
field_ra = np.degrees(ra_rad[field_indices])
Expand Down
7 changes: 5 additions & 2 deletions dsa110_continuum/calibration/beam_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,16 @@ def evaluate_beam_docker(
# Extract pointing center from MS FIELD table
try:
from dsa110_continuum.adapters.casa_tables import table
from dsa110_continuum.calibration.runner import _extract_field_ra_dec

with table(ms_path + "::FIELD") as tf:
phase_dir = tf.getcol("PHASE_DIR")
if field_id >= phase_dir.shape[0]:
raise ValueError(f"Field ID {field_id} out of range (max={phase_dir.shape[0] - 1})")
pointing_ra_deg = float(np.degrees(phase_dir[field_id, 0, 0]))
pointing_dec_deg = float(np.degrees(phase_dir[field_id, 0, 1]))
# Shape-tolerant extraction: handles (nfields, 1, 2) and (nfields, 2, 1)
ra_all, dec_all = _extract_field_ra_dec(phase_dir)
pointing_ra_deg = float(np.degrees(ra_all[field_id]))
pointing_dec_deg = float(np.degrees(dec_all[field_id]))
except Exception as e:
logger.warning(
f"Could not extract pointing from MS: {e}. Using source position as pointing."
Expand Down
10 changes: 7 additions & 3 deletions dsa110_continuum/calibration/calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1624,14 +1624,18 @@ def _check_coherent_phasing(
field_indices = list(range(int(parts[0]), int(parts[1]) + 1))

# Read PHASE_DIR from FIELD table
from dsa110_continuum.calibration.runner import _extract_field_ra_dec

with table(f"{ms}::FIELD", readonly=True, ack=False) as field_tb:
if "PHASE_DIR" not in field_tb.colnames():
logger.warning("PHASE_DIR column not found - skipping coherence check")
return
phase_dir = field_tb.getcol("PHASE_DIR") # Shape: (nfields, 1, 2)
phase_dir = field_tb.getcol("PHASE_DIR")

# Get RA values for selected fields (in radians)
ra_values = np.array([phase_dir[i, 0, 0] for i in field_indices if i < len(phase_dir)])
# Get RA values for selected fields (in radians); helper handles both
# rows-first (nfields, 1, 2) and CASA column-major (nfields, 2, 1) shapes.
ra_all, _ = _extract_field_ra_dec(phase_dir)
ra_values = np.array([ra_all[i] for i in field_indices if i < len(phase_dir)])
if len(ra_values) < 2:
return

Expand Down
8 changes: 6 additions & 2 deletions dsa110_continuum/calibration/dec_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ def read_ms_dec(ms_path: str | Path, fits_fallback: str | Path | None = None) ->
log.debug("read_ms_dec: casacore not available, trying FITS fallback")
else:
try:
from dsa110_continuum.calibration.runner import _extract_field_ra_dec

with ct.table(str(ms_path) + "::FIELD", readonly=True, ack=False) as t:
phase_dir = t.getcol("PHASE_DIR") # (n_fields, 1, 2)
dec_rad = np.median(phase_dir[:, 0, 1])
phase_dir = t.getcol("PHASE_DIR")
# Shape-tolerant: handles (nfields, 1, 2) and (nfields, 2, 1).
_, dec_rad_all = _extract_field_ra_dec(phase_dir)
dec_rad = np.median(dec_rad_all)
return float(np.degrees(dec_rad))
except Exception as e:
log.warning("read_ms_dec: MS read failed (%s), trying FITS fallback", e)
Expand Down
11 changes: 7 additions & 4 deletions dsa110_continuum/calibration/epoch_gaincal.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
SKYMODEL_MIN_FLUX_MJY,
SOURCE_QUERY_RADIUS_DEG,
)
from dsa110_continuum.calibration.runner import phaseshift_ms
from dsa110_continuum.calibration.runner import _extract_field_ra_dec, phaseshift_ms
from dsa110_continuum.calibration.skymodels import (
make_unified_skymodel,
predict_from_skymodel_wsclean,
Expand All @@ -52,9 +52,12 @@ def _read_ms_phase_center(ms_path: str) -> tuple[float, float]:
from dsa110_continuum.adapters import casa_tables as ct

with ct.table(f"{ms_path}::FIELD", readonly=True, ack=False) as t:
phase_dir = t.getcol("PHASE_DIR") # shape (nfields, 1, 2) radians
ra_rad = phase_dir[:, 0, 0]
dec_rad = phase_dir[:, 0, 1]
phase_dir = t.getcol("PHASE_DIR")
# Shape-tolerant: PHASE_DIR is (nfields, 1, 2) on rows-first table backends
# and (nfields, 2, 1) when CASA returns column-major. _extract_field_ra_dec
# handles both; raw [:, 0, 1] indexing on the column-major shape raises
# IndexError on axis-2 size 1 (the original epoch_gaincal failure mode).
ra_rad, dec_rad = _extract_field_ra_dec(phase_dir)
# Circular mean for RA to handle 0/360 wrap
median_ra = float(np.degrees(np.angle(np.mean(np.exp(1j * ra_rad)))) % 360)
median_dec = float(np.degrees(np.median(dec_rad)))
Expand Down
35 changes: 22 additions & 13 deletions dsa110_continuum/calibration/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,11 @@ def _calculate_manual_model_data(
logger.debug(f"Reading MS metadata directly from tables for {ms_path}")
with casa_table(f"{ms_path}::FIELD", readonly=True) as field_tb:
if "PHASE_DIR" in field_tb.colnames():
phase_dir = field_tb.getcol("PHASE_DIR") # Shape: (nfields, 1, 2)
phase_dir = field_tb.getcol("PHASE_DIR")
logger.debug("Using PHASE_DIR for phase centers")
else:
# Fallback to REFERENCE_DIR if PHASE_DIR not available
phase_dir = field_tb.getcol("REFERENCE_DIR") # Shape: (nfields, 1, 2)
phase_dir = field_tb.getcol("REFERENCE_DIR")
logger.debug("PHASE_DIR not available, using REFERENCE_DIR")
nfields = len(phase_dir)

Expand All @@ -386,6 +386,12 @@ def _calculate_manual_model_data(
chan_freq = spw_tb.getcol("CHAN_FREQ") # Shape: (nspw, nchan)
nspw = len(chan_freq)

# Shape-tolerant per-field RA/Dec extraction. Handles cached metadata
# (any supported shape) AND direct getcol output, which can be rows-first
# (nfields, 1, 2) or CASA column-major (nfields, 2, 1).
from dsa110_continuum.calibration.runner import _extract_field_ra_dec
phase_ra_rad_all, phase_dec_rad_all = _extract_field_ra_dec(phase_dir)

# Log field selection
if field_indices is not None:
logger.debug(f"Field selection: {field_indices} ({len(field_indices)} fields)")
Expand Down Expand Up @@ -520,9 +526,9 @@ def _calculate_manual_model_data(
chunk_u = selected_u[chunk_start:chunk_end]
chunk_v = selected_v[chunk_start:chunk_end]

# Get phase centers for chunk rows
phase_centers_ra_rad = phase_dir[chunk_field_id, 0, 0]
phase_centers_dec_rad = phase_dir[chunk_field_id, 0, 1]
# Get phase centers for chunk rows (using shape-tolerant arrays)
phase_centers_ra_rad = phase_ra_rad_all[chunk_field_id]
phase_centers_dec_rad = phase_dec_rad_all[chunk_field_id]

# Convert to degrees
phase_centers_ra_deg = np.degrees(phase_centers_ra_rad)
Expand Down Expand Up @@ -1225,20 +1231,23 @@ def populate_model_from_catalog(
# Let's assume the phaseshift has put the source at the phase center.
# So we can use the phase center of the first field.
phase_dir = t.getcol("PHASE_DIR")
# field argument might be "0~23" or "0".
# We'll just take field 0 of the MS (since it's likely phaseshifted)
# Or parse field.
# Let's just use the first field in the MS for center.
ra_rad = phase_dir[0, 0, 0]
dec_rad = phase_dir[0, 0, 1]
from dsa110_continuum.calibration.runner import _extract_field_ra_dec
# Shape-tolerant: handles (nfields, 1, 2) and (nfields, 2, 1).
# field argument might be "0~23" or "0"; use field 0 (assumed phaseshifted).
ra_all, dec_all = _extract_field_ra_dec(phase_dir)
ra_rad = ra_all[0]
dec_rad = dec_all[0]
center_ra = np.degrees(ra_rad)
center_dec = np.degrees(dec_rad)
else:
# No name, no coords -> use MS field center
with tb.table(f"{ms_path}::FIELD", readonly=True) as t:
phase_dir = t.getcol("PHASE_DIR")
ra_rad = phase_dir[0, 0, 0]
dec_rad = phase_dir[0, 0, 1]
from dsa110_continuum.calibration.runner import _extract_field_ra_dec
# Shape-tolerant: handles (nfields, 1, 2) and (nfields, 2, 1).
ra_all, dec_all = _extract_field_ra_dec(phase_dir)
ra_rad = ra_all[0]
dec_rad = dec_all[0]
center_ra = np.degrees(ra_rad)
center_dec = np.degrees(dec_rad)

Expand Down
17 changes: 12 additions & 5 deletions dsa110_continuum/calibration/preconditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,13 @@ def _check_calibrator_transit(
t_end = Time(t_end_mjd, format="mjd")

# Get phase center RA
from dsa110_continuum.calibration.runner import _extract_field_ra_dec

with ct.table(f"{ms_path}::FIELD", readonly=True, ack=False) as tb:
phase_dir = tb.getcol("PHASE_DIR")
phase_ra_rad = phase_dir[0, 0, 0]
# Shape-tolerant: handles (nfields, 1, 2) and (nfields, 2, 1).
ra_rad_all, _ = _extract_field_ra_dec(phase_dir)
phase_ra_rad = ra_rad_all[0]
phase_ra_deg = float(np.degrees(phase_ra_rad)) % 360.0
if phase_ra_deg < 0:
phase_ra_deg += 360.0
Expand Down Expand Up @@ -270,16 +274,19 @@ def _check_coherent_phasing(
else:
field_indices = None # All fields

from dsa110_continuum.calibration.runner import _extract_field_ra_dec

with ct.table(f"{ms_path}::FIELD", readonly=True, ack=False) as tb:
phase_dir = tb.getcol("PHASE_DIR") # Shape: (nfields, 1, 2)
phase_dir = tb.getcol("PHASE_DIR")
n_fields = tb.nrows()

if field_indices is None:
field_indices = list(range(n_fields))

# Extract RA/Dec for selected fields
ra_rad = phase_dir[field_indices, 0, 0]
dec_rad = phase_dir[field_indices, 0, 1]
# Shape-tolerant extraction; then index by field_indices.
ra_all, dec_all = _extract_field_ra_dec(phase_dir)
ra_rad = ra_all[field_indices]
dec_rad = dec_all[field_indices]

# Compute RA scatter using circular statistics (handles wrap-around)
ra_complex = np.exp(1j * ra_rad)
Expand Down
12 changes: 9 additions & 3 deletions dsa110_continuum/calibration/selfcal.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,14 +612,20 @@ def _check_beam_response(
# Get field pointing direction
field_id = int(field) if field.isdigit() else 0

from dsa110_continuum.calibration.runner import _extract_field_ra_dec

with tb.table(f"{ms_path}/FIELD", readonly=True) as field_tab:
if field_id >= field_tab.nrows():
logger.warning(f"Field {field_id} not found in MS")
return True, 1.0

phase_dir = field_tab.getcol("PHASE_DIR")[field_id]
field_ra = phase_dir[0, 0] # radians
field_dec = phase_dir[0, 1] # radians
# Shape-tolerant: call helper on the full PHASE_DIR array, then
# index by field_id. Avoids the (1, 2) vs (2, 1) ambiguity that a
# pre-slice would create.
phase_dir = field_tab.getcol("PHASE_DIR")
ra_all, dec_all = _extract_field_ra_dec(phase_dir)
field_ra = ra_all[field_id] # radians
field_dec = dec_all[field_id] # radians

# Get pointing direction (if available)
pointing_path = f"{ms_path}/POINTING"
Expand Down
119 changes: 119 additions & 0 deletions tests/test_epoch_gaincal_field_shape.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Regression tests for the FIELD-shape fix in epoch_gaincal._read_ms_phase_center.

Pre-fix bug: `_read_ms_phase_center` did `phase_dir[:, 0, 1]` on the result of
`getcol("PHASE_DIR")`, assuming rows-first shape `(nfields, 1, 2)`. When CASA's
table backend returned the column-major shape `(nfields, 2, 1)`, the index `1`
on axis 2 (size 1) raised ``IndexError: index 1 is out of bounds for axis 2
with size 1``. That cascaded into the orchestrator's epoch-gaincal fallback
path on real DSA-110 MS files freshly converted by the CASA adapter.

Post-fix: `_read_ms_phase_center` calls
`dsa110_continuum.calibration.runner._extract_field_ra_dec`, which is shape-
tolerant (handles both rows-first and column-major + the 2-D fallback).
"""

from __future__ import annotations

import numpy as np


def _make_fake_table(phase_dir: np.ndarray):
"""Return a fake casa_tables.table replacement that returns *phase_dir*.

Mirrors the mock pattern in tests/test_skymodel_phase_dir.py — cloud-safe,
no real CASA needed.
"""
class FakeTable:
def __init__(self, path, *_, **__):
self.path = path

def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb):
return False

def colnames(self):
return ["PHASE_DIR"]

def getcol(self, name):
assert name == "PHASE_DIR"
return phase_dir

return FakeTable


def test_read_ms_phase_center_rows_first_shape(monkeypatch):
"""`(nfields, 1, 2)` — the historical rows-first shape, must keep working."""
from dsa110_continuum.adapters import casa_tables
from dsa110_continuum.calibration import epoch_gaincal

phase_dir = np.array([
[[np.radians(180.0), np.radians(22.0)]],
[[np.radians(181.0), np.radians(22.5)]],
]) # shape (2, 1, 2)
monkeypatch.setattr(casa_tables, "table", _make_fake_table(phase_dir))

ra_deg, dec_deg = epoch_gaincal._read_ms_phase_center("/fake/ms")

assert ra_deg == np.degrees(np.angle(np.mean(np.exp(1j * np.radians([180.0, 181.0]))))) % 360
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In test_read_ms_phase_center_rows_first_shape, the RA assertion uses exact float equality and also applies % 360 inside the np.degrees(...) call. Exact equality can be flaky across platforms, and the modulo should be applied after converting to degrees to match _read_ms_phase_center (otherwise negative angles would produce an incorrect expected value). Consider switching to np.testing.assert_allclose and computing the expected RA with the modulo in degrees.

Suggested change
assert ra_deg == np.degrees(np.angle(np.mean(np.exp(1j * np.radians([180.0, 181.0]))))) % 360
expected_ra = np.degrees(
np.angle(np.mean(np.exp(1j * np.radians([180.0, 181.0]))))
) % 360
np.testing.assert_allclose(ra_deg, expected_ra)

Copilot uses AI. Check for mistakes.
np.testing.assert_allclose(dec_deg, np.degrees(np.median([np.radians(22.0), np.radians(22.5)])))


def test_read_ms_phase_center_casa_column_major_shape(monkeypatch):
"""`(nfields, 2, 1)` — the CASA column-major shape that broke pre-fix.

This is the canonical regression test for the smoke-test failure on
2026-01-25 (run_2026-04-29T18_44_03Z.log line 256:
`Epoch gaincal: FAILED (index 1 is out of bounds for axis 2 with size 1)`).
"""
from dsa110_continuum.adapters import casa_tables
from dsa110_continuum.calibration import epoch_gaincal

phase_dir = np.array([
[[np.radians(180.0)], [np.radians(22.0)]],
[[np.radians(181.0)], [np.radians(22.5)]],
]) # shape (2, 2, 1)
monkeypatch.setattr(casa_tables, "table", _make_fake_table(phase_dir))

# Pre-fix this raised IndexError. Post-fix it returns plausible values.
ra_deg, dec_deg = epoch_gaincal._read_ms_phase_center("/fake/ms")

expected_ra = np.degrees(
np.angle(np.mean(np.exp(1j * np.radians([180.0, 181.0]))))
) % 360
np.testing.assert_allclose(ra_deg, expected_ra)
np.testing.assert_allclose(dec_deg, np.degrees(np.median([np.radians(22.0), np.radians(22.5)])))


def test_read_ms_phase_center_2d_fallback_shape(monkeypatch):
"""`(nfields, 2)` — the helper's third supported shape."""
from dsa110_continuum.adapters import casa_tables
from dsa110_continuum.calibration import epoch_gaincal

phase_dir = np.array([
[np.radians(180.0), np.radians(22.0)],
[np.radians(181.0), np.radians(22.5)],
]) # shape (2, 2)
monkeypatch.setattr(casa_tables, "table", _make_fake_table(phase_dir))

ra_deg, dec_deg = epoch_gaincal._read_ms_phase_center("/fake/ms")

expected_ra = np.degrees(
np.angle(np.mean(np.exp(1j * np.radians([180.0, 181.0]))))
) % 360
np.testing.assert_allclose(ra_deg, expected_ra)
np.testing.assert_allclose(dec_deg, np.degrees(np.median([np.radians(22.0), np.radians(22.5)])))


def test_read_ms_phase_center_unsupported_shape_raises(monkeypatch):
"""Helper's defensive ValueError surfaces cleanly for unrecognized shapes."""
import pytest
from dsa110_continuum.adapters import casa_tables
from dsa110_continuum.calibration import epoch_gaincal

phase_dir = np.zeros((2, 3, 4)) # not a recognized PHASE_DIR shape
monkeypatch.setattr(casa_tables, "table", _make_fake_table(phase_dir))

with pytest.raises(ValueError, match="Unsupported FIELD direction column shape"):
epoch_gaincal._read_ms_phase_center("/fake/ms")
Loading