Skip to content

Commit

Permalink
Configure black formatter
Browse files Browse the repository at this point in the history
  • Loading branch information
lpsinger committed Jan 31, 2024
1 parent beaa77c commit 746d552
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 49 deletions.
3 changes: 3 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]
max-line-length = 88
extend-ignore = E203, E704
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4

- uses: psf/black@stable

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
Expand Down
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"editor.defaultFormatter": "ms-python.black-formatter"
}
25 changes: 17 additions & 8 deletions gcn_email/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,31 @@ def host_port(host_port_str):
# Parse netloc like it is done for HTTP URLs.
# This ensures that we will get the correct behavior for hostname:port
# splitting even for IPv6 addresses.
return urllib.parse.urlparse(f'http://{host_port_str}')
return urllib.parse.urlparse(f"http://{host_port_str}")


@click.command()
@click.option(
'--prometheus', type=host_port, default=':8000', show_default=True,
help='Hostname and port to listen on for Prometheus metric reporting')
"--prometheus",
type=host_port,
default=":8000",
show_default=True,
help="Hostname and port to listen on for Prometheus metric reporting",
)
@click.option(
'--loglevel', type=click.Choice(logging._levelToName.values()),
default='INFO', show_default=True, help='Log level')
"--loglevel",
type=click.Choice(logging._levelToName.values()),
default="INFO",
show_default=True,
help="Log level",
)
def main(prometheus, loglevel):
logging.basicConfig(level=loglevel)

prometheus_client.start_http_server(prometheus.port,
prometheus.hostname or '0.0.0.0')
log.info('Prometheus listening on %s', prometheus.netloc)
prometheus_client.start_http_server(
prometheus.port, prometheus.hostname or "0.0.0.0"
)
log.info("Prometheus listening on %s", prometheus.netloc)

consumer = connect_as_consumer()
subscribe_to_topics(consumer)
Expand Down
65 changes: 37 additions & 28 deletions gcn_email/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
from .helpers import periodic_task
from . import metrics

SESV2 = boto3.client('sesv2')
SESV2 = boto3.client("sesv2")
SENDER = f'GCN Notices <{os.environ["EMAIL_SENDER"]}>'

# Maximum send rate
MAX_SENDS = boto3.client('ses').get_send_quota()['MaxSendRate']
MAX_SENDS = boto3.client("ses").get_send_quota()["MaxSendRate"]

log = logging.getLogger(__name__)


def get_email_notification_subscription_table():
client = boto3.client('ssm')
client = boto3.client("ssm")
result = client.get_parameter(
Name='/RemixGcnProduction/tables/email_notification_subscription')
table_name = result['Parameter']['Value']
return boto3.resource('dynamodb').Table(table_name)
Name="/RemixGcnProduction/tables/email_notification_subscription"
)
table_name = result["Parameter"]["Value"]
return boto3.resource("dynamodb").Table(table_name)


