Skip to content

Commit

Permalink
[sdlf-monitoring] firehose lambda: use logging, do not merge log events
Browse files Browse the repository at this point in the history
  • Loading branch information
cnfait committed Sep 9, 2024
1 parent 3617796 commit 7631f6c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
6 changes: 6 additions & 0 deletions sdlf-cicd/template-cicd-domain-roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,12 @@ Resources:
"iam:PassedToService":
- cloudtrail.amazonaws.com
- firehose.amazonaws.com
- logs.amazonaws.com
- Effect: Allow
Action:
- iam:PassRole
Resource:
- !Sub arn:${AWS::Partition}:iam::${AWS::AccountId}:role/sdlf-domain-*--*rCloudwatch* # no iam:PassedToService for CloudWatchLogs
- Effect: Allow
Action:
- s3:CreateStorageLensGroup
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import base64
import gzip
import json
import logging
import os

import boto3
from botocore.exceptions import BotoCoreError, ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

firehose_endpoint_url = "https://firehose." + os.getenv("AWS_REGION") + ".amazonaws.com"
firehose = boto3.client("firehose", endpoint_url=firehose_endpoint_url)

Expand All @@ -22,7 +26,7 @@ def put_records(stream_name, records, attempts_made, max_attempts):
)
except (BotoCoreError, ClientError) as err:
if attempts_made + 1 < max_attempts:
print(f"Some records failed while calling PutRecords, retrying. {str(err)}")
logger.info(f"Some records failed while calling PutRecords, retrying. {str(err)}")
put_records(stream_name, records, attempts_made + 1, max_attempts)
else:
raise Exception(f"Could not put records after {max_attempts} attempts. {str(err)}")
Expand All @@ -31,7 +35,7 @@ def put_records(stream_name, records, attempts_made, max_attempts):
codes = [rec["ErrorCode"] for rec in response["RequestResponses"] if "ErrorCode" in rec]
failed = [records[i] for i, rec in enumerate(response["RequestResponses"]) if "ErrorCode" in rec]
if attempts_made + 1 < max_attempts:
print(f"Some records failed while calling PutRecords, retrying. Individual error codes: {codes}")
logger.info(f"Some records failed while calling PutRecords, retrying. Individual error codes: {codes}")
put_records(stream_name, failed, attempts_made + 1, max_attempts)
else:
raise Exception(f"Could not put records after {max_attempts} attempts. Individual error codes: {codes}")
Expand All @@ -42,7 +46,7 @@ def put_records(stream_name, records, attempts_made, max_attempts):
def lambda_handler(event, context):
records_out = []
records_to_reingest = []
inputDataByRecId = {}
input_data_by_rec_id = {}

for record in event["records"]:
compressed_payload = base64.b64decode(record["data"])
Expand All @@ -58,18 +62,17 @@ def lambda_handler(event, context):
)
else:
transformed_log_events = [transform_log_event(log_event) for log_event in data["logEvents"]]
payload = "".join(transformed_log_events)
encoded_payload = base64.b64encode(payload.encode("utf-8")).decode("utf-8")

records_out.append(
{
"recordId": record["recordId"],
"result": "Ok",
"data": encoded_payload,
}
)
for transformed_log_event in transformed_log_events:
records_out.append(
{
"recordId": record["recordId"],
"result": "Ok",
"data": base64.b64encode(transformed_log_event.encode("utf-8")).decode("utf-8"),
}
)

inputDataByRecId[record["recordId"]] = compressed_payload
input_data_by_rec_id[record["recordId"]] = compressed_payload

projected_size = sum(
[
Expand All @@ -79,10 +82,11 @@ def lambda_handler(event, context):
]
)
MAX_SIZE = 4000000
logger.info(f"Projected size: {projected_size} / {MAX_SIZE}")
for idx, record in enumerate(records_out):
if projected_size > MAX_SIZE:
if record["result"] != "ProcessingFailed":
records_to_reingest.append({"Data": inputDataByRecId[record["recordId"]]})
records_to_reingest.append({"Data": input_data_by_rec_id[record["recordId"]]})
projected_size -= len(record["data"])
del record["data"]
record["result"] = "Dropped"
Expand All @@ -93,11 +97,11 @@ def lambda_handler(event, context):

try:
put_records(stream_name, records_to_reingest, 0, 20)
print(f'Reingested {len(records_to_reingest)} records out of {len(event["records"])}')
logger.info(f'Reingested {len(records_to_reingest)} records out of {len(event["records"])}')
except Exception as err:
print(f"Failed to reingest records. {str(err)}")
logger.info(f"Failed to reingest records. {str(err)}")
raise err
else:
print("No records needed to be reingested.")
logger.info("No records needed to be reingested.")

return {"records": records_out}
2 changes: 1 addition & 1 deletion sdlf-monitoring/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ Resources:
"kms:ViaService": !Sub "s3.${AWS::Region}.amazonaws.com"
StringLike:
"kms:EncryptionContext:aws:s3:arn":
- !Sub arn:${AWS::Partition}:s3:::${rCloudwatchLogsFirehoseBucket}/*
- !Sub arn:${AWS::Partition}:s3:::${rCloudwatchLogsFirehoseBucket}
- !If
- RunInVpc
- Effect: Allow
Expand Down

0 comments on commit 7631f6c

Please sign in to comment.