Skip to content
This repository has been archived by the owner on Jan 15, 2025. It is now read-only.

Commit

Permalink
Merge pull request #34 from dynatrace-oss/APM-313824-exception-thrott…
Browse files Browse the repository at this point in the history
…ling

 APM-313824 add exception throttling
  • Loading branch information
equardo authored Aug 4, 2021
2 parents 8f1624f + 649805a commit 3d9ae63
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 35 deletions.
9 changes: 6 additions & 3 deletions src/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,20 @@ def handler(event, lambda_context):
result = TransformationResult.Ok

except CallThrottlingException:
log_multiline_message("Call Throttling Exception, Kinesis batch will be marked as OK, but some data is dropped")
log_multiline_message("Call Throttling Exception, Kinesis batch will be marked as OK, but some data is dropped",
"call-throttling-exception")
result = TransformationResult.Ok

except Exception as e:
log_error_with_stacktrace(e, "Exception caught in top-level handler")
log_error_with_stacktrace(e, "Exception caught in top-level handler",
"top-level-handler-exception")
result = TransformationResult.ProcessingFailed

try:
context.sfm.push_sfm_to_cloudwatch()
except Exception as e:
log_error_with_stacktrace(e, "SelfMonitoring push to Cloudwatch failed")
log_error_with_stacktrace(e, "SelfMonitoring push to Cloudwatch failed",
"sfm-push-exception")

return kinesis_data_transformation_response(records, result)

Expand Down
6 changes: 4 additions & 2 deletions src/logs/input_records_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def check_records_list_if_logs_end_decode(records, context: Context) -> Tuple[bo
print("Fully decoded logs payloads (base64 decode + ungzip)")
return True, records_data_plaintext
except Exception as e:
log_error_with_stacktrace(e, "Ungzip failed")
log_error_with_stacktrace(e, "Ungzip failed",
"ungzip-failed-exception")
return False, []

return False, []
Expand Down Expand Up @@ -80,4 +81,5 @@ def sfm_report_kinesis_records_age(records, context):
context.sfm.kinesis_record_age(age_sec)

except Exception as e:
log_error_with_stacktrace(e, "Failed to calculate Kinesis Record Delay Self Monitoring")
log_error_with_stacktrace(e, "Failed to calculate Kinesis Record Delay Self Monitoring",
"sfm-record-delay-calc-exception")
3 changes: 2 additions & 1 deletion src/logs/logs_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def prepare_batches(logs: List[Dict], context: Context) -> List[Batch]:
if next_entry_serialized_len > DYNATRACE_LOG_INGEST_REQUEST_MAX_SIZE:
# shouldn't happen as we are already truncating the content field, but just for safety
logging.log_multiline_message(f"Dropping entry, as it's size is {next_entry_serialized_len}, "
f"bigger than max entry size: {DYNATRACE_LOG_INGEST_REQUEST_MAX_SIZE}")
f"bigger than max entry size: {DYNATRACE_LOG_INGEST_REQUEST_MAX_SIZE}",
"entry-bigger-than-max-size")

