From a211301a8b2017af86b306b738f41b7000c9ae78 Mon Sep 17 00:00:00 2001 From: David Tsukernik Date: Fri, 20 Dec 2024 12:26:17 -0800 Subject: [PATCH 1/3] write script to send scrubbed data into a gcs bucket --- snuba/manual_jobs/extract_span_data.py | 102 +++++++++++++++++ snuba/utils/hashes.py | 11 ++ tests/manual_jobs/test_extract_span_data.py | 119 ++++++++++++++++++++ 3 files changed, 232 insertions(+) create mode 100644 snuba/manual_jobs/extract_span_data.py create mode 100644 tests/manual_jobs/test_extract_span_data.py diff --git a/snuba/manual_jobs/extract_span_data.py b/snuba/manual_jobs/extract_span_data.py new file mode 100644 index 00000000000..badcef129c3 --- /dev/null +++ b/snuba/manual_jobs/extract_span_data.py @@ -0,0 +1,102 @@ +from datetime import datetime + +from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster +from snuba.clusters.storage_sets import StorageSetKey +from snuba.manual_jobs import Job, JobLogger, JobSpec + + +class ExtractSpanData(Job): + def __init__(self, job_spec: JobSpec) -> None: + super().__init__(job_spec) + + def _generate_spans_query(self): + # Columns that should not be hashed (numeric and date types) + numeric_columns = { + "organization_id", + "project_id", + "span_id", + "parent_span_id", + "segment_id", + "is_segment", + "_sort_timestamp", + "start_timestamp", + "end_timestamp", + "duration_micro", + "exclusive_time_micro", + "retention_days", + "sampling_factor", + "sampling_weight", + "sign", + } + + base_columns = [ + "organization_id", + "project_id", + "service", + "trace_id", + "span_id", + "parent_span_id", + "segment_id", + "segment_name", + "is_segment", + "_sort_timestamp", + "start_timestamp", + "end_timestamp", + "duration_micro", + "exclusive_time_micro", + "retention_days", + "name", + "sampling_factor", + "sampling_weight", + "sign", + ] + + map_columns = [] + for prefix in ["attr_str_", "attr_num_"]: + map_columns.extend(f"{prefix}{i}" for i in range(20)) + + all_columns = base_columns + map_columns + + scrubbed_columns = [] + for col in all_columns: + if col in numeric_columns or col.startswith("attr_num"): + scrubbed_columns.append(col) + elif col.startswith("attr_str"): + scrubbed_columns.append( + f"mapApply((k, v) -> (k, cityHash64(v)), {col}) AS {col}" + ) + else: + scrubbed_columns.append(f"cityHash64({col}) AS {col}") + + query = f""" + SELECT + {', '.join(scrubbed_columns)} + FROM {self.table_name} + WHERE _sort_timestamp BETWEEN toDateTime('{self.start_timestamp}') AND toDateTime('{self.end_timestamp}') + AND organization_id IN {self.organization_ids} + LIMIT {self.limit} + """ + + return query + + def execute(self, logger: JobLogger) -> None: + cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM) + connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY) + + current_time = datetime.now().strftime("%Y%m%d_%H%M%S") + file_name = f"scrubbed_spans_data_{current_time}.csv.gz" + + query = f""" + INSERT INTO FUNCTION gcs('https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}', + 'CSVWithNames', + '', + 'gzip' + ) + {self._generate_spans_query()} + """ + + logger.info("Executing query") + connection.execute(query=query) + logger.info( + f"Data written to GCS bucket: https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}" + ) diff --git a/snuba/utils/hashes.py b/snuba/utils/hashes.py index d9cd7ad2f85..b03e94e3395 100644 --- a/snuba/utils/hashes.py +++ b/snuba/utils/hashes.py @@ -7,3 +7,14 @@ def fnv_1a(b: bytes) -> int: res = res ^ byt res = (res * fnv_1a_32_prime) & 0xFFFFFFFF # force 32 bit return res + + +def fnv_1a_64(b: bytes) -> int: + fnv_1a_64_prime = 1099511628211 + fnv_1a_64_offset_basis = 14695981039346656037 + + res = fnv_1a_64_offset_basis + for byt in b: + res = res ^ byt + res = (res * fnv_1a_64_prime) & 0xFFFFFFFFFFFFFFFF # force 64 bit + return res diff --git a/tests/manual_jobs/test_extract_span_data.py b/tests/manual_jobs/test_extract_span_data.py new file mode 100644 index 00000000000..bd2e62c2773 --- /dev/null +++ b/tests/manual_jobs/test_extract_span_data.py @@ -0,0 +1,119 @@ +import random +import uuid +from datetime import datetime, timedelta +from typing import Any, Mapping + +import pytest + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.manual_jobs import JobSpec +from snuba.manual_jobs.runner import run_job +from tests.helpers import write_raw_unprocessed_events + + +def _gen_message( + dt: datetime, + organization_id: int, + measurements: dict[str, dict[str, float]] | None = None, + tags: dict[str, str] | None = None, +) -> Mapping[str, Any]: + measurements = measurements or {} + tags = tags or {} + return { + "description": "/api/0/relays/projectconfigs/", + "duration_ms": 152, + "event_id": "d826225de75d42d6b2f01b957d51f18f", + "exclusive_time_ms": 0.228, + "is_segment": True, + "data": { + "sentry.environment": "development", + "thread.name": "uWSGIWorker1Core0", + "thread.id": "8522009600", + "sentry.segment.name": "/api/0/relays/projectconfigs/", + "sentry.sdk.name": "sentry.python.django", + "sentry.sdk.version": "2.7.0", + "my.float.field": 101.2, + "my.int.field": 2000, + "my.neg.field": -100, + "my.neg.float.field": -101.2, + "my.true.bool.field": True, + "my.false.bool.field": False, + }, + "measurements": { + "num_of_spans": {"value": 50.0}, + "eap.measurement": {"value": random.choice([1, 100, 1000])}, + **measurements, + }, + "organization_id": organization_id, + "origin": "auto.http.django", + "project_id": 1, + "received": 1721319572.877828, + "retention_days": 90, + "segment_id": "8873a98879faf06d", + "sentry_tags": { + "category": "http", + "environment": "development", + "op": "http.server", + "platform": "python", + "sdk.name": "sentry.python.django", + "sdk.version": "2.7.0", + "status": "ok", + "status_code": "200", + "thread.id": "8522009600", + "thread.name": "uWSGIWorker1Core0", + "trace.status": "ok", + "transaction": "/api/0/relays/projectconfigs/", + "transaction.method": "POST", + "transaction.op": "http.server", + "user": "ip:127.0.0.1", + }, + "span_id": "123456781234567D", + "tags": { + "http.status_code": "200", + "relay_endpoint_version": "3", + "relay_id": "88888888-4444-4444-8444-cccccccccccc", + "relay_no_cache": "False", + "relay_protocol_version": "3", + "relay_use_post_or_schedule": "True", + "relay_use_post_or_schedule_rejected": "version", + "spans_over_limit": "False", + "server_name": "blah", + "color": random.choice(["red", "green", "blue"]), + "location": random.choice(["mobile", "frontend", "backend"]), + **tags, + }, + "trace_id": uuid.uuid4().hex, + "start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)), + "start_timestamp_precise": dt.timestamp(), + "end_timestamp_precise": dt.timestamp() + 1, + } + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +@pytest.mark.skip(reason="can't test writing to GCS") +def test_extract_span_data() -> None: + BASE_TIME = datetime.utcnow().replace( + minute=0, second=0, microsecond=0 + ) - timedelta(minutes=180) + organization_ids = [0, 1] + spans_storage = get_storage(StorageKey("eap_spans")) + messages = [ + _gen_message(BASE_TIME - timedelta(minutes=i), organization_id) + for organization_id in organization_ids + for i in range(20) + ] + + write_raw_unprocessed_events(spans_storage, messages) # type: ignore + + run_job( + JobSpec( + "jobid", + "ExtractSpanData", + False, + { + "organization_ids": [0, 1], + }, + ) + ) From 2b5e68b399a2202ae51582223a1ba0dbb66f2afd Mon Sep 17 00:00:00 2001 From: David Tsukernik Date: Mon, 6 Jan 2025 07:31:14 -0800 Subject: [PATCH 2/3] Update script --- snuba/manual_jobs/extract_span_data.py | 24 +++++++----------- snuba/utils/hashes.py | 11 -------- tests/manual_jobs/test_extract_span_data.py | 28 ++++++++++++++------- 3 files changed, 28 insertions(+), 35 deletions(-) diff --git a/snuba/manual_jobs/extract_span_data.py b/snuba/manual_jobs/extract_span_data.py index badcef129c3..23f568528ec 100644 --- a/snuba/manual_jobs/extract_span_data.py +++ b/snuba/manual_jobs/extract_span_data.py @@ -1,5 +1,3 @@ -from datetime import datetime - from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster from snuba.clusters.storage_sets import StorageSetKey from snuba.manual_jobs import Job, JobLogger, JobSpec @@ -10,11 +8,10 @@ def __init__(self, job_spec: JobSpec) -> None: super().__init__(job_spec) def _generate_spans_query(self): - # Columns that should not be hashed (numeric and date types) - numeric_columns = { - "organization_id", - "project_id", + # Columns that should not be scrubbed + unscrubbed_columns = { "span_id", + "trace_id", "parent_span_id", "segment_id", "is_segment", @@ -59,14 +56,14 @@ def _generate_spans_query(self): scrubbed_columns = [] for col in all_columns: - if col in numeric_columns or col.startswith("attr_num"): + if col in unscrubbed_columns or col.startswith("attr_num"): scrubbed_columns.append(col) elif col.startswith("attr_str"): scrubbed_columns.append( - f"mapApply((k, v) -> (k, cityHash64(v)), {col}) AS {col}" + f"mapApply((k, v) -> (k, cityHash64(v)), {col}) AS {col}_scrubbed" ) else: - scrubbed_columns.append(f"cityHash64({col}) AS {col}") + scrubbed_columns.append(f"cityHash64({col}) AS {col}_scrubbed") query = f""" SELECT @@ -83,13 +80,10 @@ def execute(self, logger: JobLogger) -> None: cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM) connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY) - current_time = datetime.now().strftime("%Y%m%d_%H%M%S") - file_name = f"scrubbed_spans_data_{current_time}.csv.gz" - query = f""" - INSERT INTO FUNCTION gcs('https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}', + INSERT INTO FUNCTION gcs('{self.gcp_bucket_name}/{self.output_file_path}', 'CSVWithNames', - '', + 'auto', 'gzip' ) {self._generate_spans_query()} @@ -98,5 +92,5 @@ def execute(self, logger: JobLogger) -> None: logger.info("Executing query") connection.execute(query=query) logger.info( - f"Data written to GCS bucket: https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}" + f"Data written to GCS bucket: {self.gcp_bucket_name}/{self.output_file_path}" ) diff --git a/snuba/utils/hashes.py b/snuba/utils/hashes.py index b03e94e3395..d9cd7ad2f85 100644 --- a/snuba/utils/hashes.py +++ b/snuba/utils/hashes.py @@ -7,14 +7,3 @@ def fnv_1a(b: bytes) -> int: res = res ^ byt res = (res * fnv_1a_32_prime) & 0xFFFFFFFF # force 32 bit return res - - -def fnv_1a_64(b: bytes) -> int: - fnv_1a_64_prime = 1099511628211 - fnv_1a_64_offset_basis = 14695981039346656037 - - res = fnv_1a_64_offset_basis - for byt in b: - res = res ^ byt - res = (res * fnv_1a_64_prime) & 0xFFFFFFFFFFFFFFFF # force 64 bit - return res diff --git a/tests/manual_jobs/test_extract_span_data.py b/tests/manual_jobs/test_extract_span_data.py index bd2e62c2773..121d3efa248 100644 --- a/tests/manual_jobs/test_extract_span_data.py +++ b/tests/manual_jobs/test_extract_span_data.py @@ -8,6 +8,7 @@ from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey from snuba.manual_jobs import JobSpec +from snuba.manual_jobs.job_status import JobStatus from snuba.manual_jobs.runner import run_job from tests.helpers import write_raw_unprocessed_events @@ -92,7 +93,7 @@ def _gen_message( @pytest.mark.clickhouse_db @pytest.mark.redis_db -@pytest.mark.skip(reason="can't test writing to GCS") +# @pytest.mark.skip(reason="can't test writing to GCS") def test_extract_span_data() -> None: BASE_TIME = datetime.utcnow().replace( minute=0, second=0, microsecond=0 @@ -107,13 +108,22 @@ def test_extract_span_data() -> None: write_raw_unprocessed_events(spans_storage, messages) # type: ignore - run_job( - JobSpec( - "jobid", - "ExtractSpanData", - False, - { - "organization_ids": [0, 1], - }, + assert ( + run_job( + JobSpec( + "jobid", + "ExtractSpanData", + False, + { + "organization_ids": [0, 1], + "start_timestamp": (BASE_TIME - timedelta(minutes=30)).isoformat(), + "end_timestamp": (BASE_TIME + timedelta(hours=24)).isoformat(), + "table_name": "snuba_test.eap_spans_2_local", + "limit": 1000000, + "output_file_path": "scrubbed_spans_data.csv.gz", + "gcp_bucket_name": "test-bucket", + }, + ) ) + == JobStatus.FINISHED ) From 9e8e76a9f70e463fe9b9e4de5ba994e74c7983d4 Mon Sep 17 00:00:00 2001 From: David Tsukernik Date: Mon, 6 Jan 2025 08:35:32 -0800 Subject: [PATCH 3/3] fix mypy issues --- snuba/manual_jobs/extract_span_data.py | 41 +++++++++++++++++---- tests/manual_jobs/test_extract_span_data.py | 2 +- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/snuba/manual_jobs/extract_span_data.py b/snuba/manual_jobs/extract_span_data.py index 23f568528ec..0644aab08f9 100644 --- a/snuba/manual_jobs/extract_span_data.py +++ b/snuba/manual_jobs/extract_span_data.py @@ -1,3 +1,5 @@ +from typing import Any, Mapping, Optional + from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster from snuba.clusters.storage_sets import StorageSetKey from snuba.manual_jobs import Job, JobLogger, JobSpec @@ -5,9 +7,32 @@ class ExtractSpanData(Job): def __init__(self, job_spec: JobSpec) -> None: + self.__validate_job_params(job_spec.params) super().__init__(job_spec) - def _generate_spans_query(self): + def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None: + assert params + required_params = [ + "organization_ids", + "start_timestamp", + "end_timestamp", + "table_name", + "limit", + "gcp_bucket_name", + "output_file_path", + ] + for param in required_params: + assert param in params + + self._organization_ids = params["organization_ids"] + self._start_timestamp = params["start_timestamp"] + self._end_timestamp = params["end_timestamp"] + self._table_name = params["table_name"] + self._limit = params["limit"] + self._gcp_bucket_name = params["gcp_bucket_name"] + self._output_file_path = params["output_file_path"] + + def _generate_spans_query(self) -> str: # Columns that should not be scrubbed unscrubbed_columns = { "span_id", @@ -48,7 +73,7 @@ def _generate_spans_query(self): "sign", ] - map_columns = [] + map_columns: list[str] = [] for prefix in ["attr_str_", "attr_num_"]: map_columns.extend(f"{prefix}{i}" for i in range(20)) @@ -68,10 +93,10 @@ def _generate_spans_query(self): query = f""" SELECT {', '.join(scrubbed_columns)} - FROM {self.table_name} - WHERE _sort_timestamp BETWEEN toDateTime('{self.start_timestamp}') AND toDateTime('{self.end_timestamp}') - AND organization_id IN {self.organization_ids} - LIMIT {self.limit} + FROM {self._table_name} + WHERE _sort_timestamp BETWEEN toDateTime('{self._start_timestamp}') AND toDateTime('{self._end_timestamp}') + AND organization_id IN {self._organization_ids} + LIMIT {self._limit} """ return query @@ -81,7 +106,7 @@ def execute(self, logger: JobLogger) -> None: connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY) query = f""" - INSERT INTO FUNCTION gcs('{self.gcp_bucket_name}/{self.output_file_path}', + INSERT INTO FUNCTION gcs('{self._gcp_bucket_name}/{self._output_file_path}', 'CSVWithNames', 'auto', 'gzip' @@ -92,5 +117,5 @@ def execute(self, logger: JobLogger) -> None: logger.info("Executing query") connection.execute(query=query) logger.info( - f"Data written to GCS bucket: {self.gcp_bucket_name}/{self.output_file_path}" + f"Data written to GCS bucket: {self._gcp_bucket_name}/{self._output_file_path}" ) diff --git a/tests/manual_jobs/test_extract_span_data.py b/tests/manual_jobs/test_extract_span_data.py index 121d3efa248..3ea9e2f7cf0 100644 --- a/tests/manual_jobs/test_extract_span_data.py +++ b/tests/manual_jobs/test_extract_span_data.py @@ -93,7 +93,7 @@ def _gen_message( @pytest.mark.clickhouse_db @pytest.mark.redis_db -# @pytest.mark.skip(reason="can't test writing to GCS") +@pytest.mark.skip(reason="can't test writing to GCS") def test_extract_span_data() -> None: BASE_TIME = datetime.utcnow().replace( minute=0, second=0, microsecond=0