From c0b191f560d29fcab82e1c5fb37f68eac6a9c3bb Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 10 Oct 2025 17:42:51 -0300 Subject: [PATCH 1/9] support xid being 0 when the transaction starts, without emitting additional events --- .../postgres/collector/query_samples.go | 37 ++- .../postgres/collector/query_samples_test.go | 255 ++++++++++++++++-- 2 files changed, 272 insertions(+), 20 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index 216a0fc020..5d5ded9860 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -112,12 +112,13 @@ type QuerySamples struct { samples map[SampleKey]*SampleState } -// SampleKey uses (PID, QueryID, XID) so concurrent executions of the same +// SampleKey uses (PID, QueryID, XID, XactStartNs) so concurrent executions of the same // query across backends/transactions are uniquely tracked between scrapes. type SampleKey struct { - PID int - QueryID int64 - XID int32 + PID int + QueryID int64 + XID int32 + XactStartNs int64 } // SampleState buffers state across scrapes and is emitted once the query @@ -250,6 +251,8 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { continue } + key = c.tryMigrateKey(key) + if isIdle { c.upsertIdleSample(key, sample) idleKeys[key] = struct{}{} @@ -301,13 +304,37 @@ func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, bool, err if err := c.validateQuerySample(sample); err != nil { return SampleKey{}, false, err } - key := SampleKey{PID: sample.PID, QueryID: sample.QueryID.Int64, XID: sample.BackendXID.Int32} + key := newSampleKey(sample) if sample.State.Valid && sample.State.String == stateIdle { return key, true, nil } return key, false, nil } +// tryMigrateKey migrates an existing sample keyed by (PID,QueryID,XID=0,XactStart) +// to the new key (PID,QueryID,XID>0) once a real XID becomes available. This avoids +// emitting a partial sample for the pre-XID phase of the same execution. +func (c *QuerySamples) tryMigrateKey(key SampleKey) SampleKey { + if key.XID == 0 { + return key + } + zeroKey := SampleKey{PID: key.PID, QueryID: key.QueryID, XID: 0, XactStartNs: key.XactStartNs} + if state, ok := c.samples[zeroKey]; ok { + c.samples[key] = state + delete(c.samples, zeroKey) + } + return key +} + +// newSampleKey constructs a stable 4-tuple key for an execution instance. +func newSampleKey(sample QuerySamplesInfo) SampleKey { + key := SampleKey{PID: sample.PID, QueryID: sample.QueryID.Int64, XID: sample.BackendXID.Int32} + if sample.XactStart.Valid { + key.XactStartNs = sample.XactStart.Time.UnixNano() + } + return key +} + func (c *QuerySamples) finalizeSamples(activeKeys, idleKeys map[SampleKey]struct{}) { for key := range idleKeys { if _, ok := c.samples[key]; ok { diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index daf87aea37..6b2089d235 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -401,7 +401,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.NoError(t, mock.ExpectationsWereMet()) }) - t.Run("xid change finalizes previous sample and starts new", func(t *testing.T) { + t.Run("xid change migrates state and emits single sample", func(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) defer db.Close() @@ -417,27 +417,27 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { }) require.NoError(t, err) - // Scrape 1: xid=1 + // Scrape 1: xid=0, with xact_start present (coalesced key) mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 200, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, - "client backend", now.Add(-1*time.Minute), sql.NullInt32{Int32: 1, Valid: true}, sql.NullInt32{}, + "client backend", now.Add(-1*time.Minute), sql.NullInt32{}, sql.NullInt32{}, now.Add(-30*time.Second), "active", now.Add(-10*time.Second), sql.NullString{}, sql.NullString{}, nil, now.Add(-10*time.Second), sql.NullInt64{Int64: 777, Valid: true}, "SELECT 1", )) - // Scrape 2: xid=2 (same pid/queryid) + // Scrape 2: xid=2 appears (same pid/queryid, same xact_start) -> migrate mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 200, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, "client backend", now, sql.NullInt32{Int32: 2, Valid: true}, sql.NullInt32{}, - now, "active", now, sql.NullString{}, - sql.NullString{}, nil, now, sql.NullInt64{Int64: 777, Valid: true}, + now.Add(-30*time.Second), "active", now, sql.NullString{}, + sql.NullString{}, nil, now.Add(-10*time.Second), sql.NullInt64{Int64: 777, Valid: true}, "SELECT 1", )) - // Scrape 3: disappear -> finalize xid=2 + // Scrape 3: disappear -> finalize single sample mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) @@ -445,16 +445,11 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.EventuallyWithT(t, func(t *assert.CollectT) { entries := lokiClient.Received() - require.Len(t, entries, 2) - // First emitted: xid=1 + require.Len(t, entries, 1) + // Single emission after disappearance, with final XID=2 require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Contains(t, entries[0].Line, `xid="1"`) + require.Contains(t, entries[0].Line, `xid="2"`) require.Contains(t, entries[0].Line, `queryid="777"`) - require.Contains(t, entries[0].Line, `cpu_time="10s"`) - // Second emitted: xid=2 - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[1].Labels) - require.Contains(t, entries[1].Line, `xid="2"`) - require.Contains(t, entries[1].Line, `queryid="777"`) }, 5*time.Second, 50*time.Millisecond) sampleCollector.Stop() @@ -755,4 +750,234 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { time.Sleep(100 * time.Millisecond) require.NoError(t, mock.ExpectationsWereMet()) }) + + // Migration: start with xid=0 (coalesced via xact_start), then XID appears; expect single emission with final XID + t.Run("xid migration preserves cpu and wait events", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + logBuffer := syncbuffer.Buffer{} + lokiClient := loki_fake.NewClient(func() {}) + + sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ + DB: db, + CollectInterval: 10 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + }) + require.NoError(t, err) + + // Scrape 1: xid=0, active CPU snapshot (10s), xact_start present + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( + now, "testdb", 600, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, + now.Add(-30*time.Second), "active", now.Add(-10*time.Second), sql.NullString{}, + sql.NullString{}, nil, now.Add(-30*time.Second), sql.NullInt64{Int64: 4321, Valid: true}, + "SELECT * FROM t", + )) + // Scrape 2: xid=42 appears, waiting with wait_event; migration should occur + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( + now, "testdb", 600, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", backendStartTime, sql.NullInt32{Int32: 42, Valid: true}, sql.NullInt32{}, + now.Add(-30*time.Second), "waiting", now.Add(-7*time.Second), sql.NullString{String: "Lock", Valid: true}, + sql.NullString{String: "relation", Valid: true}, pq.Int64Array{601}, now.Add(-30*time.Second), sql.NullInt64{Int64: 4321, Valid: true}, + "SELECT * FROM t", + )) + // Scrape 3: disappear -> finalize + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) + + require.NoError(t, sampleCollector.Start(t.Context())) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + entries := lokiClient.Received() + require.Len(t, entries, 2) + // First emitted: query sample with final XID and preserved CPU time + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Contains(t, entries[0].Line, `xid="42"`) + require.Contains(t, entries[0].Line, `queryid="4321"`) + require.Contains(t, entries[0].Line, `cpu_time="10s"`) + // Second emitted: wait event with xid=42 and wait_time=7s + require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) + require.Contains(t, entries[1].Line, `xid="42"`) + require.Contains(t, entries[1].Line, `wait_time="7s"`) + }, 5*time.Second, 50*time.Millisecond) + + sampleCollector.Stop() + require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) + lokiClient.Stop() + time.Sleep(100 * time.Millisecond) + require.NoError(t, mock.ExpectationsWereMet()) + }) + + // If xact_start changes between scrapes, do not migrate; emit two samples (one for each execution) + t.Run("xid change with different xact_start does not migrate", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + logBuffer := syncbuffer.Buffer{} + lokiClient := loki_fake.NewClient(func() {}) + + sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ + DB: db, + CollectInterval: 10 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + }) + require.NoError(t, err) + + // Scrape 1: xid=0, xact_start A + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( + now, "testdb", 610, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, + now.Add(-40*time.Second), "active", now.Add(-10*time.Second), sql.NullString{}, + sql.NullString{}, nil, now.Add(-40*time.Second), sql.NullInt64{Int64: 5001, Valid: true}, + "SELECT * FROM t", + )) + // Scrape 2: xid=77, xact_start B (different); no migration should happen; zero-XID sample will finalize now + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( + now, "testdb", 610, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", backendStartTime, sql.NullInt32{Int32: 77, Valid: true}, sql.NullInt32{}, + now.Add(-35*time.Second), "active", now, sql.NullString{}, + sql.NullString{}, nil, now.Add(-35*time.Second), sql.NullInt64{Int64: 5001, Valid: true}, + "SELECT * FROM t", + )) + // Scrape 3: disappear -> finalize XID=77 sample + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) + + require.NoError(t, sampleCollector.Start(t.Context())) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + entries := lokiClient.Received() + // First emission (after scrape 2 finalization) is xid=0 sample; second emission (after disappear) is xid=77 + require.Len(t, entries, 2) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Contains(t, entries[0].Line, `xid="0"`) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[1].Labels) + require.Contains(t, entries[1].Line, `xid="77"`) + }, 5*time.Second, 50*time.Millisecond) + + sampleCollector.Stop() + require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) + lokiClient.Stop() + time.Sleep(100 * time.Millisecond) + require.NoError(t, mock.ExpectationsWereMet()) + }) + + // If XID is never assigned (read-only), finalize single sample with xid=0 + t.Run("finalize with xid zero when never assigned", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + logBuffer := syncbuffer.Buffer{} + lokiClient := loki_fake.NewClient(func() {}) + + sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ + DB: db, + CollectInterval: 10 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + }) + require.NoError(t, err) + + // Scrape 1: xid=0 active + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( + now, "testdb", 700, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, + now.Add(-20*time.Second), "active", now.Add(-5*time.Second), sql.NullString{}, + sql.NullString{}, nil, now.Add(-20*time.Second), sql.NullInt64{Int64: 7001, Valid: true}, + "SELECT * FROM t", + )) + // Scrape 2: disappear -> finalize + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) + + require.NoError(t, sampleCollector.Start(t.Context())) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + entries := lokiClient.Received() + require.Len(t, entries, 1) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Contains(t, entries[0].Line, `xid="0"`) + require.Contains(t, entries[0].Line, `queryid="7001"`) + }, 5*time.Second, 50*time.Millisecond) + + sampleCollector.Stop() + require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) + lokiClient.Stop() + time.Sleep(100 * time.Millisecond) + require.NoError(t, mock.ExpectationsWereMet()) + }) + + // Migration when xact_start is NULL: we still coalesce using zero xact_start key + t.Run("xid migration with null xact_start", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + logBuffer := syncbuffer.Buffer{} + lokiClient := loki_fake.NewClient(func() {}) + + sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ + DB: db, + CollectInterval: 10 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + }) + require.NoError(t, err) + + // Scrape 1: xid=0, xact_start NULL + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( + now, "testdb", 800, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, + sql.NullTime{Valid: false}, "active", now.Add(-3*time.Second), sql.NullString{}, + sql.NullString{}, nil, now.Add(-3*time.Second), sql.NullInt64{Int64: 8001, Valid: true}, + "SELECT 1", + )) + // Scrape 2: xid appears -> migrate + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( + now, "testdb", 800, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", backendStartTime, sql.NullInt32{Int32: 99, Valid: true}, sql.NullInt32{}, + sql.NullTime{Valid: false}, "active", now.Add(-2*time.Second), sql.NullString{}, + sql.NullString{}, nil, now.Add(-3*time.Second), sql.NullInt64{Int64: 8001, Valid: true}, + "SELECT 1", + )) + // Scrape 3: disappear -> finalize single sample with final XID + mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) + + require.NoError(t, sampleCollector.Start(t.Context())) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + entries := lokiClient.Received() + require.Len(t, entries, 1) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Contains(t, entries[0].Line, `xid="99"`) + require.Contains(t, entries[0].Line, `queryid="8001"`) + }, 5*time.Second, 50*time.Millisecond) + + sampleCollector.Stop() + require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) + lokiClient.Stop() + time.Sleep(100 * time.Millisecond) + require.NoError(t, mock.ExpectationsWereMet()) + }) } From f8fea52a9de9dd0ab28f871f9233e2c91b911dcd Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 10 Oct 2025 17:59:48 -0300 Subject: [PATCH 2/9] remove uneeded func --- .../postgres/collector/query_samples.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index 5d5ded9860..e22e8a3e25 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -304,7 +304,10 @@ func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, bool, err if err := c.validateQuerySample(sample); err != nil { return SampleKey{}, false, err } - key := newSampleKey(sample) + key := SampleKey{PID: sample.PID, QueryID: sample.QueryID.Int64, XID: sample.BackendXID.Int32} + if sample.XactStart.Valid { + key.XactStartNs = sample.XactStart.Time.UnixNano() + } if sample.State.Valid && sample.State.String == stateIdle { return key, true, nil } @@ -326,15 +329,6 @@ func (c *QuerySamples) tryMigrateKey(key SampleKey) SampleKey { return key } -// newSampleKey constructs a stable 4-tuple key for an execution instance. -func newSampleKey(sample QuerySamplesInfo) SampleKey { - key := SampleKey{PID: sample.PID, QueryID: sample.QueryID.Int64, XID: sample.BackendXID.Int32} - if sample.XactStart.Valid { - key.XactStartNs = sample.XactStart.Time.UnixNano() - } - return key -} - func (c *QuerySamples) finalizeSamples(activeKeys, idleKeys map[SampleKey]struct{}) { for key := range idleKeys { if _, ok := c.samples[key]; ok { From 6f483cb20afb52da17c181d8e2d9337a800c35fc Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 10 Oct 2025 18:52:20 -0300 Subject: [PATCH 3/9] some other fixes --- .../postgres/collector/query_samples.go | 48 ++- .../postgres/collector/query_samples_test.go | 289 +----------------- 2 files changed, 20 insertions(+), 317 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index e22e8a3e25..59890480ee 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -54,12 +54,18 @@ const selectPgStatActivity = ` FROM pg_stat_activity s JOIN pg_database d ON s.datid = d.oid AND NOT d.datistemplate AND d.datallowconn WHERE - s.backend_type != 'client backend' OR + s.pid != pg_backend_pid() AND ( - s.pid != pg_backend_pid() AND - coalesce(TRIM(s.query), '') != '' AND - s.query_id != 0 AND - s.state != 'idle' + ( + s.backend_type != 'client backend' AND + s.state != 'idle' + ) OR + ( + s.pid != pg_backend_pid() AND + coalesce(TRIM(s.query), '') != '' AND + s.query_id != 0 AND + s.state != 'idle' + ) ) ` @@ -112,13 +118,12 @@ type QuerySamples struct { samples map[SampleKey]*SampleState } -// SampleKey uses (PID, QueryID, XID, XactStartNs) so concurrent executions of the same +// SampleKey uses (PID, QueryID, QueryStartNs) so concurrent executions of the same // query across backends/transactions are uniquely tracked between scrapes. type SampleKey struct { - PID int - QueryID int64 - XID int32 - XactStartNs int64 + PID int + QueryID int64 + QueryStartNs int64 } // SampleState buffers state across scrapes and is emitted once the query @@ -251,8 +256,6 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { continue } - key = c.tryMigrateKey(key) - if isIdle { c.upsertIdleSample(key, sample) idleKeys[key] = struct{}{} @@ -304,9 +307,9 @@ func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, bool, err if err := c.validateQuerySample(sample); err != nil { return SampleKey{}, false, err } - key := SampleKey{PID: sample.PID, QueryID: sample.QueryID.Int64, XID: sample.BackendXID.Int32} - if sample.XactStart.Valid { - key.XactStartNs = sample.XactStart.Time.UnixNano() + key := SampleKey{PID: sample.PID, QueryID: sample.QueryID.Int64, QueryStartNs: 0} + if sample.QueryStart.Valid { + key.QueryStartNs = sample.QueryStart.Time.UnixNano() } if sample.State.Valid && sample.State.String == stateIdle { return key, true, nil @@ -314,21 +317,6 @@ func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, bool, err return key, false, nil } -// tryMigrateKey migrates an existing sample keyed by (PID,QueryID,XID=0,XactStart) -// to the new key (PID,QueryID,XID>0) once a real XID becomes available. This avoids -// emitting a partial sample for the pre-XID phase of the same execution. -func (c *QuerySamples) tryMigrateKey(key SampleKey) SampleKey { - if key.XID == 0 { - return key - } - zeroKey := SampleKey{PID: key.PID, QueryID: key.QueryID, XID: 0, XactStartNs: key.XactStartNs} - if state, ok := c.samples[zeroKey]; ok { - c.samples[key] = state - delete(c.samples, zeroKey) - } - return key -} - func (c *QuerySamples) finalizeSamples(activeKeys, idleKeys map[SampleKey]struct{}) { for key := range idleKeys { if _, ok := c.samples[key]; ok { diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index 6b2089d235..394e132a47 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -401,63 +401,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.NoError(t, mock.ExpectationsWereMet()) }) - t.Run("xid change migrates state and emits single sample", func(t *testing.T) { - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.NoError(t, err) - defer db.Close() - - logBuffer := syncbuffer.Buffer{} - lokiClient := loki_fake.NewClient(func() {}) - - sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - CollectInterval: 10 * time.Millisecond, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), - }) - require.NoError(t, err) - - // Scrape 1: xid=0, with xact_start present (coalesced key) - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, "testdb", 200, sql.NullInt64{}, - "testuser", "testapp", "127.0.0.1", 5432, - "client backend", now.Add(-1*time.Minute), sql.NullInt32{}, sql.NullInt32{}, - now.Add(-30*time.Second), "active", now.Add(-10*time.Second), sql.NullString{}, - sql.NullString{}, nil, now.Add(-10*time.Second), sql.NullInt64{Int64: 777, Valid: true}, - "SELECT 1", - )) - // Scrape 2: xid=2 appears (same pid/queryid, same xact_start) -> migrate - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, "testdb", 200, sql.NullInt64{}, - "testuser", "testapp", "127.0.0.1", 5432, - "client backend", now, sql.NullInt32{Int32: 2, Valid: true}, sql.NullInt32{}, - now.Add(-30*time.Second), "active", now, sql.NullString{}, - sql.NullString{}, nil, now.Add(-10*time.Second), sql.NullInt64{Int64: 777, Valid: true}, - "SELECT 1", - )) - // Scrape 3: disappear -> finalize single sample - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns)) - - require.NoError(t, sampleCollector.Start(t.Context())) - - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 1) - // Single emission after disappearance, with final XID=2 - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Contains(t, entries[0].Line, `xid="2"`) - require.Contains(t, entries[0].Line, `queryid="777"`) - }, 5*time.Second, 50*time.Millisecond) - - sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) - }) + // Removed: identity no longer depends on XID t.Run("wait-event merges across scrapes with normalized PID set", func(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) @@ -473,6 +417,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), }) + require.NoError(t, err) // Scrape 1: wait event with unordered/dup PIDs @@ -750,234 +695,4 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { time.Sleep(100 * time.Millisecond) require.NoError(t, mock.ExpectationsWereMet()) }) - - // Migration: start with xid=0 (coalesced via xact_start), then XID appears; expect single emission with final XID - t.Run("xid migration preserves cpu and wait events", func(t *testing.T) { - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.NoError(t, err) - defer db.Close() - - logBuffer := syncbuffer.Buffer{} - lokiClient := loki_fake.NewClient(func() {}) - - sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - CollectInterval: 10 * time.Millisecond, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), - }) - require.NoError(t, err) - - // Scrape 1: xid=0, active CPU snapshot (10s), xact_start present - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, "testdb", 600, sql.NullInt64{}, - "testuser", "testapp", "127.0.0.1", 5432, - "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, - now.Add(-30*time.Second), "active", now.Add(-10*time.Second), sql.NullString{}, - sql.NullString{}, nil, now.Add(-30*time.Second), sql.NullInt64{Int64: 4321, Valid: true}, - "SELECT * FROM t", - )) - // Scrape 2: xid=42 appears, waiting with wait_event; migration should occur - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, "testdb", 600, sql.NullInt64{}, - "testuser", "testapp", "127.0.0.1", 5432, - "client backend", backendStartTime, sql.NullInt32{Int32: 42, Valid: true}, sql.NullInt32{}, - now.Add(-30*time.Second), "waiting", now.Add(-7*time.Second), sql.NullString{String: "Lock", Valid: true}, - sql.NullString{String: "relation", Valid: true}, pq.Int64Array{601}, now.Add(-30*time.Second), sql.NullInt64{Int64: 4321, Valid: true}, - "SELECT * FROM t", - )) - // Scrape 3: disappear -> finalize - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns)) - - require.NoError(t, sampleCollector.Start(t.Context())) - - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 2) - // First emitted: query sample with final XID and preserved CPU time - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Contains(t, entries[0].Line, `xid="42"`) - require.Contains(t, entries[0].Line, `queryid="4321"`) - require.Contains(t, entries[0].Line, `cpu_time="10s"`) - // Second emitted: wait event with xid=42 and wait_time=7s - require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) - require.Contains(t, entries[1].Line, `xid="42"`) - require.Contains(t, entries[1].Line, `wait_time="7s"`) - }, 5*time.Second, 50*time.Millisecond) - - sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) - }) - - // If xact_start changes between scrapes, do not migrate; emit two samples (one for each execution) - t.Run("xid change with different xact_start does not migrate", func(t *testing.T) { - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.NoError(t, err) - defer db.Close() - - logBuffer := syncbuffer.Buffer{} - lokiClient := loki_fake.NewClient(func() {}) - - sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - CollectInterval: 10 * time.Millisecond, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), - }) - require.NoError(t, err) - - // Scrape 1: xid=0, xact_start A - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, "testdb", 610, sql.NullInt64{}, - "testuser", "testapp", "127.0.0.1", 5432, - "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, - now.Add(-40*time.Second), "active", now.Add(-10*time.Second), sql.NullString{}, - sql.NullString{}, nil, now.Add(-40*time.Second), sql.NullInt64{Int64: 5001, Valid: true}, - "SELECT * FROM t", - )) - // Scrape 2: xid=77, xact_start B (different); no migration should happen; zero-XID sample will finalize now - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, "testdb", 610, sql.NullInt64{}, - "testuser", "testapp", "127.0.0.1", 5432, - "client backend", backendStartTime, sql.NullInt32{Int32: 77, Valid: true}, sql.NullInt32{}, - now.Add(-35*time.Second), "active", now, sql.NullString{}, - sql.NullString{}, nil, now.Add(-35*time.Second), sql.NullInt64{Int64: 5001, Valid: true}, - "SELECT * FROM t", - )) - // Scrape 3: disappear -> finalize XID=77 sample - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns)) - - require.NoError(t, sampleCollector.Start(t.Context())) - - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - // First emission (after scrape 2 finalization) is xid=0 sample; second emission (after disappear) is xid=77 - require.Len(t, entries, 2) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Contains(t, entries[0].Line, `xid="0"`) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[1].Labels) - require.Contains(t, entries[1].Line, `xid="77"`) - }, 5*time.Second, 50*time.Millisecond) - - sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) - }) - - // If XID is never assigned (read-only), finalize single sample with xid=0 - t.Run("finalize with xid zero when never assigned", func(t *testing.T) { - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.NoError(t, err) - defer db.Close() - - logBuffer := syncbuffer.Buffer{} - lokiClient := loki_fake.NewClient(func() {}) - - sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - CollectInterval: 10 * time.Millisecond, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), - }) - require.NoError(t, err) - - // Scrape 1: xid=0 active - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, "testdb", 700, sql.NullInt64{}, - "testuser", "testapp", "127.0.0.1", 5432, - "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, - now.Add(-20*time.Second), "active", now.Add(-5*time.Second), sql.NullString{}, - sql.NullString{}, nil, now.Add(-20*time.Second), sql.NullInt64{Int64: 7001, Valid: true}, - "SELECT * FROM t", - )) - // Scrape 2: disappear -> finalize - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns)) - - require.NoError(t, sampleCollector.Start(t.Context())) - - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 1) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Contains(t, entries[0].Line, `xid="0"`) - require.Contains(t, entries[0].Line, `queryid="7001"`) - }, 5*time.Second, 50*time.Millisecond) - - sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) - }) - - // Migration when xact_start is NULL: we still coalesce using zero xact_start key - t.Run("xid migration with null xact_start", func(t *testing.T) { - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.NoError(t, err) - defer db.Close() - - logBuffer := syncbuffer.Buffer{} - lokiClient := loki_fake.NewClient(func() {}) - - sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - CollectInterval: 10 * time.Millisecond, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), - }) - require.NoError(t, err) - - // Scrape 1: xid=0, xact_start NULL - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, "testdb", 800, sql.NullInt64{}, - "testuser", "testapp", "127.0.0.1", 5432, - "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, - sql.NullTime{Valid: false}, "active", now.Add(-3*time.Second), sql.NullString{}, - sql.NullString{}, nil, now.Add(-3*time.Second), sql.NullInt64{Int64: 8001, Valid: true}, - "SELECT 1", - )) - // Scrape 2: xid appears -> migrate - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, "testdb", 800, sql.NullInt64{}, - "testuser", "testapp", "127.0.0.1", 5432, - "client backend", backendStartTime, sql.NullInt32{Int32: 99, Valid: true}, sql.NullInt32{}, - sql.NullTime{Valid: false}, "active", now.Add(-2*time.Second), sql.NullString{}, - sql.NullString{}, nil, now.Add(-3*time.Second), sql.NullInt64{Int64: 8001, Valid: true}, - "SELECT 1", - )) - // Scrape 3: disappear -> finalize single sample with final XID - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns)) - - require.NoError(t, sampleCollector.Start(t.Context())) - - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 1) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Contains(t, entries[0].Line, `xid="99"`) - require.Contains(t, entries[0].Line, `queryid="8001"`) - }, 5*time.Second, 50*time.Millisecond) - - sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) - }) } From ccd14782e9c223a6f08d1ece6edb65578b0d77c8 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 10 Oct 2025 18:54:59 -0300 Subject: [PATCH 4/9] nit --- .../postgres/collector/query_samples_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index 394e132a47..93c0a9a128 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -401,8 +401,6 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.NoError(t, mock.ExpectationsWereMet()) }) - // Removed: identity no longer depends on XID - t.Run("wait-event merges across scrapes with normalized PID set", func(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) @@ -419,7 +417,6 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { }) require.NoError(t, err) - // Scrape 1: wait event with unordered/dup PIDs mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( From 0f1a238e8e8598fe5888367e4a1d5c74272fba17 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 10 Oct 2025 18:56:16 -0300 Subject: [PATCH 5/9] nit --- .../postgres/collector/query_samples.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index 59890480ee..45835c3011 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -55,16 +55,15 @@ const selectPgStatActivity = ` JOIN pg_database d ON s.datid = d.oid AND NOT d.datistemplate AND d.datallowconn WHERE s.pid != pg_backend_pid() AND + s.state != 'idle' AND ( ( - s.backend_type != 'client backend' AND - s.state != 'idle' + s.backend_type != 'client backend' AND ) OR ( s.pid != pg_backend_pid() AND coalesce(TRIM(s.query), '') != '' AND - s.query_id != 0 AND - s.state != 'idle' + s.query_id != 0 ) ) ` From 9d6474a9842e53dc437e8e09b0dec374ca282ddf Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 10 Oct 2025 18:58:54 -0300 Subject: [PATCH 6/9] removing another unneeded test --- .../postgres/collector/query_samples_test.go | 55 ------------------- 1 file changed, 55 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index 93c0a9a128..bea9074b08 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -516,61 +516,6 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.NoError(t, mock.ExpectationsWereMet()) }) - // Finalize when row turns idle (non-client backend) - t.Run("finalize when row turns idle", func(t *testing.T) { - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.NoError(t, err) - defer db.Close() - - logBuffer := syncbuffer.Buffer{} - lokiClient := loki_fake.NewClient(func() {}) - - sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - CollectInterval: 10 * time.Millisecond, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), - }) - require.NoError(t, err) - - // Scrape 1: parallel worker active - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, "testdb", 401, sql.NullInt64{Int64: 400, Valid: true}, - "testuser", "testapp", "127.0.0.1", 5432, - "parallel worker", backendStartTime, sql.NullInt32{Int32: 42, Valid: true}, sql.NullInt32{}, - xactStartTime, "active", stateChangeTime, sql.NullString{}, - sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 9001, Valid: true}, - "SELECT * FROM t", - )) - // Scrape 2: same row turns idle (allowed for non-client backend) - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, "testdb", 401, sql.NullInt64{Int64: 400, Valid: true}, - "testuser", "testapp", "127.0.0.1", 5432, - "parallel worker", backendStartTime, sql.NullInt32{Int32: 42, Valid: true}, sql.NullInt32{}, - xactStartTime, "idle", stateChangeTime, sql.NullString{}, - sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 9001, Valid: true}, - "SELECT * FROM t", - )) - - require.NoError(t, sampleCollector.Start(t.Context())) - - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 1) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Contains(t, entries[0].Line, `leader_pid="400"`) - require.Contains(t, entries[0].Line, `backend_type="parallel worker"`) - }, 5*time.Second, 50*time.Millisecond) - - sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) - }) - // CPU persists across later waits t.Run("cpu persists across waits", func(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) From b267eced105886afb88ca1bd7888fefd9a8aaa32 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 10 Oct 2025 19:00:49 -0300 Subject: [PATCH 7/9] fix query --- .../postgres/collector/query_samples.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index 45835c3011..005702229a 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -57,9 +57,7 @@ const selectPgStatActivity = ` s.pid != pg_backend_pid() AND s.state != 'idle' AND ( - ( - s.backend_type != 'client backend' AND - ) OR + s.backend_type != 'client backend' OR ( s.pid != pg_backend_pid() AND coalesce(TRIM(s.query), '') != '' AND From 9f8422366bfe6fefd296df8c3e9213a1bd07e749 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 10 Oct 2025 19:21:17 -0300 Subject: [PATCH 8/9] removing idle transaction handling --- .../postgres/collector/query_samples.go | 56 ++++--------------- 1 file changed, 11 insertions(+), 45 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index 005702229a..cbeb606ef2 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -25,7 +25,6 @@ const ( const ( stateActive = "active" - stateIdle = "idle" ) const selectPgStatActivity = ` @@ -238,7 +237,6 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { defer rows.Close() activeKeys := map[SampleKey]struct{}{} - idleKeys := map[SampleKey]struct{}{} for rows.Next() { sample, scanErr := c.scanRow(rows) @@ -247,18 +245,12 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { continue } - key, isIdle, procErr := c.processRow(sample) + key, procErr := c.processRow(sample) if procErr != nil { level.Debug(c.logger).Log("msg", "invalid pg_stat_activity set", "queryid", sample.QueryID.Int64, "err", procErr) continue } - if isIdle { - c.upsertIdleSample(key, sample) - idleKeys[key] = struct{}{} - continue - } - c.upsertActiveSample(key, sample) activeKeys[key] = struct{}{} } @@ -268,7 +260,13 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { return err } - c.finalizeSamples(activeKeys, idleKeys) + // finalize samples that are no longer active + for key := range c.samples { + if _, stillActive := activeKeys[key]; stillActive { + continue + } + c.emitAndDeleteSample(key) + } return nil } @@ -300,36 +298,15 @@ func (c *QuerySamples) scanRow(rows *sql.Rows) (QuerySamplesInfo, error) { return sample, err } -func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, bool, error) { +func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, error) { if err := c.validateQuerySample(sample); err != nil { - return SampleKey{}, false, err + return SampleKey{}, err } key := SampleKey{PID: sample.PID, QueryID: sample.QueryID.Int64, QueryStartNs: 0} if sample.QueryStart.Valid { key.QueryStartNs = sample.QueryStart.Time.UnixNano() } - if sample.State.Valid && sample.State.String == stateIdle { - return key, true, nil - } - return key, false, nil -} - -func (c *QuerySamples) finalizeSamples(activeKeys, idleKeys map[SampleKey]struct{}) { - for key := range idleKeys { - if _, ok := c.samples[key]; ok { - c.emitAndDeleteSample(key) - } - } - - for key := range c.samples { - if _, stillActive := activeKeys[key]; stillActive { - continue - } - if _, wasIdle := idleKeys[key]; wasIdle { - continue - } - c.emitAndDeleteSample(key) - } + return key, nil } func (c QuerySamples) validateQuerySample(sample QuerySamplesInfo) error { @@ -394,17 +371,6 @@ func (t *WaitEventTracker) upsertWaitEvent(sample QuerySamplesInfo, now time.Tim } } -func (c *QuerySamples) upsertIdleSample(key SampleKey, sample QuerySamplesInfo) { - state, ok := c.samples[key] - if !ok { - state = &SampleState{tracker: newWaitEventTracker()} - c.samples[key] = state - } - state.LastRow = sample - state.LastSeenAt = sample.Now - state.tracker.CloseOpen() -} - func (c *QuerySamples) emitAndDeleteSample(key SampleKey) { state, ok := c.samples[key] if !ok { From 9d9dd1175bbd17daa73af7643bddc529991a43be Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 21 Oct 2025 17:15:09 -0300 Subject: [PATCH 9/9] fix --- .../postgres/collector/query_samples.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index cbeb606ef2..42529dc6dc 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -57,8 +57,7 @@ const selectPgStatActivity = ` s.state != 'idle' AND ( s.backend_type != 'client backend' OR - ( - s.pid != pg_backend_pid() AND + ( coalesce(TRIM(s.query), '') != '' AND s.query_id != 0 ) @@ -122,6 +121,14 @@ type SampleKey struct { QueryStartNs int64 } +func newSampleKey(pid int, queryID int64, queryStart sql.NullTime) SampleKey { + key := SampleKey{PID: pid, QueryID: queryID, QueryStartNs: 0} + if queryStart.Valid { + key.QueryStartNs = queryStart.Time.UnixNano() + } + return key +} + // SampleState buffers state across scrapes and is emitted once the query // turns idle or disappears, avoiding partial/duplicate emissions. type SampleState struct { @@ -302,10 +309,7 @@ func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, error) { if err := c.validateQuerySample(sample); err != nil { return SampleKey{}, err } - key := SampleKey{PID: sample.PID, QueryID: sample.QueryID.Int64, QueryStartNs: 0} - if sample.QueryStart.Valid { - key.QueryStartNs = sample.QueryStart.Time.UnixNano() - } + key := newSampleKey(sample.PID, sample.QueryID.Int64, sample.QueryStart) return key, nil }