Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(uptime): storage configuration and and consumer #6697

Draft
wants to merge 20 commits into
base: jferg/uptime-checks-1
Choose a base branch
from
Draft
3 changes: 2 additions & 1 deletion rust_snuba/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ mod querylog;
mod release_health_metrics;
mod replays;
mod spans;
mod uptime_monitor_checks;
mod utils;

use crate::config::ProcessorConfig;
use crate::types::{InsertBatch, InsertOrReplacement, KafkaMessageMetadata};
use sentry_arroyo::backends::kafka::types::KafkaPayload;
Expand Down Expand Up @@ -54,6 +54,7 @@ define_processing_functions! {
("ProfilesMessageProcessor", "processed-profiles", ProcessingFunctionType::ProcessingFunction(profiles::process_message)),
("QuerylogProcessor", "snuba-queries", ProcessingFunctionType::ProcessingFunction(querylog::process_message)),
("ReplaysProcessor", "ingest-replay-events", ProcessingFunctionType::ProcessingFunction(replays::process_message)),
("UptimeMonitorChecksProcessor", "uptime-results", ProcessingFunctionType::ProcessingFunction(uptime_monitor_checks::process_message)),
("SpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(spans::process_message)),
("EAPSpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(eap_spans::process_message)),
("OutcomesProcessor", "outcomes", ProcessingFunctionType::ProcessingFunction(outcomes::process_message)),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
source: src/processors/mod.rs
description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"failure\",\n \"status_reason\": {\n \"type\": \"dns_error\",\n \"description\": \"Unable to resolve hostname example.xyz\"\n },\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 100,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": 500\n }\n}\n"
expression: snapshot_payload
---
[
{
"check_status": "failure",
"check_status_reason": "dns_error",
"duration": 100,
"environment": null,
"http_status_code": 500,
"offset": 1,
"organization_id": 1,
"partition": 0,
"project_id": 1,
"region_slug": "global",
"retention_days": 30,
"scheduled_check_time": 1717614062978.0,
"timestamp": 1717614068008.0,
"trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94",
"uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1",
"uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
source: src/processors/mod.rs
description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"success\",\n \"status_reason\": null,\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 50,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": 200\n }\n}\n"
expression: snapshot_payload
---
[
{
"check_status": "success",
"check_status_reason": null,
"duration": 50,
"environment": null,
"http_status_code": 200,
"offset": 1,
"organization_id": 1,
"partition": 0,
"project_id": 1,
"region_slug": "global",
"retention_days": 30,
"scheduled_check_time": 1717614062978.0,
"timestamp": 1717614068008.0,
"trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94",
"uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1",
"uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
source: src/processors/mod.rs
description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"failure\",\n \"status_reason\": {\n \"type\": \"timeout\",\n \"description\": \"Check timed out\"\n },\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 100,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": null\n }\n}\n"
expression: snapshot_payload
---
[
{
"check_status": "failure",
"check_status_reason": "timeout",
"duration": 100,
"environment": null,
"http_status_code": 0,
"offset": 1,
"organization_id": 1,
"partition": 0,
"project_id": 1,
"region_slug": "global",
"retention_days": 30,
"scheduled_check_time": 1717614062978.0,
"timestamp": 1717614068008.0,
"trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94",
"uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1",
"uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42"
}
]
157 changes: 157 additions & 0 deletions rust_snuba/src/processors/uptime_monitor_checks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use crate::config::ProcessorConfig;
use anyhow::Context;
use chrono::DateTime;
use sentry_arroyo::backends::kafka::types::KafkaPayload;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::types::{InsertBatch, KafkaMessageMetadata};

pub fn process_message(
payload: KafkaPayload,
metadata: KafkaMessageMetadata,
_config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
let payload_bytes = payload.payload().context("Expected payload")?;
let (rows, origin_timestamp) =
deserialize_message(payload_bytes, metadata.partition, metadata.offset)?;

InsertBatch::from_rows(rows, DateTime::from_timestamp(origin_timestamp as i64, 0))
}

pub fn deserialize_message(
payload: &[u8],
partition: u16,
offset: u64,
) -> anyhow::Result<(Vec<UptimeMonitorCheckRow>, f64)> {
let monitor_message: UptimeMonitorCheckMessage = serde_json::from_slice(payload)?;

let rows = vec![UptimeMonitorCheckRow {
organization_id: 1,
project_id: 1,
environment: monitor_message.environment,
uptime_subscription_id: monitor_message.subscription_id,
uptime_check_id: monitor_message.guid,
scheduled_check_time: monitor_message.scheduled_check_time_ms,
timestamp: monitor_message.actual_check_time_ms,
duration: monitor_message.duration_ms,
region_slug: Some("global".to_string()),
check_status: monitor_message.status,
check_status_reason: monitor_message.status_reason.map(|r| r.r#type),
http_status_code: monitor_message
.request_info
.unwrap()
.http_status_code
.unwrap_or(0),
trace_id: monitor_message.trace_id,
retention_days: 30,
partition,
offset,
}];

Ok((rows, monitor_message.actual_check_time_ms))
}

#[derive(Debug, Deserialize)]
struct UptimeMonitorCheckMessage {
// TODO: add these to the message
// organization_id: u64,
// project_id: u64,
// retention_days: u16,
// region_slug: Option<String>,
environment: Option<String>,
subscription_id: Uuid,
guid: Uuid,
scheduled_check_time_ms: f64,
actual_check_time_ms: f64,
duration_ms: u64,
status: String,
status_reason: Option<CheckStatusReason>,
trace_id: Uuid,
request_info: Option<RequestInfo>,
}
#[derive(Debug, Deserialize)]
pub struct RequestInfo {
pub http_status_code: Option<u16>,
}
#[derive(Debug, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct CheckStatusReason {
/// The type of the status reason
pub r#type: String,

/// A human readable description of the status reason
pub description: String,
}

#[derive(Debug, Default, Serialize)]
pub struct UptimeMonitorCheckRow {
organization_id: u64,
project_id: u64,
environment: Option<String>,
uptime_subscription_id: Uuid,
uptime_check_id: Uuid,
scheduled_check_time: f64,
timestamp: f64,
duration: u64,
region_slug: Option<String>,
check_status: String,
check_status_reason: Option<String>,
http_status_code: u16,
trace_id: Uuid,
retention_days: u16,
partition: u16,
offset: u64,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_monitor_checkin() {
let data = r#"{
"environment": "prod",
"subscription_id": "123e4567-e89b-12d3-a456-426614174000",
"guid": "550e8400-e29b-41d4-a716-446655440000",
"scheduled_check_time_ms": 1702659277,
"actual_check_time_ms": 1702659277,
"duration_ms": 100,
"status": "ok",
"status_reason": {
"type": "Request successful",
"description": "Request successful"
},
"http_status_code": 200,
"trace_id": "550e8400-e29b-41d4-a716-446655440000",
"request_info": {
"request_type": "GET",
"http_status_code": 200
}
}"#;

let (rows, timestamp) = deserialize_message(data.as_bytes(), 0, 0).unwrap();
let monitor_row = rows.first().unwrap();

assert_eq!(monitor_row.organization_id, 1);
assert_eq!(monitor_row.project_id, 1);
assert_eq!(monitor_row.environment, Some("prod".to_string()));
assert_eq!(
monitor_row.uptime_subscription_id,
Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap()
);
assert_eq!(monitor_row.duration, 100);
assert_eq!(monitor_row.timestamp, 1702659277.0);
assert_eq!(monitor_row.region_slug, Some("global".to_string()));
assert_eq!(&monitor_row.check_status, "ok");
assert_eq!(
monitor_row.check_status_reason,
Some("Request successful".to_string())
);
assert_eq!(monitor_row.http_status_code, 200);
assert_eq!(monitor_row.retention_days, 30);
assert_eq!(monitor_row.partition, 0);
assert_eq!(monitor_row.offset, 0);
assert_eq!(timestamp, 1702659277.0);
}
}
67 changes: 67 additions & 0 deletions scripts/load_uptime_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python3

