Skip to content
This repository has been archived by the owner on Jan 15, 2025. It is now read-only.

Commit

Permalink
Merge pull request #62 from dynatrace-oss/feature/APM-326214-log-forw…
Browse files Browse the repository at this point in the history
…arders-add-log-forwarder-source-stack-name-as-a-log-attribute

APM-326214 Added Log Forwarder source stack name as a log attribute
  • Loading branch information
IrynaKudlaienko authored Feb 2, 2022
2 parents d071e63 + 876b5ab commit 0eda0a3
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 22 deletions.
1 change: 1 addition & 0 deletions dynatrace-aws-log-forwarder-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Resources:
TenantId: !Ref TenantId
- !Ref DynatraceEnvironmentUrl
VERIFY_SSL: !Ref VerifySSLTargetActiveGate
CLOUD_LOG_FORWARDER: !Sub '${AWS::AccountId}:${AWS::Region}:${AWS::StackName}'
Role: !GetAtt LambdaRole.Arn
VpcConfig: !If
- DeployAGwithVPC
Expand Down
8 changes: 3 additions & 5 deletions src/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def handler(event, lambda_context):
dt_url = os.environ.get('DYNATRACE_ENV_URL')
dt_token = os.environ.get('DYNATRACE_API_KEY')
verify_SSL = os.environ.get('VERIFY_SSL', 'false') == 'true'
cloud_log_forwarder = os.environ.get('CLOUD_LOG_FORWARDER', "")

try:
with open('version.txt') as versionFile:
Expand All @@ -41,11 +42,8 @@ def handler(event, lambda_context):

records = event['records']

context = Context(function_name=lambda_context.function_name,
dt_url=dt_url,
dt_token=dt_token,
debug=debug_flag,
verify_SSL=verify_SSL)
context = Context(function_name=lambda_context.function_name, dt_url=dt_url, dt_token=dt_token, debug=debug_flag,
verify_SSL=verify_SSL, cloud_log_forwarder=cloud_log_forwarder)

try:
is_logs, plaintext_records = input_records_decoder.check_records_list_if_logs_end_decode(records, context)
Expand Down
2 changes: 1 addition & 1 deletion src/logs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def process_log_request(decoded_records: List[str], context: Context, batch_meta
all_logs: List[Dict] = []

for record in decoded_records:
all_logs.extend(extract_dt_logs_from_single_record(record, batch_metadata))
all_logs.extend(extract_dt_logs_from_single_record(record, batch_metadata, context))

print(f"Extracted {len(all_logs)} log entries from {len(decoded_records)} records given")

Expand Down
8 changes: 5 additions & 3 deletions src/logs/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from logs.metadata_engine.metadata_engine import MetadataEngine
from logs.models.batch_metadata import BatchMetadata
from util.context import Context

metadata_engine = MetadataEngine()

Expand All @@ -29,7 +30,7 @@ class RecordMetadata:


def extract_dt_logs_from_single_record(
record_data_decoded: str, batch_metadata: BatchMetadata) -> List[Dict]:
record_data_decoded: str, batch_metadata: BatchMetadata, context: Context) -> List[Dict]:
logs: List[Dict] = []
record = json.loads(record_data_decoded)

