Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ const (

const (
stateActive = "active"
stateIdle = "idle"
)

const selectPgStatActivity = `
Expand Down Expand Up @@ -54,12 +53,14 @@ const selectPgStatActivity = `
FROM pg_stat_activity s
JOIN pg_database d ON s.datid = d.oid AND NOT d.datistemplate AND d.datallowconn
WHERE
s.backend_type != 'client backend' OR
s.pid != pg_backend_pid() AND
s.state != 'idle' AND
(
s.pid != pg_backend_pid() AND
coalesce(TRIM(s.query), '') != '' AND
s.query_id != 0 AND
s.state != 'idle'
s.backend_type != 'client backend' OR
(
coalesce(TRIM(s.query), '') != '' AND
s.query_id != 0
)
)
`

Expand Down Expand Up @@ -112,12 +113,20 @@ type QuerySamples struct {
samples map[SampleKey]*SampleState
}

// SampleKey uses (PID, QueryID, XID) so concurrent executions of the same
// SampleKey uses (PID, QueryID, QueryStartNs) so concurrent executions of the same
// query across backends/transactions are uniquely tracked between scrapes.
type SampleKey struct {
PID int
QueryID int64
XID int32
PID int
QueryID int64
QueryStartNs int64
}

func newSampleKey(pid int, queryID int64, queryStart sql.NullTime) SampleKey {
key := SampleKey{PID: pid, QueryID: queryID, QueryStartNs: 0}
if queryStart.Valid {
key.QueryStartNs = queryStart.Time.UnixNano()
}
return key
}

// SampleState buffers state across scrapes and is emitted once the query
Expand Down Expand Up @@ -235,7 +244,6 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {
defer rows.Close()

activeKeys := map[SampleKey]struct{}{}
idleKeys := map[SampleKey]struct{}{}

for rows.Next() {
sample, scanErr := c.scanRow(rows)
Expand All @@ -244,18 +252,12 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {
continue
}

key, isIdle, procErr := c.processRow(sample)
key, procErr := c.processRow(sample)
if procErr != nil {
level.Debug(c.logger).Log("msg", "invalid pg_stat_activity set", "queryid", sample.QueryID.Int64, "err", procErr)
continue
}

if isIdle {
c.upsertIdleSample(key, sample)
idleKeys[key] = struct{}{}
continue
}

c.upsertActiveSample(key, sample)
activeKeys[key] = struct{}{}
}
Expand All @@ -265,7 +267,13 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error {
return err
}

c.finalizeSamples(activeKeys, idleKeys)
// finalize samples that are no longer active
for key := range c.samples {
if _, stillActive := activeKeys[key]; stillActive {
continue
}
c.emitAndDeleteSample(key)
}
return nil
}

Expand Down Expand Up @@ -297,33 +305,12 @@ func (c *QuerySamples) scanRow(rows *sql.Rows) (QuerySamplesInfo, error) {
return sample, err
}

func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, bool, error) {
func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, error) {
if err := c.validateQuerySample(sample); err != nil {
return SampleKey{}, false, err
}
key := SampleKey{PID: sample.PID, QueryID: sample.QueryID.Int64, XID: sample.BackendXID.Int32}
if sample.State.Valid && sample.State.String == stateIdle {
return key, true, nil
}
return key, false, nil
}

func (c *QuerySamples) finalizeSamples(activeKeys, idleKeys map[SampleKey]struct{}) {
for key := range idleKeys {
if _, ok := c.samples[key]; ok {
c.emitAndDeleteSample(key)
}
}

for key := range c.samples {
if _, stillActive := activeKeys[key]; stillActive {
continue
}
if _, wasIdle := idleKeys[key]; wasIdle {
continue
}
c.emitAndDeleteSample(key)
return SampleKey{}, err
}
key := newSampleKey(sample.PID, sample.QueryID.Int64, sample.QueryStart)
return key, nil
}

func (c QuerySamples) validateQuerySample(sample QuerySamplesInfo) error {
Expand Down Expand Up @@ -388,17 +375,6 @@ func (t *WaitEventTracker) upsertWaitEvent(sample QuerySamplesInfo, now time.Tim
}
}

func (c *QuerySamples) upsertIdleSample(key SampleKey, sample QuerySamplesInfo) {
state, ok := c.samples[key]
if !ok {
state = &SampleState{tracker: newWaitEventTracker()}
c.samples[key] = state
}
state.LastRow = sample
state.LastSeenAt = sample.Now
state.tracker.CloseOpen()
}

func (c *QuerySamples) emitAndDeleteSample(key SampleKey) {
state, ok := c.samples[key]
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,69 +401,6 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) {
require.NoError(t, mock.ExpectationsWereMet())
})

t.Run("xid change finalizes previous sample and starts new", func(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.NoError(t, err)
defer db.Close()

logBuffer := syncbuffer.Buffer{}
lokiClient := loki_fake.NewClient(func() {})

sampleCollector, err := NewQuerySamples(QuerySamplesArguments{
DB: db,
CollectInterval: 10 * time.Millisecond,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)),
})
require.NoError(t, err)

// Scrape 1: xid=1
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows(columns).AddRow(
now, "testdb", 200, sql.NullInt64{},
"testuser", "testapp", "127.0.0.1", 5432,
"client backend", now.Add(-1*time.Minute), sql.NullInt32{Int32: 1, Valid: true}, sql.NullInt32{},
now.Add(-30*time.Second), "active", now.Add(-10*time.Second), sql.NullString{},
sql.NullString{}, nil, now.Add(-10*time.Second), sql.NullInt64{Int64: 777, Valid: true},
"SELECT 1",
))
// Scrape 2: xid=2 (same pid/queryid)
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows(columns).AddRow(
now, "testdb", 200, sql.NullInt64{},
"testuser", "testapp", "127.0.0.1", 5432,
"client backend", now, sql.NullInt32{Int32: 2, Valid: true}, sql.NullInt32{},
now, "active", now, sql.NullString{},
sql.NullString{}, nil, now, sql.NullInt64{Int64: 777, Valid: true},
"SELECT 1",
))
// Scrape 3: disappear -> finalize xid=2
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows(columns))

require.NoError(t, sampleCollector.Start(t.Context()))

require.EventuallyWithT(t, func(t *assert.CollectT) {
entries := lokiClient.Received()
require.Len(t, entries, 2)
// First emitted: xid=1
require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels)
require.Contains(t, entries[0].Line, `xid="1"`)
require.Contains(t, entries[0].Line, `queryid="777"`)
require.Contains(t, entries[0].Line, `cpu_time="10s"`)
// Second emitted: xid=2
require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[1].Labels)
require.Contains(t, entries[1].Line, `xid="2"`)
require.Contains(t, entries[1].Line, `queryid="777"`)
}, 5*time.Second, 50*time.Millisecond)

sampleCollector.Stop()
require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond)
lokiClient.Stop()
time.Sleep(100 * time.Millisecond)
require.NoError(t, mock.ExpectationsWereMet())
})

t.Run("wait-event merges across scrapes with normalized PID set", func(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.NoError(t, err)
Expand All @@ -478,8 +415,8 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) {
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)),
})
require.NoError(t, err)

require.NoError(t, err)
// Scrape 1: wait event with unordered/dup PIDs
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows(columns).AddRow(
Expand Down Expand Up @@ -579,61 +516,6 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) {
require.NoError(t, mock.ExpectationsWereMet())
})

// Finalize when row turns idle (non-client backend)
t.Run("finalize when row turns idle", func(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.NoError(t, err)
defer db.Close()

logBuffer := syncbuffer.Buffer{}
lokiClient := loki_fake.NewClient(func() {})

sampleCollector, err := NewQuerySamples(QuerySamplesArguments{
DB: db,
CollectInterval: 10 * time.Millisecond,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)),
})
require.NoError(t, err)

// Scrape 1: parallel worker active
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows(columns).AddRow(
now, "testdb", 401, sql.NullInt64{Int64: 400, Valid: true},
"testuser", "testapp", "127.0.0.1", 5432,
"parallel worker", backendStartTime, sql.NullInt32{Int32: 42, Valid: true}, sql.NullInt32{},
xactStartTime, "active", stateChangeTime, sql.NullString{},
sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 9001, Valid: true},
"SELECT * FROM t",
))
// Scrape 2: same row turns idle (allowed for non-client backend)
mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed().
WillReturnRows(sqlmock.NewRows(columns).AddRow(
now, "testdb", 401, sql.NullInt64{Int64: 400, Valid: true},
"testuser", "testapp", "127.0.0.1", 5432,
"parallel worker", backendStartTime, sql.NullInt32{Int32: 42, Valid: true}, sql.NullInt32{},
xactStartTime, "idle", stateChangeTime, sql.NullString{},
sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 9001, Valid: true},
"SELECT * FROM t",
))

require.NoError(t, sampleCollector.Start(t.Context()))

require.EventuallyWithT(t, func(t *assert.CollectT) {
entries := lokiClient.Received()
require.Len(t, entries, 1)
require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels)
require.Contains(t, entries[0].Line, `leader_pid="400"`)
require.Contains(t, entries[0].Line, `backend_type="parallel worker"`)
}, 5*time.Second, 50*time.Millisecond)

sampleCollector.Stop()
require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond)
lokiClient.Stop()
time.Sleep(100 * time.Millisecond)
require.NoError(t, mock.ExpectationsWereMet())
})

// CPU persists across later waits
t.Run("cpu persists across waits", func(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
Expand Down