batch_length_if_added_entry = new_batch_len + 1 + next_entry_serialized_len # +1 is for comma
batch_entries_if_added_entry = len(logs_for_next_batch) + 1
Expand Down
4 changes: 2 additions & 2 deletions src/logs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import statistics
import time
from typing import List, Dict
Expand All @@ -31,7 +30,8 @@ def process_log_request(decoded_records: List[str], context: Context, batch_meta

print(f"Extracted {len(all_logs)} log entries from {len(decoded_records)} records given")

debug_log_multiline_message("Log entries to be sent to DT: " + str(all_logs), context)
debug_log_multiline_message("Log entries to be sent to DT: " + str(all_logs), context,
"logs-send-details")

sfm_report_logs_age(all_logs, context)

Expand Down
37 changes: 24 additions & 13 deletions src/logs/metadata_engine/metadata_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ def __init__(self, source: str, condition: str):
self._source_value_extractor = _SOURCE_VALUE_EXTRACTOR_MAP.get(source.casefold(), None)

if not self._source_value_extractor:
logging.warning(f"Unsupported source type: '{source}'")
logging.warning(f"Unsupported source type: '{source}'",
"metadata-unsupported-source-type")
self.valid = False
if not self._evaluator or not self._operand:
logging.warning(f"Failed to parse condition macro for expression: '{condition}'")
logging.warning(f"Failed to parse condition macro for expression: '{condition}'",
"metadata-condition-parsing-failure")
self.valid = False

def match(self, record: Dict, parsed_record: Dict) -> bool:
Expand Down Expand Up @@ -118,7 +120,8 @@ def _load_configs(self):
else:
self.rules.extend(_create_config_rules(config_json))
except Exception as ex:
logging.exception(f"Failed to load configuration file: '{config_file_path}'")
logging.exception(f"Failed to load configuration file: '{config_file_path}'",
"config-load-exception")

def apply(self, record: Dict, parsed_record: Dict):
try:
Expand All @@ -130,7 +133,8 @@ def apply(self, record: Dict, parsed_record: Dict):
if self.default_rule:
_apply_rule(self.default_rule, record, parsed_record)
except Exception as ex:
logging.exception(f"Encountered exception when running Rule Engine. ")
logging.exception(f"Encountered exception when running Rule Engine. ",
"rule-engine-exception")


def _check_if_rule_applies(rule: ConfigRule, record: Dict, parsed_record: Dict):
Expand All @@ -145,7 +149,8 @@ def _apply_rule(rule, record, parsed_record):
try:
record["log_content"] = json.loads(parsed_record.get("content", {}))
except Exception as ex:
logging.log_error_with_stacktrace(ex, f"Encountered exception when parsing log content as json, requested by rule for {rule.entity_type_name}")
logging.log_error_with_stacktrace(ex, f"Encountered exception when parsing log content as json, requested by rule for {rule.entity_type_name}",
"json-rule-parsing-exception")
else:
record["log_content"] = parsed_record.get("content", "")

Expand All @@ -159,7 +164,8 @@ def _apply_rule(rule, record, parsed_record):
if attribute.priority is not None:
record[attribute.key] = value
except Exception as ex:
logging.log_error_without_stacktrace(f"Encountered exception when evaluating attribute {attribute} of rule for {rule.entity_type_name}")
logging.log_error_without_stacktrace(f"Encountered exception when evaluating attribute {attribute} of rule for {rule.entity_type_name}",
"rule-attribute-evaluation-exception")

record.pop("log_content", {})

Expand All @@ -178,7 +184,8 @@ def parse_aws_loggroup_with_grok_pattern(loggroup, pattern) -> dict:
extracted_values = grok.match(loggroup)

if not extracted_values:
logging.warning(f"Failed to match logGroup '{loggroup}' against the pattern '{pattern}'")
logging.warning(f"Failed to match logGroup '{loggroup}' against the pattern '{pattern}'",
"loggroup-pattern-matching-failure")
return {}

return extracted_values
Expand All @@ -198,7 +205,8 @@ def _create_sources(sources_json: List[Dict]) -> List[SourceMatcher]:
if source_matcher and source_matcher.valid:
result.append(source_matcher)
else:
logging.warning(f"Encountered invalid rule source, parameters were: source= {source}, condition = {condition}")
logging.warning(f"Encountered invalid rule source, parameters were: source= {source}, condition = {condition}",
"metadata-invalid-rule-source")
return []

return result
Expand All @@ -215,7 +223,8 @@ def _create_attributes(attributes_json: List[Dict]) -> List[Attribute]:
if key and pattern:
result.append(Attribute(key, priority, pattern))
else:
logging.warning(f"Encountered invalid rule attribute with missing parameter, parameters were: key = {key}, pattern = {pattern}")
logging.warning(f"Encountered invalid rule attribute with missing parameter, parameters were: key = {key}, pattern = {pattern}",
"metadata-attribute-missing-parameter")

# attributes without priority are executed last
result.sort(key= lambda attribute: attribute.priority if attribute.priority is not None else inf)
Expand All @@ -225,19 +234,21 @@ def _create_attributes(attributes_json: List[Dict]) -> List[Attribute]:
def _create_config_rule(entity_name: str, rule_json: Dict) -> Optional[ConfigRule]:
sources_json = rule_json.get("sources", [])
if entity_name != "default" and not sources_json:
logging.warning(f"Encountered invalid rule with missing sources for config entry named {entity_name}")
logging.warning(f"Encountered invalid rule with missing sources for config entry named {entity_name}",
"metadata-rule-missing-sources")
return None
sources = _create_sources(sources_json)
if entity_name != "default" and not sources:
logging.warning(f"Encountered invalid rule with invalid sources for config entry named {entity_name}: {sources_json}")
logging.warning(f"Encountered invalid rule with invalid sources for config entry named {entity_name}: {sources_json}",
"metadata-rule-invalid-sources")
return None
attributes = _create_attributes(rule_json.get("attributes", []))

aws_loggroup_pattern = rule_json.get("aws", {}).get("logGroup", None)
log_content_parse_type = rule_json.get("aws", {}).get("logContentParseAs", None)

return ConfigRule(entity_type_name=entity_name, source_matchers=sources, attributes=attributes,
aws_loggroup_pattern=aws_loggroup_pattern, log_content_parse_type = log_content_parse_type)
aws_loggroup_pattern=aws_loggroup_pattern, log_content_parse_type=log_content_parse_type)


