diff --git a/CHANGELOG.md b/CHANGELOG.md index 37b3dd0..37c78cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,33 +12,40 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Added -- ZTF Figures Tutorial +- `Alert` and `Table` classes. +- Registry for alert schemas and GCP Project IDs. +- Alert schemas (Avro) and schema maps (yaml). +- Exceptions: `BadRequest` and `SchemaNotFoundError`. +- Types: `PubsubMessageLike` and `Schema`. +- ZTF Figures Tutorial ### Changed -- update README.md to point to the new docs -- remove setup and requirements files that are no longer needed after switching away from Read The Docs +- Update PubSub classes. +- update README.md to point to the new docs +- remove setup and requirements files that are no longer needed after switching away from Read The Docs ### Removed -- `figures` module (content moved to tutorial). This allowed the removal of the following explicit +- `figures` module (content moved to tutorial). This allowed the removal of the following explicit dependencies: `aplpy`, `matplotlib`, `numpy`. +- v0.1 BigQuery functions. ## \[0.2.0\] - 2023-07-02 ### Added -- `auth` module supporting authentication via a service account or oauth2 -- `exceptions` module with class `OpenAlertError` -- "Overview" section in docs -- classes in `utils` module: `ProjectIds`, `Cast` -- files: `CHANGELOG.md`, `pittgoogle_env.yml` +- `auth` module supporting authentication via a service account or oauth2 +- `exceptions` module with class `OpenAlertError` +- "Overview" section in docs +- classes in `utils` module: `ProjectIds`, `Cast` +- files: `CHANGELOG.md`, `pittgoogle_env.yml` ### Changed -- Overhaul the `pubsub` module. Add classes `Topic`, `Subscription`, `Consumer`, `Alert`, +- Overhaul the `pubsub` module. Add classes `Topic`, `Subscription`, `Consumer`, `Alert`, `Response`. ### Fixed -- cleanup some issues flagged by Codacy +- cleanup some issues flagged by Codacy diff --git a/pittgoogle/__init__.py b/pittgoogle/__init__.py index 47238f1..2da4e88 100644 --- a/pittgoogle/__init__.py +++ b/pittgoogle/__init__.py @@ -9,7 +9,12 @@ except ImportError: # for Python<3.8 import importlib_metadata as metadata -from . import auth, bigquery, exceptions, pubsub, utils +from . import alert, auth, bigquery, exceptions, pubsub, registry, types_, utils +from .alert import Alert +from .auth import Auth +from .bigquery import Table +from .pubsub import Consumer, Subscription, Topic +from .registry import ProjectIds, Schemas __version__ = metadata.version("pittgoogle-client") diff --git a/pittgoogle/alert.py b/pittgoogle/alert.py new file mode 100644 index 0000000..b6c2908 --- /dev/null +++ b/pittgoogle/alert.py @@ -0,0 +1,408 @@ +# -*- coding: UTF-8 -*- +"""Classes to facilitate working with astronomical alerts. + +.. contents:: + :local: + :depth: 2 + +Usage Examples +--------------- + +Load an alert from disk: + +.. code-block:: python + + import pittgoogle + + path = "path/to/ztf_alert.avro" # point this to a file containing an alert + alert = pittgoogle.Alert.from_path(path, schema_name="ztf") + +API +---- + +""" +import importlib.resources +import io +import logging +from datetime import datetime +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, Optional, Union + +import fastavro +from attrs import define, field + +from . import registry, types_, utils +from .exceptions import BadRequest, OpenAlertError, SchemaNotFoundError + +if TYPE_CHECKING: + import google._upb._message + import google.cloud.pubsub_v1 + import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run + +LOGGER = logging.getLogger(__name__) +PACKAGE_DIR = importlib.resources.files(__package__) + + +@define(kw_only=True) +class Alert: + """Pitt-Google container for an astronomical alert. + + Recommended to instantiate using one of the `from_*` methods. + + All parameters are keyword only. + + Parameters + ------------ + bytes : `bytes`, optional + The message payload, as returned by Pub/Sub. It may be Avro or JSON serialized depending + on the topic. + dict : `dict`, optional + The message payload as a dictionary. + metadata : `dict`, optional + The message metadata. + msg : `google.cloud.pubsub_v1.types.PubsubMessage`, optional + The Pub/Sub message object, documented at + ``__. + schema_name : `str` + One of (case insensitive): + - ztf + - ztf.lite + - elasticc.v0_9_1.alert + - elasticc.v0_9_1.brokerClassification + Schema name of the alert. Used for unpacking. If not provided, some properties of the + `Alert` may not be available. + """ + + msg: Optional[ + Union["google.cloud.pubsub_v1.types.PubsubMessage", types_.PubsubMessageLike] + ] = field(default=None) + """Incoming Pub/Sub message object.""" + _attributes: Optional[Union[Dict, "google._upb._message.ScalarMapContainer"]] = field( + default=None + ) + _dict: Optional[Dict] = field(default=None) + _dataframe: Optional["pd.DataFrame"] = field(default=None) + schema_name: Optional[str] = field(default=None) + _schema: Optional[types_.Schema] = field(default=None, init=False) + path: Optional[Path] = field(default=None) + + # ---- class methods ---- # + @classmethod + def from_cloud_run(cls, envelope: Dict, schema_name: Optional[str] = None) -> "Alert": + """Create an `Alert` from an HTTP request envelope containing a Pub/Sub message, as received by a Cloud Run module. + + Example code for a Cloud Run module that uses this method to open a ZTF alert: + + .. code-block:: python + + import pittgoogle + # flask is used to work with HTTP requests, which trigger Cloud Run modules + # the request contains the Pub/Sub message, which contains the alert packet + import flask + + app = flask.Flask(__name__) + + # function that receives the request + @app.route("/", methods=["POST"]) + def index(): + + try: + # unpack the alert + # if the request does not contain a valid message, this raises a `BadRequest` + alert = pittgoogle.Alert.from_cloud_run(envelope=flask.request.get_json(), schema_name="ztf") + + except pg.exceptions.BadRequest as exc: + # return the error text and an HTTP 400 Bad Request code + return str(exc), 400 + + # continue processing the alert + # when finished, return an empty string and an HTTP success code + return "", 204 + """ + # check whether received message is valid, as suggested by Cloud Run docs + if not envelope: + raise BadRequest("Bad Request: no Pub/Sub message received") + if not isinstance(envelope, dict) or "message" not in envelope: + raise BadRequest("Bad Request: invalid Pub/Sub message format") + + # convert the message publish_time string -> datetime + # occasionally the string doesn't include microseconds so we need a try/except + publish_time = envelope["message"]["publish_time"].replace("Z", "+00:00") + try: + publish_time = datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S.%f%z") + except ValueError: + publish_time = datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S%z") + + return cls( + msg=types_.PubsubMessageLike( + # this class requires data. the rest should be present in the message, but let's be lenient + data=envelope["message"]["data"], + attributes=envelope["message"].get("attributes"), + message_id=envelope["message"].get("message_id"), + publish_time=publish_time, + ordering_key=envelope["message"].get("ordering_key"), + ), + schema_name=schema_name, + ) + + @classmethod + def from_dict( + cls, + payload: Dict, + attributes: Optional[Union[Dict, "google._upb._message.ScalarMapContainer"]] = None, + schema_name: Optional[str] = None, + ) -> "Alert": # [TODO] update tom_desc to use this + """Create an `Alert` from a dictionary (`payload`).""" + return cls(dict=payload, attributes=attributes, schema_name=schema_name) + + @classmethod + def from_msg( + cls, msg: "google.cloud.pubsub_v1.types.PubsubMessage", schema_name: Optional[str] = None + ) -> "Alert": # [TODO] update tom_desc to use this + """Create an `Alert` from a `google.cloud.pubsub_v1.types.PubsubMessage`.""" + return cls(msg=msg, schema_name=schema_name) + + @classmethod + def from_path(cls, path: Union[str, Path], schema_name: Optional[str] = None) -> "Alert": + """Create an `Alert` from the file at `path`.""" + with open(path, "rb") as f: + bytes_ = f.read() + return cls( + msg=types_.PubsubMessageLike(data=bytes_), schema_name=schema_name, path=Path(path) + ) + + # ---- properties ---- # + @property + def attributes(self) -> Dict: + """Custom metadata for the message. Pub/Sub handles this as a dict-like called "attributes". + + If this was not set when the `Alert` was instantiated, a new dictionary will be created using + the `attributes` field in :attr:`pittgoogle.Alert.msg` the first time it is requested. + Update this dictionary as desired (it will not affect the original `msg`). + When publishing the alert using :attr:`pittgoogle.Topic.publish`, this dictionary will be + sent as the Pub/Sub message attributes. + """ + if self._attributes is None: + self._attributes = dict(self.msg.attributes) + return self._attributes + + @property + def dict(self) -> Dict: + """Alert data as a dictionary. Created from `self.msg.data`, if needed. + + Raises + ------ + :class:`pittgoogle.exceptions.OpenAlertError` + if unable to deserialize the alert bytes. + """ + if self._dict is not None: + return self._dict + + # deserialize self.msg.data (avro or json bytestring) into a dict. + # if self.msg.data is either (1) json; or (2) avro that contains the schema in the header, + # self.schema is not required for deserialization, so we want to be lenient. + # if self.msg.data is schemaless avro, deserialization requires self.schema.avsc to exist. + # currently, there is a clean separation between surveys: + # elasticc always requires self.schema.avsc; ztf never does. + # we'll check the survey name from self.schema.survey; but first we need to check whether + # the schema exists so we can try to continue without one instead of raising an error. + # we may want or need to handle this differently in the future. + try: + self.schema + except SchemaNotFoundError as exc: + LOGGER.warning(f"schema not found. attempting to deserialize without it. {exc}") + avro_schema = None + else: + if self.schema.survey in ["elasticc"]: + avro_schema = self.schema.avsc + else: + avro_schema = None + + # if we have an avro schema, use it to deserialize and return + if avro_schema: + with io.BytesIO(self.msg.data) as fin: + self._dict = fastavro.schemaless_reader(fin, avro_schema) + return self._dict + + # [TODO] this should be rewritten to catch specific errors + # for now, just try avro then json, catching basically all errors in the process + try: + self._dict = utils.Cast.avro_to_dict(self.msg.data) + except Exception: + try: + self._dict = utils.Cast.json_to_dict(self.msg.data) + except Exception: + raise OpenAlertError("failed to deserialize the alert bytes") + return self._dict + + @property + def dataframe(self) -> "pd.DataFrame": + if self._dataframe is not None: + return self._dataframe + + import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run + + # sources and previous sources are expected to have the same fields + sources_df = pd.DataFrame([self.get("source")] + self.get("prv_sources")) + # sources and forced sources may have different fields + forced_df = pd.DataFrame(self.get("prv_forced_sources")) + + # use nullable integer data type to avoid converting ints to floats + # for columns in one dataframe but not the other + sources_ints = [c for c, v in sources_df.dtypes.items() if v == int] + sources_df = sources_df.astype( + {c: "Int64" for c in set(sources_ints) - set(forced_df.columns)} + ) + forced_ints = [c for c, v in forced_df.dtypes.items() if v == int] + forced_df = forced_df.astype( + {c: "Int64" for c in set(forced_ints) - set(sources_df.columns)} + ) + + self._dataframe = pd.concat([sources_df, forced_df], ignore_index=True) + return self._dataframe + + @property + def alertid(self) -> Union[str, int]: + """Convenience property to get the alert ID. + + If the survey does not define an alert ID, this returns the `sourceid`. + """ + return self.get("alertid", self.sourceid) + + @property + def objectid(self) -> Union[str, int]: + """Convenience property to get the object ID. + + The "object" represents a collection of sources, as determined by the survey. + """ + return self.get("objectid") + + @property + def sourceid(self) -> Union[str, int]: + """Convenience property to get the source ID. + + The "source" is the detection that triggered the alert. + """ + return self.get("sourceid") + + @property + def schema(self) -> types_.Schema: + """Loads the schema from the registry :class:`pittgoogle.registry.Schemas`. + + Raises + ------ + :class:`pittgoogle.exceptions.SchemaNotFoundError` + if the `schema_name` is not supplied or a schema with this name is not found + """ + if self._schema is not None: + return self._schema + + # need to load the schema. raise an error if no schema_name given + if self.schema_name is None: + raise SchemaNotFoundError("a schema_name is required") + + # this also may raise SchemaNotFoundError + self._schema = registry.Schemas.get(self.schema_name) + return self._schema + + # ---- methods ---- # + def add_id_attributes(self) -> None: + """Add the IDs to the attributes.""" + ids = ["alertid", "objectid", "sourceid"] + values = [self.get(id) for id in ids] + + # get the survey-specific field names + survey_names = [self.get_key(id) for id in ids] + # if the field is nested, the key will be a list + # but pubsub message attributes must be strings. join to avoid a future error on publish + names = [".".join(id) if isinstance(id, list) else id for id in survey_names] + + # only add to attributes if the survey has defined this field + for idname, idvalue in zip(names, values): + if idname is not None: + self.attributes[idname] = idvalue + + def get(self, field: str, default: Any = None) -> Any: + """Return the value of `field` in this alert. + + The keys in the alert dictionary :attr:`pittgoogle.alert.Alert.dict` are survey-specific field names. + This method allows you to `get` values from the dict using generic names that will work across + surveys. `self.schema.map` is the mapping of generic -> survey-specific names. + To access a field using a survey-specific name, get it directly from the alert `dict`. + + Parameters + ---------- + field : str + Name of a field in the alert's schema. This must be one of the keys in the dict `self.schema.map`. + default : str or None + Default value to be returned if the field is not found. + + Returns + ------- + value : any + Value in the :attr:`pittgoogle.alert.Alert.dict` corresponding to this field. + """ + survey_field = self.schema.map.get(field) # str, list[str], or None + + if survey_field is None: + return default + + if isinstance(survey_field, str): + return self.dict.get(survey_field, default) + + # if survey_field is not one of the expected types, the schema map is malformed + # maybe this was intentional, but we don't know how to handle it here + if not isinstance(survey_field, list): + raise TypeError( + f"field lookup not implemented for a schema-map value of type {type(survey_field)}" + ) + + # the list must have more than 1 item, else it would be a single str + if len(survey_field) == 2: + try: + return self.dict[survey_field[0]][survey_field[1]] + except KeyError: + return default + + if len(survey_field) == 3: + try: + return self.dict[survey_field[0]][survey_field[1]][survey_field[2]] + except KeyError: + return default + + raise NotImplementedError( + f"field lookup not implemented for depth {len(survey_field)} (key = {survey_field})" + ) + + def get_key( + self, field: str, name_only: bool = False, default: Optional[str] = None + ) -> Optional[Union[str, list[str]]]: + """Return the survey-specific field name. + + Parameters + ---------- + field : str + Generic field name whose survey-specific name is to be returned. This must be one of the + keys in the dict `self.schema.map`. + name_only : bool + In case the survey-specific field name is nested below the top level, whether to return + just the single final name as a str (True) or the full path as a list[str] (False). + default : str or None + Default value to be returned if the field is not found. + + Returns + ------- + survey_field : str or list[str] + Survey-specific name for the `field`, or `default` if the field is not found. + list[str] if this is a nested field and `name_only` is False, else str with the + final field name only. + """ + survey_field = self.schema.map.get(field) # str, list[str], or None + + if survey_field is None: + return default + + if name_only and isinstance(survey_field, list): + return survey_field[-1] + + return survey_field diff --git a/pittgoogle/bigquery.py b/pittgoogle/bigquery.py index 9da91b7..d0e3144 100644 --- a/pittgoogle/bigquery.py +++ b/pittgoogle/bigquery.py @@ -1,692 +1,165 @@ # -*- coding: UTF-8 -*- -"""The ``bigquery`` module facilitates querying Pitt-Google Broker's -BigQuery databases and reading the results. -See the tutorial for usage help. -""" -from typing import Generator, List, Optional, Tuple, Union - -import astropy -import pandas as pd -from astropy import coordinates as coord -from google.cloud import bigquery -from tabulate import tabulate - -from .utils import ProjectIds - - -pgb_project_id = ProjectIds.pittgoogle - -# --- BigQuery Client -user_bq_client, user_project_id = None, None # module's global Client, related id - - -def create_client(project_id: str): - """Open a BigQuery Client. - - Args: - project_id: User's Google Cloud Platform project ID - """ - - global user_bq_client - global user_project_id - - # instantiate the client - print(f"\nInstantiating a BigQuery client with project_id: {project_id}\n") - user_bq_client = bigquery.Client(project=project_id) - - # if the user passed a bad project_id, we won't know it yet. Let's check - _create_client_raise_exception_if_not_connected(project_id) - - # client is connected. set the global user_project_id - user_project_id = project_id - - -def _create_client_raise_exception_if_not_connected(project_id: str): - """Checks that the user's client can successfully connect to our tables - by executing a dry run query. - """ - - global user_bq_client - - query = f"SELECT candid FROM `{pgb_project_id}.ztf_alerts.salt2`" - try: - dry_run(query, notify=False) - except: - user_bq_client = None # reset so the user can try again - msg = ( - "You have tried to create a BigQuery Client with the project_id:\n" - f"\t{project_id}\n" - "But the Client cannot connect to the Pitt-Google Broker.\n" - "Check that your project_id is valid " - "(e.g., it should not be wrapped in quotes)." - ) - raise ValueError(msg) - - -def _check_client_isinstance(): - msg = ( - "You must create a BigQuery client first. " - "Run `pittgoogle.bigquery.create_client('your_project_id')`" - ) - assert isinstance(user_bq_client, bigquery.client.Client), msg - - -def _create_client_if_needed(): - stop = False # will be set to True if the user chooses to exit - - try: - _check_client_isinstance() - - except AssertionError: - # help the user open a bigquery client - msg = ( - "\nTo run queries, you must first open a BigQuery Client.\n" - "Enter your Google Cloud Platform project ID now " - "or exit (just press Enter) and run\n" - "`pittgoogle.bigquery.create_client(my_project_id)`\n" - "\nProject ID: " - ) - project_id = input(msg) or "" - - if project_id == "": - stop = True # user wants to exit rather than creating a client - else: - create_client(project_id) - - return stop - - -# --- Get information about PGB datasets and tables -def get_table_info(table: Union[str, list] = "all", dataset: str = "ztf_alerts"): - """Retrieves and prints BigQuery table schemas. - - Args: - - table: Name of the BigQuery table or list of the same. - 'all' will print the info for all tables in the dataset. - - dataset: Name of BigQuery dataset that the table(s) belong to. - """ - - # if a bigquery Client does not exist, help the user instantiate one - stop = _create_client_if_needed() - if stop: # the user has chosen to exit rather than create a client - return - - # get the table names in a list - if table == "all": - tables = get_dataset_table_names(dataset=dataset) - elif isinstance(table, str): - tables = [table] - else: - tables = table - - # get and print info about each table - for t in tables: - df = get_table_schema(table=t, dataset=dataset) - - # print the metadata and column info - print(df.table_name) - print(tabulate(df, headers="keys", tablefmt="grid")) # psql - print(f"\n{df.table_name} has {df.num_rows} rows.\n") - - -def get_table_schema(table: str, dataset: str = "ztf_alerts") -> pd.DataFrame: - """Retrieves information about the columns in a BigQuery table and returns - it as a DataFrame. - - Args: - table: Name of the BigQuery table - dataset: Name of BigQuery dataset that the table(s) belong to. - Returns - Column information from the BigQuery table schema. - """ - - # if a bigquery Client does not exist, help the user instantiate one - stop = _create_client_if_needed() - if stop: # the user has chosen to exit rather than create a client - return - - bqtable = user_bq_client.get_table(f"{pgb_project_id}.{dataset}.{table}") - cols = [] - for field in bqtable.schema: - cols.append((field.name, field.description, field.field_type)) - - if field.field_type == "RECORD": - for subfield in field.fields: - cols.append( - ( - f"{field.name}.{subfield.name}", - subfield.description, - subfield.field_type, - ) - ) - - # cols = [(s.name, s.description, s.field_type, s.mode) for s in bqtable.schema] - colnames = ["column_name", "description", "type"] - df = pd.DataFrame(cols, columns=colnames) - - # add some metadata - df.table_name = f"{bqtable.project}.{bqtable.dataset_id}.{bqtable.table_id}" - df.num_rows = bqtable.num_rows - - return df - - -def get_dataset_table_names(dataset: str = "ztf_alerts") -> List[str]: - """ - Args: - dataset: Name of the BigQuery dataset. - - Returns: - List of table names in the dataset. - """ - - # if a bigquery Client does not exist, help the user instantiate one - stop = _create_client_if_needed() - if stop: # the user has chosen to exit rather than create a client - return - - print(f"Getting table names for dataset: {dataset}") - - query = "SELECT * " f"FROM {pgb_project_id}.{dataset}.INFORMATION_SCHEMA.TABLES" - query_job = user_bq_client.query(query) - tables = [row["table_name"] for row in query_job] - tables.sort(key=str.lower) - return tables - - -# --- Setup to query for object histories -def get_history_column_names() -> List[str]: - """ - It would be convenient to also return the column descriptions, but - that is more complicated, and this function will be completely - obsolete if we change the database structure to store only the - "candidate" observation and metadata. - - Returns: - Column names appropriate for querying object histories. - """ - - dropcols = ["prv_candidates", "cutoutScience", "cutoutDifference", "cutoutTemplate"] - - sdf = get_table_schema("alerts") - schemacols = list(sdf["column_name"]) - - # drop the prv_candidates and cutout columns - historycols = [c for c in schemacols if c.split(".")[0] not in dropcols] - - # drop the full "candidate" RECORD column - historycols.remove("candidate") - - # drop "candidate.candid" as it is simply a repeat of "candid" - historycols.remove("candidate.candid") - - # strip out "candidate." from nested columns - # query_objects() uses only the base names - historycols = [c.replace("candidate.", "") for c in historycols] - - return historycols - - -def check_history_column_names(columns: List[str]) -> Union[List[str], bool]: - """Make sure user-submitted column names are appropriate to query object histories.""" - - -def _split_good_bad_history_column_names( - columns: List[str], -) -> Tuple[List[str], List[str]]: - """Split columns list into "good" and "bad" according to whether they are - suitable for querying an object's history. - """ - - badcols = list(set(columns) - set(get_history_column_names())) - goodcols = columns.copy() - for bc in badcols: - goodcols.remove(bc) - return (goodcols, badcols) - - -def object_history_sql_statement( - columns: List[str], objectIds: Optional[list] = None, limit: Optional[int] = None -) -> str: - """Convince function that generates the SQL string needed to - query the alerts table and aggregate data by objectId. - When the resulting SQL query is executed, the query job will contain - one row for each objectId, with the object's data aggregated into - arrays (one array per column in columns) ordered by the observation date. - - Note: Arrays may contain duplicated observations; it is the user's - responsibility to clean them. - - Args: - columns: Names of columns to select from the alerts table. - The 'objectId' and 'candid' columns are automatically included - and do not need to be in this list. - objectIds: IDs of ZTF objects to include in the query. - limit: Maximum number of rows to be returned. - - Returns: - SQL statement to query the alerts table and aggregate data by objectId. - """ - - dataset = "ztf_alerts" - table = "alerts" - objectcols = [ - "objectId", - ] # columns unique to an object - # make sure 'candid' is in columns. (objectcols handled separately) - columns = list(set(columns).union(set(["candid"]))) - - # SELECT statement - # create a list of strings that will aggregate columns into arrays - aggcols = _list_aggcols_sql_statements(columns) - selects = f'SELECT {", ".join(objectcols + aggcols)}' - - # FROM statement - froms = f"FROM `{pgb_project_id}.{dataset}.{table}`" - # concat the statements into the beginning of a SQL query statement - sqlquery = " ".join([selects, froms]) +"""Classes to facilitate connections to BigQuery datasets and tables. - # WHERE statement - if objectIds is not None: - # wrap each objectId in quotes and join to single string - oids = ",".join([f'"{o}"' for o in objectIds]) - wheres = f"WHERE objectId IN ({oids})" - # concat the statements into a SQL query statement - sqlquery = " ".join([sqlquery, wheres]) +.. contents:: + :local: + :depth: 2 - # GROUP BY statement - groupbys = "GROUP BY objectId" - sqlquery = " ".join([sqlquery, groupbys]) +.. note:: - # LIMIT statement - if limit is not None: - limits = f"LIMIT {limit}" - sqlquery = " ".join([sqlquery, limits]) + This module relies on :mod:`pittgoogle.auth` to authenticate API calls. + The examples given below assume the use of a :ref:`service account ` and + :ref:`environment variables `. In this case, :mod:`pittgoogle.auth` does not + need to be called explicitly. - return sqlquery +Usage Examples +--------------- +.. code-block:: python -def _list_aggcols_sql_statements(columns: List[str]) -> List[str]: - """Create a list of SQL string query segments that will aggregate - all columns not in objectcols. - """ - - objectcols = [ - "objectId", - ] - flatcols = [ - "schemavsn", - "publisher", - "candid", - ] - - # list of requested flatcols - fcols = list(set(columns) & set(flatcols)) - # list of requested columns nested under 'candidate' - ncols = list(set(columns) - set(objectcols) - set(flatcols)) - ncols = [f"candidate.{c}" for c in ncols] - # complete list of columns to be aggregated (group by) objectId - aggcols = fcols + ncols - # attach the ARRAY_AGG, ORDER By, and AS statements to the aggcols - aggcols = [f'ARRAY_AGG({c} ORDER BY candidate.jd) AS {c.split(".")[-1]}' for c in aggcols] - - return aggcols - - -# --- Dry runs -def dry_run(query: str, notify: bool = True): - """Perform a dry run to find out how many bytes the query will process. - Args: - query: SQL query statement - """ - - global user_project_id - _check_client_isinstance() # make sure we have a bigquery.client - - job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) - query_job = user_bq_client.query(query, job_config=job_config) - - if notify: - nbytes, TiB = query_job.total_bytes_processed, 2**40 - pTiB = nbytes / TiB * 100 # nbytes as a percent of 1 TiB - print("\nQuery statement:") - print(f'\n"{query}"\n') - print(f"will process {nbytes} bytes of data.") - print(f"({pTiB:.3}% of your 1 TiB Free Tier monthly allotment.)") - - -def _dry_run_and_confirm(query: str) -> bool: - # print dry run info - dry_run(query) - # ask user if they want to proceed - cont = input("Continue? [y/N]: ") or "N" - do_the_query = cont in ["y", "Y"] - return do_the_query - - -# --- Query for object histories -def query_objects( - columns: List[str], - objectIds: Optional[list] = None, - limit: Optional[int] = None, - format: str = "pandas", - iterator: bool = False, - dry_run: bool = True, -) -> Union[ - str, - pd.DataFrame, - bigquery.job.QueryJob, - Generator[Union[str, pd.DataFrame], None, None], -]: - """Query the alerts database for object histories. - - Args: - columns: Names of columns to select from the alerts table. - The 'objectId' and 'candid' columns are automatically included - and do not need to be in this list. - objectIds: IDs of ZTF objects to include in the query. - limit: Limit the number of objects returned to N <= limit. - format: One of 'pandas', 'json', or 'query_job'. Query results will be - returned in this format. Results returned as 'query_job' may - contain duplicate observations; else duplicates are dropped. - iterator: If True, iterate over the objects and return one at a time. - Else return the full query results together. - This parameter is ignored if `format` == 'query_job'. - dry_run: If True, `pittgoogle.bigquery.dry_run` will be called first and the - user will be asked to confirm before continuing. - - Returns: - Query results in the requested format. - """ - - # make sure we have appropriate column names - goodcols = _query_objects_check_history_column_names(columns) - if len(goodcols) == 0: # user submitted bad columns and wants to abort - return - - # if a bigquery client does not exist, help the user instantiate one - stop = _create_client_if_needed() - if stop: # the user has chosen to exit rather than create a client - return - - # generate the SQL statement to query alerts db and aggregate histories - query = object_history_sql_statement(goodcols, objectIds, limit=limit) # str - - # print dry run results - if dry_run: - do_the_query = _dry_run_and_confirm(query) - if not do_the_query: # user has chosen to abort the query - return + import pittgoogle - # make the API call - query_job = user_bq_client.query(query) + [TODO] - # return the results - if format == "query_job": - return query_job - elif iterator: # return a generator that cycles through the objects/rows - return (format_history_query_results(row=row, format=format) for row in query_job) - else: # format and return all rows at once - return format_history_query_results(query_job=query_job, format=format) +API +---- - -def _query_objects_check_history_column_names(columns: List[str]) -> List[str]: - """Make sure user-submitted column names are appropriate for `query_objects()`. - - Returns one of: - Columns stripped of bad column names. - Empty list if there were bad columns and the user wants to abort the query. - """ - - goodcols, badcols = _split_good_bad_history_column_names(columns) - - try: - assert len(badcols) == 0 - except AssertionError: - msg = ( - "\nYou have requested columns that are not available to `query_objects()`.\n" - "(To view available columns, use `pittgoogle.bigquery.get_history_column_names()`)\n" - f"\nRequested columns:\n\t{columns}\n" - f"Unavailable columns:\n\t{badcols}\n" - "\nProceed without the unavailable columns? [y/N] " - ) - proceed = input(msg) or "N" - - if proceed not in ["y", "Y"]: # user wants to exit; return an empty list - return [] - - return goodcols - - -# --- Format query results -def format_history_query_results( - query_job: Optional[bigquery.job.QueryJob] = None, - row: Optional[bigquery.table.Row] = None, - format: str = "pandas", -) -> Union[pd.DataFrame, str]: - """Converts the results of a BigQuery query to the desired format. - Must pass either query_job or row. - Any duplicate observations will be dropped. - - Args: - query_job: Results from a object history query job. SQL statement needed - to create the job can be obtained with object_history_sql_statement(). - Must supply either query_job or row. - - row: A single row from query_job. Must supply either row or query_job. - - format: One of 'pandas' or 'json'. Input query results will be returned - in this format. - - Returns: - histories: Input query results converted to requested format - """ - - # make sure we have an appropriate param combination - do_job, do_row = query_job is not None, row is not None - good_format = format in ["pandas", "json"] - good_combo = (do_job != do_row) and good_format - if not good_combo: - raise ValueError("Must pass one of query_job or row.") - - # convert query_job - if do_job: - histories = _format_history_query_results_to_df(query_job) # df - if format == "json": - histories = histories.reset_index().to_json() # str - - # convert row - if do_row: - histories = _format_history_row_to_df(row) # df - if format == "json": - histories["objectId"] = histories.objectId # persist metadata - histories = histories.reset_index().to_json() # str - - return histories - - -def _format_history_query_results_to_df(query_job: bigquery.job.QueryJob): - """Convert a query_job (containing multiple rows of object history data) - to a DataFrame. - Any duplicate observations will be dropped. - """ - - dflist = [] - for row in query_job: - # convert to DataFrame - df = _format_history_row_to_df(row) - # add the objectId so we can use it to multi-index - df["objectId"] = df.objectId - # set the multi-index and append to the list - dflist.append(df.reset_index().set_index(["objectId", "candid"])) - - histories = pd.concat(dflist) - - return histories - - -def _format_history_row_to_df(row: Union[dict, bigquery.table.Row]): - """Convert a single object's history from a query row to a DataFrame. - Any duplicate observations will be dropped. - """ - - d = dict(row.items()) - oid, cid = d.pop("objectId"), d.pop("candid") - df = pd.DataFrame(data=d, index=pd.Index(cid, name="candid")) - df.drop_duplicates(inplace=True) - df.objectId = oid - return df - - -# --- Cone Search -def cone_search( - center: astropy.coordinates.SkyCoord, - radius: astropy.coordinates.Angle, - columns: List[str], - objectIds: Optional[list] = None, - format: str = "pandas", - iterator: bool = False, - dry_run: bool = True, -) -> Union[str, pd.DataFrame, Generator[Union[str, pd.DataFrame], None, None]]: - """Perform a cone search on the alerts database and return object histories. - This uses the coordinates of the most recent observation to determine - whether an object is within the cone. - - Args: - center: Center of the cone to search within. - radius: Radius of the cone to search within. - columns: Names of history columns to select from the alerts table. - The 'objectId' and 'candid' columns are automatically included - and do not need to be in this list. - objectIds: IDs of ZTF objects to include in the query. - format: One of 'pandas', or 'json'. Query results will be - returned in this format. Duplicate observations are dropped. - iterator: If True, iterate over the objects and return one at a time. - Else return the full query results together. - dry_run: If True, `pittgoogle.bigquery.dry_run` will be called first and the - user will be asked to confirm before continuing. - - Returns: - Query results in the requested format. - """ - - # make sure we have required columns - for c in ["jd", "ra", "dec"]: - if c not in columns: - columns.append(c) - - # Performing a dry run prints the SQL query statement, which does not account - # for the cone search. We'll print some things to reduce user confusion. - if dry_run: - print("\nInitiating a cone search.") - - # Query the database for object histories. - objects = query_objects( - columns, - objectIds=objectIds, - format="pandas", - iterator=iterator, - dry_run=dry_run, +""" +import logging +from typing import Optional, Union + +import google.cloud.bigquery as bigquery +from attrs import define, field +from attrs.validators import instance_of, optional + +from .alert import Alert +from .auth import Auth + +LOGGER = logging.getLogger(__name__) + + +@define +class Table: + """Methods and properties for a BigQuery table. + + Parameters + ------------ + name : `str` + Name of the BigQuery table. + dataset : `str` + Name of the BigQuery dataset this table belongs to. + + projectid : `str`, optional + The table owner's Google Cloud project ID. Either this or `auth` is required. Note: + :attr:`pittgoogle.utils.ProjectIds` is a registry containing Pitt-Google's project IDs. + auth : :class:`pittgoogle.auth.Auth`, optional + Credentials for the Google Cloud project that owns this table. If not provided, + it will be created from environment variables when needed. + client : `bigquery.Client`, optional + BigQuery client that will be used to access the table. If not provided, a new client will + be created (using `auth`) the first time it is requested. + """ + + name: str = field() + dataset: str = field() + _projectid: str = field(default=None) + _auth: Auth = field(default=None, validator=optional(instance_of(Auth))) + _client: Optional[bigquery.Client] = field( + default=None, validator=optional(instance_of(bigquery.Client)) ) - # == None if user chose to abort; else DataFrame or generator of same - if objects is None: - return - - if dry_run: - print("\nFiltering for objects within the given cone.") - - # filter out objects not in the cone and return the rest - objects_in_cone = _do_cone_search(objects, center, radius, format, iterator) - return objects_in_cone - - -def _do_cone_search( - objects: Union[pd.DataFrame, Generator[pd.DataFrame, None, None]], - center: astropy.coordinates.SkyCoord, - radius: astropy.coordinates.Angle, - format: str = "pandas", - iterator: bool = False, -) -> Union[str, pd.DataFrame, Generator[Union[str, pd.DataFrame], None, None]]: - """Apply the cone search filter and return appropriate objects.""" - - if iterator: # objects is a generator, return a generator - return _do_cone_search_iterator(objects, center, radius, format) - - else: # objects is single df - return _do_cone_search_all(objects, center, radius, format) - - -def _do_cone_search_iterator( - objects: pd.DataFrame, - center: astropy.coordinates.SkyCoord, - radius: astropy.coordinates.Angle, - format, -): - """Iterate objects, format and yield those that are in the cone. - - Args: - objects: DataFrame containing histories of multiple objectIds. - """ - - for df in objects: - in_cone = object_is_in_cone(df, center, radius) - - if in_cone: # format and yield - if format == "json": - df["objectId"] = df.objectId # else metadata is lost - object = df.reset_index().to_json() # str - else: - object = df - yield object - - -def _do_cone_search_all( - objects: pd.DataFrame, - center: astropy.coordinates.SkyCoord, - radius: astropy.coordinates.Angle, - format, -): - """Filter out objects not in the cone, format, and return. - - Args: - objects: DataFrame containing histories of multiple objectIds. - """ - - gb = objects.groupby(level="objectId") - objects_in_cone = gb.filter(lambda df: object_is_in_cone(df, center, radius)) - if format == "json": - objects_in_cone = objects_in_cone.reset_index().to_json() # str - return objects_in_cone - - -def object_is_in_cone( - object: pd.DataFrame, - center: astropy.coordinates.SkyCoord, - radius: astropy.coordinates.Angle, -): - """Checks whether the object's most recent observation has a position that - is within a cone defined by center and radius. - - Args: - object: DataFrame containing the history of a single objectId. - Required columns: ['jd','ra','dec'] - center: Center of the cone to search within. - radius: Radius of the cone to search within. - - Returns: - True if object is within radius of center, else False - """ - - # get the SkyCoords of the most recent observation - # to do: use the epoch with highest S/N instead - obs = object.loc[object["jd"] == object["jd"].max(), :] - obs_coords = coord.SkyCoord(obs["ra"], obs["dec"], frame="icrs", unit="deg") - - # check whether obs_coords are within the cone - dist = center.separation(obs_coords) - in_cone = dist < radius # array with a single bool - in_cone = in_cone[0] - - return in_cone + _table: Optional[bigquery.Table] = field(default=None, init=False) + + @classmethod + def from_cloud( + cls, + name: str, + *, + dataset: Optional[str] = None, + survey: Optional[str] = None, + testid: Optional[str] = None, + ): + """Create a `Table` with a `client` using implicit credentials (no explicit `auth`). + + The `projectid` will be retrieved from the `client`. + + Parameters + ---------- + name : `str` + Name of the table. + dataset : `str`, optional + Name of the dataset containing the table. Either this or a `survey` is required. If a + `testid` is provided, it will be appended to this name following the Pitt-Google naming syntax. + survey : `str`, optional + Name of the survey. This will be used as the name of the dataset if the `dataset` kwarg + is not provided. This kwarg is provided for convenience in cases where the Pitt-Google + naming syntax is used to name resources. + testid : `str`, optional + Pipeline identifier. If this is not `None`, `False`, or `"False"` it will be appended to + the dataset name. This is used in cases where the Pitt-Google naming syntax is used to name + resources. This allows pipeline modules to find the correct resources without interfering + with other pipelines that may have deployed resources with the same base names + (e.g., for development and testing purposes). + """ + if dataset is None: + # [TODO] update the elasticc broker to name the dataset using the survey name only + dataset = survey + # if testid is not False, "False", or None, append it to the dataset + if testid and testid != "False": + dataset = f"{dataset}_{testid}" + client = bigquery.Client() + table = cls(name, dataset=dataset, projectid=client.project, client=client) + # make the get request now to create a connection to the table + _ = table.table + return table + + @property + def auth(self) -> Auth: + """Credentials for the Google Cloud project that owns this table. + + This will be created from environment variables if `self._auth` is None. + """ + if self._auth is None: + self._auth = Auth() + + if (self._projectid != self._auth.GOOGLE_CLOUD_PROJECT) and (self._projectid is not None): + LOGGER.warning(f"setting projectid to match auth: {self._auth.GOOGLE_CLOUD_PROJECT}") + self._projectid = self._auth.GOOGLE_CLOUD_PROJECT + + return self._auth + + @property + def id(self) -> str: + """Fully qualified table ID.""" + return f"{self.projectid}.{self.dataset}.{self.name}" + + @property + def projectid(self) -> str: + """The table owner's Google Cloud project ID.""" + if self._projectid is None: + self._projectid = self.auth.GOOGLE_CLOUD_PROJECT + return self._projectid + + @property + def table(self) -> bigquery.Table: + """Return a BigQuery Table object that's connected to the table. Makes a get request if necessary.""" + if self._table is None: + self._table = self.client.get_table(self.id) + return self._table + + @property + def client(self) -> bigquery.Client: + """BigQuery client for table access. + + Will be created using `self.auth.credentials` if necessary. + """ + if self._client is None: + self._client = bigquery.Client(credentials=self.auth.credentials) + return self._client + + def insert_rows(self, rows: Union[list[dict], list[Alert]]) -> list[dict]: + # if elements of rows are Alerts, need to extract the dicts + myrows = [row.dict if isinstance(row, Alert) else row for row in rows] + errors = self.client.insert_rows(self.table, myrows) + if len(errors) > 0: + LOGGER.warning(f"BigQuery insert error: {errors}") + return errors diff --git a/pittgoogle/exceptions.py b/pittgoogle/exceptions.py index 1c2f58f..9ef37f7 100644 --- a/pittgoogle/exceptions.py +++ b/pittgoogle/exceptions.py @@ -1,3 +1,11 @@ # -*- coding: UTF-8 -*- +class BadRequest(Exception): + """Raised when a Flask request json envelope (e.g., from Cloud Run) is invalid.""" + + class OpenAlertError(Exception): """Raised when unable to deserialize a Pub/Sub message payload.""" + + +class SchemaNotFoundError(Exception): + """Raised when a schema with a given name is not found in the registry.""" diff --git a/pittgoogle/pubsub.py b/pittgoogle/pubsub.py index 28b1ffc..dc99a07 100644 --- a/pittgoogle/pubsub.py +++ b/pittgoogle/pubsub.py @@ -23,11 +23,14 @@ .. code-block:: python - subscription = pittgoogle.pubsub.Subscription( - "my-ztf-loop-subscription", - # topic only required if the subscription does not yet exist in Google Cloud - topic=pittgoogle.pubsub.Topic("ztf-loop", pittgoogle.utils.ProjectIds.pittgoogle) - ) + # topic the subscription will be connected to + # only required if the subscription does not yet exist in Google Cloud + topic = pittgoogle.Topic(name="ztf-loop", projectid=pittgoogle.ProjectIds.pittgoogle) + + # choose your own name for the subscription + subscription = pittgoogle.Subscription(name="my-ztf-loop-subscription", topic=topic, schema_name="ztf") + + # make sure the subscription exists and we can connect to it. create it if necessary subscription.touch() Pull a small batch of alerts. Helpful for testing. Not recommended for long-runnining listeners. @@ -76,25 +79,30 @@ def my_batch_callback(results): ---- """ +import importlib.resources +import io +import json import logging import queue from concurrent.futures import ThreadPoolExecutor from time import sleep -from typing import Any, ByteString, Callable, List, Optional, Union +from typing import Any, Callable, List, Optional, Union -from attrs import converters, define, field +import fastavro +import google.cloud.pubsub_v1 as pubsub_v1 +from attrs import define, field from attrs.validators import gt, instance_of, is_callable, optional from google.api_core.exceptions import NotFound -from google.cloud import pubsub_v1 +from .alert import Alert from .auth import Auth -from .exceptions import OpenAlertError -from .utils import Cast +from .exceptions import SchemaNotFoundError LOGGER = logging.getLogger(__name__) +PACKAGE_DIR = importlib.resources.files(__package__) -def msg_callback_example(alert: "Alert") -> "Response": +def msg_callback_example(alert: Alert) -> "Response": print(f"processing message: {alert.metadata['message_id']}") return Response(ack=True, result=alert.dict) @@ -108,6 +116,7 @@ def batch_callback_example(batch: list) -> None: def pull_batch( subscription: Union[str, "Subscription"], max_messages: int = 1, + schema_name: str = str(), **subscription_kwargs, ) -> List["Alert"]: """Pull a single batch of messages from the `subscription`. @@ -118,6 +127,10 @@ def pull_batch( Subscription to be pulled. If `str`, the name of the subscription. max_messages : `int` Maximum number of messages to be pulled. + schema_name : `str` + One of "ztf", "ztf.lite", "elasticc.v0_9_1.alert", "elasticc.v0_9_1.brokerClassification". + Schema name of the alerts in the subscription. Passed to :class:`pittgoogle.pubsub.Alert` + for unpacking. If not provided, some properties of the `Alert` may not be available. subscription_kwargs Keyword arguments sent to :class:`pittgoogle.pubsub.Subscription`. Ignored if `subscription` is a :class:`pittgoogle.pubsub.Subscription`. @@ -129,13 +142,15 @@ def pull_batch( {"subscription": subscription.path, "max_messages": max_messages} ) - message_list = [Alert(msg=msg.message) for msg in response.received_messages] - ack_ids = [msg.ack_id for msg in response.received_messages] + alerts = [ + Alert.from_msg(msg.message, schema_name=schema_name) for msg in response.received_messages + ] + ack_ids = [msg.ack_id for msg in response.received_messages] if len(ack_ids) > 0: subscription.client.acknowledge({"subscription": subscription.path, "ack_ids": ack_ids}) - return message_list + return alerts @define @@ -146,18 +161,61 @@ class Topic: ------------ name : `str` Name of the Pub/Sub topic. - projectid : `str` - The topic owner's Google Cloud project ID. Note: :attr:`pittgoogle.utils.ProjectIds` - is a registry containing Pitt-Google's project IDs. + projectid : `str`, optional + The topic owner's Google Cloud project ID. Either this or `auth` is required. Use this + if you are connecting to a subscription owned by a different project than this topic. Note: + :attr:`pittgoogle.utils.ProjectIds` is a registry containing Pitt-Google's project IDs. + auth : :class:`pittgoogle.auth.Auth`, optional + Credentials for the Google Cloud project that owns this topic. If not provided, + it will be created from environment variables when needed. + client : `pubsub_v1.PublisherClient`, optional + Pub/Sub client that will be used to access the topic. If not provided, a new client will + be created (using `auth`) the first time it is requested. """ name: str = field() - projectid: str = field() + _projectid: str = field(default=None) + _auth: Auth = field(default=None, validator=optional(instance_of(Auth))) + _client: Optional[pubsub_v1.PublisherClient] = field( + default=None, validator=optional(instance_of(pubsub_v1.PublisherClient)) + ) - @property - def path(self) -> str: - """Fully qualified path to the topic.""" - return f"projects/{self.projectid}/topics/{self.name}" + @classmethod + def from_cloud( + cls, + name: str, + *, + projectid: str, + survey: Optional[str] = None, + testid: Optional[str] = None, + ): + """Create a `Topic` with a `client` using implicit credentials (no explicit `auth`). + + Parameters + ---------- + name : `str` + Name of the topic. If `survey` and/or `testid` are provided, they will be added to this + name following the Pitt-Google naming syntax. + projectid : `str` + Project ID of the Goodle Cloud project that owns this resource. Project IDs used by + Pitt-Google are listed in the registry for convenience (:class:`pittgoogle.registry.ProjectIds`). + Required because it cannot be retrieved from the `client` and there is no explicit `auth`. + survey : `str`, optional + Name of the survey. If provided, it will be prepended to `name` following the + Pitt-Google naming syntax. + testid : `str`, optional + Pipeline identifier. If this is not `None`, `False`, or `"False"` it will be appended to + the `name` following the Pitt-Google naming syntax. This used to allow pipeline modules + to find the correct resources without interfering with other pipelines that may have + deployed resources with the same base names (e.g., for development and testing purposes). + """ + # if survey and/or testid passed in, use them to construct full name using the pitt-google naming syntax + if survey is not None: + name = f"{survey}-{name}" + # must accommodate False and "False" for consistency with the broker pipeline + if testid and testid != "False": + name = f"{name}-{testid}" + return cls(name, projectid=projectid, client=pubsub_v1.PublisherClient()) @classmethod def from_path(cls, path) -> "Topic": @@ -165,6 +223,113 @@ def from_path(cls, path) -> "Topic": _, projectid, _, name = path.split("/") return cls(name, projectid) + @property + def auth(self) -> Auth: + """Credentials for the Google Cloud project that owns this topic. + + This will be created from environment variables if `self._auth` is None. + """ + if self._auth is None: + self._auth = Auth() + + if (self._projectid != self._auth.GOOGLE_CLOUD_PROJECT) and (self._projectid is not None): + LOGGER.warning(f"setting projectid to match auth: {self._auth.GOOGLE_CLOUD_PROJECT}") + self._projectid = self._auth.GOOGLE_CLOUD_PROJECT + + return self._auth + + @property + def path(self) -> str: + """Fully qualified path to the topic.""" + return f"projects/{self.projectid}/topics/{self.name}" + + @property + def projectid(self) -> str: + """The topic owner's Google Cloud project ID.""" + if self._projectid is None: + self._projectid = self.auth.GOOGLE_CLOUD_PROJECT + return self._projectid + + @property + def client(self) -> pubsub_v1.PublisherClient: + """Pub/Sub client for topic access. + + Will be created using `self.auth.credentials` if necessary. + """ + if self._client is None: + self._client = pubsub_v1.PublisherClient(credentials=self.auth.credentials) + return self._client + + def touch(self) -> None: + """Test the connection to the topic, creating it if necessary.""" + try: + self.client.get_topic(topic=self.path) + LOGGER.info(f"topic exists: {self.path}") + + except NotFound: + self.client.create_topic(name=self.path) + LOGGER.info(f"topic created: {self.path}") + + def delete(self) -> None: + """Delete the topic.""" + try: + self.client.delete_topic(topic=self.path) + except NotFound: + LOGGER.info(f"nothing to delete. topic not found: {self.path}") + else: + LOGGER.info(f"deleted topic: {self.path}") + + def publish(self, alert: "Alert") -> int: + """Publish a message with `alert.dict` as the payload and `alert.attributes` as the attributes. + + If the `alert` has an elasticc schema, the payload will be serialized as schemaless Avro. + Otherwise, json will be used. + """ + # we need to decide which format to use: json, avro with schema, or avro without schema + # the format that pitt-google currently (2023-09-23) uses to publish messages depends on the stream: + # - consumer modules pass on the original alert data packet, as produced by the survey. + # they do not need to use this method (in fact, the consumers do not even use python), + # so we can ignore this case. + # - all other broker pipeline modules (Pitt-Google-Broker repo) use json. + # - modules in the pittgoogle-user repo publish classifications for elasticc, and thus + # use schemaless avro. + # at some point, we should re-evaluate the broker pipeline in particular. + # + # for now, we will get close enough to the current behavior if we assume that: + # - elasticc messages should be published as schemaless avro + # - else, we should publish a json message + # this will match the current behavior in all cases except the elasticc broker pipeline modules. + # neither broker pipeline uses pittgoogle-client at this time (they use pgb-broker-utils), + # so we don't need to update or accommodate them yet. + # + # we'll get the survey name from self.schema.survey, but first we should check whether the + # schema exists so we can be lenient and just fall back to json instead of raising an error. + try: + alert.schema + except SchemaNotFoundError: + avro_schema = None + else: + if alert.schema.survey in ["elasticc"]: + avro_schema = alert.schema.avsc + else: + avro_schema = None + + if not avro_schema: + # serialize using json + message = json.dumps(alert.dict).encode("utf-8") + else: + # serialize as schemaless avro + fout = io.BytesIO() + fastavro.schemaless_writer(fout, avro_schema, alert.dict) + fout.seek(0) + message = fout.getvalue() + + # attribute keys and values must be strings. let's sort the keys while we're at it + attributes = {str(key): str(alert.attributes[key]) for key in sorted(alert.attributes)} + + future = self.client.publish(self.path, data=message, **attributes) + return future.result() + @define class Subscription: @@ -183,6 +348,10 @@ class Subscription: client : `pubsub_v1.SubscriberClient`, optional Pub/Sub client that will be used to access the subscription. This kwarg is useful if you want to reuse a client. If None, a new client will be created. + schema_name : `str` + One of "ztf", "ztf.lite", "elasticc.v0_9_1.alert", "elasticc.v0_9_1.brokerClassification". + Schema name of the alerts in the subscription. Passed to :class:`pittgoogle.pubsub.Alert` + for unpacking. If not provided, some properties of the `Alert` may not be available. """ name: str = field() @@ -191,6 +360,7 @@ class Subscription: _client: Optional[pubsub_v1.SubscriberClient] = field( default=None, validator=optional(instance_of(pubsub_v1.SubscriberClient)) ) + schema_name: str = field(factory=str) @property def projectid(self) -> str: @@ -266,6 +436,19 @@ def delete(self) -> None: else: LOGGER.info(f"deleted subscription: {self.path}") + def pull_batch(self, max_messages: int = 1) -> List["Alert"]: + """Pull a single batch of messages. + + Recommended for testing. Not recommended for long-running listeners (use the + :meth:`~Consumer.stream` method instead). + + Parameters + ---------- + max_messages : `int` + Maximum number of messages to be pulled. + """ + return pull_batch(self, max_messages=max_messages, schema_name=self.schema_name) + @define() class Consumer: @@ -432,86 +615,7 @@ def pull_batch(self, max_messages: int = 1) -> List["Alert"]: max_messages : `int` Maximum number of messages to be pulled. """ - return pull_batch(self.subscription, max_messages) - - -@define(kw_only=True) -class Alert: - """Pitt-Google container for a Pub/Sub message. - - Typical usage is to instantiate an `Alert` using only a `msg`, and then the other attributes - will be automatically extracted and returned (lazily). - - All parameters are keyword only. - - Parameters - ------------ - bytes : `bytes`, optional - The message payload, as returned by Pub/Sub. It may be Avro or JSON serialized depending - on the topic. - dict : `dict`, optional - The message payload as a dictionary. - metadata : `dict`, optional - The message metadata. - msg : `google.cloud.pubsub_v1.types.PubsubMessage`, optional - The Pub/Sub message object, documented at - ``__. - """ - - _bytes: Optional[ByteString] = field(default=None) - _dict: Optional[dict] = field(default=None) - _metadata: Optional[dict] = field(default=None) - msg: Optional["pubsub_v1.types.PubsubMessage"] = field(default=None) - """Original Pub/Sub message object.""" - - @property - def bytes(self) -> bytes: - """Message payload in original format (Avro or JSON serialized bytes).""" - if self._bytes is None: - # add try-except when we know what we're looking for - self._bytes = self.msg.data - if self._bytes is None: - # if we add a "path" attribute for the path to an avro file on disk - # we can load it like this: - # with open(self.path, "rb") as f: - # self._bytes = f.read() - pass - return self._bytes - - @property - def dict(self) -> dict: - """Message payload as a dictionary. - - Raises - ------ - :class:`pittgoogle.exceptions.OpenAlertError` - if unable to deserialize the alert bytes. - """ - if self._dict is None: - # this should be rewritten to catch specific errors - # for now, just try avro then json, catching basically all errors in the process - try: - self._dict = Cast.avro_to_dict(self.bytes) - except Exception: - try: - self._dict = Cast.json_to_dict(self.bytes) - except Exception: - raise OpenAlertError("failed to deserialize the alert bytes") - return self._dict - - @property - def metadata(self) -> dict: - """Message metadata as a flat dictionary.""" - if self._metadata is None: - self._metadata = { - "message_id": self.msg.message_id, - "publish_time": self.msg.publish_time, - # ordering must be enabled on the subscription for this to be useful - "ordering_key": self.msg.ordering_key, - # flatten the dict containing our custom attributes - **self.msg.attributes, - } - return self._metadata + return self.subscription.pull_batch(max_messages=max_messages) @define(kw_only=True, frozen=True) @@ -532,5 +636,5 @@ class Response: If there is no batch callback the results will be lost. """ - ack: bool = field(default=True, converter=converters.to_bool) + ack: bool = field(default=True, converter=bool) result: Any = field(default=None) diff --git a/pittgoogle/registry.py b/pittgoogle/registry.py new file mode 100644 index 0000000..29cb75f --- /dev/null +++ b/pittgoogle/registry.py @@ -0,0 +1,65 @@ +# -*- coding: UTF-8 -*- +"""Pitt-Google registries.""" +import importlib.resources +import logging +from typing import Final + +import yaml +from attrs import define + +from . import types_ +from .exceptions import SchemaNotFoundError + +LOGGER = logging.getLogger(__name__) +PACKAGE_DIR = importlib.resources.files(__package__) +SCHEMA_MANIFEST = yaml.safe_load((PACKAGE_DIR / "registry_manifests/schemas.yml").read_text()) + + +@define(frozen=True) +class ProjectIds: + """Registry of Google Cloud Project IDs.""" + + pittgoogle: Final[str] = "ardent-cycling-243415" + """Pitt-Google's production project.""" + + pittgoogle_dev: Final[str] = "avid-heading-329016" + """Pitt-Google's testing and development project.""" + + # pittgoogle_billing: Final[str] = "light-cycle-328823" + # """Pitt-Google's billing project.""" + + elasticc: Final[str] = "elasticc-challenge" + """Project running classifiers for ELAsTiCC alerts and reporting to DESC.""" + + +@define(frozen=True) +class Schemas: + """Registry of schemas used by Pitt-Google.""" + + @classmethod + def get(cls, schema_name: str) -> types_.Schema: + """Return the registered schema called `schema_name`. + + Raises + ------ + :class:`pittgoogle.exceptions.SchemaNotFoundError` + if a schema called `schema_name` is not found + """ + for schema in SCHEMA_MANIFEST: + if schema["name"] != schema_name: + continue + + return types_.Schema( + name=schema["name"], + description=schema["description"], + path=PACKAGE_DIR / schema["path"] if schema["path"] is not None else None, + ) + + raise SchemaNotFoundError( + f"{schema_name} not found. for a list of valid names, use `pittgoogle.Schemas.names()`." + ) + + @classmethod + def names(cls) -> list[str]: + """Return the names of all registered schemas.""" + return [schema["name"] for schema in SCHEMA_MANIFEST] diff --git a/pittgoogle/registry_manifests/schemas.yml b/pittgoogle/registry_manifests/schemas.yml new file mode 100644 index 0000000..1929b99 --- /dev/null +++ b/pittgoogle/registry_manifests/schemas.yml @@ -0,0 +1,16 @@ +# Guidelines: +# - Schema names must start with the name of the survey. If the survey has more than one schema +# the survey name should be followed by a "." and then a schema-specific specifier(s). +# - If a schema file is also being registered (path key), it is recommended that the file have the +# same name (path stem) as the schema. Avro is the only file type currently implemented, and the file name +# must end with ".avsc". +# - The path must be relative to the package directory or null if no schema file is being registered. +- name: "elasticc.v0_9_1.alert" + description: "Avro schema of alerts published by ELAsTiCC." + path: "schemas/elasticc/elasticc.v0_9_1.alert.avsc" +- name: "elasticc.v0_9_1.brokerClassification" + description: "Avro schema of alerts to be sent to DESC containing classifications of ELAsTiCC alerts." + path: "schemas/elasticc/elasticc.v0_9_1.brokerClassification.avsc" +- name: "ztf" + description: "ZTF schema. The ZTF survey publishes alerts in Avro format with the schema attached in the header. Pitt-Google publishes ZTF alerts in json format. This schema covers both cases." + path: null diff --git a/pittgoogle/schemas/elasticc/elasticc.v0_9_1.alert.avsc b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.alert.avsc new file mode 100644 index 0000000..d5b89ea --- /dev/null +++ b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.alert.avsc @@ -0,0 +1,17 @@ +{ + "namespace": "elasticc.v0_9_1", + "type": "record", + "name": "alert", + "doc": "sample avro alert schema v4.1", + "fields": [ + {"name": "alertId", "type": "long", "doc": "unique alert identifer"}, + {"name": "diaSource", "type": "elasticc.v0_9_1.diaSource"}, + {"name": "prvDiaSources", "type": ["null", { + "type": "array", + "items": "elasticc.v0_9_1.diaSource"}], "default": null}, + {"name": "prvDiaForcedSources", "type": ["null", { + "type": "array", + "items": "elasticc.v0_9_1.diaForcedSource"}], "default": null}, + {"name": "diaObject", "type": ["null", "elasticc.v0_9_1.diaObject"], "default": null} + ] +} diff --git a/pittgoogle/schemas/elasticc/elasticc.v0_9_1.brokerClassification.avsc b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.brokerClassification.avsc new file mode 100644 index 0000000..f975f9a --- /dev/null +++ b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.brokerClassification.avsc @@ -0,0 +1,35 @@ +{ + "namespace": "elasticc.v0_9_1", + "type": "record", + "name": "brokerClassfication", + "fields": [ + {"name": "alertId", "type": "long", "doc": "unique alert identifer"}, + {"name": "diaSourceId", "type": "long", "doc": "id of source that triggered this classification"}, + {"name": "elasticcPublishTimestamp", + "type": {"type": "long", "logicalType": "timestamp-millis"}, + "doc": "timestamp from originating ELAsTiCC alert" + }, + {"name": "brokerIngestTimestamp", + "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}], + "doc": "timestamp of broker ingestion of ELAsTiCC alert" + }, + {"name": "brokerName", "type": "string", "doc": "Name of broker (never changes)" }, + {"name": "brokerVersion", "type": "string", "doc": "Version/Release of broker's software" }, + {"name": "classifierName", "type": "string", + "doc": "Name of classifier broker is using, including software version" }, + {"name": "classifierParams", "type": "string", + "doc": "Any classifier parameter information worth noting for this classification" }, + {"name": "classifications", "type": { + "type": "array", + "items": { + "type": "record", + "name": "classificationDict", + "fields": [ + {"name": "classId", "type": "int", "doc": "See https://github.com/LSSTDESC/elasticc/tree/main/taxonomy/taxonomy.ipynb for specification" }, + {"name": "probability", "type": "float", "doc": "0-1" } + ] + } + } + } + ] +} diff --git a/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaForcedSource.avsc b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaForcedSource.avsc new file mode 100644 index 0000000..d5d180f --- /dev/null +++ b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaForcedSource.avsc @@ -0,0 +1,13 @@ +{ + "namespace": "elasticc.v0_9_1", + "name": "diaForcedSource", + "type": "record", + "fields": [ + {"name": "diaForcedSourceId", "type": "long"}, + {"name": "diaObjectId", "type": "long"}, + {"name": "midPointTai", "type": "double"}, + {"name": "filterName", "type": "string"}, + {"name": "psFlux", "type": "float"}, + {"name": "psFluxErr", "type": "float"} + ] +} diff --git a/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaNondetectionLimit.avsc b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaNondetectionLimit.avsc new file mode 100644 index 0000000..2cffef3 --- /dev/null +++ b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaNondetectionLimit.avsc @@ -0,0 +1,11 @@ +{ + "namespace": "elasticc.v0_9_1", + "name": "diaNondetectionLimit", + "type": "record", + "fields": [ + {"name": "ccdVisitId", "type": "long"}, + {"name": "midPointTai", "type": "double"}, + {"name": "filterName", "type": "string"}, + {"name": "diaNoise", "type": "float"} + ] +} diff --git a/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaObject.avsc b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaObject.avsc new file mode 100644 index 0000000..5b65699 --- /dev/null +++ b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaObject.avsc @@ -0,0 +1,79 @@ +{ + "namespace": "elasticc.v0_9_1", + "name": "diaObject", + "type": "record", + "fields": [ + {"name": "diaObjectId", "type": "long"}, + {"name": "simVersion", "type": ["null", "string"], "doc": "diaObject provenance"}, + {"name": "ra", "type": "double"}, + {"name": "decl", "type": "double"}, + {"name": "mwebv", "type": ["null", "float"], "default": null}, + {"name": "mwebv_err", "type": ["null", "float"], "default": null}, + {"name": "z_final", "type": ["null", "float"], "default": null}, + {"name": "z_final_err", "type": ["null", "float"], "default": null}, + {"name": "hostgal_ellipticity", "type": ["null", "float"], "default": null}, + {"name": "hostgal_sqradius", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zspec", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zspec_err", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_err", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_q000", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_q010", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_q020", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_q030", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_q040", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_q050", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_q060", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_q070", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_q080", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_q090", "type": ["null", "float"], "default": null}, + {"name": "hostgal_zphot_q100", "type": ["null", "float"], "default": null}, + {"name": "hostgal_mag_u", "type": ["null", "float"], "default": null}, + {"name": "hostgal_mag_g", "type": ["null", "float"], "default": null}, + {"name": "hostgal_mag_r", "type": ["null", "float"], "default": null}, + {"name": "hostgal_mag_i", "type": ["null", "float"], "default": null}, + {"name": "hostgal_mag_z", "type": ["null", "float"], "default": null}, + {"name": "hostgal_mag_Y", "type": ["null", "float"], "default": null}, + {"name": "hostgal_ra", "type": ["null", "float"], "default": null}, + {"name": "hostgal_dec", "type": ["null", "float"], "default": null}, + {"name": "hostgal_snsep", "type": ["null", "float"], "default": null}, + {"name": "hostgal_magerr_u", "type": ["null", "float"], "default": null}, + {"name": "hostgal_magerr_g", "type": ["null", "float"], "default": null}, + {"name": "hostgal_magerr_r", "type": ["null", "float"], "default": null}, + {"name": "hostgal_magerr_i", "type": ["null", "float"], "default": null}, + {"name": "hostgal_magerr_z", "type": ["null", "float"], "default": null}, + {"name": "hostgal_magerr_Y", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_ellipticity", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_sqradius", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zspec", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zspec_err", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_err", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_q000", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_q010", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_q020", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_q030", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_q040", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_q050", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_q060", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_q070", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_q080", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_q090", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_zphot_q100", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_mag_u", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_mag_g", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_mag_r", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_mag_i", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_mag_z", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_mag_Y", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_ra", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_dec", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_snsep", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_magerr_u", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_magerr_g", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_magerr_r", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_magerr_i", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_magerr_z", "type": ["null", "float"], "default": null}, + {"name": "hostgal2_magerr_Y", "type": ["null", "float"], "default": null} + ] +} diff --git a/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaSource.avsc b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaSource.avsc new file mode 100644 index 0000000..4906aa7 --- /dev/null +++ b/pittgoogle/schemas/elasticc/elasticc.v0_9_1.diaSource.avsc @@ -0,0 +1,16 @@ +{ + "namespace": "elasticc.v0_9_1", + "name": "diaSource", + "type": "record", + "fields": [ + {"name": "diaSourceId", "type": "long"}, + {"name": "diaObjectId", "type": ["null", "long"], "default": null}, + {"name": "midPointTai", "type": "double"}, + {"name": "filterName", "type": "string"}, + {"name": "ra", "type": "double"}, + {"name": "decl", "type": "double"}, + {"name": "psFlux", "type": "float"}, + {"name": "psFluxErr", "type": "float"}, + {"name": "snr", "type": "float"} + ] +} diff --git a/pittgoogle/schemas/maps/decat.yml b/pittgoogle/schemas/maps/decat.yml new file mode 100644 index 0000000..c150e38 --- /dev/null +++ b/pittgoogle/schemas/maps/decat.yml @@ -0,0 +1,17 @@ +SURVEY: decat +SURVEY_SCHEMA: https://github.com/rknop/decat_schema +TOPIC_SYNTAX: decat_yyyymmdd_2021A-0113 # replace yyyymmdd with the date +FILTER_MAP: + g DECam SDSS c0001 4720.0 1520.0: g + r DECam SDSS c0002 6415.0 1480.0: r +objectid: objectid +prv_sources: sources +source: triggersource +sourceid: sourceid +cutout_difference: diffcutout +cutout_science: scicutout +cutout_template: refcutout +filter: filter +mag: mag +magerr: magerr +magzp: magzp diff --git a/pittgoogle/schemas/maps/elasticc.yml b/pittgoogle/schemas/maps/elasticc.yml new file mode 100644 index 0000000..7087ff4 --- /dev/null +++ b/pittgoogle/schemas/maps/elasticc.yml @@ -0,0 +1,23 @@ +SURVEY: elasticc +SURVEY_SCHEMA: https://github.com/LSSTDESC/elasticc/tree/main/alert_schema +SCHEMA_VERSION: v0_9_1 +TOPIC_SYNTAX: +FILTER_MAP: +alertid: alertId +objectid: [diaObject, diaObjectId] +source: diaSource +sourceid: [diaSource, diaSourceId] +prv_sources: prvDiaSources +prv_forced_sources: prvDiaForcedSources +mjd: midPointTai +filter: filterName +mag: magpsf +magerr: sigmapsf +magzp: magzpsci +flux: psFlux +fluxerr: psFluxErr +ra: ra +dec: decl +cutout_science: +cutout_template: +cutout_difference: diff --git a/pittgoogle/schemas/maps/ztf.yml b/pittgoogle/schemas/maps/ztf.yml new file mode 100644 index 0000000..4aaf800 --- /dev/null +++ b/pittgoogle/schemas/maps/ztf.yml @@ -0,0 +1,18 @@ +SURVEY: ztf +SURVEY_SCHEMA: https://zwickytransientfacility.github.io/ztf-avro-alert/schema.html +TOPIC_SYNTAX: ztf_yyyymmdd_programid1 # replace yyyymmdd with the date +FILTER_MAP: + 1: g + 2: r + 3: i +objectid: objectId +prv_sources: prv_candidates +source: candidate +sourceid: candid +cutout_difference: cutoutDifference +cutout_science: cutoutScience +cutout_template: cutoutTemplate +filter: fid +mag: magpsf +magerr: sigmapsf +magzp: magzpsci diff --git a/pittgoogle/types_.py b/pittgoogle/types_.py new file mode 100644 index 0000000..104a769 --- /dev/null +++ b/pittgoogle/types_.py @@ -0,0 +1,86 @@ +# -*- coding: UTF-8 -*- +"""Functions to support working with alerts and related data.""" +import importlib.resources +import logging +from typing import TYPE_CHECKING, Optional + +import fastavro +import yaml +from attrs import define, field + +if TYPE_CHECKING: + import datetime + from pathlib import Path + +LOGGER = logging.getLogger(__name__) +PACKAGE_DIR = importlib.resources.files(__package__) + + +@define(kw_only=True) +class Schema: + """Class for an individual schema. + + This class is not intended to be used directly. Instead, get a schema from the registry: + `pittgoogle.registry.Schemas`. + """ + + name: str = field() + description: str = field() + path: Optional["Path"] = field(default=None) + _map: Optional[dict] = field(default=None, init=False) + _avsc: Optional[dict] = field(default=None, init=False) + + @property + def survey(self) -> str: + """Name of the survey. This is the first block (separated by ".") in the schema's name.""" + return self.name.split(".")[0] + + @property + def definition(self) -> str: + """Pointer (e.g., URL) to the survey's schema definition.""" + return self.map.SURVEY_SCHEMA + + @property + def map(self) -> dict: + """Mapping of Pitt-Google's generic field names to survey-specific field names.""" + if self._map is None: + yml = PACKAGE_DIR / f"schemas/maps/{self.survey}.yml" + try: + self._map = yaml.safe_load(yml.read_text()) + except FileNotFoundError: + raise ValueError(f"no schema map found for schema name '{self.name}'") + return self._map + + @property + def avsc(self) -> Optional[dict]: + """The Avro schema loaded from the file at `self.path`, or None if a valid file cannot be found.""" + # if the schema has already been loaded, return it + if self._avsc is not None: + return self._avsc + + # if self.path does not point to an existing avro schema file, return None + if (self.path is None) or (self.path.suffix != ".avsc") or (not self.path.is_file()): + return None + + # load the schema and return it + self._avsc = fastavro.schema.load_schema(self.path) + return self._avsc + + +@define(frozen=True) +class PubsubMessageLike: + """Container for an incoming Pub/Sub message that mimics a `google.cloud.pubsub_v1.types.PubsubMessage`. + + It is convenient for the :class:`pittgoogle.Alert` class to work with a message as a + `pubsub_v1.types.PubsubMessage`. However, there are many ways to obtain an alert that do + not result in a `pubsub_v1.types.PubsubMessage` (e.g., an alert packet loaded from disk or + an incoming message to a Cloud Functions or Cloud Run module). In those cases, this class + is used to create an object with the same attributes as a `pubsub_v1.types.PubsubMessage`. + This object is then assigned to the `msg` attribute of the `Alert`. + """ + + data: bytes = field() + attributes: dict = field(factory=dict) + message_id: Optional[str] = field(default=None) + publish_time: Optional["datetime.datetime"] = field(default=None) + ordering_key: Optional[str] = field(default=None) diff --git a/pittgoogle/utils.py b/pittgoogle/utils.py index d2a77ef..cd18980 100644 --- a/pittgoogle/utils.py +++ b/pittgoogle/utils.py @@ -5,10 +5,8 @@ from base64 import b64decode, b64encode from collections import OrderedDict from io import BytesIO -from typing import ClassVar import fastavro -import pandas as pd from astropy.table import Table from astropy.time import Time from attrs import define @@ -16,23 +14,6 @@ LOGGER = logging.getLogger(__name__) -@define -class ProjectIds: - """Registry of Google Cloud Project IDs.""" - - pittgoogle: ClassVar[str] = "ardent-cycling-243415" - """Pitt-Google's production project.""" - - pittgoogle_dev: ClassVar[str] = "avid-heading-329016" - """Pitt-Google's development project.""" - - # pittgoogle_billing: ClassVar[str] = "light-cycle-328823" - # """Pitt-Google's billing project.""" - - elasticc: ClassVar[str] = "elasticc-challenge" - """Project running a classifier for ELAsTiCC alerts and reporting to DESC.""" - - @define class Cast: """Methods to convert data types.""" @@ -124,24 +105,6 @@ def b64avro_to_dict(bytes_data): return Cast.avro_to_dict(b64decode(bytes_data)) # --- Work with alert dictionaries - @staticmethod - def alert_dict_to_dataframe(alert_dict: dict) -> pd.DataFrame: - """Package a ZTF alert dictionary into a dataframe. - - Adapted from: - https://github.com/ZwickyTransientFacility/ztf-avro-alert/blob/master/notebooks/Filtering_alerts.ipynb - """ - dfc = pd.DataFrame(alert_dict["candidate"], index=[0]) - df_prv = pd.DataFrame(alert_dict["prv_candidates"]) - df = pd.concat([dfc, df_prv], ignore_index=True, sort=True) - df = df[dfc.columns] # return to original column ordering - - # we'll attach some metadata - # note this may not be preserved after all operations - # https://stackoverflow.com/questions/14688306/adding-meta-information-metadata-to-pandas-dataframe - df.objectId = alert_dict["objectId"] - return df - @staticmethod def alert_dict_to_table(alert_dict: dict) -> Table: """Package a ZTF alert dictionary into an Astopy Table."""