Skip to content

Commit ae13727

Browse files
authored
Consolidate LSST schemas and enhance Alert for Pitt-Google-Broker/pull/263 (#81)
* Add Schema version_id, _init_from_bytes; remove _header_bytes * Fix #72. Refactor lsst_schema_helper to get version from avro header * Consolidate lsst schemas * Patch Alert._prep_for_publish for _ConfluentWireAvroSchema * Patch tests * Fix version_id in lsst sample alert headers * Add test_serialize_without_definition and test_unsupported_version_lsst * Patch forward reference * Bugfix path syntax and default schema name * Add Schema._name_in_bucket. Rename _init_from_bytes -> _init_from_msg. * Call Schema._init_from_msg once per Alert in from_* methods * Add Alert.name_in_bucket. Wrapper for Schema._name_in_bucket * Patch lsst schema init test * Add test test_name_in_bucket * Add alert.schema.version to _add_id_attributes * Add properties Alert.ra and Alert.dec * Add Schema._clean_for_json to replace NaN with None * Add random_alerts pytest fixture * Bugfix lsst schema name in test alerts * Add schema.version to attributes test * Change name "default_schema" -> "default" * Add default schema map * Patch TEMPLATE.yml * Add test_get_wrappers * Update CHANGELOG
1 parent 7c91733 commit ae13727

17 files changed

+279
-182
lines changed

Diff for: CHANGELOG.md

+17-4
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,26 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
1212

1313
### Added
1414

15-
- Testing:
16-
- Add test data for schema "lsst.v7_4.alert" that is randomly generated.
17-
- Add `TestAlertProperties.test_dataframe`. Currently this only tests alerts with schema "lsst.v7_4.alert".
15+
- `Alert`:
16+
- Properties: `ra`, `dec`, `name_in_bucket`.
17+
- `Schema`:
18+
- Property: `_name_in_bucket`.
19+
- Methods to support LSST schema consolidation.
20+
- Schema map for the default schema.
21+
- Unit tests:
22+
- Tests for new `Alert` properties.
23+
- Tests for LSST version and serialization.
24+
- Randomly generated data for schema "lsst.v7_4.alert".
1825

1926
### Changed
2027

21-
- Testing:
28+
- `Alert`:
29+
- `from_*` methods now call `Schema._init_from_msg` so the schema version can be retrieved from the message.
30+
- `Schema.version` is now added to the `attributes` automatically.
31+
- `Schema`:
32+
- Consolidate LSST schemas into one. Name is "lsst".
33+
- Change default schema name "default_schema" -> "default".
34+
- Unit tests:
2235
- Move `conftest.SampleAlert` -> `load_data.TestAlert`
2336
- Split `TestAlert` -> `TestAlertFrom`, `TestAlertProperties`, and `TestAlertMethods`
2437

Diff for: pittgoogle/alert.py

+48-17
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import google.cloud.pubsub_v1
2020

2121
from . import exceptions, registry, types_
22-
from .schema import (
23-
Schema, # so 'schema' module doesn't clobber 'Alert.schema' attribute
24-
)
22+
23+
# so 'schema' module doesn't clobber 'Alert.schema' attribute
24+
from .schema import Schema
2525

2626
if TYPE_CHECKING:
2727
import astropy.table
@@ -96,7 +96,7 @@ def from_cloud_functions(
9696
describes the service API endpoint pubsub.googleapis.com, the triggering topic's name,
9797
and the triggering event type `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
9898
"""
99-
return cls(
99+
alert = cls(
100100
msg=types_.PubsubMessageLike(
101101
# data is required. the rest should be present in the message, but use get to be lenient
102102
data=base64.b64decode(event["data"]),
@@ -107,6 +107,8 @@ def from_cloud_functions(
107107
context=context,
108108
schema_name=schema_name,
109109
)
110+
alert.schema._init_from_msg(alert)
111+
return alert
110112

111113
@classmethod
112114
def from_cloud_run(cls, envelope: Mapping, schema_name: str | None = None) -> "Alert":
@@ -162,7 +164,7 @@ def index():
162164
if not isinstance(envelope, dict) or "message" not in envelope:
163165
raise exceptions.BadRequest("Bad Request: invalid Pub/Sub message format")
164166

165-
return cls(
167+
alert = cls(
166168
msg=types_.PubsubMessageLike(
167169
# data is required. the rest should be present in the message, but use get to be lenient
168170
data=base64.b64decode(envelope["message"]["data"].encode("utf-8")),
@@ -173,6 +175,8 @@ def index():
173175
),
174176
schema_name=schema_name,
175177
)
178+
alert.schema._init_from_msg(alert)
179+
return alert
176180

177181
@classmethod
178182
def from_dict(
@@ -213,7 +217,9 @@ def from_msg(
213217
Alert:
214218
The created `Alert` object.
215219
"""
216-
return cls(msg=msg, schema_name=schema_name)
220+
alert = cls(msg=msg, schema_name=schema_name)
221+
alert.schema._init_from_msg(alert)
222+
return alert
217223

218224
@classmethod
219225
def from_path(cls, path: str | Path, schema_name: str | None = None) -> "Alert":
@@ -237,9 +243,11 @@ def from_path(cls, path: str | Path, schema_name: str | None = None) -> "Alert":
237243
"""
238244
with open(path, "rb") as f:
239245
bytes_ = f.read()
240-
return cls(
246+
alert = cls(
241247
msg=types_.PubsubMessageLike(data=bytes_), schema_name=schema_name, path=Path(path)
242248
)
249+
alert.schema._init_from_msg(alert)
250+
return alert
243251

244252
def to_mock_input(self, cloud_functions: bool = False):
245253
if not cloud_functions:
@@ -340,6 +348,22 @@ def sourceid(self) -> str | int:
340348
"""
341349
return self.get("sourceid")
342350

351+
@property
352+
def ra(self) -> float:
353+
"""Return the source's right ascension. Convenience wrapper around :attr:`Alert.get`.
354+
355+
The "source" is the detection that triggered the alert.
356+
"""
357+
return self.get("ra")
358+
359+
@property
360+
def dec(self) -> float:
361+
"""Return the source's declination. Convenience wrapper around :attr:`Alert.get`.
362+
363+
The "source" is the detection that triggered the alert.
364+
"""
365+
return self.get("dec")
366+
343367
@property
344368
def schema(self) -> Schema:
345369
"""Return the schema from the :class:`pittgoogle.registry.Schemas` registry.
@@ -412,22 +436,29 @@ def skymap(self) -> Union["astropy.table.QTable", None]:
412436

413437
return self._skymap
414438

439+
@property
440+
def name_in_bucket(self) -> str:
441+
"""Name of the alert object (file) in Google Cloud Storage."""
442+
return self.schema._name_in_bucket(alert=self)
443+
415444
# ---- methods ---- #
416445
def _add_id_attributes(self) -> None:
417-
"""Add the IDs ("alertid", "objectid", "sourceid") to :attr:`Alert.attributes`."""
446+
"""Add the IDs 'alertid', 'objectid', 'sourceid' and 'schema.version' to :attr:`Alert.attributes`."""
447+
# Get the data IDs and corresponding survey-specific field names. If the field is nested, the
448+
# key will be a list. Join list -> string. These are likely to become Pub/Sub message attributes.
418449
ids = ["alertid", "objectid", "sourceid"]
450+
_names = [self.get_key(id) for id in ids]
451+
names = [".".join(id) if isinstance(id, list) else id for id in _names]
419452
values = [self.get(id) for id in ids]
453+
attributes = dict(zip(names, values))
420454

421-
# get the survey-specific field names
422-
survey_names = [self.get_key(id) for id in ids]
423-
# if the field is nested, the key will be a list
424-
# but pubsub message attributes must be strings. join to avoid a future error on publish
425-
names = [".".join(id) if isinstance(id, list) else id for id in survey_names]
455+
# Add the schema version.
456+
attributes["schema.version"] = self.schema.version
426457

427-
# only add to attributes if the survey has defined this field and it's not already in the attributes
428-
for idname, idvalue in zip(names, values):
429-
if idname is not None and idname not in self._attributes:
430-
self._attributes[idname] = idvalue
458+
# Add attributes to self, but only if the survey has defined the field and it's not already there.
459+
for name, value in attributes.items():
460+
if name is not None and name not in self._attributes:
461+
self._attributes[name] = value
431462

432463
def get(self, field: str, default: Any = None) -> Any:
433464
"""Return the value of a field from the alert data.

Diff for: pittgoogle/exceptions.py

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
1010
----
1111
"""
12+
13+
1214
class BadRequest(Exception):
1315
"""Raised when a Flask request json envelope (e.g., from Cloud Run) is invalid."""
1416

Diff for: pittgoogle/registry.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
LOGGER = logging.getLogger(__name__)
2020

2121
# Load the schema manifest as a list of dicts sorted by key.
22-
manifest_yaml = (__package_path__ / "registry_manifests/schemas.yml").read_text()
22+
manifest_yaml = (__package_path__ / "registry_manifests" / "schemas.yml").read_text()
2323
SCHEMA_MANIFEST = sorted(yaml.safe_load(manifest_yaml), key=lambda schema: schema["name"])
2424

2525

@@ -79,9 +79,7 @@ def get(schema_name: str | None) -> schema.Schema:
7979
# If no schema_name provided, return the default.
8080
if schema_name is None:
8181
LOGGER.warning("No schema name provided. Returning a default schema.")
82-
mft_schema = [
83-
schema for schema in SCHEMA_MANIFEST if schema["name"] == "default_schema"
84-
][0]
82+
mft_schema = [schema for schema in SCHEMA_MANIFEST if schema["name"] == "default"][0]
8583
return schema.Schema._from_yaml(schema_dict=mft_schema)
8684

8785
# Return the schema with name == schema_name, if one exists.

Diff for: pittgoogle/registry_manifests/schemas.yml

+4-59
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
# 2: ''
1414
#
1515
# DEFAULT
16-
- name: 'default_schema'
16+
- name: 'default'
1717
description: 'Default schema used when no other schema is specified.'
18-
origin: '(null)'
18+
origin: null
1919
helper: 'default_schema_helper'
2020
#
2121
# ELASTICC alerts
@@ -33,63 +33,8 @@
3333
path: 'schemas/elasticc/elasticc.v0_9_1.brokerClassification.avsc'
3434
#
3535
# LSST alerts
36-
# [FIXME] Hacking in v7.x for now. Need to figure out install lsst-alert-packet to use lsst_auto_schema_helper
37-
# - name: 'lsst.v<MAJOR>_<MINOR>.alert'
38-
# helper: 'lsst_auto_schema_helper'
39-
- name: 'lsst.v7_4.alert'
40-
path: 'schemas/lsst/7/4/lsst.v7_4.alert.avsc'
41-
description: 'Schema for LSST alerts.'
42-
origin: 'https://github.com/lsst/alert_packet/tree/main/python/lsst/alert/packet/schema'
43-
helper: 'lsst_schema_helper'
44-
filter_map:
45-
1: 'u'
46-
2: 'g'
47-
3: 'r'
48-
4: 'i'
49-
5: 'z'
50-
6: 'y'
51-
#
52-
- name: 'lsst.v7_3.alert'
53-
path: 'schemas/lsst/7/3/lsst.v7_3.alert.avsc'
54-
description: 'Schema for LSST alerts.'
55-
origin: 'https://github.com/lsst/alert_packet/tree/main/python/lsst/alert/packet/schema'
56-
helper: 'lsst_schema_helper'
57-
filter_map:
58-
1: 'u'
59-
2: 'g'
60-
3: 'r'
61-
4: 'i'
62-
5: 'z'
63-
6: 'y'
64-
#
65-
- name: 'lsst.v7_2.alert'
66-
path: 'schemas/lsst/7/2/lsst.v7_2.alert.avsc'
67-
description: 'Schema for LSST alerts.'
68-
origin: 'https://github.com/lsst/alert_packet/tree/main/python/lsst/alert/packet/schema'
69-
helper: 'lsst_schema_helper'
70-
filter_map:
71-
1: 'u'
72-
2: 'g'
73-
3: 'r'
74-
4: 'i'
75-
5: 'z'
76-
6: 'y'
77-
#
78-
- name: 'lsst.v7_1.alert'
79-
path: 'schemas/lsst/7/1/lsst.v7_1.alert.avsc'
80-
description: 'Schema for LSST alerts.'
81-
origin: 'https://github.com/lsst/alert_packet/tree/main/python/lsst/alert/packet/schema'
82-
helper: 'lsst_schema_helper'
83-
filter_map:
84-
1: 'u'
85-
2: 'g'
86-
3: 'r'
87-
4: 'i'
88-
5: 'z'
89-
6: 'y'
90-
#
91-
- name: 'lsst.v7_0.alert'
92-
path: 'schemas/lsst/7/0/lsst.v7_0.alert.avsc'
36+
- name: 'lsst'
37+
path: 'schemas/lsst/MAJOR/MINOR/lsst.vMAJOR_MINOR.alert.avsc'
9338
description: 'Schema for LSST alerts.'
9439
origin: 'https://github.com/lsst/alert_packet/tree/main/python/lsst/alert/packet/schema'
9540
helper: 'lsst_schema_helper'

0 commit comments

Comments
 (0)