Expand All @@ -39,13 +40,13 @@ def extract_dt_logs_from_single_record(
record_metadata = RecordMetadata(record["logGroup"], record["logStream"])

for log_event in record["logEvents"]:
log_entry = transform_single_log_entry(log_event, batch_metadata, record_metadata)
log_entry = transform_single_log_entry(log_event, batch_metadata, record_metadata, context)
logs.append(log_entry)

return logs


def transform_single_log_entry(log_event, batch_metadata, record_metadata) -> Dict:
def transform_single_log_entry(log_event, batch_metadata, record_metadata, context: Context) -> Dict:
parsed_record = {
'content': log_event["message"],
'cloud.provider': 'aws',
Expand All @@ -56,6 +57,7 @@ def transform_single_log_entry(log_event, batch_metadata, record_metadata) -> Di
'aws.region': batch_metadata.region,
'aws.account.id': batch_metadata.account_id,
'severity': 'INFO',
'cloud.log_forwarder': context.cloud_log_forwarder
}

if "timestamp" in log_event:
Expand Down
4 changes: 3 additions & 1 deletion src/util/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@


class Context:
def __init__(self, function_name: Text, dt_url: str, dt_token: str, debug: bool, verify_SSL: bool):
def __init__(self, function_name: Text, dt_url: str, dt_token: str, debug: bool, verify_SSL: bool,
cloud_log_forwarder: str):
self.function_name: Text = function_name
self.dt_url = dt_url
self.dt_token = dt_token
self.debug: bool = debug
self.verify_SSL: bool = verify_SSL
self.cloud_log_forwarder = cloud_log_forwarder
self.sfm = SelfMonitoringContext(function_name)
3 changes: 2 additions & 1 deletion tests/perf/logs/test_transformation_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from util.context import Context

BATCH_METADATA = BatchMetadata("444000444", "us-east-1", "aws")
CONTEXT = Context("function-name", "dt-url", "dt-token", False, False, "log.forwarder")

CLOUDTRAIL_USER_IDENTITY = {
"type": "AssumedRole",
Expand Down Expand Up @@ -104,7 +105,7 @@ def test_full_transformation(testcase: dict):
start_sec = time.time()
for i in range(repeat_record):
logs_sent = logs.transformation.extract_dt_logs_from_single_record(
json.dumps(record_data_decoded), BATCH_METADATA)
json.dumps(record_data_decoded), BATCH_METADATA, CONTEXT)
end_sec = time.time()

if time_limit_sec:
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/full_flow/test_full_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"DYNATRACE_API_KEY": "token",
"VERIFY_SSL": "false",
"DEBUG": "false",
"CLOUD_LOG_FORWARDER": "444652832050:us-east-1:log_forwarder"
})
@pytest.mark.parametrize("testcase", [
({
Expand Down Expand Up @@ -108,3 +109,6 @@ def test_full_flow(testcase: dict):
sent_logs = json.loads(request_body)

assert len(sent_logs) == number_of_logs_expected

for log in sent_logs:
assert log["cloud.log_forwarder"] == "444652832050:us-east-1:log_forwarder"
4 changes: 2 additions & 2 deletions tests/unit/logs/test_input_records_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_check_records_list_if_logs_end_decode(self):
]

is_logs, decoded_records = input_records_decoder.check_records_list_if_logs_end_decode(
records, Context("function-name", "dt-url", "dt-token", False, False))
records, Context("function-name", "dt-url", "dt-token", False, False, "log.forwarder"))

self.assertTrue(is_logs)

Expand All @@ -45,7 +45,7 @@ def test_check_records_list_if_logs_end_decode(self):
self.assertEqual(decoded_records[1], expected_second)

def test_check_records_list_if_logs_end_decode_not_logs(self):
context = Context("function-name", "dt-url", "dt-token", False, False)
context = Context("function-name", "dt-url", "dt-token", False, False, "log.forwarder")

records = [
{
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/logs/test_logs_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def create_log_entry_with_random_len_msg(min_length = 1, max_length = len(log_me
class Test(TestCase):

def test_prepare_serialized_batches(self):
context = Context("function-name", "dt-url", "dt-token", False, False)
context = Context("function-name", "dt-url", "dt-token", False, False, "log.forwarder")
how_many_logs = 20000
logs = [create_log_entry_with_random_len_msg() for x in range(how_many_logs)]

Expand All @@ -66,7 +66,7 @@ def test_prepare_serialized_batches(self):
self.assertGreater(batches_total_length, logs_total_length)

def test_request_and_content_length(self):
context = Context("function-name", "dt-url", "dt-token", False, False)
context = Context("function-name", "dt-url", "dt-token", False, False, "log.forwarder")
how_many_logs = 10
logs = [create_log_entry_with_random_len_msg(750, 900) for x in range(how_many_logs)]

Expand Down Expand Up @@ -106,7 +106,7 @@ def test_request_and_content_length(self):
self.assertGreater(batches_total_length, logs_total_length)

def test_entries_in_batch(self):
context = Context("function-name", "dt-url", "dt-token", False, False)
context = Context("function-name", "dt-url", "dt-token", False, False, "log.forwarder")
how_many_logs = 20000
logs = [create_log_entry_with_random_len_msg() for x in range(how_many_logs)]

Expand Down Expand Up @@ -136,7 +136,7 @@ def test_entries_in_batch(self):
self.assertGreater(batches_total_length, logs_total_length)

def test_trim_fields(self):
context = Context("function-name", "dt-url", "dt-token", False, False)
context = Context("function-name", "dt-url", "dt-token", False, False, "log.forwarder")
string_with_900_chars = log_message

logs_sender.DYNATRACE_LOG_INGEST_CONTENT_MAX_LENGTH = 600
Expand Down
38 changes: 34 additions & 4 deletions tests/unit/logs/test_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
from logs.models.batch_metadata import BatchMetadata
from util.context import Context

BATCH_METADATA = BatchMetadata("444000444", "us-east-1", "aws")


def get_context(log_forwarder_setup="log.forwarder"):
return Context("function-name", "dt-url", "dt-token", False, False, log_forwarder_setup)


@patch.object(MetadataEngine, 'apply')
def test_metadata_engine_input(metadata_engine_apply_mock):
Expand All @@ -43,9 +49,8 @@ def test_metadata_engine_input(metadata_engine_apply_mock):
],
})

batch_metadata = BatchMetadata("444000444", "us-east-1", "aws")
actual_output = logs.transformation.extract_dt_logs_from_single_record(
input_entry, batch_metadata)
input_entry, BATCH_METADATA, get_context())

assert metadata_engine_apply_mock.call_count == 2

Expand Down Expand Up @@ -77,8 +82,33 @@ def test_control_message():
],
})