def _create_config_rules(config_json: Dict) -> List[ConfigRule]:
Expand All @@ -247,4 +258,4 @@ def _create_config_rules(config_json: Dict) -> List[ConfigRule]:


def _is_json_file(file: str) -> bool:
return file.endswith(".json")
return file.endswith(".json")
4 changes: 2 additions & 2 deletions src/util/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import ssl
import time
import urllib.error
Expand Down Expand Up @@ -61,5 +60,6 @@ def perform_http_request_for_json(url, encoded_body_bytes, method, headers, veri

context.sfm.request_finished_with_status_code(status, duration_ms)

log_multiline_message(f"Response: call duration {duration_ms}ms, status code {status}, body '{body}'")
log_multiline_message(f"Response: call duration {duration_ms}ms, status code {status}, body '{body}'",
"http-response-details")
return status, body
49 changes: 37 additions & 12 deletions src/util/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,55 @@

from util.context import Context

LOG_THROTTLING_LIMIT_PER_CALLER = 10

log_call_count = dict()


def log_multiline_message(message: Text, caller: Text):
# display logs called from one spot no more than the specified amount of times
if check_if_caller_exceeded_limit(caller):
return

def log_multiline_message(message: Text):
# need to modify endline char to have multiline log record not split into multiple log entries in CloudWatch:
message = message.replace('\n', ' ')
print(message)


def debug_log_multiline_message(message: Text, context: Context):
def check_if_caller_exceeded_limit(caller):
log_calls_performed = log_call_count.get(caller, 0)
log_calls_left = LOG_THROTTLING_LIMIT_PER_CALLER - log_calls_performed

if log_calls_left == 0:
log_call_count[caller] = log_calls_performed + 1
print(f"Logging calls from caller '{caller}' exceeded the throttling limit of",
f"{LOG_THROTTLING_LIMIT_PER_CALLER}. Further logs from this caller will be discarded")

caller_exceeded_limit = log_calls_left <= 0
if not caller_exceeded_limit:
log_call_count[caller] = log_calls_performed + 1

return caller_exceeded_limit


def debug_log_multiline_message(message: Text, context: Context, caller: Text):
if context.debug:
log_multiline_message("DEBUG: " + message)
log_multiline_message("DEBUG: " + message, caller)


def log_error_with_stacktrace(e: Exception, msg):
log_multiline_message(f"Exception '{e}' occurred. Additional message: '{msg}'")
log_multiline_message(traceback.format_exc())
def log_error_with_stacktrace(e: Exception, msg, caller: Text):
log_multiline_message(f"Exception '{e}' occurred. Additional message: '{msg}' " + traceback.format_exc(), caller)

def log_error_without_stacktrace(msg):
log_multiline_message(msg + "Exception details: " + traceback.format_exc(limit=0))

def log_error_without_stacktrace(msg, caller: Text):
log_multiline_message(msg + "Exception details: " + traceback.format_exc(limit=0), caller)


# wrappers for metadata_engine (there is different logging approach there):

def exception(msg):
log_multiline_message(msg + "Exception details: " + traceback.format_exc())
def exception(msg, caller):
log_multiline_message(msg + "Exception details: " + traceback.format_exc(), caller)


def warning(msg):
log_multiline_message("WARNING: " + msg)
def warning(msg, caller):
log_multiline_message("WARNING: " + msg, caller)

0 comments on commit 3d9ae63

Please sign in to comment.