diff --git a/src/sql-server-util/examples/cdc.rs b/src/sql-server-util/examples/cdc.rs index fa88e88e0ae0f..88491ee415e49 100644 --- a/src/sql-server-util/examples/cdc.rs +++ b/src/sql-server-util/examples/cdc.rs @@ -42,7 +42,7 @@ use futures::StreamExt; use mz_ore::future::InTask; use mz_sql_server_util::cdc::CdcEvent; use mz_sql_server_util::config::TunnelConfig; -use mz_sql_server_util::{Client, Config}; +use mz_sql_server_util::{Client, Config, LoggingSqlServerCdcMetrics}; use tracing_subscriber::EnvFilter; #[tokio::main] @@ -65,7 +65,8 @@ async fn main() -> Result<(), anyhow::Error> { tracing::info!("connection 1 successful!"); let capture_instances = ["materialize_t1", "materialize_t2"]; - let mut cdc_handle = client_1.cdc(capture_instances); + let metrics = LoggingSqlServerCdcMetrics; + let mut cdc_handle = client_1.cdc(capture_instances, metrics); cdc_handle.wait_for_ready().await?; diff --git a/src/sql-server-util/src/cdc.rs b/src/sql-server-util/src/cdc.rs index f001de1d6667d..b0136076bfcc0 100644 --- a/src/sql-server-util/src/cdc.rs +++ b/src/sql-server-util/src/cdc.rs @@ -75,7 +75,7 @@ use tiberius::numeric::Numeric; use crate::desc::{SqlServerQualifiedTableName, SqlServerTableRaw}; use crate::inspect::DDLEvent; -use crate::{Client, SqlServerError, TransactionIsolationLevel}; +use crate::{Client, SqlServerCdcMetrics, SqlServerError, TransactionIsolationLevel}; /// A stream of changes from a table in SQL Server that has CDC enabled. /// @@ -83,7 +83,7 @@ use crate::{Client, SqlServerError, TransactionIsolationLevel}; /// poll the upstream source. /// /// See: -pub struct CdcStream<'a> { +pub struct CdcStream<'a, M: SqlServerCdcMetrics> { /// Client we use for querying SQL Server. client: &'a mut Client, /// Upstream capture instances we'll list changes from. @@ -97,18 +97,22 @@ pub struct CdcStream<'a> { /// we'll wait this duration for SQL Server to report an [`Lsn`] and thus indicate CDC is /// ready to go. max_lsn_wait: Duration, + /// Metrics. + metrics: M, } -impl<'a> CdcStream<'a> { +impl<'a, M: SqlServerCdcMetrics> CdcStream<'a, M> { pub(crate) fn new( client: &'a mut Client, capture_instances: BTreeMap, Option>, + metrics: M, ) -> Self { CdcStream { client, capture_instances, poll_interval: Duration::from_secs(1), max_lsn_wait: Duration::from_secs(10), + metrics, } } @@ -164,6 +168,13 @@ impl<'a> CdcStream<'a> { // as it will be just be locking the table(s). let mut fencing_client = self.client.new_connection().await?; let mut fence_txn = fencing_client.transaction().await?; + let qualified_table_name = format!( + "{schema_name}.{table_name}", + schema_name = &table.schema_name, + table_name = &table.name + ); + self.metrics + .snapshot_table_lock_start(&qualified_table_name); fence_txn .lock_table_shared(&table.schema_name, &table.name) .await?; @@ -204,7 +215,7 @@ impl<'a> CdcStream<'a> { // the table no longer needs to be locked. Any writes that happen to the upstream table // will have an LSN higher than our captured LSN, and will be read from CDC. fence_txn.rollback().await?; - + self.metrics.snapshot_table_lock_end(&qualified_table_name); let lsn = txn.get_lsn().await?; tracing::info!(%source_id, ?lsn, "timely-{worker_id} starting snapshot"); @@ -225,7 +236,9 @@ impl<'a> CdcStream<'a> { } /// Consume `self` returning a [`Stream`] of [`CdcEvent`]s. - pub fn into_stream(mut self) -> impl Stream> + use<'a> { + pub fn into_stream( + mut self, + ) -> impl Stream> + use<'a, M> { async_stream::try_stream! { // Initialize all of our start LSNs. self.initialize_start_lsns().await?; diff --git a/src/sql-server-util/src/lib.rs b/src/sql-server-util/src/lib.rs index 7c2ff4ccf2acd..5d9ccc43bef31 100644 --- a/src/sql-server-util/src/lib.rs +++ b/src/sql-server-util/src/lib.rs @@ -341,16 +341,17 @@ impl Client { /// `capture_instances`. /// /// [`CdcStream`]: crate::cdc::CdcStream - pub fn cdc(&mut self, capture_instances: I) -> crate::cdc::CdcStream<'_> + pub fn cdc(&mut self, capture_instances: I, metrics: M) -> crate::cdc::CdcStream<'_, M> where I: IntoIterator, I::Item: Into>, + M: SqlServerCdcMetrics, { let instances = capture_instances .into_iter() .map(|i| (i.into(), None)) .collect(); - crate::cdc::CdcStream::new(self, instances) + crate::cdc::CdcStream::new(self, instances, metrics) } } @@ -945,6 +946,27 @@ pub fn quote_identifier(ident: &str) -> String { quoted } +pub trait SqlServerCdcMetrics { + /// Called before the table lock is aquired + fn snapshot_table_lock_start(&self, table_name: &str); + /// Called after the table lock is released + fn snapshot_table_lock_end(&self, table_name: &str); +} + +/// A simple implementation of [`SqlServerCdcMetrics`] that uses the tracing framework to log +/// the start and end conditions. +pub struct LoggingSqlServerCdcMetrics; + +impl SqlServerCdcMetrics for LoggingSqlServerCdcMetrics { + fn snapshot_table_lock_start(&self, table_name: &str) { + tracing::info!("snapshot_table_lock_start: {table_name}"); + } + + fn snapshot_table_lock_end(&self, table_name: &str) { + tracing::info!("snapshot_table_lock_end: {table_name}"); + } +} + #[cfg(test)] mod test { use super::*; diff --git a/src/storage/src/metrics.rs b/src/storage/src/metrics.rs index 1703d7fa3e136..285f20a52920a 100644 --- a/src/storage/src/metrics.rs +++ b/src/storage/src/metrics.rs @@ -138,7 +138,7 @@ impl StorageMetrics { source::SourceMetrics::new(&self.source_defs.source_defs, id, worker_id) } - /// Get a `PgMetrics` for the given id. + /// Get a `PgSourceMetrics` for the given id. pub(crate) fn get_postgres_source_metrics( &self, id: GlobalId, @@ -154,6 +154,19 @@ impl StorageMetrics { source::mysql::MySqlSourceMetrics::new(&self.source_defs.mysql_defs, id) } + /// Get a `SqlServerSourceMetrics` for the given id. + pub(crate) fn get_sql_server_source_metrics( + &self, + source_id: GlobalId, + worker_id: usize, + ) -> source::sql_server::SqlServerSourceMetrics { + source::sql_server::SqlServerSourceMetrics::new( + &self.source_defs.sql_server_defs, + source_id, + worker_id, + ) + } + /// Get an `OffsetCommitMetrics` for the given id. pub(crate) fn get_offset_commit_metrics(&self, id: GlobalId) -> source::OffsetCommitMetrics { source::OffsetCommitMetrics::new(&self.source_defs.source_defs, id) diff --git a/src/storage/src/metrics/source.rs b/src/storage/src/metrics/source.rs index 0f305037cad03..bb8878ebd1881 100644 --- a/src/storage/src/metrics/source.rs +++ b/src/storage/src/metrics/source.rs @@ -25,6 +25,7 @@ use prometheus::core::{AtomicI64, AtomicU64}; pub mod kafka; pub mod mysql; pub mod postgres; +pub mod sql_server; /// Definitions for general metrics about sources that are not specific to the source type. /// @@ -225,6 +226,7 @@ pub(crate) struct SourceMetricDefs { pub(crate) postgres_defs: postgres::PgSourceMetricDefs, pub(crate) mysql_defs: mysql::MySqlSourceMetricDefs, pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs, + pub(crate) sql_server_defs: sql_server::SqlServerSourceMetricDefs, /// A cluster-wide counter shared across all sources. pub(crate) bytes_read: IntCounter, } @@ -236,6 +238,7 @@ impl SourceMetricDefs { postgres_defs: postgres::PgSourceMetricDefs::register_with(registry), mysql_defs: mysql::MySqlSourceMetricDefs::register_with(registry), kafka_source_defs: kafka::KafkaSourceMetricDefs::register_with(registry), + sql_server_defs: sql_server::SqlServerSourceMetricDefs::register_with(registry), bytes_read: registry.register(metric!( name: "mz_bytes_read_total", help: "Count of bytes read from sources", diff --git a/src/storage/src/metrics/source/sql_server.rs b/src/storage/src/metrics/source/sql_server.rs new file mode 100644 index 0000000000000..16de5566d3e90 --- /dev/null +++ b/src/storage/src/metrics/source/sql_server.rs @@ -0,0 +1,177 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Metrics for Postgres. + +use std::collections::BTreeMap; +use std::rc::Rc; +use std::sync::Mutex; + +use mz_ore::metric; +use mz_ore::metrics::{ + DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, IntGaugeVec, MetricsRegistry, + UIntGaugeVec, +}; +use mz_repr::GlobalId; +use prometheus::core::{AtomicF64, AtomicI64, AtomicU64}; + +/// Definitions for Postgres source metrics. +#[derive(Clone, Debug)] +pub(crate) struct SqlServerSourceMetricDefs { + pub(crate) ignored_messages: IntCounterVec, + pub(crate) insert_messages: IntCounterVec, + pub(crate) update_messages: IntCounterVec, + pub(crate) delete_messages: IntCounterVec, + pub(crate) snapshot_table_count: UIntGaugeVec, + pub(crate) snapshot_table_size_latency: GaugeVec, + pub(crate) snapshot_table_lock: IntGaugeVec, +} + +impl SqlServerSourceMetricDefs { + pub(crate) fn register_with(registry: &MetricsRegistry) -> Self { + // Every metric must have a worker specific id associated with it. These are later wrapped + // in a DeleteOnDrop helper. If the label was just `source_id` and one worker completed, it + // would call the DeleteOnDrop code that deregestiers the metric for `source_id`. Other + // workers may still be running, but the metrics registry will no longer record or report + // metrics for that `source_id`. + Self { + ignored_messages: registry.register(metric!( + name: "mz_sql_server_per_source_ignored_messages", + help: "The number of messages ignored because of an irrelevant type or relation_id", + var_labels: ["source_id", "worker_id"], + )), + insert_messages: registry.register(metric!( + name: "mz_sql_server_per_source_inserts", + help: "The number of inserts for all tables in this source", + var_labels: ["source_id", "worker_id"], + )), + update_messages: registry.register(metric!( + name: "mz_sql_server_per_source_updates", + help: "The number of updates for all tables in this source", + var_labels: ["source_id", "worker_id"], + )), + delete_messages: registry.register(metric!( + name: "mz_sql_server_per_source_deletes", + help: "The number of deletes for all tables in this source", + var_labels: ["source_id", "worker_id"], + )), + snapshot_table_count: registry.register(metric!( + name: "mz_sql_server_snapshot_table_count", + help: "The number of tables that SQL Server still needs to snapshot", + var_labels: ["source_id", "worker_id"], + )), + snapshot_table_size_latency: registry.register(metric!( + name: "mz_sql_server_snapshot_count_latency", + help: "The wall time used to obtain snapshot sizes.", + var_labels: ["source_id", "worker_id", "table_name"], + )), + snapshot_table_lock: registry.register(metric!( + name: "mz_sql_server_snapshot_table_lock", + help: "The upstream tables locked for snapshot.", + var_labels: ["source_id", "worker_id", "table_name"], + )), + } + } +} +#[derive(Clone)] +/// Metrics for Postgres sources. +pub(crate) struct SqlServerSourceMetrics { + // stored as String to avoid having to convert them repeatedly. + source_id: String, + worker_id: String, + defs: SqlServerSourceMetricDefs, + // Currently, this structure is not accessed across threads. + snapshot_table_size_latency: + Rc>>>>, + snapshot_table_lock_count: + Rc>>>>, + + pub(crate) inserts: DeleteOnDropCounter>, + pub(crate) updates: DeleteOnDropCounter>, + pub(crate) deletes: DeleteOnDropCounter>, + pub(crate) ignored: DeleteOnDropCounter>, + pub(crate) snapshot_table_count: DeleteOnDropGauge>, +} + +impl SqlServerSourceMetrics { + /// Create a `SqlServerSourceMetrics` from the `SqlServerSourceMetricDefs`. + pub(crate) fn new( + defs: &SqlServerSourceMetricDefs, + source_id: GlobalId, + worker_id: usize, + ) -> Self { + let source_id_labels = &[source_id.to_string(), worker_id.to_string()]; + Self { + source_id: source_id.to_string(), + worker_id: worker_id.to_string(), + defs: defs.clone(), + inserts: defs + .insert_messages + .get_delete_on_drop_metric(source_id_labels.to_vec()), + updates: defs + .update_messages + .get_delete_on_drop_metric(source_id_labels.to_vec()), + deletes: defs + .delete_messages + .get_delete_on_drop_metric(source_id_labels.to_vec()), + ignored: defs + .ignored_messages + .get_delete_on_drop_metric(source_id_labels.to_vec()), + snapshot_table_count: defs + .snapshot_table_count + .get_delete_on_drop_metric(source_id_labels.to_vec()), + snapshot_table_size_latency: Default::default(), + snapshot_table_lock_count: Default::default(), + } + } + + pub fn set_snapshot_table_size_latency(&self, table_name: &str, latency: f64) { + let mut snapshot_table_size_latency = + self.snapshot_table_size_latency.lock().expect("poisoned"); + match snapshot_table_size_latency.entry(table_name.to_string()) { + std::collections::btree_map::Entry::Vacant(vacant_entry) => { + let labels = vec![ + self.source_id.clone(), + self.worker_id.clone(), + table_name.to_string(), + ]; + let metric = self + .defs + .snapshot_table_size_latency + .get_delete_on_drop_metric(labels); + vacant_entry.insert(metric).set(latency); + } + std::collections::btree_map::Entry::Occupied(occupied_entry) => { + occupied_entry.get().set(latency) + } + } + } + + pub fn update_snapshot_table_lock_count(&self, table_name: &str, delta: i64) { + let mut snapshot_table_lock_count = + self.snapshot_table_lock_count.lock().expect("poisoned"); + match snapshot_table_lock_count.entry(table_name.to_string()) { + std::collections::btree_map::Entry::Vacant(vacant_entry) => { + let labels = vec![ + self.source_id.clone(), + self.worker_id.clone(), + table_name.to_string(), + ]; + let metric = self + .defs + .snapshot_table_lock + .get_delete_on_drop_metric(labels); + vacant_entry.insert(metric).add(delta); + } + std::collections::btree_map::Entry::Occupied(occupied_entry) => { + occupied_entry.get().add(delta); + } + } + } +} diff --git a/src/storage/src/source/sql_server.rs b/src/storage/src/source/sql_server.rs index 9dff0e79bb26e..e083401c0c1e1 100644 --- a/src/storage/src/source/sql_server.rs +++ b/src/storage/src/source/sql_server.rs @@ -157,11 +157,16 @@ impl SourceRender for SqlServerSourceConnection { source_outputs.insert(*id, output_info); } + let metrics = config + .metrics + .get_sql_server_source_metrics(config.id, config.worker_id); + let (repl_updates, uppers, repl_errs, repl_token) = replication::render( scope.clone(), config.clone(), source_outputs.clone(), self.clone(), + metrics, ); let (progress_errs, progress_probes, progress_token) = progress::render( diff --git a/src/storage/src/source/sql_server/replication.rs b/src/storage/src/source/sql_server/replication.rs index 458aeb50f2f61..fedb265272ca5 100644 --- a/src/storage/src/source/sql_server/replication.rs +++ b/src/storage/src/source/sql_server/replication.rs @@ -22,6 +22,7 @@ use itertools::Itertools; use mz_ore::cast::CastFrom; use mz_ore::future::InTask; use mz_repr::{Diff, GlobalId, Row, RowArena}; +use mz_sql_server_util::SqlServerCdcMetrics; use mz_sql_server_util::cdc::{CdcEvent, Lsn, Operation as CdcOperation}; use mz_sql_server_util::desc::SqlServerRowDecoder; use mz_sql_server_util::inspect::get_latest_restore_history_id; @@ -40,6 +41,7 @@ use timely::dataflow::operators::{CapabilitySet, Concat, Map}; use timely::dataflow::{Scope, Stream as TimelyStream}; use timely::progress::{Antichain, Timestamp}; +use crate::metrics::source::sql_server::SqlServerSourceMetrics; use crate::source::RawSourceCreationConfig; use crate::source::sql_server::{ DefiniteError, ReplicationError, SourceOutputInfo, TransientError, @@ -58,6 +60,7 @@ pub(crate) fn render>( config: RawSourceCreationConfig, outputs: BTreeMap, source: SqlServerSourceConnection, + metrics: SqlServerSourceMetrics, ) -> ( StackedCollection)>, TimelyStream, @@ -134,6 +137,7 @@ pub(crate) fn render>( // A worker *must* emit a count even if not responsible for snapshotting a table // as statistic summarization will return null if any worker hasn't set a value. // This will also reset snapshot stats for any exports not snapshotting. + metrics.snapshot_table_count.set(u64::cast_from(capture_instance_to_snapshot.len())); if !capture_instance_to_snapshot.is_empty() { for stats in config.statistics.values() { stats.set_snapshot_records_known(0); @@ -177,15 +181,23 @@ pub(crate) fn render>( // happens outside the snapshot transaction the totals might be off, so we won't assert // that we get exactly this many rows later. for table in &snapshot_tables { + let qualified_table_name = format!("{schema_name}.{table_name}", + schema_name = &table.schema_name, + table_name = &table.name); + let size_calc_start = Instant::now(); let table_total = mz_sql_server_util::inspect::snapshot_size(&mut client, &table.schema_name, &table.name).await?; + metrics.set_snapshot_table_size_latency( + &qualified_table_name, + size_calc_start.elapsed().as_secs_f64() + ); for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() { export_stat.set_snapshot_records_known(u64::cast_from(table_total)); export_stat.set_snapshot_records_staged(0); } } - + let cdc_metrics = PrometheusSqlServerCdcMetrics{inner: &metrics}; let mut cdc_handle = client - .cdc(capture_instances.keys().cloned()) + .cdc(capture_instances.keys().cloned(), cdc_metrics) .max_lsn_wait(MAX_LSN_WAIT.get(config.config.config_set())); // Snapshot any instance that requires it. @@ -208,7 +220,7 @@ pub(crate) fn render>( for table in snapshot_tables { // TODO(sql_server3): filter columns to only select columns required for Source. - let (snapshot_lsn, snapshot)= cdc_handle + let (snapshot_lsn, snapshot) = cdc_handle .snapshot(&table, config.worker_id, config.id) .await?; @@ -267,7 +279,7 @@ pub(crate) fn render>( } tracing::info!(%config.id, %table.name, %table.schema_name, %snapshot_lsn, "timely-{worker_id} snapshot complete"); - + metrics.snapshot_table_count.dec(); // final update for snapshot_staged, using the staged values as the total is an estimate for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() { export_stat.set_snapshot_records_staged(snapshot_staged); @@ -385,6 +397,7 @@ pub(crate) fn render>( if errored_instances.contains(&capture_instance) { // outputs for this captured instance are in an errored state, so they are not // emitted + metrics.ignored.inc_by(u64::cast_from(changes.len())); } let Some(partition_indexes) = capture_instances.get(&capture_instance) else { @@ -410,7 +423,8 @@ pub(crate) fn render>( lsn, &rewinds, &data_output, - data_cap_set + data_cap_set, + &metrics ).await? }, CdcEvent::SchemaUpdate { capture_instance, table, ddl_event } => { @@ -470,15 +484,23 @@ async fn handle_data_event( rewinds: &BTreeMap, data_output: &StackedAsyncOutputHandle)>, data_cap_set: &CapabilitySet, + metrics: &SqlServerSourceMetrics, ) -> Result<(), TransientError> { for change in changes { let (sql_server_row, diff): (_, _) = match change { - CdcOperation::Insert(sql_server_row) | CdcOperation::UpdateNew(sql_server_row) => { + CdcOperation::Insert(sql_server_row) => { + metrics.inserts.inc(); (sql_server_row, Diff::ONE) } - CdcOperation::Delete(sql_server_row) | CdcOperation::UpdateOld(sql_server_row) => { + CdcOperation::UpdateNew(sql_server_row) => { + metrics.updates.inc(); + (sql_server_row, Diff::ONE) + } + CdcOperation::Delete(sql_server_row) => { + metrics.deletes.inc(); (sql_server_row, Diff::MINUS_ONE) } + CdcOperation::UpdateOld(sql_server_row) => (sql_server_row, Diff::MINUS_ONE), }; // Try to decode a row, returning a SourceError if it fails. @@ -571,3 +593,18 @@ async fn return_definite_error( ReplicationError::DefiniteError(Rc::new(err)), ); } + +/// Provides an implemntation of [`SqlServerCdcMetrics`] that will update [`SqlServerSourceMetrics`]` +struct PrometheusSqlServerCdcMetrics<'a> { + inner: &'a SqlServerSourceMetrics, +} + +impl<'a> SqlServerCdcMetrics for PrometheusSqlServerCdcMetrics<'a> { + fn snapshot_table_lock_start(&self, table_name: &str) { + self.inner.update_snapshot_table_lock_count(table_name, 1); + } + + fn snapshot_table_lock_end(&self, table_name: &str) { + self.inner.update_snapshot_table_lock_count(table_name, -1); + } +}