From eb4fe5efb0443e97548cf531063d559a7c8c937f Mon Sep 17 00:00:00 2001 From: Shriyansh Agnihotri Date: Mon, 18 Nov 2024 21:59:52 +0530 Subject: [PATCH] Removing user host name and other path related data captured during telemetry for security and PII protection purpose. As its not required. --- testzeus_hercules/telemetry.py | 68 ++++++++++++++++++++++++++++------ 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/testzeus_hercules/telemetry.py b/testzeus_hercules/telemetry.py index 1d9df3a..cdbd1fa 100644 --- a/testzeus_hercules/telemetry.py +++ b/testzeus_hercules/telemetry.py @@ -8,6 +8,8 @@ from typing import Any, Dict, Optional import sentry_sdk +from sentry_sdk.types import Event, Hint +from sentry_sdk.scrubber import EventScrubber, DEFAULT_DENYLIST, DEFAULT_PII_DENYLIST from pydantic import BaseModel DSN = "https://d14d2ee82f26a3585b2a892fab7fffaa@o4508256143540224.ingest.us.sentry.io/4508256153042944" @@ -15,9 +17,9 @@ # Telemetry flag, default is enabled (1) unless set to "0" in the environment variable ENABLE_TELEMETRY = os.getenv("ENABLE_TELEMETRY", "1") == "1" -# Initialize Sentry only if telemetry is enabled -if ENABLE_TELEMETRY: - sentry_sdk.init(dsn=DSN, max_breadcrumbs=0, send_default_pii=True) +# custom denylist +denylist = DEFAULT_DENYLIST + ["sys.argv", "argv", "server_name"] +pii_denylist = DEFAULT_PII_DENYLIST + ["sys.argv", "argv", "server_name"] class EventType(Enum): @@ -33,10 +35,51 @@ class EventType(Enum): class EventData(BaseModel): detail: Optional[str] = None - additional_data: Optional[Dict[str, Any]] = None # For any extra details specific to certain events + additional_data: Optional[Dict[str, Any]] = ( + None # For any extra details specific to certain events + ) + + +def my_before_send(event: Event, hint: Hint) -> Event | None: + # Filter out all ZeroDivisionError events. + # Note that the exception type is available in the hint, + # but we should handle the case where the exception info + # is missing. + if hint.get("exc_info", [None])[0] == ZeroDivisionError: + return None + + # We can set extra data on the event's "extra" field. + event["extra"]["session_summary"] = build_final_message() + if "contexts" in event: + contexts = event["contexts"] + # Check if sys.argv exists and redact secrets + if "argv" in contexts: + contexts.pop("argv") + # We have modified the event as desired, so return the event. + # The SDK will then send the returned event to Sentry. + return event + + +# Initialize Sentry only if telemetry is enabled +if ENABLE_TELEMETRY: -def get_installation_id(file_path="installation_id.txt"): + sentry_sdk.init( + dsn=DSN, + before_send=my_before_send, + max_breadcrumbs=0, + send_default_pii=False, + send_client_reports=False, + server_name=None, + event_scrubber=EventScrubber( + denylist=denylist, pii_denylist=pii_denylist, recursive=True + ), + ) + sentry_sdk.set_extra("sys.argv", None) + sentry_sdk.set_user(None) + + +def get_installation_id(file_path="installation_id.txt") -> str: """Generate or load a unique installation ID.""" if os.path.exists(file_path): with open(file_path, "r") as file: @@ -59,7 +102,7 @@ def get_installation_id(file_path="installation_id.txt"): } -def add_event(event_type: EventType, event_data: EventData): +def add_event(event_type: EventType, event_data: EventData) -> None: """ Adds an event to the event collector in the appropriate event_type bucket, only if telemetry is enabled. @@ -80,7 +123,7 @@ def add_event(event_type: EventType, event_data: EventData): event_collector["buckets"][event_type.value]["event_count"] += 1 -async def send_message_to_sentry(): +async def send_message_to_sentry() -> None: """ Sends the final message to Sentry asynchronously, only if telemetry is enabled. """ @@ -89,25 +132,28 @@ async def send_message_to_sentry(): try: message = build_final_message() with sentry_sdk.push_scope() as scope: - scope.set_context("session_summary", message) + scope.set_extra("session_summary", message) sentry_sdk.capture_message("Program execution summary") except Exception as e: print(f"Error sending message to Sentry: {e}") -def build_final_message(): +def build_final_message() -> Dict[str, Any]: """ Builds the final message from collected events, organized by event_type buckets. """ message = { "installation_id": event_collector["installation_id"], "session_start": event_collector["start_time"], - "buckets": {event_type_s: events for event_type_s, events in event_collector["buckets"].items()}, + "buckets": { + event_type_s: events + for event_type_s, events in event_collector["buckets"].items() + }, } return message -def register_shutdown(): +def register_shutdown() -> None: """ Register a shutdown handler to run send_message_to_sentry on exit, only if telemetry is enabled.