Update ps_to_storage source code#263
Conversation
|
@troyraen things look a bit nicer now. Still needs to be tested |
troyraen
left a comment
There was a problem hiding this comment.
Looking much better! I'd love to get a few more things reorganized while we're right here working on it to give you a clean start with lsst and these cloud run services.
|
|
||
|
|
||
| def create_file_metadata(alert_dict: dict, event_id: str) -> dict: | ||
| def _create_file_metadata(alert: pittgoogle.Alert, event_id: str) -> dict: |
There was a problem hiding this comment.
Would be nice to move this to pittgoogle as well, but skipping for now.
| "diaSourceId": str(alert_dict["diaSource"]["diaSourceId"]), | ||
| "schema_version": schema_version, | ||
| **attributes, | ||
| "schema_version": alert.schema.version, |
There was a problem hiding this comment.
Should we have pittgoogle add this as an attribute automatically at the same time it does the ids? Does the bigquery subscription require this attribute to be called "schema_version"? Can there be other attributes included in here or will the bigquery subscription choke on them?
There was a problem hiding this comment.
Should we have
pittgoogleadd this as an attribute automatically at the same time it does the ids?
Yes I think that would be really useful.
Does the bigquery subscription require this attribute to be called "schema_version"?
As things are set up right now, yes. This can be easily changed when the subscription is created in setup_broker.sh.
Can there be other attributes included in here or will the bigquery subscription choke on them?
Other attributes can be included (and are currently present in the tests I've performed). From what I've seen so far, there doesn't seem to be any issues.
There was a problem hiding this comment.
@troyraen I'm also realizing that as things are currently written in this script, the messages in the deduplicated alert stream are missing additional attributes:
diaObjectIddiaSourceId
Can these attributes be included as well? (if this is already being done in pittgoogle, I'm not seeing where that happens)
There was a problem hiding this comment.
Does the bigquery subscription require this attribute to be called "schema_version"?
As things are set up right now, yes. This can be easily changed when the subscription is created in setup_broker.sh.
Awesome. Let's switch the separator "_" -> "." so that it will match what we do for nested fields -> attributes (eg, sourceid (lsst) -> "diaSource.diaSourceId"). I've added it to the alert.attributes in mwvgroup/pittgoogle-client#81.
There was a problem hiding this comment.
Can these attributes be included as well? (if this is already being done in pittgoogle, I'm not seeing where that happens)
These should be added the first time you call alert.attributes (which may be when the message is published). If they're not being added, that's a bug.
| def _reformat_alert_data_to_valid_json( | ||
| alert_dict: dict, attributes: dict | ||
| ) -> pittgoogle.alert.Alert: | ||
| def _reformat_alert_data_to_valid_json(alert: pittgoogle.Alert) -> dict: |
There was a problem hiding this comment.
Do you think this is something pittgoogle should handle under the hood?
Co-authored-by: Troy Raen <troy.raen@pitt.edu>
Co-authored-by: Troy Raen <troy.raen@pitt.edu>
Co-authored-by: Troy Raen <troy.raen@pitt.edu>
Co-authored-by: Troy Raen <troy.raen@pitt.edu>
| @@ -54,10 +48,6 @@ | |||
| bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID)) | |||
| publisher = TOPIC_ALERTS.client | |||
There was a problem hiding this comment.
Line 49 is needed to publish messages to TOPIC_BIGQUERY_IMPORT
There was a problem hiding this comment.
(At minimum, change TOPIC_ALERTS.client -> TOPIC_BIGQUERY_IMPORT.client. But really, ...)
Hmm, sorry I thought I made another code suggestion lower down in the file that would make this line obsolete, but I must not have finished the thought because the comment isn't there...
The goal is to replace the function publish_valid_json_stream with TOPIC_BIGQUERY_IMPORT.publish(alert) and then line 49 will not be needed. Slightly bigger picture[*], I'm thinking about how to generalize this main.py file so that all the surveys can use the same main.py for their storage modules. This main.py's job should be to publish and store the incoming alerts to the various gcp resources by using pittgoogle.[Alert|Topic|etc] without knowing the actual schema of the alert it is working with. It should apply the SURVEY and other env vars as kwargs to pittgoogle where appropriate and then use methods like Alert.get which provide generic ways to retrieve the alert data.
So hence all my questions about which things from this file can/should be moved to pittgoogle. I'll take a closer look at your responses and try to move some more things over.
[*] Ignore for this pr, but fyi: Longer term thinking is, How to generalize even further so that all cloud_run/ modules can use the same boilerplate template for main.py, adding as little custom code as possible? And-now-I'd-like-a-pony thinking is, How can all cloud_run/ modules use exactly the same main.py, deploy.sh, etc. files and define all of resources and script behaviors they need in a config file (yaml strongly preferred)?
troyraen
left a comment
There was a problem hiding this comment.
@hernandezc1 Check out the new pittgoogle additions and my couple of questions below. I think everything is about wrapped up. There's probably just one more thing we should add to pittgoogle for this and that is some way to choose a different serializer when publishing to pubsub.
| TOPIC_ALERTS.publish(alert) # deduplicated "alerts" stream | ||
| json_dict = _reformat_alert_data_to_valid_json(alert) | ||
| publish_valid_json_stream( | ||
| topic_name=TOPIC_BIGQUERY_IMPORT.name, |
There was a problem hiding this comment.
Is there anything that necessarily has to be different between these two streams? As is, TOPIC_ALERTS will be avro and TOPIC_BIGQUERY will be json. Historically our 'alerts' stream has been in the same format as the incoming survey stream but maybe it doesn't have to be. If we switch TOPIC_ALERTS to json, can we get rid of TOPIC_BIGQUERY altogether and just have that subscription listen to the 'alerts' stream? Or are we transforming for BigQuery in some way that we don't want to do for the 'alerts' stream?
There was a problem hiding this comment.
I guess we have to mangle it in at least two ways in order to publish the json, NaN -> None and drop the cutouts, in addition to changing the format itself. Let's leave the two streams separate for now. But is the 'bigquery-import' stream really just the 'alerts' stream in json format, including the mangling required to get it into json but no other changes, or can it be? If so maybe we should name this stream something like 'alerts-json'. That would make the content more clear for folks (including us) who might want to use that topic for other things.
There was a problem hiding this comment.
@troyraen I think the questions we should answer before moving forward are: Do we want to publish more than one stream? If so, how do we want to categorize them? What attributes should be specified in the messages?
For example:
pittgoogle-lsst-alerts- contains the full alert stream in the same format as the incoming survey (which means the cutouts must be included)
- Attributes:
diaObjectId,diaSourceId,ssObjectId,ra,dec,initial_detection(ifprvDiaSources=Null,initial_detection=True;Falseotherwise), ...
pittgoogle-lsst-storage- contains valid json messages that BigQuery subscriptions can subscribe to to store alert data and/or classifications to a BigQuery table
- Attributes (not all will be applicable):
diaObjectId,diaSourceId,ssObjectId,ra,dec,schema_version,classifier_name,classifier_version,name_in_bucket, ...
pittgoogle-lsst-classifications- contains messages (format TBD) with classification results (currently only have SuperNNova)
- Attributes (not all will be applicable):
diaObjectId,diaSourceId,ra,dec,classifier_name,classifier_version,predicted_class, ...
As of right now, I do believe we should be publishing two different streams: one that contains the cutouts (i.e., the full survey stream) and one that publishes valid JSON messages (in order to use BigQuery subscriptions)
There was a problem hiding this comment.
@hernandezc1 I think we do want to publish two streams since we have to mangle the data in order to convert it to json. So let's leave the lsst-alerts stream payloads the same as we have done before -- exactly as given to us by LSST. I think the attributes can be the same in both streams. (I don't think the classification streams or attributes are relevant here since this module doesn't even know about them. If pittgoogle needs to handle something for them that's different than what's needed here, let's worry about that later.) Does that sound reasonable to you?
After mwvgroup/pittgoogle-client#82, I think both the conversion to json and adding the attributes will be taken care of under the hood by pittgoogle. So all we should need to do here is this:
TOPIC_ALERTS.publish(alert)
# Publish the same alert as JSON. Data will be coerced to valid JSON by pittgoogle.
TOPIC_BIGQUERY_IMPORT.publish(alert, serializer="json")Regarding the names, I'm currently in favor of lsst-alerts and lsst-alerts-json. What do you think about those? I think the "storage" stream really is just the alerts stream in json format (including the minimum-necessary mangling required to get it into json), so that name would make this clear to anyone else who might want to use it. Also, since we will be changing our Project ID, I think that prepending "pittgoogle-" is unnecessary, so I am in favor of the shorter version without it.
There was a problem hiding this comment.
If pittgoogle needs to handle something for them that's different than what's needed here, let's worry about that later.) Does that sound reasonable to you?
Yes! Sounds like a plan.
Regarding the names, I'm currently in favor of lsst-alerts and lsst-alerts-json. What do you think about those? I think the "storage" stream really is just the alerts stream in json format (including the minimum-necessary mangling required to get it into json)
I'm not opposed, but what I'm trying to say is that the "storage" stream can be more than just a json formatted alert stream. Rather than creating a new topic every time we'd like to use a BigQuery subscription, we can use lsst-storage and filter the messages the BigQuery subscription receives based on the attributes.
| BigQuery Subscription | Filter | Description |
|---|---|---|
| lsst-bigquery-import-v7_4 | attributes.schema_version = "v7_4" AND NOT attributes:classifier_name | Used to write v7_4 alert data to BigQuery. Filters messages with the appropriate schema version and messages that do not contain the classifier_name attribute. |
| lsst-bigquery-import-SuperNNova | attributes.classifier_name = "SuperNNova" | Used to write SuperNNova classifications to BigQuery. Filters messages with the classifier_name attribute and the value of "SuperNNova". |
| lsst-bigquery-import-MicroLIA | attributes.classifier_name = "MicroLIA" | Used to write MicroLIA classifications to BigQuery. Filters messages with the classifier_name attribute and the value of "MicroLIA". |
This reduces the amount of Pub/Sub topics needed. Otherwise, you'd need a "lsst-bigquery-import-(classifier name)" topic for each classifier just to be able to write the classifications to BigQuery. What's nice about lsst-storage is that all of these messages should be valid json as well, but now it's much clearer what this topic is used for.
There was a problem hiding this comment.
Otherwise, you'd need a "lsst-bigquery-import-(classifier name)" topic for each classifier just to be able to write the classifications to BigQuery.
@hernandezc1 Can't the other bigquery subscriptions be attached directly to the topics that are (or will be) already produced by those modules? They will be json already. I'd really like to avoid having to make those modules publish a second message just to get their results into BigQuery.
By the way, let's use all lower case for resources names. So, "supernnova" rather than "SuperNNova", etc.
There was a problem hiding this comment.
Can't the other bigquery subscriptions be attached directly to the topics that are (or will be) already produced by those modules?
@troyraen Yes, we can set things up that way. I'd still like to vouch for a single classification topic (lsst-classifications) as it would be really helpful when trying to filter messages for specific science cases (e.g., lensed supernova). I have more thoughts that I can share tomorrow.
By the way, let's use all lower case for resources names. So, "supernnova" rather than "SuperNNova", etc.
Ok!
| def _reformat_alert_data_to_valid_json( | ||
| alert_dict: dict, attributes: dict | ||
| ) -> pittgoogle.alert.Alert: | ||
| def _reformat_alert_data_to_valid_json(alert: pittgoogle.Alert) -> dict: |
| "diaSourceId": str(alert_dict["diaSource"]["diaSourceId"]), | ||
| "schema_version": schema_version, | ||
| **attributes, | ||
| "schema_version": alert.schema.version, |
There was a problem hiding this comment.
Can these attributes be included as well? (if this is already being done in pittgoogle, I'm not seeing where that happens)
These should be added the first time you call alert.attributes (which may be when the message is published). If they're not being added, that's a bug.
troyraen
left a comment
There was a problem hiding this comment.
@hernandezc1 how about you go ahead and merge this PR and open a followup PR to get the rest of it, since there's a lot in here already? Consider this for this PR but I think the rest of it will be easier to see if we can look at a fresh PR.
On the pittgoogle end, I'm adding flexibility so that the user can choose the serializers on the fly. The idea is to be able to handle incoming and outgoing alerts with the same Alert object regardless of the serialization, to make it possible for this main.py to do:
# Load incoming alert. "lsst" assumes Confluent Wire Avro serializers.
alert = pittgoogle.Alert.from_cloud_run(envelope, "lsst")
# ...
# Publish the same alert as Confluent Wire Avro.
TOPIC_ALERTS.publish(alert)
# Publish the same alert as JSON. Data will be coerced to valid JSON by pittgoogle.
TOPIC_BIGQUERY_IMPORT.publish(alert, serializer="json")and then for subscribers to TOPIC_BIGQUERY_IMPORT to do, for example:
# Load "lsst" alert, incoming from TOPIC_BIGQUERY_IMPORT, which is now in JSON format.
alert = pittgoogle.Alert.from_cloud_run(envelope, "lsst", deserializer="json")
# Alternately, declare JSON for both serializer and deserializer. (Note the plural.)
alert = pittgoogle.Alert.from_cloud_run(envelope, "lsst", serializers="json")Co-authored-by: Troy Raen <troy.raen@pitt.edu>
This PR updates the code in
main.pyand uses version 0.3.12 of the pittgoogle-client.This code has been tested. Notably, the following error message was seen:
"The request was aborted because there was no available instance. Additional troubleshooting documentation can be found at: https://cloud.google.com/run/docs/troubleshooting#abort-request"
Context:
The error message count was 31. There were ~13,500 alerts processed.
Since the default number of container instances is zero, I think it's reasonable to assume that the sudden increase in traffic and a long cold start time are the source of the issue. Worth discussing if it's worthwhile to change the minimum number of instances.