def query_and_project_subscribers(table, topic):
Expand All @@ -43,20 +44,21 @@ def query_and_project_subscribers(table, topic):
"""
try:
response = table.query(
IndexName='byTopic',
IndexName="byTopic",
ProjectionExpression="#topic, recipient",
ExpressionAttributeNames={"#topic": "topic"},
KeyConditionExpression=(Key('topic').eq(topic)))
KeyConditionExpression=(Key("topic").eq(topic)),
)
except Exception:
log.exception('Failed to query recipients')
log.exception("Failed to query recipients")
return []
else:
return [x['recipient'] for x in response['Items']]
return [x["recipient"] for x in response["Items"]]


def connect_as_consumer():
log.info('Connecting to Kafka')
return Consumer(config_from_env(), **{'enable.auto.commit': False})
log.info("Connecting to Kafka")
return Consumer(config_from_env(), **{"enable.auto.commit": False})


@periodic_task(86400)
Expand All @@ -65,26 +67,32 @@ def subscribe_to_topics(consumer: Consumer):
# This may need to be updated if new topics have a format different than
# 'gcn.classic.[text | voevent | binary].[topic]'
topics = [
topic for topic in consumer.list_topics().topics
if topic.startswith('gcn.')]
log.info('Subscribing to topics: %r', topics)
topic for topic in consumer.list_topics().topics if topic.startswith("gcn.")
]
log.info("Subscribing to topics: %r", topics)
consumer.subscribe(topics)


def kafka_message_to_email(message):
topic = message.topic()
email_message = EmailMessage()
if topic.startswith('gcn.classic.text.'):
if topic.startswith("gcn.classic.text."):
email_message.set_content(message.value().decode())
elif topic.startswith('gcn.classic.voevent.'):
elif topic.startswith("gcn.classic.voevent."):
email_message.add_attachment(
message.value(), filename='notice.xml',
maintype='application', subtype='xml')
message.value(),
filename="notice.xml",
maintype="application",
subtype="xml",
)
else:
email_message.add_attachment(
message.value(), filename='notice.bin',
maintype='application', subtype='octet-stream')
email_message['Subject'] = topic
message.value(),
filename="notice.bin",
maintype="application",
subtype="octet-stream",
)
email_message["Subject"] = topic
return email_message.as_bytes()


Expand All @@ -94,19 +102,19 @@ def recieve_alerts(consumer):
for message in consumer.consume(timeout=1):
error = message.error()
if error is not None:
log.error('Error consuming message: %s', error)
log.error("Error consuming message: %s", error)
continue
topic = message.topic()
metrics.received.labels(topic).inc()
recipients = query_and_project_subscribers(table, topic)
if recipients:
log.info('Sending message for topic %s', topic)
log.info("Sending message for topic %s", topic)
email = kafka_message_to_email(message)
for recipient in recipients:
try:
send_raw_ses_message_to_recipient(email, recipient)
except Exception:
log.exception('Failed to send message')
log.exception("Failed to send message")
else:
metrics.sent.labels(topic, recipient).inc()
consumer.commit(message)
Expand All @@ -123,12 +131,13 @@ def recieve_alerts(consumer):
SESV2.exceptions.SendingPausedException,
SESV2.exceptions.TooManyRequestsException,
),
max_time=300
max_time=300,
)
@limits(calls=MAX_SENDS, period=1)
@metrics.send_request_latency_seconds.time()
def send_raw_ses_message_to_recipient(bytes, recipient):
SESV2.send_email(
FromEmailAddress=SENDER,
Destination={'ToAddresses': [recipient]},
Content={'Raw': {'Data': bytes}})
Destination={"ToAddresses": [recipient]},
Content={"Raw": {"Data": bytes}},
)
4 changes: 3 additions & 1 deletion gcn_email/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ def inner_wrap():
try:
function(*args, **kwargs)
except Exception:
log.exception('Periodic task failed')
log.exception("Periodic task failed")
stop.wait(interval)

t = threading.Thread(target=inner_wrap, daemon=True)
t.start()
return stop

return wrap

return outer_wrap
19 changes: 8 additions & 11 deletions gcn_email/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,17 @@
import prometheus_client

received = prometheus_client.Counter(
'received',
'Kafka messages received',
labelnames=['topic'],
namespace=__package__)
"received", "Kafka messages received", labelnames=["topic"], namespace=__package__
)


sent = prometheus_client.Counter(
'sent',
'Emails sent',
labelnames=['topic', 'address'],
namespace=__package__)
"sent", "Emails sent", labelnames=["topic", "address"], namespace=__package__
)


send_request_latency_seconds = prometheus_client.Histogram(
'send_request_latency_seconds',
'Time taken to send each message',
namespace=__package__)
"send_request_latency_seconds",
"Time taken to send each message",
namespace=__package__,
)
118 changes: 117 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ click = "^8.1.3"
prometheus-client = "^0.17.0"

[tool.poetry.group.dev.dependencies]
black = "*"
flake8 = "*"

[tool.poetry.scripts]
Expand Down

0 comments on commit 746d552

Please sign in to comment.