Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure black formatter #163

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]
max-line-length = 88
extend-ignore = E203, E704
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4

- uses: psf/black@stable

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
Expand Down
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"editor.defaultFormatter": "ms-python.black-formatter"
}
25 changes: 17 additions & 8 deletions gcn_email/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,31 @@ def host_port(host_port_str):
# Parse netloc like it is done for HTTP URLs.
# This ensures that we will get the correct behavior for hostname:port
# splitting even for IPv6 addresses.
return urllib.parse.urlparse(f'http://{host_port_str}')
return urllib.parse.urlparse(f"http://{host_port_str}")


@click.command()
@click.option(
'--prometheus', type=host_port, default=':8000', show_default=True,
help='Hostname and port to listen on for Prometheus metric reporting')
"--prometheus",
type=host_port,
default=":8000",
show_default=True,
help="Hostname and port to listen on for Prometheus metric reporting",
)
@click.option(
'--loglevel', type=click.Choice(logging._levelToName.values()),
default='INFO', show_default=True, help='Log level')
"--loglevel",
type=click.Choice(logging._levelToName.values()),
default="INFO",
show_default=True,
help="Log level",
)
def main(prometheus, loglevel):
logging.basicConfig(level=loglevel)

prometheus_client.start_http_server(prometheus.port,
prometheus.hostname or '0.0.0.0')
log.info('Prometheus listening on %s', prometheus.netloc)
prometheus_client.start_http_server(
prometheus.port, prometheus.hostname or "0.0.0.0"
)
log.info("Prometheus listening on %s", prometheus.netloc)

consumer = connect_as_consumer()
subscribe_to_topics(consumer)
Expand Down
65 changes: 37 additions & 28 deletions gcn_email/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
from .helpers import periodic_task
from . import metrics

SESV2 = boto3.client('sesv2')
SESV2 = boto3.client("sesv2")
SENDER = f'GCN Notices <{os.environ["EMAIL_SENDER"]}>'

# Maximum send rate
MAX_SENDS = boto3.client('ses').get_send_quota()['MaxSendRate']
MAX_SENDS = boto3.client("ses").get_send_quota()["MaxSendRate"]

log = logging.getLogger(__name__)


def get_email_notification_subscription_table():
client = boto3.client('ssm')
client = boto3.client("ssm")
result = client.get_parameter(
Name='/RemixGcnProduction/tables/email_notification_subscription')
table_name = result['Parameter']['Value']
return boto3.resource('dynamodb').Table(table_name)
Name="/RemixGcnProduction/tables/email_notification_subscription"
)
table_name = result["Parameter"]["Value"]
return boto3.resource("dynamodb").Table(table_name)


def query_and_project_subscribers(table, topic):
Expand All @@ -43,20 +44,21 @@ def query_and_project_subscribers(table, topic):
"""
try:
response = table.query(
IndexName='byTopic',
IndexName="byTopic",
ProjectionExpression="#topic, recipient",
ExpressionAttributeNames={"#topic": "topic"},
KeyConditionExpression=(Key('topic').eq(topic)))
KeyConditionExpression=(Key("topic").eq(topic)),
)
except Exception:
log.exception('Failed to query recipients')
log.exception("Failed to query recipients")
return []
else:
return [x['recipient'] for x in response['Items']]
return [x["recipient"] for x in response["Items"]]


def connect_as_consumer():
log.info('Connecting to Kafka')
return Consumer(config_from_env(), **{'enable.auto.commit': False})
log.info("Connecting to Kafka")
return Consumer(config_from_env(), **{"enable.auto.commit": False})


@periodic_task(86400)
Expand All @@ -65,26 +67,32 @@ def subscribe_to_topics(consumer: Consumer):
# This may need to be updated if new topics have a format different than
# 'gcn.classic.[text | voevent | binary].[topic]'
topics = [
topic for topic in consumer.list_topics().topics
if topic.startswith('gcn.')]
log.info('Subscribing to topics: %r', topics)
topic for topic in consumer.list_topics().topics if topic.startswith("gcn.")
]
log.info("Subscribing to topics: %r", topics)
consumer.subscribe(topics)


def kafka_message_to_email(message):
topic = message.topic()
email_message = EmailMessage()
if topic.startswith('gcn.classic.text.'):
if topic.startswith("gcn.classic.text."):
email_message.set_content(message.value().decode())
elif topic.startswith('gcn.classic.voevent.'):
elif topic.startswith("gcn.classic.voevent."):
email_message.add_attachment(
message.value(), filename='notice.xml',
maintype='application', subtype='xml')
message.value(),
filename="notice.xml",
maintype="application",
subtype="xml",
)
else:
email_message.add_attachment(
message.value(), filename='notice.bin',
maintype='application', subtype='octet-stream')
email_message['Subject'] = topic
message.value(),
filename="notice.bin",
maintype="application",
subtype="octet-stream",
)
email_message["Subject"] = topic
return email_message.as_bytes()


Expand All @@ -94,19 +102,19 @@ def recieve_alerts(consumer):
for message in consumer.consume(timeout=1):
error = message.error()
if error is not None:
log.error('Error consuming message: %s', error)
log.error("Error consuming message: %s", error)
continue
topic = message.topic()
metrics.received.labels(topic).inc()
recipients = query_and_project_subscribers(table, topic)
if recipients:
log.info('Sending message for topic %s', topic)
log.info("Sending message for topic %s", topic)
email = kafka_message_to_email(message)
for recipient in recipients:
try:
send_raw_ses_message_to_recipient(email, recipient)
except Exception:
log.exception('Failed to send message')
log.exception("Failed to send message")
else:
metrics.sent.labels(topic, recipient).inc()
consumer.commit(message)
Expand All @@ -123,12 +131,13 @@ def recieve_alerts(consumer):
SESV2.exceptions.SendingPausedException,
SESV2.exceptions.TooManyRequestsException,
),
max_time=300
max_time=300,
)
@limits(calls=MAX_SENDS, period=1)
@metrics.send_request_latency_seconds.time()
def send_raw_ses_message_to_recipient(bytes, recipient):
SESV2.send_email(
FromEmailAddress=SENDER,
Destination={'ToAddresses': [recipient]},
Content={'Raw': {'Data': bytes}})
Destination={"ToAddresses": [recipient]},
Content={"Raw": {"Data": bytes}},
)
4 changes: 3 additions & 1 deletion gcn_email/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ def inner_wrap():
try:
function(*args, **kwargs)
except Exception:
log.exception('Periodic task failed')
log.exception("Periodic task failed")
stop.wait(interval)

t = threading.Thread(target=inner_wrap, daemon=True)
t.start()
return stop

return wrap

return outer_wrap
19 changes: 8 additions & 11 deletions gcn_email/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,17 @@
import prometheus_client

received = prometheus_client.Counter(
'received',
'Kafka messages received',
labelnames=['topic'],
namespace=__package__)
"received", "Kafka messages received", labelnames=["topic"], namespace=__package__
)


sent = prometheus_client.Counter(
'sent',
'Emails sent',
labelnames=['topic', 'address'],
namespace=__package__)
"sent", "Emails sent", labelnames=["topic", "address"], namespace=__package__
)


send_request_latency_seconds = prometheus_client.Histogram(
'send_request_latency_seconds',
'Time taken to send each message',
namespace=__package__)
"send_request_latency_seconds",
"Time taken to send each message",
namespace=__package__,
)
118 changes: 117 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ click = "^8.1.3"
prometheus-client = "^0.17.0"

[tool.poetry.group.dev.dependencies]
black = "*"
flake8 = "*"

[tool.poetry.scripts]
Expand Down