Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eap): Start decoupling EAP entities at the entity layer #6701

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions snuba/clickhouse/translators/snuba/mappers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, Tuple
from typing import Mapping, Optional, Tuple

from snuba.clickhouse.translators.snuba import SnubaClickhouseStrictTranslator
from snuba.clickhouse.translators.snuba.allowed import (
Expand All @@ -9,7 +9,7 @@
SubscriptableReferenceMapper,
ValidColumnMappings,
)
from snuba.query.dsl import arrayElement
from snuba.query.dsl import arrayElement, column, literal
from snuba.query.expressions import Column as ColumnExpr
from snuba.query.expressions import CurriedFunctionCall, Expression
from snuba.query.expressions import FunctionCall as FunctionCallExpr
Expand Down Expand Up @@ -239,6 +239,12 @@ class SubscriptableHashBucketMapper(SubscriptableReferenceMapper):
from_column_name: str
to_col_table: Optional[str]
to_col_name: str
# if specified, casts the result to the specified type.
data_type: Optional[str] = None
# if you add {'sentry.span_id': 'span_id'} here, then if the user requests attr_blah[sentry.span_id],
# this mapper will return a reference to the actual column instead of attr_str.
# if specified, data_type must also be specified.
normalized_columns: Optional[Mapping[str, str]] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that with the current implementation if normalized_columns is specified but data_type is not specified, then the code silently ignores it and falls into the else block returning arrayElement(...). If the expectation is that data_type must be specified along with normalized_columns, then should we enforce that and return appropriate error so that the caller does not get unexpected behavior?


