Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions internal/datastore/mysql/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"context"
"database/sql/driver"
"errors"
"fmt"
"strconv"
"time"
Expand All @@ -13,26 +14,27 @@
)

var (
connectHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
connectHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "spicedb",
Subsystem: "datastore",
Name: "mysql_connect_duration",
Help: "distribution in seconds of time spent opening a new MySQL connection.",
Buckets: []float64{0.01, 0.1, 0.5, 1, 5, 10, 25, 60, 120},
})
}, []string{"db_name"})
connectCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore",
Name: "mysql_connect_count_total",
Help: "number of mysql connections opened.",
}, []string{"success"})
}, []string{"success", "db_name"})
)

// instrumentedConnector wraps the default MySQL driver connector
// to get metrics and tracing when creating a new connection
type instrumentedConnector struct {
conn driver.Connector
drv driver.Driver
conn driver.Connector
drv driver.Driver
dbName string
}

func (d *instrumentedConnector) Connect(ctx context.Context) (driver.Conn, error) {
Expand All @@ -41,11 +43,11 @@

startTime := time.Now()
defer func() {
connectHistogram.Observe(time.Since(startTime).Seconds())
connectHistogram.WithLabelValues(d.dbName).Observe(time.Since(startTime).Seconds())

Check warning on line 46 in internal/datastore/mysql/connection.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/connection.go#L46

Added line #L46 was not covered by tests
}()

conn, err := d.conn.Connect(ctx)
connectCount.WithLabelValues(strconv.FormatBool(err == nil)).Inc()
connectCount.WithLabelValues(strconv.FormatBool(err == nil), d.dbName).Inc()

Check warning on line 50 in internal/datastore/mysql/connection.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/connection.go#L50

Added line #L50 was not covered by tests
if err != nil {
span.RecordError(err)
log.Ctx(ctx).Error().Err(err).Msg("failed to open mysql connection")
Expand All @@ -59,20 +61,21 @@
return d.drv
}

func instrumentConnector(c driver.Connector) (driver.Connector, error) {
func instrumentConnector(c driver.Connector, dbName string) (driver.Connector, error) {

Check warning on line 64 in internal/datastore/mysql/connection.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/connection.go#L64

Added line #L64 was not covered by tests
err := prometheus.Register(connectHistogram)
if err != nil {
if err != nil && !errors.As(err, &prometheus.AlreadyRegisteredError{}) {

Check warning on line 66 in internal/datastore/mysql/connection.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/connection.go#L66

Added line #L66 was not covered by tests
return nil, fmt.Errorf("unable to register metric: %w", err)
}

err = prometheus.Register(connectCount)
if err != nil {
if err != nil && !errors.As(err, &prometheus.AlreadyRegisteredError{}) {

Check warning on line 71 in internal/datastore/mysql/connection.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/connection.go#L71

Added line #L71 was not covered by tests
return nil, fmt.Errorf("unable to register metric: %w", err)
}

return &instrumentedConnector{
conn: c,
drv: c.Driver(),
conn: c,
drv: c.Driver(),
dbName: dbName,

Check warning on line 78 in internal/datastore/mysql/connection.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/connection.go#L76-L78

Added lines #L76 - L78 were not covered by tests
}, nil
}

Expand Down
10 changes: 5 additions & 5 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,16 @@

var db *sql.DB
if config.enablePrometheusStats {
connector, err = instrumentConnector(connector)
if err != nil {
return nil, common.RedactAndLogSensitiveConnString(ctx, "NewMySQLDatastore: unable to instrument connector", err, uri)
}

dbName := "spicedb"
if replicaIndex != primaryInstanceID {
dbName = fmt.Sprintf("spicedb_replica_%d", replicaIndex)
}

connector, err = instrumentConnector(connector, dbName)
if err != nil {
return nil, common.RedactAndLogSensitiveConnString(ctx, "NewMySQLDatastore: unable to instrument connector", err, uri)
}

Check warning on line 186 in internal/datastore/mysql/datastore.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/datastore.go#L183-L186

Added lines #L183 - L186 were not covered by tests

db = sql.OpenDB(connector)
collector := sqlstats.NewStatsCollector(dbName, db)
if err := prometheus.Register(collector); err != nil {
Expand Down
64 changes: 59 additions & 5 deletions internal/datastore/mysql/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package mysql
import (
"context"
"database/sql"
"errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -103,7 +104,7 @@ func createMultiDatastoreTest(b testdatastore.RunningEngineForTest, tf multiData
ds, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...)
require.NoError(t, err)

ds2, err := newMySQLDatastore(ctx, uri, primaryInstanceID, options...)
ds2, err := newMySQLDatastore(ctx, uri, primaryInstanceID+1, options...)
require.NoError(t, err)

secondDS = ds2
Expand Down Expand Up @@ -132,11 +133,30 @@ func TestMySQLRevisionTimestamps(t *testing.T) {
t.Run("TransactionTimestamps", createDatastoreTest(b, TransactionTimestampsTest, defaultOptions...))
}

func additionalMySQLTests(t *testing.T, b testdatastore.RunningEngineForTest) {
reg := prometheus.NewRegistry()
prometheus.DefaultGatherer = reg
prometheus.DefaultRegisterer = reg
func multiDataStoreWithDuplicateDBNames(t *testing.T, b testdatastore.RunningEngineForTest, errMsg string) func(*testing.T) {
return func(t *testing.T) {
ctx := t.Context()

ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := newMySQLDatastore(ctx, uri, primaryInstanceID, WithEnablePrometheusStats(true))
require.NoError(t, err)

_, err = newMySQLDatastore(ctx, uri, primaryInstanceID, WithEnablePrometheusStats(true))
require.Error(t, err)
require.ErrorContains(t, err, errMsg)

return ds
})
defer failOnError(t, ds.Close)
}
}

func additionalMySQLTests(t *testing.T, b testdatastore.RunningEngineForTest) {
{
reg := prometheus.NewRegistry()
prometheus.DefaultGatherer = reg
prometheus.DefaultRegisterer = reg
}
t.Run("DatabaseSeeding", createDatastoreTest(b, DatabaseSeedingTest))
t.Run("PrometheusCollector", createDatastoreTest(
b,
Expand All @@ -151,7 +171,41 @@ func additionalMySQLTests(t *testing.T, b testdatastore.RunningEngineForTest) {
t.Run("QuantizedRevisions", func(t *testing.T) {
QuantizedRevisionTest(t, b)
})
{
reg := &unregisterable{*prometheus.NewRegistry()}
prometheus.DefaultGatherer = reg
prometheus.DefaultRegisterer = reg
}
t.Run("DatastoreWithUnregisterableRegistry", func(t *testing.T) {
_ = b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
ds, err := newMySQLDatastore(t.Context(), uri, primaryInstanceID, WithEnablePrometheusStats(true))
require.Error(t, err)
require.ErrorContains(t, err, "NewMySQLDatastore: unable to instrument connector.")
require.Nil(t, ds)
return ds
})
})
t.Run("Locking", createMultiDatastoreTest(b, LockingTest, defaultOptions...))
{
reg := prometheus.NewRegistry()
prometheus.DefaultGatherer = reg
prometheus.DefaultRegisterer = reg
}
t.Run("MultiDatastoreWithPrometheus", createMultiDatastoreTest(b, func(t *testing.T, ds1 datastore.Datastore, ds2 datastore.Datastore) {}, append(defaultOptions, WithEnablePrometheusStats(true))...))
{
reg := prometheus.NewRegistry()
prometheus.DefaultGatherer = reg
prometheus.DefaultRegisterer = reg
}
t.Run("MultiDatastoreWithPrometheusWithDuplicateDBNames", multiDataStoreWithDuplicateDBNames(t, b, "duplicate metrics collector registration attempted"))
}

type unregisterable struct {
prometheus.Registry
}

func (*unregisterable) Register(prometheus.Collector) error {
return errors.New("unregisterable")
}

func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) {
Expand Down
Loading