diff --git a/snuba/clickhouse/translators/snuba/mappers.py b/snuba/clickhouse/translators/snuba/mappers.py index 452d140ef0..a2f8704812 100644 --- a/snuba/clickhouse/translators/snuba/mappers.py +++ b/snuba/clickhouse/translators/snuba/mappers.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Optional, Tuple +from typing import Mapping, Optional, Tuple from snuba.clickhouse.translators.snuba import SnubaClickhouseStrictTranslator from snuba.clickhouse.translators.snuba.allowed import ( @@ -9,7 +9,7 @@ SubscriptableReferenceMapper, ValidColumnMappings, ) -from snuba.query.dsl import arrayElement +from snuba.query.dsl import arrayElement, column, literal from snuba.query.expressions import Column as ColumnExpr from snuba.query.expressions import CurriedFunctionCall, Expression from snuba.query.expressions import FunctionCall as FunctionCallExpr @@ -239,6 +239,12 @@ class SubscriptableHashBucketMapper(SubscriptableReferenceMapper): from_column_name: str to_col_table: Optional[str] to_col_name: str + # if specified, casts the result to the specified type. + data_type: Optional[str] = None + # if you add {'sentry.span_id': 'span_id'} here, then if the user requests attr_blah[sentry.span_id], + # this mapper will return a reference to the actual column instead of attr_str. + # if specified, data_type must also be specified. + normalized_columns: Optional[Mapping[str, str]] = None def attempt_map( self, @@ -256,12 +262,42 @@ def attempt_map( if not isinstance(key.value, str): return None + if ( + self.normalized_columns + and key.value in self.normalized_columns + and self.data_type + ): + return FunctionCallExpr( + expression.alias, + "CAST", + ( + column(self.normalized_columns[key.value]), + literal(self.data_type), + ), + ) + bucket_idx = fnv_1a(key.value.encode("utf-8")) % ATTRIBUTE_BUCKETS - return arrayElement( - expression.alias, - ColumnExpr(None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}"), - key, - ) + if self.data_type: + return FunctionCallExpr( + expression.alias, + "CAST", + ( + arrayElement( + None, + ColumnExpr( + None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}" + ), + key, + ), + literal(self.data_type), + ), + ) + else: + return arrayElement( + expression.alias, + ColumnExpr(None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}"), + key, + ) @dataclass(frozen=True) diff --git a/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml index 3a3e3e5975..5d01842a91 100644 --- a/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans.yaml @@ -90,9 +90,9 @@ query_processors: - quantileTDigestWeighted - processor: HashBucketFunctionTransformer args: - hash_bucket_names: - - attr_str - - attr_num + hash_bucket_name_mapping: + attr_str: attr_str + attr_num: attr_num validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema validators: diff --git a/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans_rpc.yaml b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans_rpc.yaml new file mode 100644 index 0000000000..32a00c77db --- /dev/null +++ b/snuba/datasets/configuration/events_analytics_platform/entities/eap_spans_rpc.yaml @@ -0,0 +1,124 @@ +version: v1 +kind: entity +name: eap_spans_rpc + +schema: + [ + { name: service, type: String }, + { name: trace_id, type: UUID }, + { name: span_id, type: UInt, args: { size: 64 } }, + { name: parent_span_id, type: UInt, args: { size: 64 } }, + { name: segment_id, type: UInt, args: { size: 64 } }, + { name: segment_name, type: String }, + { name: is_segment, type: UInt, args: { size: 8 } }, + { name: start_timestamp, type: DateTime64, args: { precision: 6 } }, + { name: end_timestamp, type: DateTime64, args: { precision: 6 } }, + { name: duration_ms, type: Float, args: { size: 64 } }, + { name: exclusive_time_ms, type: Float, args: { size: 64 } }, + { name: name, type: String }, + + # these are the required columns for an 'RPC entity' that can be used by EAP RPCs + { name: organization_id, type: UInt, args: { size: 64 } }, + { name: project_id, type: UInt, args: { size: 64 } }, + { name: time, type: DateTime }, # used by TimeSeriesProcessor + { name: timestamp, type: DateTime }, # mapped to _sort_timestamp + { name: retention_days, type: UInt, args: { size: 16 } }, + { name: sampling_factor, type: Float, args: { size: 64 } }, + { name: sampling_weight, type: UInt, args: { size: 64 } }, + { name: sign, type: Int, args: { size: 8 } }, + { name: attr_str, type: Map, args: { key: { type: String }, value: { type: String } } }, + { name: attr_f64, type: Map, args: { key: { type: String }, value: { type: Float, args: { size: 64 } } } }, + { name: attr_i64, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } }, + ] + +storages: + - storage: eap_spans + is_writable: true + translation_mappers: + columns: + - mapper: ColumnToColumn + args: + from_table_name: null + from_col_name: timestamp + to_table_name: null + to_col_name: _sort_timestamp + + subscriptables: + - mapper: SubscriptableHashBucketMapper + args: + from_column_table: null + from_column_name: attr_str + to_col_table: null + to_col_name: attr_str + data_type: String + normalized_columns: + sentry.name: name + sentry.service: service + sentry.span_id: span_id + sentry.parent_span_id: parent_span_id + sentry.segment_id: segment_id + sentry.segment_name: segment_name + sentry.start_timestamp: start_timestamp + sentry.end_timestamp: end_timestamp + sentry.timestamp: _sort_timestamp + - mapper: SubscriptableHashBucketMapper + args: + from_column_table: null + from_column_name: attr_f64 + to_col_table: null + to_col_name: attr_num + data_type: Float64 + normalized_columns: + sentry.exclusive_time_micro: exclusive_time_micro + sentry.duration_micro: duration_micro + - mapper: SubscriptableHashBucketMapper + args: + from_column_table: null + from_column_name: attr_i64 + to_col_table: null + to_col_name: attr_num + data_type: Int64 + normalized_columns: + sentry.organization_id: organization_id + sentry.project_id: project_id + +storage_selector: + selector: DefaultQueryStorageSelector + +query_processors: + - processor: TimeSeriesProcessor + args: + time_group_columns: + time: timestamp + time_parse_columns: + - start_timestamp + - end_timestamp + - processor: HashBucketFunctionTransformer + args: + hash_bucket_name_mapping: + attr_str: attr_str + attr_f64: attr_num + attr_i64: attr_num + - processor: OptionalAttributeAggregationTransformer + args: + attribute_column_names: + - attr_num + aggregation_names: + - sum + - count + - avg + - avgWeighted + - max + - min + - uniq + curried_aggregation_names: + - quantile + - quantileTDigestWeighted + +validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema +validators: + - validator: EntityRequiredColumnValidator + args: + required_filter_columns: [organization_id] + +required_time_column: timestamp diff --git a/snuba/query/processors/logical/hash_bucket_functions.py b/snuba/query/processors/logical/hash_bucket_functions.py index 2527b66fe5..5e7ff1717d 100644 --- a/snuba/query/processors/logical/hash_bucket_functions.py +++ b/snuba/query/processors/logical/hash_bucket_functions.py @@ -1,4 +1,4 @@ -from typing import Sequence +from typing import Mapping from snuba.query.expressions import Column, Expression, FunctionCall, Literal from snuba.query.logical import Query @@ -22,11 +22,9 @@ class HashBucketFunctionTransformer(LogicalQueryProcessor): It converts mapExists(attr_str, 'blah') to mapExists(attr_str_{hash('blah')%20}, 'blah') """ - def __init__( - self, - hash_bucket_names: Sequence[str], - ): - self.hash_bucket_names = hash_bucket_names + def __init__(self, hash_bucket_name_mapping: Mapping[str, str]): + super().__init__() + self.hash_bucket_name_mapping = hash_bucket_name_mapping def process_query(self, query: Query, query_settings: QuerySettings) -> None: def transform_map_keys_and_values_expression(exp: Expression) -> Expression: @@ -40,7 +38,7 @@ def transform_map_keys_and_values_expression(exp: Expression) -> Expression: if not isinstance(param, Column): return exp - if param.column_name not in self.hash_bucket_names: + if param.column_name not in self.hash_bucket_name_mapping: return exp if exp.function_name not in ("mapKeys", "mapValues"): @@ -56,7 +54,7 @@ def transform_map_keys_and_values_expression(exp: Expression) -> Expression: parameters=( Column( None, - column_name=f"{param.column_name}_{i}", + column_name=f"{self.hash_bucket_name_mapping[param.column_name]}_{i}", table_name=param.table_name, ), ), @@ -76,7 +74,7 @@ def transform_map_contains_expression(exp: Expression) -> Expression: if not isinstance(column, Column): return exp - if column.column_name not in self.hash_bucket_names: + if column.column_name not in self.hash_bucket_name_mapping: return exp if exp.function_name != "mapContains": @@ -91,7 +89,11 @@ def transform_map_contains_expression(exp: Expression) -> Expression: alias=exp.alias, function_name=exp.function_name, parameters=( - Column(None, None, f"{column.column_name}_{bucket_idx}"), + Column( + None, + None, + f"{self.hash_bucket_name_mapping[column.column_name]}_{bucket_idx}", + ), key, ), ) diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index 923f0c9aa5..de08b62ccd 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -1,7 +1,7 @@ from datetime import datetime, timedelta -from typing import Final, Mapping, Sequence, Set +from typing import Sequence -from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta +from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemName from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( AttributeKey, VirtualColumnContext, @@ -11,8 +11,11 @@ TraceItemFilter, ) +from snuba.datasets.entities.entity_key import EntityKey +from snuba.datasets.entities.factory import get_entity from snuba.query import Query from snuba.query.conditions import combine_and_conditions, combine_or_conditions +from snuba.query.data_source.simple import Entity from snuba.query.dsl import Functions as f from snuba.query.dsl import ( and_cond, @@ -72,86 +75,25 @@ def transform(exp: Expression) -> Expression: query.transform_expressions(transform) -# These are the columns which aren't stored in attr_str_ nor attr_num_ in clickhouse -NORMALIZED_COLUMNS: Final[Mapping[str, AttributeKey.Type.ValueType]] = { - "sentry.organization_id": AttributeKey.Type.TYPE_INT, - "sentry.project_id": AttributeKey.Type.TYPE_INT, - "sentry.service": AttributeKey.Type.TYPE_STRING, - "sentry.span_id": AttributeKey.Type.TYPE_STRING, # this is converted by a processor on the storage - "sentry.parent_span_id": AttributeKey.Type.TYPE_STRING, # this is converted by a processor on the storage - "sentry.segment_id": AttributeKey.Type.TYPE_STRING, # this is converted by a processor on the storage - "sentry.segment_name": AttributeKey.Type.TYPE_STRING, - "sentry.is_segment": AttributeKey.Type.TYPE_BOOLEAN, - "sentry.duration_ms": AttributeKey.Type.TYPE_FLOAT, - "sentry.exclusive_time_ms": AttributeKey.Type.TYPE_FLOAT, - "sentry.retention_days": AttributeKey.Type.TYPE_INT, - "sentry.name": AttributeKey.Type.TYPE_STRING, - "sentry.sampling_weight": AttributeKey.Type.TYPE_FLOAT, - "sentry.sampling_factor": AttributeKey.Type.TYPE_FLOAT, - "sentry.timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, - "sentry.start_timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, - "sentry.end_timestamp": AttributeKey.Type.TYPE_UNSPECIFIED, -} - -TIMESTAMP_COLUMNS: Final[Set[str]] = { - "sentry.timestamp", - "sentry.start_timestamp", - "sentry.end_timestamp", -} - - def attribute_key_to_expression(attr_key: AttributeKey) -> Expression: - def _build_label_mapping_key(attr_key: AttributeKey) -> str: - return attr_key.name + "_" + AttributeKey.Type.Name(attr_key.type) - if attr_key.type == AttributeKey.Type.TYPE_UNSPECIFIED: raise BadSnubaRPCRequestException( f"attribute key {attr_key.name} must have a type specified" ) - alias = _build_label_mapping_key(attr_key) - - if attr_key.name == "sentry.trace_id": - if attr_key.type == AttributeKey.Type.TYPE_STRING: - return f.CAST(column("trace_id"), "String", alias=alias) - raise BadSnubaRPCRequestException( - f"Attribute {attr_key.name} must be requested as a string, got {attr_key.type}" - ) - - if attr_key.name in TIMESTAMP_COLUMNS: - if attr_key.type == AttributeKey.Type.TYPE_STRING: - return f.CAST( - column(attr_key.name[len("sentry.") :]), "String", alias=alias - ) - if attr_key.type == AttributeKey.Type.TYPE_INT: - return f.CAST(column(attr_key.name[len("sentry.") :]), "Int64", alias=alias) - if attr_key.type == AttributeKey.Type.TYPE_FLOAT: - return f.CAST( - column(attr_key.name[len("sentry.") :]), "Float64", alias=alias - ) - raise BadSnubaRPCRequestException( - f"Attribute {attr_key.name} must be requested as a string, float, or integer, got {attr_key.type}" - ) + alias = attr_key.name + "_" + AttributeKey.Type.Name(attr_key.type) - if attr_key.name in NORMALIZED_COLUMNS: - if NORMALIZED_COLUMNS[attr_key.name] == attr_key.type: - return column(attr_key.name[len("sentry.") :], alias=attr_key.name) - raise BadSnubaRPCRequestException( - f"Attribute {attr_key.name} must be requested as {NORMALIZED_COLUMNS[attr_key.name]}, got {attr_key.type}" - ) - - # End of special handling, just send to the appropriate bucket if attr_key.type == AttributeKey.Type.TYPE_STRING: return SubscriptableReference( alias=alias, column=column("attr_str"), key=literal(attr_key.name) ) if attr_key.type == AttributeKey.Type.TYPE_FLOAT: return SubscriptableReference( - alias=alias, column=column("attr_num"), key=literal(attr_key.name) + alias=alias, column=column("attr_f64"), key=literal(attr_key.name) ) if attr_key.type == AttributeKey.Type.TYPE_INT: return f.CAST( SubscriptableReference( - alias=None, column=column("attr_num"), key=literal(attr_key.name) + alias=None, column=column("attr_i64"), key=literal(attr_key.name) ), "Int64", alias=alias, @@ -160,7 +102,7 @@ def _build_label_mapping_key(attr_key: AttributeKey) -> str: return f.CAST( SubscriptableReference( alias=None, - column=column("attr_num"), + column=column("attr_f64"), key=literal(attr_key.name), ), "Boolean", @@ -209,20 +151,19 @@ def apply_virtual_columns( mapped_column_to_context = {c.to_column_name: c for c in virtual_column_contexts} def transform_expressions(expression: Expression) -> Expression: - # virtual columns will show up as `attr_str[virtual_column_name]` or `attr_num[virtual_column_name]` + # virtual columns will show up as `attr_str[virtual_column_name]` if not isinstance(expression, SubscriptableReference): return expression if expression.column.column_name != "attr_str": return expression + context = mapped_column_to_context.get(str(expression.key.value)) if context: attribute_expression = attribute_key_to_expression( AttributeKey( name=context.from_column_name, - type=NORMALIZED_COLUMNS.get( - context.from_column_name, AttributeKey.TYPE_STRING - ), + type=AttributeKey.TYPE_STRING, ) ) return f.transform( @@ -340,8 +281,6 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression if item_filter.HasField("exists_filter"): k = item_filter.exists_filter.key - if k.name in NORMALIZED_COLUMNS.keys(): - return f.isNotNull(column(k.name)) if k.type == AttributeKey.Type.TYPE_STRING: return f.mapContains(column("attr_str"), literal(k.name)) else: @@ -363,6 +302,24 @@ def project_id_and_org_conditions(meta: RequestMeta) -> Expression: ) +def entity_key_from_trace_item_name(name: TraceItemName.ValueType) -> EntityKey: + # TODO type is not always specified, fix that then delete this + if name == TraceItemName.TRACE_ITEM_NAME_UNSPECIFIED: + return EntityKey("eap_spans_rpc") + if name == TraceItemName.TRACE_ITEM_NAME_EAP_SPANS: + return EntityKey("eap_spans_rpc") + raise BadSnubaRPCRequestException(f"unknown trace item type: ${name}") + + +def entity_from_trace_item_name(name: TraceItemName.ValueType) -> Entity: + entity_key = entity_key_from_trace_item_name(name) + return Entity( + key=entity_key, + schema=get_entity(entity_key).get_data_model(), + sample=None, + ) + + def timestamp_in_range_condition(start_ts: int, end_ts: int) -> Expression: return and_cond( f.less( diff --git a/snuba/web/rpc/v1/endpoint_time_series.py b/snuba/web/rpc/v1/endpoint_time_series.py index d263852fe2..49635d36c3 100644 --- a/snuba/web/rpc/v1/endpoint_time_series.py +++ b/snuba/web/rpc/v1/endpoint_time_series.py @@ -16,11 +16,8 @@ from snuba.attribution.appid import AppID from snuba.attribution.attribution_info import AttributionInfo -from snuba.datasets.entities.entity_key import EntityKey -from snuba.datasets.entities.factory import get_entity from snuba.datasets.pluggable_dataset import PluggableDataset from snuba.query import OrderBy, OrderByDirection, SelectedExpression -from snuba.query.data_source.simple import Entity from snuba.query.dsl import Functions as f from snuba.query.dsl import column from snuba.query.logical import Query @@ -38,6 +35,7 @@ from snuba.web.rpc.common.common import ( attribute_key_to_expression, base_conditions_and, + entity_from_trace_item_name, trace_item_filters_to_expression, treeify_or_and_conditions, ) @@ -188,12 +186,7 @@ def _convert_result_timeseries( def _build_query(request: TimeSeriesRequest) -> Query: - # TODO: This is hardcoded still - entity = Entity( - key=EntityKey("eap_spans"), - schema=get_entity(EntityKey("eap_spans")).get_data_model(), - sample=None, - ) + entity = entity_from_trace_item_name(request.meta.trace_item_name) aggregation_columns = [ SelectedExpression( diff --git a/snuba/web/rpc/v1/endpoint_trace_item_table.py b/snuba/web/rpc/v1/endpoint_trace_item_table.py index 7bece3f811..7cfcad3a6e 100644 --- a/snuba/web/rpc/v1/endpoint_trace_item_table.py +++ b/snuba/web/rpc/v1/endpoint_trace_item_table.py @@ -20,11 +20,8 @@ from snuba.attribution.appid import AppID from snuba.attribution.attribution_info import AttributionInfo -from snuba.datasets.entities.entity_key import EntityKey -from snuba.datasets.entities.factory import get_entity from snuba.datasets.pluggable_dataset import PluggableDataset from snuba.query import OrderBy, OrderByDirection, SelectedExpression -from snuba.query.data_source.simple import Entity from snuba.query.logical import Query from snuba.query.query_settings import HTTPQuerySettings from snuba.request import Request as SnubaRequest @@ -41,6 +38,7 @@ apply_virtual_columns, attribute_key_to_expression, base_conditions_and, + entity_from_trace_item_name, trace_item_filters_to_expression, treeify_or_and_conditions, ) @@ -79,12 +77,7 @@ def _convert_order_by( def _build_query(request: TraceItemTableRequest) -> Query: - # TODO: This is hardcoded still - entity = Entity( - key=EntityKey("eap_spans"), - schema=get_entity(EntityKey("eap_spans")).get_data_model(), - sample=None, - ) + entity = entity_from_trace_item_name(request.meta.trace_item_name) selected_columns = [] for column in request.columns: diff --git a/tests/clickhouse/translators/snuba/test_translation.py b/tests/clickhouse/translators/snuba/test_translation.py index 1dffa817b7..7c8c03ea97 100644 --- a/tests/clickhouse/translators/snuba/test_translation.py +++ b/tests/clickhouse/translators/snuba/test_translation.py @@ -146,6 +146,59 @@ def test_hash_bucket_tag_translation() -> None: ) +def test_hash_bucket_normalized() -> None: + mapper = SubscriptableHashBucketMapper( + from_column_table=None, + from_column_name="tags_str", + to_col_table=None, + to_col_name="tags_float", + data_type="String", + normalized_columns={"derp.hello": "some_column"}, + ) + + non_normalized_mapped = mapper.attempt_map( + SubscriptableReference( + "tags_str[z]", Column(None, None, "tags_str"), Literal(None, "z") + ), + SnubaClickhouseMappingTranslator(TranslationMappers()), + ) + + normalized_mapped = mapper.attempt_map( + SubscriptableReference( + "tags_str[derp.hello]", + Column(None, None, "tags_str"), + Literal(None, "derp.hello"), + ), + SnubaClickhouseMappingTranslator(TranslationMappers()), + ) + + assert non_normalized_mapped == FunctionCall( + "tags_str[z]", + "CAST", + ( + FunctionCall( + None, + "arrayElement", + ( + Column( + None, + None, + f"tags_float_{fnv_1a(b'z') % constants.ATTRIBUTE_BUCKETS}", + ), + Literal(None, "z"), + ), + ), + Literal(None, "String"), + ), + ) + + assert normalized_mapped == FunctionCall( + "tags_str[derp.hello]", + "CAST", + (Column(None, None, "some_column"), Literal(None, "String")), + ) + + def _get_nullable_expr(alias: str) -> FunctionCall: return FunctionCall( alias, diff --git a/tests/datasets/test_entity_factory.py b/tests/datasets/test_entity_factory.py index d7e0f39d41..79a8cd5cff 100644 --- a/tests/datasets/test_entity_factory.py +++ b/tests/datasets/test_entity_factory.py @@ -15,6 +15,7 @@ EntityKey.DISCOVER, EntityKey.EVENTS, EntityKey.EAP_SPANS, + EntityKey.EAP_SPANS_RPC, EntityKey.SPANS_NUM_ATTRS, EntityKey.SPANS_STR_ATTRS, EntityKey.GROUPASSIGNEE, diff --git a/tests/query/processors/test_hash_bucket_functions_processor.py b/tests/query/processors/test_hash_bucket_functions_processor.py index a259ca1066..ac3faf68bb 100644 --- a/tests/query/processors/test_hash_bucket_functions_processor.py +++ b/tests/query/processors/test_hash_bucket_functions_processor.py @@ -196,6 +196,7 @@ condition=binary_condition( "or", f.mapContains(column("attr_str"), literal("blah"), alias="x"), + f.mapContains(column("attr_i64"), literal("blah"), alias="y"), f.mapContains(column("attr_strz"), literal("blah"), alias="z"), ), ), @@ -210,6 +211,7 @@ condition=binary_condition( "or", f.mapContains(column("attr_str_2"), literal("blah"), alias="x"), + f.mapContains(column("attr_num_2"), literal("blah"), alias="y"), f.mapContains(column("attr_strz"), literal("blah"), alias="z"), ), ), @@ -220,7 +222,9 @@ @pytest.mark.parametrize("pre_format, expected_query", test_data) def test_format_expressions(pre_format: Query, expected_query: Query) -> None: copy = deepcopy(pre_format) - HashBucketFunctionTransformer("attr_str").process_query(copy, HTTPQuerySettings()) + HashBucketFunctionTransformer( + {"attr_str": "attr_str", "attr_i64": "attr_num"} + ).process_query(copy, HTTPQuerySettings()) assert copy.get_selected_columns() == expected_query.get_selected_columns() assert copy.get_groupby() == expected_query.get_groupby() assert copy.get_condition() == expected_query.get_condition()