Skip to content

Commit

Permalink
Adds function to traverse dict and replace content thats too long
Browse files Browse the repository at this point in the history
  • Loading branch information
dakota002 committed Jan 31, 2024
1 parent debb872 commit 57f4303
Showing 1 changed file with 60 additions and 35 deletions.
95 changes: 60 additions & 35 deletions gcn_email/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,43 @@
from .helpers import periodic_task
from . import metrics

SESV2 = boto3.client('sesv2')
SESV2 = boto3.client("sesv2")
SENDER = f'GCN Notices <{os.environ["EMAIL_SENDER"]}>'

# Maximum send rate
MAX_SENDS = boto3.client('ses').get_send_quota()['MaxSendRate']
MAX_SENDS = boto3.client("ses").get_send_quota()["MaxSendRate"]

log = logging.getLogger(__name__)


REPLACEMENT_TEXT = (
"This content is too large for email. "
"To receive it, see https://gcn.nasa.gov/quickstart"
)


def replace_long_values(data, max_length):
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, (str, bytes)) and len(value) > max_length:
data[key] = REPLACEMENT_TEXT
else:
replace_long_values(value, max_length)
elif isinstance(data, list):
for index, item in enumerate(data):
if isinstance(item, (str, bytes)) and len(item) > max_length:
data[index] = REPLACEMENT_TEXT
else:
replace_long_values(item, max_length)


def get_email_notification_subscription_table():
client = boto3.client('ssm')
client = boto3.client("ssm")
result = client.get_parameter(
Name='/RemixGcnProduction/tables/email_notification_subscription')
table_name = result['Parameter']['Value']
return boto3.resource('dynamodb').Table(table_name)
Name="/RemixGcnProduction/tables/email_notification_subscription"
)
table_name = result["Parameter"]["Value"]
return boto3.resource("dynamodb").Table(table_name)


def query_and_project_subscribers(table, topic):
Expand All @@ -45,20 +67,21 @@ def query_and_project_subscribers(table, topic):
"""
try:
response = table.query(
IndexName='byTopic',
IndexName="byTopic",
ProjectionExpression="#topic, recipient",
ExpressionAttributeNames={"#topic": "topic"},
KeyConditionExpression=(Key('topic').eq(topic)))
KeyConditionExpression=(Key("topic").eq(topic)),
)
except Exception:
log.exception('Failed to query recipients')
log.exception("Failed to query recipients")
return []
else:
return [x['recipient'] for x in response['Items']]
return [x["recipient"] for x in response["Items"]]


def connect_as_consumer():
log.info('Connecting to Kafka')
return Consumer(config_from_env(), **{'enable.auto.commit': False})
log.info("Connecting to Kafka")
return Consumer(config_from_env(), **{"enable.auto.commit": False})


@periodic_task(86400)
Expand All @@ -67,35 +90,36 @@ def subscribe_to_topics(consumer: Consumer):
# This may need to be updated if new topics have a format different than
# 'gcn.classic.[text | voevent | binary].[topic]'
topics = [
topic for topic in consumer.list_topics().topics
if topic.startswith('gcn.')]
log.info('Subscribing to topics: %r', topics)
topic for topic in consumer.list_topics().topics if topic.startswith("gcn.")
]
log.info("Subscribing to topics: %r", topics)
consumer.subscribe(topics)


def kafka_message_to_email(message):
topic = message.topic()
email_message = EmailMessage()
if topic.startswith('gcn.classic.text.'):
if topic.startswith("gcn.classic.text."):
email_message.set_content(message.value().decode())
elif topic.startswith('gcn.classic.voevent.'):
elif topic.startswith("gcn.classic.voevent."):
email_message.add_attachment(
message.value(), filename='notice.xml',
maintype='application', subtype='xml')
elif topic.startswith('gcn.notices.'):
message.value(),
filename="notice.xml",
maintype="application",
subtype="xml",
)
elif topic.startswith("gcn.notices."):
valueJson = json.loads(message.value().decode())
for key in valueJson:
if sys.getsizeof(valueJson[key]) > 512:
valueJson[key] = (
'This content is too large for email. '
'To receive it, see https://gcn.nasa.gov/quickstart'
)
replace_long_values(valueJson, 512)
email_message.set_content(json.dumps(valueJson, indent=4))
else:
email_message.add_attachment(
message.value(), filename='notice.bin',
maintype='application', subtype='octet-stream')
email_message['Subject'] = topic
message.value(),
filename="notice.bin",
maintype="application",
subtype="octet-stream",
)
email_message["Subject"] = topic
return email_message.as_bytes()


Expand All @@ -105,19 +129,19 @@ def recieve_alerts(consumer):
for message in consumer.consume(timeout=1):
error = message.error()
if error is not None:
log.error('Error consuming message: %s', error)
log.error("Error consuming message: %s", error)
continue
topic = message.topic()
metrics.received.labels(topic).inc()
recipients = query_and_project_subscribers(table, topic)
if recipients:
log.info('Sending message for topic %s', topic)
log.info("Sending message for topic %s", topic)
email = kafka_message_to_email(message)
for recipient in recipients:
try:
send_raw_ses_message_to_recipient(email, recipient)
except Exception:
log.exception('Failed to send message')
log.exception("Failed to send message")
else:
metrics.sent.labels(topic, recipient).inc()
consumer.commit(message)
Expand All @@ -134,12 +158,13 @@ def recieve_alerts(consumer):
SESV2.exceptions.SendingPausedException,
SESV2.exceptions.TooManyRequestsException,
),
max_time=300
max_time=300,
)
@limits(calls=MAX_SENDS, period=1)
@metrics.send_request_latency_seconds.time()
def send_raw_ses_message_to_recipient(bytes, recipient):
SESV2.send_email(
FromEmailAddress=SENDER,
Destination={'ToAddresses': [recipient]},
Content={'Raw': {'Data': bytes}})
Destination={"ToAddresses": [recipient]},
Content={"Raw": {"Data": bytes}},
)

0 comments on commit 57f4303

Please sign in to comment.