import datetime
import json
import random
import uuid

import requests

# Generate and insert data for uptime checks for each project
base_time = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

query = """
INSERT INTO default.uptime_monitor_checks_local (
organization_id, project_id, environment, uptime_subscription_id, uptime_check_id,
scheduled_check_time, timestamp, duration, region_id, check_status,
check_status_reason, http_status_code, trace_id, retention_days
) FORMAT JSONEachRow
"""

total_records = 0

for project_id in range(1, 2):
project_data = []
for minute in range(24 * 60 * 90): # 24 hours * 60 minutes * 90 days
timestamp = base_time + datetime.timedelta(minutes=minute)
scheduled_time = timestamp - datetime.timedelta(seconds=random.randint(1, 30))
http_status = (
500 if minute % 100 == 0 else 200
) # Every 100th record gets status 500
check_status = "failure" if http_status == 500 else "success"
project_data.append(
{
"organization_id": 1,
"project_id": project_id,
"environment": "production",
"uptime_subscription_id": random.randint(1, 3) * project_id,
"uptime_check_id": str(uuid.uuid4()),
"scheduled_check_time": scheduled_time.strftime("%Y-%m-%d %H:%M:%S"),
"timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
"duration": random.randint(1, 1000),
"region_id": random.randint(1, 3),
"check_status": check_status,
"check_status_reason": "Timeout error"
if check_status == "failure"
else None,
"http_status_code": http_status,
"trace_id": str(uuid.uuid4()),
"retention_days": 30,
}
)

