Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(wip) Remove hardcoded references to eap_spans in EAP RPCs #6702

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions snuba/clickhouse/translators/snuba/mappers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, Tuple
from typing import Mapping, Optional, Tuple

from snuba.clickhouse.translators.snuba import SnubaClickhouseStrictTranslator
from snuba.clickhouse.translators.snuba.allowed import (
Expand All @@ -9,7 +9,7 @@
SubscriptableReferenceMapper,
ValidColumnMappings,
)
from snuba.query.dsl import arrayElement
from snuba.query.dsl import arrayElement, column, literal
from snuba.query.expressions import Column as ColumnExpr
from snuba.query.expressions import CurriedFunctionCall, Expression
from snuba.query.expressions import FunctionCall as FunctionCallExpr
Expand Down Expand Up @@ -239,6 +239,12 @@ class SubscriptableHashBucketMapper(SubscriptableReferenceMapper):
from_column_name: str
to_col_table: Optional[str]
to_col_name: str
# if specified, casts the result to the specified type.
data_type: Optional[str] = None
# if you add {'sentry.span_id': 'span_id'} here, then if the user requests attr_blah[sentry.span_id],
# this mapper will return a reference to the actual column instead of attr_str.
# if specified, data_type must also be specified.
normalized_columns: Optional[Mapping[str, str]] = None

