Skip to content

Commit

Permalink
feat: Add consumer support for pre-aggregated sessions (4/4) (#1492)
Browse files Browse the repository at this point in the history
  • Loading branch information
Swatinem authored Jan 15, 2021
1 parent df3a0d5 commit 77f0d95
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 2 deletions.
4 changes: 3 additions & 1 deletion snuba/datasets/sessions_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"exited": 1,
"crashed": 2,
"abnormal": 3,
"errored": 4,
}

metrics = MetricsWrapper(environment.metrics, "sessions.processor")
Expand All @@ -40,6 +41,7 @@ def process_message(self, message, metadata) -> Optional[ProcessedMessage]:
duration = MAX_UINT32

errors = _collapse_uint16(message["errors"]) or 0
quantity = _collapse_uint32(message.get("quantity")) or 1

# If a session ends in crashed or abnormal we want to make sure that
# they count as errored too, so we can get the number of health and
Expand All @@ -58,7 +60,7 @@ def process_message(self, message, metadata) -> Optional[ProcessedMessage]:
processed = {
"session_id": str(uuid.UUID(message["session_id"])),
"distinct_id": str(uuid.UUID(message.get("distinct_id") or NIL_UUID)),
"quantity": 1,
"quantity": quantity,
"seq": message["seq"],
"org_id": message["org_id"],
"project_id": message["project_id"],
Expand Down
80 changes: 79 additions & 1 deletion tests/test_sessions_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import simplejson as json

from snuba import settings
from snuba.consumer import KafkaMessageMetadata
from snuba.datasets.storages import StorageKey
from snuba.datasets.storages.factory import get_writable_storage
from tests.base import BaseApiTest
from tests.helpers import write_processed_messages
from snuba.processor import MAX_UINT32
from snuba.utils.metrics.backends.dummy import DummyMetricsBackend

Expand All @@ -25,7 +27,6 @@ def setup_method(self, test_method):
)

self.storage = get_writable_storage(StorageKey.SESSIONS_RAW)
self.generate_manual_session_events()

def generate_manual_session_events(self):
session_1 = "b3ef3211-58a4-4b36-a9a1-5a55df0d9aae"
Expand Down Expand Up @@ -72,6 +73,7 @@ def generate_manual_session_events(self):
).write([json.dumps(session).encode("utf-8") for session in sessions])

def test_manual_session_aggregation(self):
self.generate_manual_session_events()
response = self.app.post(
"/query",
data=json.dumps(
Expand All @@ -97,3 +99,79 @@ def test_manual_session_aggregation(self):
assert data["data"][0]["sessions_errored"] == 10
assert data["data"][0]["users"] == 2
assert data["data"][0]["users_errored"] == 1

def generate_session_events(self):
processor = self.storage.get_table_writer().get_stream_loader().get_processor()
meta = KafkaMessageMetadata(
offset=1, partition=2, timestamp=datetime(1970, 1, 1)
)
template = {
"session_id": "00000000-0000-0000-0000-000000000000",
"distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf",
"duration": None,
"environment": "production",
"org_id": 1,
"project_id": 2,
"release": "[email protected]",
"retention_days": settings.DEFAULT_RETENTION_DAYS,
"seq": 0,
"errors": 0,
"received": datetime.utcnow().timestamp(),
"started": self.started.timestamp(),
}
events = [
processor.process_message(
{
**template,
"status": "exited",
"duration": 1947.49,
"session_id": "8333339f-5675-4f89-a9a0-1c935255ab58",
},
meta,
),
processor.process_message(
{**template, "status": "exited", "quantity": 5}, meta,
),
processor.process_message(
{**template, "status": "errored", "errors": 1, "quantity": 2}, meta,
),
processor.process_message(
{
**template,
"distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf",
"status": "errored",
"errors": 1,
"quantity": 2,
},
meta,
),
]
write_processed_messages(self.storage, events)

def test_session_aggregation(self):
self.generate_session_events()
response = self.app.post(
"/query",
data=json.dumps(
{
"dataset": "sessions",
"organization": 1,
"project": 2,
"selected_columns": [
"sessions",
"sessions_errored",
"users",
"users_errored",
],
"from_date": (self.started - self.skew).isoformat(),
"to_date": (self.started + self.skew).isoformat(),
}
),
)
data = json.loads(response.data)
assert response.status_code == 200, response.data
assert len(data["data"]) == 1, data
assert data["data"][0]["sessions"] == 10
assert data["data"][0]["sessions_errored"] == 4
assert data["data"][0]["users"] == 1
assert data["data"][0]["users_errored"] == 1

0 comments on commit 77f0d95

Please sign in to comment.