diff --git a/snuba/pipeline/stages/query_execution.py b/snuba/pipeline/stages/query_execution.py index 8c83aa6a7f..5a977f5418 100644 --- a/snuba/pipeline/stages/query_execution.py +++ b/snuba/pipeline/stages/query_execution.py @@ -74,6 +74,7 @@ def get_cluster( def _process_data( self, pipe_input: QueryPipelineData[ClickhouseQuery | CompositeQuery[Table]] ) -> QueryResult: + print("whereeeee query_execution.py") cluster = self.get_cluster(pipe_input.data, pipe_input.query_settings) if pipe_input.query_settings.get_dry_run(): return _dry_run_query_runner( @@ -147,6 +148,7 @@ def _run_and_apply_column_names( concurrent_queries_gauge, cluster_name, ) + print("query_execution/result", result) alias_name_mapping: MutableMapping[str, list[str]] = {} for select_col in clickhouse_query.get_selected_columns(): @@ -194,6 +196,7 @@ def _format_storage_query_and_run( formatted_query = format_query(clickhouse_query) formatted_sql = formatted_query.get_sql() + print("formatted_sql print", formatted_sql) query_size_bytes = len(formatted_sql.encode("utf-8")) span.set_data( "query", textwrap.wrap(formatted_sql, 100, break_long_words=False) diff --git a/snuba/pipeline/stages/query_processing.py b/snuba/pipeline/stages/query_processing.py index 7b2fd4311c..b7377780d5 100644 --- a/snuba/pipeline/stages/query_processing.py +++ b/snuba/pipeline/stages/query_processing.py @@ -29,6 +29,7 @@ class EntityProcessingStage( def _process_data( self, pipe_input: QueryPipelineData[Request] ) -> ClickhouseQuery | CompositeQuery[Table]: + print("whereeeee query_processing.py") query = pipe_input.data.query translated_storage_query = try_translate_storage_query(query) if translated_storage_query: @@ -65,6 +66,7 @@ def _apply_default_subscriptable_mapping( def _process_data( self, pipe_input: QueryPipelineData[ClickhouseQuery | CompositeQuery[Table]] ) -> ClickhouseQuery | CompositeQuery[Table]: + print("whereeeee query_processing.py/StorageProcessingStage") self._apply_default_subscriptable_mapping(pipe_input.data) if isinstance(pipe_input.data, ClickhouseQuery): query_plan = build_best_plan(pipe_input.data, pipe_input.query_settings, []) diff --git a/snuba/query/mql/parser.py b/snuba/query/mql/parser.py index 2f0065037d..54149454fd 100644 --- a/snuba/query/mql/parser.py +++ b/snuba/query/mql/parser.py @@ -1376,6 +1376,7 @@ def _process_data( tuple[str, Dataset, dict[str, Any], QuerySettings | None] ], ) -> LogicalQuery: + print("whereeeee mql/parser.py") mql_str, dataset, mql_context_dict, settings = pipe_input.data with sentry_sdk.start_span(op="parser", description="parse_mql_query_initial"): @@ -1419,6 +1420,7 @@ def _process_data( ] ], ) -> LogicalQuery: + print("whereeeee query/mql/parser.py") query, settings, custom_processing = pipe_input.data with sentry_sdk.start_span(op="processor", description="post_processors"): _post_process( diff --git a/snuba/query/snql/parser.py b/snuba/query/snql/parser.py index 832e0581d1..e9da22a956 100644 --- a/snuba/query/snql/parser.py +++ b/snuba/query/snql/parser.py @@ -1587,6 +1587,7 @@ def _process_data( ] ], ) -> LogicalQuery | CompositeQuery[LogicalDataSource]: + print("whereeeee snql/parser.py") query, dataset, custom_processing = pipe_input.data settings = pipe_input.query_settings diff --git a/snuba/web/db_query.py b/snuba/web/db_query.py index 4e38d51f53..0d1d5b1a42 100644 --- a/snuba/web/db_query.py +++ b/snuba/web/db_query.py @@ -182,6 +182,8 @@ def execute_query( with_totals=clickhouse_query.has_totals(), robust=robust, ) + print("formatted_query", formatted_query) + print("reader.execute result", result) timer.mark("execute") stats.update( @@ -674,6 +676,7 @@ def db_query( trace_id, robust, ) + print("db_query/result", result) except AllocationPolicyViolations as e: update_query_metadata_and_stats( query=clickhouse_query, diff --git a/snuba/web/query.py b/snuba/web/query.py index 939c1034c0..e3a4e7890c 100644 --- a/snuba/web/query.py +++ b/snuba/web/query.py @@ -57,6 +57,7 @@ def _run_query_pipeline( robust=robust, concurrent_queries_gauge=concurrent_queries_gauge, ).execute(clickhouse_query) + print("query.py/res", res) if res.error: raise res.error elif res.data: diff --git a/snuba/web/rpc/__init__.py b/snuba/web/rpc/__init__.py index 48acf733f6..3b51130575 100644 --- a/snuba/web/rpc/__init__.py +++ b/snuba/web/rpc/__init__.py @@ -129,7 +129,8 @@ def run_rpc_handler( name: str, version: str, data: bytes ) -> ProtobufMessage | ErrorProto: try: - endpoint = RPCEndpoint.get_from_name(name, version)() # type: ignore + endpoint = RPCEndpoint.get_from_name(name, version)() + print("endpoint", endpoint) # type: ignore except (AttributeError, InvalidConfigKeyError) as e: return convert_rpc_exception_to_proto( RPCRequestException( @@ -140,6 +141,7 @@ def run_rpc_handler( try: deserialized_protobuf = endpoint.parse_from_string(data) + print("deserialized_protobuf", deserialized_protobuf) except DecodeError as e: return convert_rpc_exception_to_proto( RPCRequestException( diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index 186f94a851..211e0f7db2 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -228,6 +228,13 @@ def attribute_key_to_expression(attr_key: AttributeKey) -> Expression: # End of special handling, just send to the appropriate bucket if attr_key.type == AttributeKey.Type.TYPE_STRING: + # return f.CAST( + # SubscriptableReference( + # alias=None, column=column("attr_str"), key=literal(attr_key.name) + # ), + # "String", + # alias=alias, + # ) return SubscriptableReference( alias=alias, column=column("attr_str"), key=literal(attr_key.name) ) diff --git a/snuba/web/rpc/v1/endpoint_trace_item_table.py b/snuba/web/rpc/v1/endpoint_trace_item_table.py index ee786d8d28..7f0ffcc886 100644 --- a/snuba/web/rpc/v1/endpoint_trace_item_table.py +++ b/snuba/web/rpc/v1/endpoint_trace_item_table.py @@ -251,6 +251,7 @@ def _execute(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse: uuid.uuid4() ) snuba_request = _build_snuba_request(in_msg) + print("snuba_request", snuba_request) res = run_query( dataset=PluggableDataset(name="eap", all_entities=[]), request=snuba_request, diff --git a/snuba/web/views.py b/snuba/web/views.py index fc4b5ebf60..2f4a134ba4 100644 --- a/snuba/web/views.py +++ b/snuba/web/views.py @@ -287,6 +287,7 @@ def rpc(*, name: str, version: str) -> Response: if isinstance(result_proto, ErrorProto): return Response(result_proto.SerializeToString(), status=result_proto.code) else: + print("views.py/result_proto", result_proto) return Response(result_proto.SerializeToString(), status=200) diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py index 75b3bca420..224c0877b0 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py @@ -84,7 +84,7 @@ def gen_message( "origin": "auto.http.django", "project_id": 1, "received": 1721319572.877828, - "retention_days": 90, + "retention_days": 91, "segment_id": "8873a98879faf06d", "sentry_tags": { "category": "http", @@ -841,6 +841,7 @@ def test_cast_bug(self, setup_teardown: Any) -> None: err_msg = ParseDict(err_req, TraceItemTableRequest()) # just ensuring it doesnt raise an exception EndpointTraceItemTable().execute(err_msg) + assert False class TestUtils: diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_rachel.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_rachel.py new file mode 100644 index 0000000000..1e6d5d8518 --- /dev/null +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_rachel.py @@ -0,0 +1,231 @@ +import json +from datetime import UTC, datetime, timedelta, timezone +from typing import Any, TypedDict, Union +from uuid import uuid4 + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from google.protobuf.timestamp_pb2 import Timestamp as ProtobufTimestamp +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( + Column, + TraceItemTableRequest, +) +from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeKey, + VirtualColumnContext, +) +from sentry_protos.snuba.v1.trace_item_filter_pb2 import TraceItemFilter + +from tests.base import BaseApiTest + + +def before_now(**kwargs: float) -> datetime: + date = datetime.now(UTC) - timedelta(**kwargs) + return date - timedelta(microseconds=date.microsecond % 1000) + + +BASE_TIME = datetime.now(timezone.utc).replace( + minute=0, second=0, microsecond=0 +) - timedelta(minutes=180) + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +class TestRachel(BaseApiTest): + def create_span( + self, + extra_data: dict[str, Any] | None = None, + start_ts: datetime | None = None, + duration: int = 1000, + measurements: dict[str, Any] | None = None, + ) -> dict[str, Any]: + """Create span json, not required for store_span, but with no params passed should just work out of the box""" + if start_ts is None: + start_ts = datetime.now() - timedelta(days=30) + if extra_data is None: + extra_data = {} + span: dict = { + "is_segment": False, + "measurements": {}, + "retention_days": 90, + "sentry_tags": {}, + "tags": {}, + } + # Load some defaults + span.update( + { + "event_id": uuid4().hex, + "organization_id": 4555051977080832, + "project_id": 4555051977211905, + "trace_id": uuid4().hex, + "span_id": uuid4().hex[:16], + "parent_span_id": uuid4().hex[:16], + "segment_id": uuid4().hex[:16], + "group_raw": uuid4().hex[:16], + "profile_id": uuid4().hex, + # Multiply by 1000 cause it needs to be ms + "start_timestamp_ms": int(start_ts.timestamp() * 1000), + "start_timestamp_precise": start_ts.timestamp(), + "end_timestamp_precise": start_ts.timestamp() + duration / 1000, + "timestamp": int(start_ts.timestamp() * 1000), + "received": start_ts.timestamp(), + "duration_ms": duration, + "exclusive_time_ms": duration, + } + ) + # Load any specific custom data + span.update(extra_data) + # coerce to string + for tag, value in dict(span["tags"]).items(): + span["tags"][tag] = str(value) + if measurements: + span["measurements"] = measurements + return span + + def store_spans(self, spans, is_eap=False): + for span in spans: + span["ingest_in_eap"] = is_eap + assert ( + self.app.post( + f"/tests/entities/{'eap_' if is_eap else ''}spans/insert", + data=json.dumps(spans), + ).status_code + == 200 + ) + + def test_rachel(self): + self.store_spans( + [ + self.create_span( + { + "description": "foo", + "sentry_tags": {"status": "success"}, + "tags": {"foo": "five"}, + }, + measurements={"foo": {"value": 5}}, + start_ts=before_now(minutes=10), + ), + ], + is_eap=True, + ) + + # response = self.do_request( + # { + # "field": ["description", "tags[foo,number]", "tags[foo,string]", "tags[foo]"], + # "query": "", + # "orderby": "description", + # "project": 4555051977080832, + # "dataset": "spans", + # } + # ) + + req = TraceItemTableRequest( + meta=RequestMeta( + organization_id=4555051977080832, + referrer="api.organization-events", + project_ids=[4555051977211905], + start_timestamp=Timestamp( + seconds=int( + datetime( + year=2024, + month=8, + day=17, + hour=19, + minute=19, + second=13, + microsecond=417691, + tzinfo=UTC, + ).timestamp() + ) + ), + end_timestamp=Timestamp( + seconds=int( + datetime( + year=2024, + month=11, + day=15, + hour=19, + minute=29, + second=13, + microsecond=417691, + tzinfo=UTC, + ).timestamp() + ) + ), + ), + columns=[ + Column( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="sentry.name"), + label="description", + ), + Column( + key=AttributeKey(type=AttributeKey.TYPE_INT, name="foo"), + label="tags[foo,number]", + ), + Column( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="foo"), + label="tags[foo,string]", + ), + Column( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="foo"), + label="tags[foo]", + ), + Column( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, name="sentry.span_id" + ), + label="id", + ), + Column( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, name="project.name" + ), + label="project.name", + ), + ], + order_by=[ + TraceItemTableRequest.OrderBy( + column=Column( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, name="sentry.name" + ), + label="description", + ) + ) + ], + group_by=[ + AttributeKey(type="TYPE_STRING", name="sentry.name"), + AttributeKey(type="TYPE_INT", name="foo"), + AttributeKey(type="TYPE_STRING", name="foo"), + AttributeKey(type="TYPE_STRING", name="foo"), + AttributeKey(type="TYPE_STRING", name="sentry.span_id"), + AttributeKey(type="TYPE_STRING", name="project.name"), + ], + virtual_column_contexts=[ + VirtualColumnContext( + from_column_name="sentry.project_id", + to_column_name="project.name", + value_map={"4555051977211905": "bar"}, + ) + ], + ) + + response = self.app.post( + "/rpc/EndpointTraceItemTable/v1", + data=req.SerializeToString(), + headers={"referer": "api.organization-events"}, + ) + # + # print("jdflksflkslf") + # print(response.data) + # #print(json.loads(response.data)) + # print(response.status_code) + # print(response.json) + + assert False + assert response.status_code == 200, response.content + datata = data["data"] + assert datata[0]["data"]["tags[foo,number]"] == 5 + assert datata[0]["tags[foo,string]"] == "five" + assert datata[0]["tags[foo]"] == "five"