batch_metadata = BatchMetadata("444000444", "us-east-1", "aws")
parsed_logs = logs.transformation.extract_dt_logs_from_single_record(
control_record, batch_metadata)
control_record, BATCH_METADATA, get_context())

assert len(parsed_logs) == 0


def test_log_forwarder_setup():
# given
test_record = json.dumps({
"messageType": "DATA_MESSAGE",
"owner": "444652832050",
"logGroup": "API-Gateway-Execution-Logs",
"logStream": "2021-02-04-logstream",
"subscriptionFilters": ["b-SubscriptionFilter0-1I0DE5MAAFV5G"],
"logEvents": [
{
"id": "35958590510527767165636549608812769529777864588249006080",
"timestamp": "12345",
"message": "Test message",
}
]
})

# when
forwarder_setup = "MyLogForwarderSetup"
actual_output = logs.transformation.extract_dt_logs_from_single_record(
test_record, BATCH_METADATA, get_context(forwarder_setup))

# then
assert actual_output[0]['cloud.log_forwarder'] == forwarder_setup
3 changes: 2 additions & 1 deletion tests/unit/logs/test_transformation_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from util.context import Context

BATCH_METADATA = BatchMetadata("444000444", "us-east-1", "aws")
CONTEXT = Context("function-name", "dt-url", "dt-token", False, False, "log.forwarder")

CLOUDTRAIL_USER_IDENTITY = {
"type": "AssumedRole",
Expand Down Expand Up @@ -1001,7 +1002,7 @@ def test_full_transformation(testcase: dict):
expect_first_log_contains = testcase["expect_first_log_contains"]

logs_sent = logs.transformation.extract_dt_logs_from_single_record(
json.dumps(record_data_decoded), BATCH_METADATA)
json.dumps(record_data_decoded), BATCH_METADATA, CONTEXT)

assert len(logs_sent) == len(record_data_decoded["logEvents"])

Expand Down

0 comments on commit 0eda0a3

Please sign in to comment.