Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/common/event-recorder/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ pub trait Event: Send + Sync + Debug {
vec![]
}

/// Add the extra row to the event with the default row.
fn extra_row(&self) -> Result<Row> {
Ok(Row { values: vec![] })
/// Add the extra rows to the event with the default row.
fn extra_rows(&self) -> Result<Vec<Row>> {
Ok(vec![Row { values: vec![] }])
}

/// Returns the event as any type.
Expand Down Expand Up @@ -159,15 +159,17 @@ pub fn build_row_inserts_request(events: &[&Box<dyn Event>]) -> Result<RowInsert

let mut rows: Vec<Row> = Vec::with_capacity(events.len());
for event in events {
let extra_row = event.extra_row()?;
let mut values = Vec::with_capacity(3 + extra_row.values.len());
values.extend([
ValueData::StringValue(event.event_type().to_string()).into(),
ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
]);
values.extend(extra_row.values);
rows.push(Row { values });
let extra_rows = event.extra_rows()?;
for extra_row in extra_rows {
let mut values = Vec::with_capacity(3 + extra_row.values.len());
values.extend([
ValueData::StringValue(event.event_type().to_string()).into(),
ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
]);
values.extend(extra_row.values);
rows.push(Row { values });
}
}

Ok(RowInsertRequests {
Expand Down
6 changes: 3 additions & 3 deletions src/common/frontend/src/slow_query_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ impl Event for SlowQueryEvent {
]
}

fn extra_row(&self) -> Result<Row> {
Ok(Row {
fn extra_rows(&self) -> Result<Vec<Row>> {
Ok(vec![Row {
values: vec![
ValueData::U64Value(self.cost).into(),
ValueData::U64Value(self.threshold).into(),
Expand All @@ -119,7 +119,7 @@ impl Event for SlowQueryEvent {
ValueData::TimestampMillisecondValue(self.promql_start.unwrap_or(0)).into(),
ValueData::TimestampMillisecondValue(self.promql_end.unwrap_or(0)).into(),
],
})
}])
}

fn json_payload(&self) -> Result<String> {
Expand Down
102 changes: 86 additions & 16 deletions src/common/procedure/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,95 @@ impl Event for ProcedureEvent {
schema
}

fn extra_row(&self) -> Result<Row> {
let error_str = match &self.state {
ProcedureState::Failed { error } => format!("{:?}", error),
ProcedureState::PrepareRollback { error } => format!("{:?}", error),
ProcedureState::RollingBack { error } => format!("{:?}", error),
ProcedureState::Retrying { error } => format!("{:?}", error),
ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
_ => "".to_string(),
};
let mut row = vec![
ValueData::StringValue(self.procedure_id.to_string()).into(),
ValueData::StringValue(self.state.as_str_name().to_string()).into(),
ValueData::StringValue(error_str).into(),
];
row.append(&mut self.internal_event.extra_row()?.values);
Ok(Row { values: row })
fn extra_rows(&self) -> Result<Vec<Row>> {
let mut internal_event_extra_rows = self.internal_event.extra_rows()?;
let mut rows = Vec::with_capacity(internal_event_extra_rows.len());
for internal_event_extra_row in internal_event_extra_rows.iter_mut() {
let error_str = match &self.state {
ProcedureState::Failed { error } => format!("{:?}", error),
ProcedureState::PrepareRollback { error } => format!("{:?}", error),
ProcedureState::RollingBack { error } => format!("{:?}", error),
ProcedureState::Retrying { error } => format!("{:?}", error),
ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
_ => "".to_string(),
};
let mut values = vec![
ValueData::StringValue(self.procedure_id.to_string()).into(),
ValueData::StringValue(self.state.as_str_name().to_string()).into(),
ValueData::StringValue(error_str).into(),
];
values.append(&mut internal_event_extra_row.values);
rows.push(Row { values });
}

Ok(rows)
}

fn as_any(&self) -> &dyn Any {
self
}
}

#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
use common_event_recorder::Event;

use crate::{ProcedureEvent, ProcedureId, ProcedureState};

#[derive(Debug)]
struct TestEvent;

impl Event for TestEvent {
fn event_type(&self) -> &str {
"test_event"
}

fn extra_schema(&self) -> Vec<ColumnSchema> {
vec![ColumnSchema {
column_name: "test_event_column".to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
}]
}

fn extra_rows(&self) -> common_event_recorder::error::Result<Vec<Row>> {
Ok(vec![
Row {
values: vec![ValueData::StringValue("test_event1".to_string()).into()],
},
Row {
values: vec![ValueData::StringValue("test_event2".to_string()).into()],
},
])
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
}

#[test]
fn test_procedure_event_extra_rows() {
let procedure_event = ProcedureEvent::new(
ProcedureId::random(),
Box::new(TestEvent {}),
ProcedureState::Running,
);

let procedure_event_extra_rows = procedure_event.extra_rows().unwrap();
assert_eq!(procedure_event_extra_rows.len(), 2);
assert_eq!(procedure_event_extra_rows[0].values.len(), 4);
assert_eq!(
procedure_event_extra_rows[0].values[3],
ValueData::StringValue("test_event1".to_string()).into()
);
assert_eq!(procedure_event_extra_rows[1].values.len(), 4);
assert_eq!(
procedure_event_extra_rows[1].values[3],
ValueData::StringValue("test_event2".to_string()).into()
);
}
}
71 changes: 38 additions & 33 deletions src/meta-srv/src/events/region_migration_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_event_recorder::Event;
use common_event_recorder::error::{Result, SerializeEventSnafu};
use serde::Serialize;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};
use store_api::storage::RegionId;

use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason};

Expand All @@ -37,37 +37,34 @@ pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_nod
pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr";

/// RegionMigrationEvent is the event of region migration.
#[derive(Debug, Serialize)]
#[derive(Debug)]
pub(crate) struct RegionMigrationEvent {
#[serde(skip)]
region_id: RegionId,
#[serde(skip)]
table_id: TableId,
#[serde(skip)]
region_number: u32,
#[serde(skip)]
/// The region ids of the region migration.
region_ids: Vec<RegionId>,
/// The trigger reason of the region migration.
trigger_reason: RegionMigrationTriggerReason,
#[serde(skip)]
/// The source node id of the region migration.
src_node_id: u64,
#[serde(skip)]
/// The source peer address of the region migration.
src_peer_addr: String,
#[serde(skip)]
/// The destination node id of the region migration.
dst_node_id: u64,
#[serde(skip)]
/// The destination peer address of the region migration.
dst_peer_addr: String,
/// The timeout of the region migration.
timeout: Duration,
}

// The following fields will be serialized as the json payload.
#[derive(Debug, Serialize)]
struct Payload {
#[serde(with = "humantime_serde")]
timeout: Duration,
}

impl RegionMigrationEvent {
pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self {
// FIXME(weny): handle multiple region ids.
let region_id = ctx.region_ids[0];
Self {
region_id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
region_ids: ctx.region_ids.clone(),
trigger_reason: ctx.trigger_reason,
src_node_id: ctx.from_peer.id,
src_peer_addr: ctx.from_peer.addr.clone(),
Expand Down Expand Up @@ -136,23 +133,31 @@ impl Event for RegionMigrationEvent {
]
}

fn extra_row(&self) -> Result<Row> {
Ok(Row {
values: vec![
ValueData::U64Value(self.region_id.as_u64()).into(),
ValueData::U32Value(self.table_id).into(),
ValueData::U32Value(self.region_number).into(),
ValueData::StringValue(self.trigger_reason.to_string()).into(),
ValueData::U64Value(self.src_node_id).into(),
ValueData::StringValue(self.src_peer_addr.clone()).into(),
ValueData::U64Value(self.dst_node_id).into(),
ValueData::StringValue(self.dst_peer_addr.clone()).into(),
],
})
fn extra_rows(&self) -> Result<Vec<Row>> {
let mut extra_rows = Vec::with_capacity(self.region_ids.len());
for region_id in &self.region_ids {
extra_rows.push(Row {
values: vec![
ValueData::U64Value(region_id.as_u64()).into(),
ValueData::U32Value(region_id.table_id()).into(),
ValueData::U32Value(region_id.region_number()).into(),
ValueData::StringValue(self.trigger_reason.to_string()).into(),
ValueData::U64Value(self.src_node_id).into(),
ValueData::StringValue(self.src_peer_addr.clone()).into(),
ValueData::U64Value(self.dst_node_id).into(),
ValueData::StringValue(self.dst_peer_addr.clone()).into(),
],
});
}

Ok(extra_rows)
}

fn json_payload(&self) -> Result<String> {
serde_json::to_string(self).context(SerializeEventSnafu)
serde_json::to_string(&Payload {
timeout: self.timeout,
})
.context(SerializeEventSnafu)
}

fn as_any(&self) -> &dyn Any {
Expand Down
24 changes: 19 additions & 5 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub(crate) mod open_candidate_region;
pub mod test_util;
pub(crate) mod update_metadata;
pub(crate) mod upgrade_candidate_region;
pub(crate) mod utils;

use std::any::Any;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -100,9 +101,14 @@ where
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistentContext {
/// The table catalog.
pub(crate) catalog: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) catalog: Option<String>,
/// The table schema.
pub(crate) schema: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) schema: Option<String>,
/// The catalog and schema of the regions.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub(crate) catalog_and_schema: Vec<(String, String)>,
/// The [Peer] of migration source.
pub(crate) from_peer: Peer,
/// The [Peer] of migration destination.
Expand All @@ -124,9 +130,17 @@ fn default_timeout() -> Duration {

impl PersistentContext {
pub fn lock_key(&self) -> Vec<StringKey> {
let mut lock_keys = Vec::with_capacity(self.region_ids.len() + 2);
lock_keys.push(CatalogLock::Read(&self.catalog).into());
lock_keys.push(SchemaLock::read(&self.catalog, &self.schema).into());
let mut lock_keys =
Vec::with_capacity(self.region_ids.len() + 2 + self.catalog_and_schema.len() * 2);
if let (Some(catalog), Some(schema)) = (&self.catalog, &self.schema) {
lock_keys.push(CatalogLock::Read(catalog).into());
lock_keys.push(SchemaLock::read(catalog, schema).into());
}
for (catalog, schema) in self.catalog_and_schema.iter() {
lock_keys.push(CatalogLock::Read(catalog).into());
lock_keys.push(SchemaLock::read(catalog, schema).into());
}

// Sort the region ids to ensure the same order of region ids.
let mut region_ids = self.region_ids.clone();
region_ids.sort_unstable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,9 @@ mod tests {

fn new_persistent_context() -> PersistentContext {
PersistentContext {
catalog: "greptime".into(),
schema: "public".into(),
catalog: Some("greptime".to_string()),
schema: Some("public".to_string()),
catalog_and_schema: vec![],
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_ids: vec![RegionId::new(1024, 1)],
Expand Down
Loading
Loading