def attempt_map(
self,
Expand All @@ -256,12 +262,42 @@ def attempt_map(
if not isinstance(key.value, str):
return None

if (
self.normalized_columns
and key.value in self.normalized_columns
and self.data_type
):
return FunctionCallExpr(
expression.alias,
"CAST",
(
column(self.normalized_columns[key.value]),
literal(self.data_type),
),
)

bucket_idx = fnv_1a(key.value.encode("utf-8")) % ATTRIBUTE_BUCKETS
return arrayElement(
expression.alias,
ColumnExpr(None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}"),
key,
)
if self.data_type:
return FunctionCallExpr(
expression.alias,
"CAST",
(
arrayElement(
None,
ColumnExpr(
None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}"
),
key,
),
literal(self.data_type),
),
)
else:
return arrayElement(
expression.alias,
ColumnExpr(None, self.to_col_table, f"{self.to_col_name}_{bucket_idx}"),
key,
)


@dataclass(frozen=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ query_processors:
- quantileTDigestWeighted
- processor: HashBucketFunctionTransformer
args:
hash_bucket_names:
- attr_str
- attr_num
hash_bucket_name_mapping:
attr_str: attr_str
attr_num: attr_num

validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema
validators:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
version: v1
kind: entity
name: eap_spans_rpc

schema:
[
{ name: service, type: String },
{ name: trace_id, type: UUID },
{ name: span_id, type: UInt, args: { size: 64 } },
{ name: parent_span_id, type: UInt, args: { size: 64 } },
{ name: segment_id, type: UInt, args: { size: 64 } },
{ name: segment_name, type: String },
{ name: is_segment, type: UInt, args: { size: 8 } },
{ name: start_timestamp, type: DateTime64, args: { precision: 6 } },
{ name: end_timestamp, type: DateTime64, args: { precision: 6 } },
{ name: duration_ms, type: Float, args: { size: 64 } },
{ name: exclusive_time_ms, type: Float, args: { size: 64 } },
{ name: name, type: String },

# these are the required columns for an 'RPC entity' that can be used by EAP RPCs
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: time, type: DateTime }, # used by TimeSeriesProcessor
{ name: timestamp, type: DateTime }, # mapped to _sort_timestamp
{ name: retention_days, type: UInt, args: { size: 16 } },
{ name: sampling_factor, type: Float, args: { size: 64 } },
{ name: sampling_weight, type: UInt, args: { size: 64 } },
{ name: sign, type: Int, args: { size: 8 } },
{ name: attr_str, type: Map, args: { key: { type: String }, value: { type: String } } },
{ name: attr_f64, type: Map, args: { key: { type: String }, value: { type: Float, args: { size: 64 } } } },
{ name: attr_i64, type: Map, args: { key: { type: String }, value: { type: Int, args: { size: 64 } } } },
]

storages:
- storage: eap_spans
is_writable: true
translation_mappers:
columns:
- mapper: ColumnToColumn
args:
from_table_name: null
from_col_name: timestamp
to_table_name: null
to_col_name: _sort_timestamp

subscriptables:
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_str
to_col_table: null
to_col_name: attr_str
data_type: String
normalized_columns:
sentry.name: name
sentry.service: service
sentry.span_id: span_id
sentry.parent_span_id: parent_span_id
sentry.segment_id: segment_id
sentry.segment_name: segment_name
sentry.start_timestamp: start_timestamp
sentry.end_timestamp: end_timestamp
sentry.timestamp: _sort_timestamp
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_f64
to_col_table: null
to_col_name: attr_num
data_type: Float64
normalized_columns:
sentry.exclusive_time_micro: exclusive_time_micro
sentry.duration_micro: duration_micro
- mapper: SubscriptableHashBucketMapper
args:
from_column_table: null
from_column_name: attr_i64
to_col_table: null
to_col_name: attr_num
data_type: Int64
normalized_columns:
sentry.organization_id: organization_id
sentry.project_id: project_id

storage_selector:
selector: DefaultQueryStorageSelector

query_processors:
- processor: TimeSeriesProcessor
args:
time_group_columns:
time: timestamp
time_parse_columns:
- start_timestamp
- end_timestamp
- processor: HashBucketFunctionTransformer
args:
hash_bucket_name_mapping:
attr_str: attr_str
attr_f64: attr_num
attr_i64: attr_num
- processor: OptionalAttributeAggregationTransformer
args:
attribute_column_names:
- attr_num
aggregation_names:
- sum
- count
- avg
- avgWeighted
- max
- min
- uniq
curried_aggregation_names:
- quantile
- quantileTDigestWeighted

validate_data_model: do_nothing # in order to reference aliased columns, we shouldn't validate columns purely based on the entity schema
validators:
- validator: EntityRequiredColumnValidator
args:
required_filter_columns: [organization_id]

required_time_column: timestamp
22 changes: 12 additions & 10 deletions snuba/query/processors/logical/hash_bucket_functions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Sequence
from typing import Mapping

from snuba.query.expressions import Column, Expression, FunctionCall, Literal
from snuba.query.logical import Query
Expand All @@ -22,11 +22,9 @@ class HashBucketFunctionTransformer(LogicalQueryProcessor):
It converts mapExists(attr_str, 'blah') to mapExists(attr_str_{hash('blah')%20}, 'blah')
"""

def __init__(
self,
hash_bucket_names: Sequence[str],
):
self.hash_bucket_names = hash_bucket_names
def __init__(self, hash_bucket_name_mapping: Mapping[str, str]):
super().__init__()
self.hash_bucket_name_mapping = hash_bucket_name_mapping

def process_query(self, query: Query, query_settings: QuerySettings) -> None:
def transform_map_keys_and_values_expression(exp: Expression) -> Expression:
Expand All @@ -40,7 +38,7 @@ def transform_map_keys_and_values_expression(exp: Expression) -> Expression:
if not isinstance(param, Column):
return exp

if param.column_name not in self.hash_bucket_names:
if param.column_name not in self.hash_bucket_name_mapping:
return exp

if exp.function_name not in ("mapKeys", "mapValues"):
Expand All @@ -56,7 +54,7 @@ def transform_map_keys_and_values_expression(exp: Expression) -> Expression:
parameters=(
Column(
None,
column_name=f"{param.column_name}_{i}",
column_name=f"{self.hash_bucket_name_mapping[param.column_name]}_{i}",
table_name=param.table_name,
),
),
Expand All @@ -76,7 +74,7 @@ def transform_map_contains_expression(exp: Expression) -> Expression:
if not isinstance(column, Column):
return exp

if column.column_name not in self.hash_bucket_names:
if column.column_name not in self.hash_bucket_name_mapping:
return exp

if exp.function_name != "mapContains":
Expand All @@ -91,7 +89,11 @@ def transform_map_contains_expression(exp: Expression) -> Expression:
alias=exp.alias,
function_name=exp.function_name,
parameters=(
Column(None, None, f"{column.column_name}_{bucket_idx}"),
Column(
None,
None,
f"{self.hash_bucket_name_mapping[column.column_name]}_{bucket_idx}",
),
key,
),
)
Expand Down
Loading
Loading