diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index 2b83b8e..062cd2c 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -86,4 +86,5 @@ jobs: tests/test_batch_pipeline_dry_run_quarantine.py \ tests/test_batch_e1_hygiene.py \ tests/test_batch_e2_hygiene.py \ - tests/test_skymodel_phase_dir.py + tests/test_skymodel_phase_dir.py \ + tests/test_epoch_gaincal_field_shape.py diff --git a/dsa110_continuum/calibration/bandpass_diagnostics.py b/dsa110_continuum/calibration/bandpass_diagnostics.py index 8b91b4a..c70076b 100644 --- a/dsa110_continuum/calibration/bandpass_diagnostics.py +++ b/dsa110_continuum/calibration/bandpass_diagnostics.py @@ -346,10 +346,12 @@ def check_geometric_setup( field_indices = list(range(24)) # Default: all fields # Check field phase centers + from dsa110_continuum.calibration.runner import _extract_field_ra_dec + with table(f"{ms_path}::FIELD", readonly=True) as tb: - phase_dir = tb.getcol("PHASE_DIR") # Shape: (nfields, 1, 2) - ra_rad = phase_dir[:, 0, 0] - dec_rad = phase_dir[:, 0, 1] + phase_dir = tb.getcol("PHASE_DIR") + # Shape-tolerant: handles both (nfields, 1, 2) and (nfields, 2, 1) + ra_rad, dec_rad = _extract_field_ra_dec(phase_dir) # Filter to selected fields field_ra = np.degrees(ra_rad[field_indices]) diff --git a/dsa110_continuum/calibration/beam_docker.py b/dsa110_continuum/calibration/beam_docker.py index 8e9c3a6..4035d61 100644 --- a/dsa110_continuum/calibration/beam_docker.py +++ b/dsa110_continuum/calibration/beam_docker.py @@ -98,13 +98,16 @@ def evaluate_beam_docker( # Extract pointing center from MS FIELD table try: from dsa110_continuum.adapters.casa_tables import table + from dsa110_continuum.calibration.runner import _extract_field_ra_dec with table(ms_path + "::FIELD") as tf: phase_dir = tf.getcol("PHASE_DIR") if field_id >= phase_dir.shape[0]: raise ValueError(f"Field ID {field_id} out of range (max={phase_dir.shape[0] - 1})") - pointing_ra_deg = float(np.degrees(phase_dir[field_id, 0, 0])) - pointing_dec_deg = float(np.degrees(phase_dir[field_id, 0, 1])) + # Shape-tolerant extraction: handles (nfields, 1, 2) and (nfields, 2, 1) + ra_all, dec_all = _extract_field_ra_dec(phase_dir) + pointing_ra_deg = float(np.degrees(ra_all[field_id])) + pointing_dec_deg = float(np.degrees(dec_all[field_id])) except Exception as e: logger.warning( f"Could not extract pointing from MS: {e}. Using source position as pointing." diff --git a/dsa110_continuum/calibration/calibration.py b/dsa110_continuum/calibration/calibration.py index faf6581..648f2ad 100644 --- a/dsa110_continuum/calibration/calibration.py +++ b/dsa110_continuum/calibration/calibration.py @@ -1624,14 +1624,18 @@ def _check_coherent_phasing( field_indices = list(range(int(parts[0]), int(parts[1]) + 1)) # Read PHASE_DIR from FIELD table + from dsa110_continuum.calibration.runner import _extract_field_ra_dec + with table(f"{ms}::FIELD", readonly=True, ack=False) as field_tb: if "PHASE_DIR" not in field_tb.colnames(): logger.warning("PHASE_DIR column not found - skipping coherence check") return - phase_dir = field_tb.getcol("PHASE_DIR") # Shape: (nfields, 1, 2) + phase_dir = field_tb.getcol("PHASE_DIR") - # Get RA values for selected fields (in radians) - ra_values = np.array([phase_dir[i, 0, 0] for i in field_indices if i < len(phase_dir)]) + # Get RA values for selected fields (in radians); helper handles both + # rows-first (nfields, 1, 2) and CASA column-major (nfields, 2, 1) shapes. + ra_all, _ = _extract_field_ra_dec(phase_dir) + ra_values = np.array([ra_all[i] for i in field_indices if i < len(phase_dir)]) if len(ra_values) < 2: return diff --git a/dsa110_continuum/calibration/dec_utils.py b/dsa110_continuum/calibration/dec_utils.py index 38ed2f9..f9b1c6c 100644 --- a/dsa110_continuum/calibration/dec_utils.py +++ b/dsa110_continuum/calibration/dec_utils.py @@ -37,9 +37,13 @@ def read_ms_dec(ms_path: str | Path, fits_fallback: str | Path | None = None) -> log.debug("read_ms_dec: casacore not available, trying FITS fallback") else: try: + from dsa110_continuum.calibration.runner import _extract_field_ra_dec + with ct.table(str(ms_path) + "::FIELD", readonly=True, ack=False) as t: - phase_dir = t.getcol("PHASE_DIR") # (n_fields, 1, 2) - dec_rad = np.median(phase_dir[:, 0, 1]) + phase_dir = t.getcol("PHASE_DIR") + # Shape-tolerant: handles (nfields, 1, 2) and (nfields, 2, 1). + _, dec_rad_all = _extract_field_ra_dec(phase_dir) + dec_rad = np.median(dec_rad_all) return float(np.degrees(dec_rad)) except Exception as e: log.warning("read_ms_dec: MS read failed (%s), trying FITS fallback", e) diff --git a/dsa110_continuum/calibration/epoch_gaincal.py b/dsa110_continuum/calibration/epoch_gaincal.py index 4757a31..3b7cdda 100644 --- a/dsa110_continuum/calibration/epoch_gaincal.py +++ b/dsa110_continuum/calibration/epoch_gaincal.py @@ -25,7 +25,7 @@ SKYMODEL_MIN_FLUX_MJY, SOURCE_QUERY_RADIUS_DEG, ) -from dsa110_continuum.calibration.runner import phaseshift_ms +from dsa110_continuum.calibration.runner import _extract_field_ra_dec, phaseshift_ms from dsa110_continuum.calibration.skymodels import ( make_unified_skymodel, predict_from_skymodel_wsclean, @@ -52,9 +52,12 @@ def _read_ms_phase_center(ms_path: str) -> tuple[float, float]: from dsa110_continuum.adapters import casa_tables as ct with ct.table(f"{ms_path}::FIELD", readonly=True, ack=False) as t: - phase_dir = t.getcol("PHASE_DIR") # shape (nfields, 1, 2) radians - ra_rad = phase_dir[:, 0, 0] - dec_rad = phase_dir[:, 0, 1] + phase_dir = t.getcol("PHASE_DIR") + # Shape-tolerant: PHASE_DIR is (nfields, 1, 2) on rows-first table backends + # and (nfields, 2, 1) when CASA returns column-major. _extract_field_ra_dec + # handles both; raw [:, 0, 1] indexing on the column-major shape raises + # IndexError on axis-2 size 1 (the original epoch_gaincal failure mode). + ra_rad, dec_rad = _extract_field_ra_dec(phase_dir) # Circular mean for RA to handle 0/360 wrap median_ra = float(np.degrees(np.angle(np.mean(np.exp(1j * ra_rad)))) % 360) median_dec = float(np.degrees(np.median(dec_rad))) diff --git a/dsa110_continuum/calibration/model.py b/dsa110_continuum/calibration/model.py index 5fa6337..fd2ab64 100644 --- a/dsa110_continuum/calibration/model.py +++ b/dsa110_continuum/calibration/model.py @@ -373,11 +373,11 @@ def _calculate_manual_model_data( logger.debug(f"Reading MS metadata directly from tables for {ms_path}") with casa_table(f"{ms_path}::FIELD", readonly=True) as field_tb: if "PHASE_DIR" in field_tb.colnames(): - phase_dir = field_tb.getcol("PHASE_DIR") # Shape: (nfields, 1, 2) + phase_dir = field_tb.getcol("PHASE_DIR") logger.debug("Using PHASE_DIR for phase centers") else: # Fallback to REFERENCE_DIR if PHASE_DIR not available - phase_dir = field_tb.getcol("REFERENCE_DIR") # Shape: (nfields, 1, 2) + phase_dir = field_tb.getcol("REFERENCE_DIR") logger.debug("PHASE_DIR not available, using REFERENCE_DIR") nfields = len(phase_dir) @@ -386,6 +386,12 @@ def _calculate_manual_model_data( chan_freq = spw_tb.getcol("CHAN_FREQ") # Shape: (nspw, nchan) nspw = len(chan_freq) + # Shape-tolerant per-field RA/Dec extraction. Handles cached metadata + # (any supported shape) AND direct getcol output, which can be rows-first + # (nfields, 1, 2) or CASA column-major (nfields, 2, 1). + from dsa110_continuum.calibration.runner import _extract_field_ra_dec + phase_ra_rad_all, phase_dec_rad_all = _extract_field_ra_dec(phase_dir) + # Log field selection if field_indices is not None: logger.debug(f"Field selection: {field_indices} ({len(field_indices)} fields)") @@ -520,9 +526,9 @@ def _calculate_manual_model_data( chunk_u = selected_u[chunk_start:chunk_end] chunk_v = selected_v[chunk_start:chunk_end] - # Get phase centers for chunk rows - phase_centers_ra_rad = phase_dir[chunk_field_id, 0, 0] - phase_centers_dec_rad = phase_dir[chunk_field_id, 0, 1] + # Get phase centers for chunk rows (using shape-tolerant arrays) + phase_centers_ra_rad = phase_ra_rad_all[chunk_field_id] + phase_centers_dec_rad = phase_dec_rad_all[chunk_field_id] # Convert to degrees phase_centers_ra_deg = np.degrees(phase_centers_ra_rad) @@ -1225,20 +1231,23 @@ def populate_model_from_catalog( # Let's assume the phaseshift has put the source at the phase center. # So we can use the phase center of the first field. phase_dir = t.getcol("PHASE_DIR") - # field argument might be "0~23" or "0". - # We'll just take field 0 of the MS (since it's likely phaseshifted) - # Or parse field. - # Let's just use the first field in the MS for center. - ra_rad = phase_dir[0, 0, 0] - dec_rad = phase_dir[0, 0, 1] + from dsa110_continuum.calibration.runner import _extract_field_ra_dec + # Shape-tolerant: handles (nfields, 1, 2) and (nfields, 2, 1). + # field argument might be "0~23" or "0"; use field 0 (assumed phaseshifted). + ra_all, dec_all = _extract_field_ra_dec(phase_dir) + ra_rad = ra_all[0] + dec_rad = dec_all[0] center_ra = np.degrees(ra_rad) center_dec = np.degrees(dec_rad) else: # No name, no coords -> use MS field center with tb.table(f"{ms_path}::FIELD", readonly=True) as t: phase_dir = t.getcol("PHASE_DIR") - ra_rad = phase_dir[0, 0, 0] - dec_rad = phase_dir[0, 0, 1] + from dsa110_continuum.calibration.runner import _extract_field_ra_dec + # Shape-tolerant: handles (nfields, 1, 2) and (nfields, 2, 1). + ra_all, dec_all = _extract_field_ra_dec(phase_dir) + ra_rad = ra_all[0] + dec_rad = dec_all[0] center_ra = np.degrees(ra_rad) center_dec = np.degrees(dec_rad) diff --git a/dsa110_continuum/calibration/preconditions.py b/dsa110_continuum/calibration/preconditions.py index ae49b24..c86128d 100644 --- a/dsa110_continuum/calibration/preconditions.py +++ b/dsa110_continuum/calibration/preconditions.py @@ -167,9 +167,13 @@ def _check_calibrator_transit( t_end = Time(t_end_mjd, format="mjd") # Get phase center RA + from dsa110_continuum.calibration.runner import _extract_field_ra_dec + with ct.table(f"{ms_path}::FIELD", readonly=True, ack=False) as tb: phase_dir = tb.getcol("PHASE_DIR") - phase_ra_rad = phase_dir[0, 0, 0] + # Shape-tolerant: handles (nfields, 1, 2) and (nfields, 2, 1). + ra_rad_all, _ = _extract_field_ra_dec(phase_dir) + phase_ra_rad = ra_rad_all[0] phase_ra_deg = float(np.degrees(phase_ra_rad)) % 360.0 if phase_ra_deg < 0: phase_ra_deg += 360.0 @@ -270,16 +274,19 @@ def _check_coherent_phasing( else: field_indices = None # All fields + from dsa110_continuum.calibration.runner import _extract_field_ra_dec + with ct.table(f"{ms_path}::FIELD", readonly=True, ack=False) as tb: - phase_dir = tb.getcol("PHASE_DIR") # Shape: (nfields, 1, 2) + phase_dir = tb.getcol("PHASE_DIR") n_fields = tb.nrows() if field_indices is None: field_indices = list(range(n_fields)) - # Extract RA/Dec for selected fields - ra_rad = phase_dir[field_indices, 0, 0] - dec_rad = phase_dir[field_indices, 0, 1] + # Shape-tolerant extraction; then index by field_indices. + ra_all, dec_all = _extract_field_ra_dec(phase_dir) + ra_rad = ra_all[field_indices] + dec_rad = dec_all[field_indices] # Compute RA scatter using circular statistics (handles wrap-around) ra_complex = np.exp(1j * ra_rad) diff --git a/dsa110_continuum/calibration/selfcal.py b/dsa110_continuum/calibration/selfcal.py index 9fcf069..05e3d18 100644 --- a/dsa110_continuum/calibration/selfcal.py +++ b/dsa110_continuum/calibration/selfcal.py @@ -612,14 +612,20 @@ def _check_beam_response( # Get field pointing direction field_id = int(field) if field.isdigit() else 0 + from dsa110_continuum.calibration.runner import _extract_field_ra_dec + with tb.table(f"{ms_path}/FIELD", readonly=True) as field_tab: if field_id >= field_tab.nrows(): logger.warning(f"Field {field_id} not found in MS") return True, 1.0 - phase_dir = field_tab.getcol("PHASE_DIR")[field_id] - field_ra = phase_dir[0, 0] # radians - field_dec = phase_dir[0, 1] # radians + # Shape-tolerant: call helper on the full PHASE_DIR array, then + # index by field_id. Avoids the (1, 2) vs (2, 1) ambiguity that a + # pre-slice would create. + phase_dir = field_tab.getcol("PHASE_DIR") + ra_all, dec_all = _extract_field_ra_dec(phase_dir) + field_ra = ra_all[field_id] # radians + field_dec = dec_all[field_id] # radians # Get pointing direction (if available) pointing_path = f"{ms_path}/POINTING" diff --git a/tests/test_epoch_gaincal_field_shape.py b/tests/test_epoch_gaincal_field_shape.py new file mode 100644 index 0000000..bc4b158 --- /dev/null +++ b/tests/test_epoch_gaincal_field_shape.py @@ -0,0 +1,119 @@ +"""Regression tests for the FIELD-shape fix in epoch_gaincal._read_ms_phase_center. + +Pre-fix bug: `_read_ms_phase_center` did `phase_dir[:, 0, 1]` on the result of +`getcol("PHASE_DIR")`, assuming rows-first shape `(nfields, 1, 2)`. When CASA's +table backend returned the column-major shape `(nfields, 2, 1)`, the index `1` +on axis 2 (size 1) raised ``IndexError: index 1 is out of bounds for axis 2 +with size 1``. That cascaded into the orchestrator's epoch-gaincal fallback +path on real DSA-110 MS files freshly converted by the CASA adapter. + +Post-fix: `_read_ms_phase_center` calls +`dsa110_continuum.calibration.runner._extract_field_ra_dec`, which is shape- +tolerant (handles both rows-first and column-major + the 2-D fallback). +""" + +from __future__ import annotations + +import numpy as np + + +def _make_fake_table(phase_dir: np.ndarray): + """Return a fake casa_tables.table replacement that returns *phase_dir*. + + Mirrors the mock pattern in tests/test_skymodel_phase_dir.py — cloud-safe, + no real CASA needed. + """ + class FakeTable: + def __init__(self, path, *_, **__): + self.path = path + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def colnames(self): + return ["PHASE_DIR"] + + def getcol(self, name): + assert name == "PHASE_DIR" + return phase_dir + + return FakeTable + + +def test_read_ms_phase_center_rows_first_shape(monkeypatch): + """`(nfields, 1, 2)` — the historical rows-first shape, must keep working.""" + from dsa110_continuum.adapters import casa_tables + from dsa110_continuum.calibration import epoch_gaincal + + phase_dir = np.array([ + [[np.radians(180.0), np.radians(22.0)]], + [[np.radians(181.0), np.radians(22.5)]], + ]) # shape (2, 1, 2) + monkeypatch.setattr(casa_tables, "table", _make_fake_table(phase_dir)) + + ra_deg, dec_deg = epoch_gaincal._read_ms_phase_center("/fake/ms") + + assert ra_deg == np.degrees(np.angle(np.mean(np.exp(1j * np.radians([180.0, 181.0]))))) % 360 + np.testing.assert_allclose(dec_deg, np.degrees(np.median([np.radians(22.0), np.radians(22.5)]))) + + +def test_read_ms_phase_center_casa_column_major_shape(monkeypatch): + """`(nfields, 2, 1)` — the CASA column-major shape that broke pre-fix. + + This is the canonical regression test for the smoke-test failure on + 2026-01-25 (run_2026-04-29T18_44_03Z.log line 256: + `Epoch gaincal: FAILED (index 1 is out of bounds for axis 2 with size 1)`). + """ + from dsa110_continuum.adapters import casa_tables + from dsa110_continuum.calibration import epoch_gaincal + + phase_dir = np.array([ + [[np.radians(180.0)], [np.radians(22.0)]], + [[np.radians(181.0)], [np.radians(22.5)]], + ]) # shape (2, 2, 1) + monkeypatch.setattr(casa_tables, "table", _make_fake_table(phase_dir)) + + # Pre-fix this raised IndexError. Post-fix it returns plausible values. + ra_deg, dec_deg = epoch_gaincal._read_ms_phase_center("/fake/ms") + + expected_ra = np.degrees( + np.angle(np.mean(np.exp(1j * np.radians([180.0, 181.0])))) + ) % 360 + np.testing.assert_allclose(ra_deg, expected_ra) + np.testing.assert_allclose(dec_deg, np.degrees(np.median([np.radians(22.0), np.radians(22.5)]))) + + +def test_read_ms_phase_center_2d_fallback_shape(monkeypatch): + """`(nfields, 2)` — the helper's third supported shape.""" + from dsa110_continuum.adapters import casa_tables + from dsa110_continuum.calibration import epoch_gaincal + + phase_dir = np.array([ + [np.radians(180.0), np.radians(22.0)], + [np.radians(181.0), np.radians(22.5)], + ]) # shape (2, 2) + monkeypatch.setattr(casa_tables, "table", _make_fake_table(phase_dir)) + + ra_deg, dec_deg = epoch_gaincal._read_ms_phase_center("/fake/ms") + + expected_ra = np.degrees( + np.angle(np.mean(np.exp(1j * np.radians([180.0, 181.0])))) + ) % 360 + np.testing.assert_allclose(ra_deg, expected_ra) + np.testing.assert_allclose(dec_deg, np.degrees(np.median([np.radians(22.0), np.radians(22.5)]))) + + +def test_read_ms_phase_center_unsupported_shape_raises(monkeypatch): + """Helper's defensive ValueError surfaces cleanly for unrecognized shapes.""" + import pytest + from dsa110_continuum.adapters import casa_tables + from dsa110_continuum.calibration import epoch_gaincal + + phase_dir = np.zeros((2, 3, 4)) # not a recognized PHASE_DIR shape + monkeypatch.setattr(casa_tables, "table", _make_fake_table(phase_dir)) + + with pytest.raises(ValueError, match="Unsupported FIELD direction column shape"): + epoch_gaincal._read_ms_phase_center("/fake/ms")