Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/sql-server-util/examples/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use futures::StreamExt;
use mz_ore::future::InTask;
use mz_sql_server_util::cdc::CdcEvent;
use mz_sql_server_util::config::TunnelConfig;
use mz_sql_server_util::{Client, Config};
use mz_sql_server_util::{Client, Config, LoggingSqlServerCdcMetrics};
use tracing_subscriber::EnvFilter;

#[tokio::main]
Expand All @@ -65,7 +65,8 @@ async fn main() -> Result<(), anyhow::Error> {
tracing::info!("connection 1 successful!");

let capture_instances = ["materialize_t1", "materialize_t2"];
let mut cdc_handle = client_1.cdc(capture_instances);
let metrics = LoggingSqlServerCdcMetrics;
let mut cdc_handle = client_1.cdc(capture_instances, metrics);

cdc_handle.wait_for_ready().await?;

Expand Down
23 changes: 18 additions & 5 deletions src/sql-server-util/src/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ use tiberius::numeric::Numeric;

use crate::desc::{SqlServerQualifiedTableName, SqlServerTableRaw};
use crate::inspect::DDLEvent;
use crate::{Client, SqlServerError, TransactionIsolationLevel};
use crate::{Client, SqlServerCdcMetrics, SqlServerError, TransactionIsolationLevel};

/// A stream of changes from a table in SQL Server that has CDC enabled.
///
/// SQL Server does not have an API to push or notify consumers of changes, so we periodically
/// poll the upstream source.
///
/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-tables/change-data-capture-tables-transact-sql?view=sql-server-ver16>
pub struct CdcStream<'a> {
pub struct CdcStream<'a, M: SqlServerCdcMetrics> {
/// Client we use for querying SQL Server.
client: &'a mut Client,
/// Upstream capture instances we'll list changes from.
Expand All @@ -97,18 +97,22 @@ pub struct CdcStream<'a> {
/// we'll wait this duration for SQL Server to report an [`Lsn`] and thus indicate CDC is
/// ready to go.
max_lsn_wait: Duration,
/// Metrics.
metrics: M,
}