def attempt_map(
self,
Expand All @@ -256,12 +262,42 @@ def attempt_map(
if not isinstance(key.value, str):
return None

if (
self.normalized_columns
and key.value in self.normalized_columns
and self.data_type
):
return FunctionCallExpr(
expression.alias,
"CAST",
(
column(self.normalized_columns[key.value]),
literal(self.data_type),
),
)

bucket_idx = fnv_1a(key.value.encode("utf-8")) % ATTRIBUTE_BUCKETS
return arrayElement(
expression.alias,
ColumnExpr(None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}"),
key,
)
if self.data_type:
return FunctionCallExpr(
expression.alias,
"CAST",
(
arrayElement(
None,
ColumnExpr(
None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}"
),
key,
),
literal(self.data_type),
),
)
else:
return arrayElement(
expression.alias,
ColumnExpr(None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}"),
key,
)


@dataclass(frozen=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ query_processors:
- quantileTDigestWeighted
- processor: HashBucketFunctionTransformer
args:
hash_bucket_names:
- attr_str
- attr_num
hash_bucket_name_mapping:
attr_str: attr_str
attr_num: attr_num

validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema
validators:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
version: v1
kind: entity
name: eap_spans_rpc

schema:
[
{ name: service, type: String },
{ name: trace_id, type: UUID },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a required field given the API we have.

{ name: span_id, type: UInt, args: { size: 64 } },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have some sort of a event_id, as a string, required in for the RPC. Having it as a string would let us pass any sort of UUIDs or span IDs.

{ name: parent_span_id, type: UInt, args: { size: 64 } },
{ name: segment_id, type: UInt, args: { size: 64 } },
{ name: segment_name, type: String },
{ name: is_segment, type: UInt, args: { size: 8 } },
{ name: start_timestamp, type: DateTime64, args: { precision: 6 } },
{ name: end_timestamp, type: DateTime64, args: { precision: 6 } },
{ name: duration_ms, type: Float, args: { size: 64 } },
{ name: exclusive_time_ms, type: Float, args: { size: 64 } },
{ name: name, type: String },

# these are the required columns for an 'RPC entity' that can be used by EAP RPCs
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: time, type: DateTime }, # used by TimeSeriesProcessor
{ name: timestamp, type: DateTime }, # mapped to _sort_timestamp
Comment on lines +23 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we do not have this duplication? I think one timestamp is enough for the required fields.

{ name: retention_days, type: UInt, args: { size: 16 } },
{ name: sampling_factor, type: Float, args: { size: 64 } },
{ name: sampling_weight, type: UInt, args: { size: 64 } },
{ name: sign, type: Int, args: { size: 8 } },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be a required column as we're not using CollapsingMergeTree everywhere.

{ name: attr_str, type: Map, args: { key: { type: String }, value: { type: String } } },
{ name: attr_f64, type: Map, args: { key: { type: String }, value: { type: Float, args: { size: 64 } } } },
{ name: attr_i64, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave things that don't exist yet out of it.

]

storages:
- storage: eap_spans
is_writable: true
translation_mappers:
columns:
- mapper: ColumnToColumn
args:
from_table_name: null
from_col_name: timestamp
to_table_name: null
to_col_name: _sort_timestamp

subscriptables:
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_str
to_col_table: null
to_col_name: attr_str
data_type: String
normalized_columns:
sentry.name: name
sentry.service: service
sentry.span_id: span_id
sentry.parent_span_id: parent_span_id
sentry.segment_id: segment_id
sentry.segment_name: segment_name
sentry.start_timestamp: start_timestamp
sentry.end_timestamp: end_timestamp
sentry.timestamp: _sort_timestamp
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_f64
to_col_table: null
to_col_name: attr_num
data_type: Float64
normalized_columns:
sentry.exclusive_time_micro: exclusive_time_micro
sentry.duration_micro: duration_micro
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_i64
to_col_table: null
to_col_name: attr_num
data_type: Int64
normalized_columns:
sentry.organization_id: organization_id
sentry.project_id: project_id

storage_selector:
selector: DefaultQueryStorageSelector

query_processors:
- processor: TimeSeriesProcessor
args:
time_group_columns:
time: timestamp
time_parse_columns:
- start_timestamp
- end_timestamp
- processor: HashBucketFunctionTransformer
args:
hash_bucket_name_mapping:
attr_str: attr_str
attr_f64: attr_num
attr_i64: attr_num
- processor: OptionalAttributeAggregationTransformer
args:
attribute_column_names:
- attr_num
Comment on lines +104 to +105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be my ignorance speaking but should this be attr_str instead of attr_num since we are talking about column names?

aggregation_names:
- sum
- count
- avg
- avgWeighted
- max
- min
- uniq
curried_aggregation_names:
- quantile
- quantileTDigestWeighted

validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema
validators:
- validator: EntityRequiredColumnValidator
args:
required_filter_columns: [organization_id]

required_time_column: timestamp
22 changes: 12 additions & 10 deletions snuba/query/processors/logical/hash_bucket_functions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Sequence
from typing import Mapping

from snuba.query.expressions import Column, Expression, FunctionCall, Literal
from snuba.query.logical import Query
Expand All @@ -22,11 +22,9 @@ class HashBucketFunctionTransformer(LogicalQueryProcessor):
It converts mapExists(attr_str, 'blah') to mapExists(attr_str_{hash('blah')%20}, 'blah')
"""

def __init__(
self,
hash_bucket_names: Sequence[str],
):
self.hash_bucket_names = hash_bucket_names
def __init__(self, hash_bucket_name_mapping: Mapping[str, str]):
super().__init__()
self.hash_bucket_name_mapping = hash_bucket_name_mapping

def process_query(self, query: Query, query_settings: QuerySettings) -> None:
def transform_map_keys_and_values_expression(exp: Expression) -> Expression:
Expand All @@ -40,7 +38,7 @@ def transform_map_keys_and_values_expression(exp: Expression) -> Expression:
if not isinstance(param, Column):
return exp

if param.column_name not in self.hash_bucket_names:
if param.column_name not in self.hash_bucket_name_mapping:
return exp

if exp.function_name not in ("mapKeys", "mapValues"):
Expand All @@ -56,7 +54,7 @@ def transform_map_keys_and_values_expression(exp: Expression) -> Expression:
parameters=(
Column(
None,
column_name=f"{param.column_name}_{i}",
column_name=f"{self.hash_bucket_name_mapping[param.column_name]}_{i}",
table_name=param.table_name,
),
),
Expand All @@ -76,7 +74,7 @@ def transform_map_contains_expression(exp: Expression) -> Expression:
if not isinstance(column, Column):
return exp

if column.column_name not in self.hash_bucket_names:
if column.column_name not in self.hash_bucket_name_mapping:
return exp

if exp.function_name != "mapContains":
Expand All @@ -91,7 +89,11 @@ def transform_map_contains_expression(exp: Expression) -> Expression:
alias=exp.alias,
function_name=exp.function_name,
parameters=(
Column(None, None, f"{column.column_name}_{bucket_idx}"),
Column(
None,
None,
f"{self.hash_bucket_name_mapping[column.column_name]}_{bucket_idx}",
),
key,
),
)
Expand Down
53 changes: 53 additions & 0 deletions tests/clickhouse/translators/snuba/test_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,59 @@ def test_hash_bucket_tag_translation() -> None:
)


def test_hash_bucket_normalized() -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add unit tests for when:

  1. normalized_columns is specified but data_type is not specified? Based on the comment above IMO, this should raise some exception or error condition.
  2. Only data_type is specified but normalized_columns is not specified. This should ideally just do the casting.
  3. Neither normalized_columns nor data_type is specified. This should return arrayElement(...)

mapper = SubscriptableHashBucketMapper(
from_column_table=None,
from_column_name="tags_str",
to_col_table=None,
to_col_name="tags_float",
data_type="String",
normalized_columns={"derp.hello": "some_column"},
)

non_normalized_mapped = mapper.attempt_map(
SubscriptableReference(
"tags_str[z]", Column(None, None, "tags_str"), Literal(None, "z")
),
SnubaClickhouseMappingTranslator(TranslationMappers()),
)

normalized_mapped = mapper.attempt_map(
SubscriptableReference(
"tags_str[derp.hello]",
Column(None, None, "tags_str"),
Literal(None, "derp.hello"),
),
SnubaClickhouseMappingTranslator(TranslationMappers()),
)

assert non_normalized_mapped == FunctionCall(
"tags_str[z]",
"CAST",
(
FunctionCall(
None,
"arrayElement",
(
Column(
None,
None,
f"tags_float_{fnv_1a(b'z') % constants.ATTRIBUTE_BUCKETS}",
),
Literal(None, "z"),
),
),
Literal(None, "String"),
),
)

assert normalized_mapped == FunctionCall(
"tags_str[derp.hello]",
"CAST",
(Column(None, None, "some_column"), Literal(None, "String")),
)


def _get_nullable_expr(alias: str) -> FunctionCall:
return FunctionCall(
alias,
Expand Down
1 change: 1 addition & 0 deletions tests/datasets/test_entity_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
EntityKey.DISCOVER,
EntityKey.EVENTS,
EntityKey.EAP_SPANS,
EntityKey.EAP_SPANS_RPC,
EntityKey.SPANS_NUM_ATTRS,
EntityKey.SPANS_STR_ATTRS,
EntityKey.GROUPASSIGNEE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@
condition=binary_condition(
"or",
f.mapContains(column("attr_str"), literal("blah"), alias="x"),
f.mapContains(column("attr_i64"), literal("blah"), alias="y"),
f.mapContains(column("attr_strz"), literal("blah"), alias="z"),
),
),
Expand All @@ -210,6 +211,7 @@
condition=binary_condition(
"or",
f.mapContains(column("attr_str_2"), literal("blah"), alias="x"),
f.mapContains(column("attr_num_2"), literal("blah"), alias="y"),
f.mapContains(column("attr_strz"), literal("blah"), alias="z"),
),
),
Expand All @@ -220,7 +222,9 @@
@pytest.mark.parametrize("pre_format, expected_query", test_data)
def test_format_expressions(pre_format: Query, expected_query: Query) -> None:
copy = deepcopy(pre_format)
HashBucketFunctionTransformer("attr_str").process_query(copy, HTTPQuerySettings())
HashBucketFunctionTransformer(
{"attr_str": "attr_str", "attr_i64": "attr_num"}
).process_query(copy, HTTPQuerySettings())
assert copy.get_selected_columns() == expected_query.get_selected_columns()
assert copy.get_groupby() == expected_query.get_groupby()
assert copy.get_condition() == expected_query.get_condition()
Loading