response = requests.post(
"http://localhost:8123",
params={"query": query},
data="\n".join(json.dumps(row) for row in project_data),
)

if response.status_code == 200:
total_records += len(project_data)
print(
f"Successfully inserted {len(project_data)} records for project {project_id}"
)
else:
print(f"Error inserting data for project {project_id}: {response.text}")

print(f"Total records inserted: {total_records}")
1 change: 1 addition & 0 deletions snuba/clusters/storage_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"EVENTS_ANALYTICS_PLATFORM": "events_analytics_platform",
"GROUP_ATTRIBUTES": "group_attributes",
"PROFILE_CHUNKS": "profile_chunks",
"UPTIME_MONITOR_CHECKS": "uptime_monitor_checks",
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
version: v1
kind: writable_storage
name: uptime_monitor_checks
storage:
key: uptime_monitor_checks
set_key: uptime_monitor_checks
readiness_state: limited
schema:
columns:
[
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: environment, type: String, args: { schema_modifiers: [nullable, low_cardinality] } },
{ name: uptime_subscription_id, type: UUID },
{ name: uptime_check_id, type: UUID },
{ name: scheduled_check_time, type: DateTime64, args: { precision: 3 } },
{ name: timestamp, type: DateTime64, args: { precision: 3 } },
{ name: duration_ms, type: UInt, args: { size: 64 } },
{ name: region_slug, type: String, args: { schema_modifiers: [low_cardinality] } },
{ name: check_status, type: String, args: { schema_modifiers: [low_cardinality] } },
{ name: check_status_reason, type: String, args: { schema_modifiers: [nullable, low_cardinality] } },
{ name: http_status_code, type: UInt, args: { size: 16 } },
{ name: trace_id, type: UUID },
{ name: retention_days, type: UInt, args: { size: 16 } },
]
local_table_name: uptime_monitor_checks_local
dist_table_name: uptime_monitor_checks_dist


mandatory_condition_checkers:
- condition: ProjectIdEnforcer

stream_loader:
processor: UptimeMonitorChecksProcessor
default_topic: snuba-uptime-results

allocation_policies:
- name: ConcurrentRateLimitAllocationPolicy
args:
required_tenant_types:
- organization_id
- referrer
- project_id
default_config_overrides:
is_enforced: 0
- name: ReferrerGuardRailPolicy
args:
required_tenant_types:
- referrer
default_config_overrides:
is_enforced: 0
is_active: 0
- name: BytesScannedRejectingPolicy
args:
required_tenant_types:
- organization_id
- project_id
- referrer
default_config_overrides:
is_active: 0
is_enforced: 0
Loading
Loading