impl<'a> CdcStream<'a> {
impl<'a, M: SqlServerCdcMetrics> CdcStream<'a, M> {
pub(crate) fn new(
client: &'a mut Client,
capture_instances: BTreeMap<Arc<str>, Option<Lsn>>,
metrics: M,
) -> Self {
CdcStream {
client,
capture_instances,
poll_interval: Duration::from_secs(1),
max_lsn_wait: Duration::from_secs(10),
metrics,
}
}

Expand Down Expand Up @@ -164,6 +168,13 @@ impl<'a> CdcStream<'a> {
// as it will be just be locking the table(s).
let mut fencing_client = self.client.new_connection().await?;
let mut fence_txn = fencing_client.transaction().await?;
let qualified_table_name = format!(
"{schema_name}.{table_name}",
schema_name = &table.schema_name,
table_name = &table.name
);
self.metrics
.snapshot_table_lock_start(&qualified_table_name);
fence_txn
.lock_table_shared(&table.schema_name, &table.name)
.await?;
Expand Down Expand Up @@ -204,7 +215,7 @@ impl<'a> CdcStream<'a> {
// the table no longer needs to be locked. Any writes that happen to the upstream table
// will have an LSN higher than our captured LSN, and will be read from CDC.
fence_txn.rollback().await?;

self.metrics.snapshot_table_lock_end(&qualified_table_name);
let lsn = txn.get_lsn().await?;

tracing::info!(%source_id, ?lsn, "timely-{worker_id} starting snapshot");
Expand All @@ -225,7 +236,9 @@ impl<'a> CdcStream<'a> {
}

/// Consume `self` returning a [`Stream`] of [`CdcEvent`]s.
pub fn into_stream(mut self) -> impl Stream<Item = Result<CdcEvent, SqlServerError>> + use<'a> {
pub fn into_stream(
mut self,
) -> impl Stream<Item = Result<CdcEvent, SqlServerError>> + use<'a, M> {
async_stream::try_stream! {
// Initialize all of our start LSNs.
self.initialize_start_lsns().await?;
Expand Down
26 changes: 24 additions & 2 deletions src/sql-server-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,16 +341,17 @@ impl Client {
/// `capture_instances`.
///
/// [`CdcStream`]: crate::cdc::CdcStream
pub fn cdc<I>(&mut self, capture_instances: I) -> crate::cdc::CdcStream<'_>
pub fn cdc<I, M>(&mut self, capture_instances: I, metrics: M) -> crate::cdc::CdcStream<'_, M>
where
I: IntoIterator,
I::Item: Into<Arc<str>>,
M: SqlServerCdcMetrics,
{
let instances = capture_instances
.into_iter()
.map(|i| (i.into(), None))
.collect();
crate::cdc::CdcStream::new(self, instances)
crate::cdc::CdcStream::new(self, instances, metrics)
}
}

Expand Down Expand Up @@ -945,6 +946,27 @@ pub fn quote_identifier(ident: &str) -> String {
quoted
}

pub trait SqlServerCdcMetrics {
/// Called before the table lock is aquired
fn snapshot_table_lock_start(&self, table_name: &str);
/// Called after the table lock is released
fn snapshot_table_lock_end(&self, table_name: &str);
}

/// A simple implementation of [`SqlServerCdcMetrics`] that uses the tracing framework to log
/// the start and end conditions.
pub struct LoggingSqlServerCdcMetrics;

impl SqlServerCdcMetrics for LoggingSqlServerCdcMetrics {
fn snapshot_table_lock_start(&self, table_name: &str) {
tracing::info!("snapshot_table_lock_start: {table_name}");
}

fn snapshot_table_lock_end(&self, table_name: &str) {
tracing::info!("snapshot_table_lock_end: {table_name}");
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
15 changes: 14 additions & 1 deletion src/storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl StorageMetrics {
source::SourceMetrics::new(&self.source_defs.source_defs, id, worker_id)
}

/// Get a `PgMetrics` for the given id.
/// Get a `PgSourceMetrics` for the given id.
pub(crate) fn get_postgres_source_metrics(
&self,
id: GlobalId,
Expand All @@ -154,6 +154,19 @@ impl StorageMetrics {
source::mysql::MySqlSourceMetrics::new(&self.source_defs.mysql_defs, id)
}

/// Get a `SqlServerSourceMetrics` for the given id.
pub(crate) fn get_sql_server_source_metrics(
&self,
source_id: GlobalId,
worker_id: usize,
) -> source::sql_server::SqlServerSourceMetrics {
source::sql_server::SqlServerSourceMetrics::new(
&self.source_defs.sql_server_defs,
source_id,
worker_id,
)
}

/// Get an `OffsetCommitMetrics` for the given id.
pub(crate) fn get_offset_commit_metrics(&self, id: GlobalId) -> source::OffsetCommitMetrics {
source::OffsetCommitMetrics::new(&self.source_defs.source_defs, id)
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/metrics/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use prometheus::core::{AtomicI64, AtomicU64};
pub mod kafka;
pub mod mysql;
pub mod postgres;
pub mod sql_server;

/// Definitions for general metrics about sources that are not specific to the source type.
///
Expand Down Expand Up @@ -225,6 +226,7 @@ pub(crate) struct SourceMetricDefs {
pub(crate) postgres_defs: postgres::PgSourceMetricDefs,
pub(crate) mysql_defs: mysql::MySqlSourceMetricDefs,
pub(crate) kafka_source_defs: kafka::KafkaSourceMetricDefs,
pub(crate) sql_server_defs: sql_server::SqlServerSourceMetricDefs,
/// A cluster-wide counter shared across all sources.
pub(crate) bytes_read: IntCounter,
}
Expand All @@ -236,6 +238,7 @@ impl SourceMetricDefs {
postgres_defs: postgres::PgSourceMetricDefs::register_with(registry),
mysql_defs: mysql::MySqlSourceMetricDefs::register_with(registry),
kafka_source_defs: kafka::KafkaSourceMetricDefs::register_with(registry),
sql_server_defs: sql_server::SqlServerSourceMetricDefs::register_with(registry),
bytes_read: registry.register(metric!(
name: "mz_bytes_read_total",
help: "Count of bytes read from sources",
Expand Down
177 changes: 177 additions & 0 deletions src/storage/src/metrics/source/sql_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Metrics for Postgres.

use std::collections::BTreeMap;
use std::rc::Rc;
use std::sync::Mutex;

use mz_ore::metric;
use mz_ore::metrics::{
DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, IntGaugeVec, MetricsRegistry,
UIntGaugeVec,
};
use mz_repr::GlobalId;
use prometheus::core::{AtomicF64, AtomicI64, AtomicU64};

/// Definitions for Postgres source metrics.
#[derive(Clone, Debug)]
pub(crate) struct SqlServerSourceMetricDefs {
pub(crate) ignored_messages: IntCounterVec,
pub(crate) insert_messages: IntCounterVec,
pub(crate) update_messages: IntCounterVec,
pub(crate) delete_messages: IntCounterVec,
pub(crate) snapshot_table_count: UIntGaugeVec,
pub(crate) snapshot_table_size_latency: GaugeVec,
pub(crate) snapshot_table_lock: IntGaugeVec,
}

impl SqlServerSourceMetricDefs {
pub(crate) fn register_with(registry: &MetricsRegistry) -> Self {
// Every metric must have a worker specific id associated with it. These are later wrapped
// in a DeleteOnDrop helper. If the label was just `source_id` and one worker completed, it
// would call the DeleteOnDrop code that deregestiers the metric for `source_id`. Other
// workers may still be running, but the metrics registry will no longer record or report
// metrics for that `source_id`.
Self {
ignored_messages: registry.register(metric!(
name: "mz_sql_server_per_source_ignored_messages",
help: "The number of messages ignored because of an irrelevant type or relation_id",
var_labels: ["source_id", "worker_id"],
)),
insert_messages: registry.register(metric!(
name: "mz_sql_server_per_source_inserts",
help: "The number of inserts for all tables in this source",
var_labels: ["source_id", "worker_id"],
)),
update_messages: registry.register(metric!(
name: "mz_sql_server_per_source_updates",
help: "The number of updates for all tables in this source",
var_labels: ["source_id", "worker_id"],
)),
delete_messages: registry.register(metric!(
name: "mz_sql_server_per_source_deletes",
help: "The number of deletes for all tables in this source",
var_labels: ["source_id", "worker_id"],
)),
snapshot_table_count: registry.register(metric!(
name: "mz_sql_server_snapshot_table_count",
help: "The number of tables that SQL Server still needs to snapshot",
var_labels: ["source_id", "worker_id"],
)),
snapshot_table_size_latency: registry.register(metric!(
name: "mz_sql_server_snapshot_count_latency",
help: "The wall time used to obtain snapshot sizes.",
var_labels: ["source_id", "worker_id", "table_name"],
)),
snapshot_table_lock: registry.register(metric!(
name: "mz_sql_server_snapshot_table_lock",
help: "The upstream tables locked for snapshot.",
var_labels: ["source_id", "worker_id", "table_name"],
)),
}
}
}
#[derive(Clone)]
/// Metrics for Postgres sources.
pub(crate) struct SqlServerSourceMetrics {
// stored as String to avoid having to convert them repeatedly.
source_id: String,
worker_id: String,
defs: SqlServerSourceMetricDefs,
// Currently, this structure is not accessed across threads.
snapshot_table_size_latency:
Rc<Mutex<BTreeMap<String, DeleteOnDropGauge<AtomicF64, Vec<String>>>>>,
snapshot_table_lock_count:
Rc<Mutex<BTreeMap<String, DeleteOnDropGauge<AtomicI64, Vec<String>>>>>,

pub(crate) inserts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
pub(crate) updates: DeleteOnDropCounter<AtomicU64, Vec<String>>,
pub(crate) deletes: DeleteOnDropCounter<AtomicU64, Vec<String>>,
pub(crate) ignored: DeleteOnDropCounter<AtomicU64, Vec<String>>,
pub(crate) snapshot_table_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
}

impl SqlServerSourceMetrics {
/// Create a `SqlServerSourceMetrics` from the `SqlServerSourceMetricDefs`.
pub(crate) fn new(
defs: &SqlServerSourceMetricDefs,
source_id: GlobalId,
worker_id: usize,
) -> Self {
let source_id_labels = &[source_id.to_string(), worker_id.to_string()];
Self {
source_id: source_id.to_string(),
worker_id: worker_id.to_string(),
defs: defs.clone(),
inserts: defs
.insert_messages
.get_delete_on_drop_metric(source_id_labels.to_vec()),
updates: defs
.update_messages
.get_delete_on_drop_metric(source_id_labels.to_vec()),
deletes: defs
.delete_messages
.get_delete_on_drop_metric(source_id_labels.to_vec()),
ignored: defs
.ignored_messages
.get_delete_on_drop_metric(source_id_labels.to_vec()),
snapshot_table_count: defs
.snapshot_table_count
.get_delete_on_drop_metric(source_id_labels.to_vec()),
snapshot_table_size_latency: Default::default(),
snapshot_table_lock_count: Default::default(),
}
}

pub fn set_snapshot_table_size_latency(&self, table_name: &str, latency: f64) {
let mut snapshot_table_size_latency =
self.snapshot_table_size_latency.lock().expect("poisoned");
match snapshot_table_size_latency.entry(table_name.to_string()) {
std::collections::btree_map::Entry::Vacant(vacant_entry) => {
let labels = vec![
self.source_id.clone(),
self.worker_id.clone(),
table_name.to_string(),
];
let metric = self
.defs
.snapshot_table_size_latency
.get_delete_on_drop_metric(labels);
vacant_entry.insert(metric).set(latency);
}
std::collections::btree_map::Entry::Occupied(occupied_entry) => {
occupied_entry.get().set(latency)
}
}
}

pub fn update_snapshot_table_lock_count(&self, table_name: &str, delta: i64) {
let mut snapshot_table_lock_count =
self.snapshot_table_lock_count.lock().expect("poisoned");
match snapshot_table_lock_count.entry(table_name.to_string()) {
std::collections::btree_map::Entry::Vacant(vacant_entry) => {
let labels = vec![
self.source_id.clone(),
self.worker_id.clone(),
table_name.to_string(),
];
let metric = self
.defs
.snapshot_table_lock
.get_delete_on_drop_metric(labels);
vacant_entry.insert(metric).add(delta);
}
std::collections::btree_map::Entry::Occupied(occupied_entry) => {
occupied_entry.get().add(delta);
}
}
}
}
5 changes: 5 additions & 0 deletions src/storage/src/source/sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,16 @@ impl SourceRender for SqlServerSourceConnection {
source_outputs.insert(*id, output_info);
}

let metrics = config
.metrics
.get_sql_server_source_metrics(config.id, config.worker_id);

let (repl_updates, uppers, repl_errs, repl_token) = replication::render(
scope.clone(),
config.clone(),
source_outputs.clone(),
self.clone(),
metrics,
);

let (progress_errs, progress_probes, progress_token) = progress::render(
Expand Down
Loading