diff --git a/internal/datastore/mysql/connection.go b/internal/datastore/mysql/connection.go index 10562ff63..d3ab5c886 100644 --- a/internal/datastore/mysql/connection.go +++ b/internal/datastore/mysql/connection.go @@ -3,6 +3,7 @@ package mysql import ( "context" "database/sql/driver" + "errors" "fmt" "strconv" "time" @@ -13,26 +14,27 @@ import ( ) var ( - connectHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + connectHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "spicedb", Subsystem: "datastore", Name: "mysql_connect_duration", Help: "distribution in seconds of time spent opening a new MySQL connection.", Buckets: []float64{0.01, 0.1, 0.5, 1, 5, 10, 25, 60, 120}, - }) + }, []string{"db_name"}) connectCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "spicedb", Subsystem: "datastore", Name: "mysql_connect_count_total", Help: "number of mysql connections opened.", - }, []string{"success"}) + }, []string{"success", "db_name"}) ) // instrumentedConnector wraps the default MySQL driver connector // to get metrics and tracing when creating a new connection type instrumentedConnector struct { - conn driver.Connector - drv driver.Driver + conn driver.Connector + drv driver.Driver + dbName string } func (d *instrumentedConnector) Connect(ctx context.Context) (driver.Conn, error) { @@ -41,11 +43,11 @@ func (d *instrumentedConnector) Connect(ctx context.Context) (driver.Conn, error startTime := time.Now() defer func() { - connectHistogram.Observe(time.Since(startTime).Seconds()) + connectHistogram.WithLabelValues(d.dbName).Observe(time.Since(startTime).Seconds()) }() conn, err := d.conn.Connect(ctx) - connectCount.WithLabelValues(strconv.FormatBool(err == nil)).Inc() + connectCount.WithLabelValues(strconv.FormatBool(err == nil), d.dbName).Inc() if err != nil { span.RecordError(err) log.Ctx(ctx).Error().Err(err).Msg("failed to open mysql connection") @@ -59,20 +61,21 @@ func (d *instrumentedConnector) Driver() driver.Driver { return d.drv } -func instrumentConnector(c driver.Connector) (driver.Connector, error) { +func instrumentConnector(c driver.Connector, dbName string) (driver.Connector, error) { err := prometheus.Register(connectHistogram) - if err != nil { + if err != nil && !errors.As(err, &prometheus.AlreadyRegisteredError{}) { return nil, fmt.Errorf("unable to register metric: %w", err) } err = prometheus.Register(connectCount) - if err != nil { + if err != nil && !errors.As(err, &prometheus.AlreadyRegisteredError{}) { return nil, fmt.Errorf("unable to register metric: %w", err) } return &instrumentedConnector{ - conn: c, - drv: c.Driver(), + conn: c, + drv: c.Driver(), + dbName: dbName, }, nil } diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 854b6f846..f410016fd 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -175,16 +175,16 @@ func newMySQLDatastore(ctx context.Context, uri string, replicaIndex int, option var db *sql.DB if config.enablePrometheusStats { - connector, err = instrumentConnector(connector) - if err != nil { - return nil, common.RedactAndLogSensitiveConnString(ctx, "NewMySQLDatastore: unable to instrument connector", err, uri) - } - dbName := "spicedb" if replicaIndex != primaryInstanceID { dbName = fmt.Sprintf("spicedb_replica_%d", replicaIndex) } + connector, err = instrumentConnector(connector, dbName) + if err != nil { + return nil, common.RedactAndLogSensitiveConnString(ctx, "NewMySQLDatastore: unable to instrument connector", err, uri) + } + db = sql.OpenDB(connector) collector := sqlstats.NewStatsCollector(dbName, db) if err := prometheus.Register(collector); err != nil { diff --git a/internal/datastore/mysql/datastore_test.go b/internal/datastore/mysql/datastore_test.go index b682f752b..2e67d4ee9 100644 --- a/internal/datastore/mysql/datastore_test.go +++ b/internal/datastore/mysql/datastore_test.go @@ -6,6 +6,7 @@ package mysql import ( "context" "database/sql" + "errors" "fmt" "testing" "time" @@ -103,7 +104,7 @@ func createMultiDatastoreTest(b testdatastore.RunningEngineForTest, tf multiData ds, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...) require.NoError(t, err) - ds2, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...) + ds2, err := newMySQLDatastore(ctx, uri, primaryInstanceID+1, options...) require.NoError(t, err) secondDS = ds2 @@ -132,11 +133,30 @@ func TestMySQLRevisionTimestamps(t *testing.T) { t.Run("TransactionTimestamps", createDatastoreTest(b, TransactionTimestampsTest, defaultOptions...)) } -func additionalMySQLTests(t *testing.T, b testdatastore.RunningEngineForTest) { - reg := prometheus.NewRegistry() - prometheus.DefaultGatherer = reg - prometheus.DefaultRegisterer = reg +func multiDataStoreWithDuplicateDBNames(t *testing.T, b testdatastore.RunningEngineForTest, errMsg string) func(*testing.T) { + return func(t *testing.T) { + ctx := t.Context() + + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newMySQLDatastore(ctx, uri, primaryInstanceID, WithEnablePrometheusStats(true)) + require.NoError(t, err) + + _, err = newMySQLDatastore(ctx, uri, primaryInstanceID, WithEnablePrometheusStats(true)) + require.Error(t, err) + require.ErrorContains(t, err, errMsg) + + return ds + }) + defer failOnError(t, ds.Close) + } +} +func additionalMySQLTests(t *testing.T, b testdatastore.RunningEngineForTest) { + { + reg := prometheus.NewRegistry() + prometheus.DefaultGatherer = reg + prometheus.DefaultRegisterer = reg + } t.Run("DatabaseSeeding", createDatastoreTest(b, DatabaseSeedingTest)) t.Run("PrometheusCollector", createDatastoreTest( b, @@ -151,7 +171,41 @@ func additionalMySQLTests(t *testing.T, b testdatastore.RunningEngineForTest) { t.Run("QuantizedRevisions", func(t *testing.T) { QuantizedRevisionTest(t, b) }) + { + reg := &unregisterable{*prometheus.NewRegistry()} + prometheus.DefaultGatherer = reg + prometheus.DefaultRegisterer = reg + } + t.Run("DatastoreWithUnregisterableRegistry", func(t *testing.T) { + _ = b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := newMySQLDatastore(t.Context(), uri, primaryInstanceID, WithEnablePrometheusStats(true)) + require.Error(t, err) + require.ErrorContains(t, err, "NewMySQLDatastore: unable to instrument connector.") + require.Nil(t, ds) + return ds + }) + }) t.Run("Locking", createMultiDatastoreTest(b, LockingTest, defaultOptions...)) + { + reg := prometheus.NewRegistry() + prometheus.DefaultGatherer = reg + prometheus.DefaultRegisterer = reg + } + t.Run("MultiDatastoreWithPrometheus", createMultiDatastoreTest(b, func(t *testing.T, ds1 datastore.Datastore, ds2 datastore.Datastore) {}, append(defaultOptions, WithEnablePrometheusStats(true))...)) + { + reg := prometheus.NewRegistry() + prometheus.DefaultGatherer = reg + prometheus.DefaultRegisterer = reg + } + t.Run("MultiDatastoreWithPrometheusWithDuplicateDBNames", multiDataStoreWithDuplicateDBNames(t, b, "duplicate metrics collector registration attempted")) +} + +type unregisterable struct { + prometheus.Registry +} + +func (*unregisterable) Register(prometheus.Collector) error { + return errors.New("unregisterable") } func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) {