diff --git a/docs/sources/reference/components/database_observability/database_observability.postgres.md b/docs/sources/reference/components/database_observability/database_observability.postgres.md index 452676b310..31f031a3d4 100644 --- a/docs/sources/reference/components/database_observability/database_observability.postgres.md +++ b/docs/sources/reference/components/database_observability/database_observability.postgres.md @@ -49,8 +49,8 @@ You can use the following blocks with `database_observability.postgres`: | Block | Description | Required | |------------------------------------|---------------------------------------------------|----------| -| [`cloud_provider`][cloud_provider] | Provide Cloud Provider information. | no | -| `cloud_provider` > [`aws`][aws] | Provide AWS database host information. | no | +| [`cloud_provider`][cloud_provider] | Provide Cloud Provider information. | no | +| `cloud_provider` > [`aws`][aws] | Provide AWS database host information. | no | | [`query_details`][query_details] | Configure the queries collector. | no | | [`query_samples`][query_samples] | Configure the query samples collector. | no | | [`schema_details`][schema_details] | Configure the schema and table details collector. | no | @@ -90,10 +90,10 @@ The `aws` block supplies the [ARN](https://docs.aws.amazon.com/IAM/latest/UserGu ### `query_samples` -| Name | Type | Description | Default | Required | -|---------------------------|------------|---------------------------------------------------------|---------|----------| -| `collect_interval` | `duration` | How frequently to collect information from database. | `"15s"` | no | -| `disable_query_redaction` | `bool` | Collect unredacted SQL query text including parameters. | `false` | no | +| Name | Type | Description | Default | Required | +|---------------------------|------------|---------------------------------------------------------------|---------|----------| +| `collect_interval` | `duration` | How frequently to collect information from database. | `"15s"` | no | +| `disable_query_redaction` | `bool` | Collect unredacted SQL query text (might include parameters). | `false` | no | ### `schema_details` diff --git a/internal/component/database_observability/mysql/collector/query_samples.go b/internal/component/database_observability/mysql/collector/query_samples.go index 3503a0535c..4b6ca5f651 100644 --- a/internal/component/database_observability/mysql/collector/query_samples.go +++ b/internal/component/database_observability/mysql/collector/query_samples.go @@ -127,7 +127,7 @@ func (c *QuerySamples) Name() string { func (c *QuerySamples) Start(ctx context.Context) error { if c.disableQueryRedaction { - level.Warn(c.logger).Log("msg", "collector started with query redaction disabled. Query samples will include complete SQL text including query parameters.") + level.Warn(c.logger).Log("msg", "collector started with query redaction disabled. SQL text in query samples may include query parameters.") } else { level.Debug(c.logger).Log("msg", "collector started") } diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index 42529dc6dc..848982f691 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -24,7 +24,8 @@ const ( ) const ( - stateActive = "active" + queryTextClause = ", s.query" + stateActive = "active" ) const selectPgStatActivity = ` @@ -48,8 +49,8 @@ const selectPgStatActivity = ` s.wait_event, pg_blocking_pids(s.pid) as blocked_by_pids, s.query_start, - s.query_id, - s.query + s.query_id + %s FROM pg_stat_activity s JOIN pg_database d ON s.datid = d.oid AND NOT d.datistemplate AND d.datallowconn WHERE @@ -57,7 +58,7 @@ const selectPgStatActivity = ` s.state != 'idle' AND ( s.backend_type != 'client backend' OR - ( + ( coalesce(TRIM(s.query), '') != '' AND s.query_id != 0 ) @@ -194,7 +195,11 @@ func (c *QuerySamples) Name() string { } func (c *QuerySamples) Start(ctx context.Context) error { - level.Debug(c.logger).Log("msg", "collector started") + if c.disableQueryRedaction { + level.Warn(c.logger).Log("msg", "collector started with query redaction disabled. SQL text in query samples may include query parameters.") + } else { + level.Debug(c.logger).Log("msg", "collector started") + } c.running.Store(true) ctx, cancel := context.WithCancel(ctx) @@ -236,7 +241,13 @@ func (c *QuerySamples) Stop() { } func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { - rows, err := c.dbConnection.QueryContext(ctx, selectPgStatActivity) + queryTextField := "" + if c.disableQueryRedaction { + queryTextField = queryTextClause + } + + query := fmt.Sprintf(selectPgStatActivity, queryTextField) + rows, err := c.dbConnection.QueryContext(ctx, query) if err != nil { return fmt.Errorf("failed to query pg_stat_activity: %w", err) } @@ -279,7 +290,7 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { func (c *QuerySamples) scanRow(rows *sql.Rows) (QuerySamplesInfo, error) { sample := QuerySamplesInfo{} - err := rows.Scan( + scanArgs := []interface{}{ &sample.Now, &sample.DatabaseName, &sample.PID, @@ -300,8 +311,11 @@ func (c *QuerySamples) scanRow(rows *sql.Rows) (QuerySamplesInfo, error) { &sample.BlockedByPIDs, &sample.QueryStart, &sample.QueryID, - &sample.Query, - ) + } + if c.disableQueryRedaction { + scanArgs = append(scanArgs, &sample.Query) + } + err := rows.Scan(scanArgs...) return sample, err } @@ -314,8 +328,10 @@ func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, error) { } func (c QuerySamples) validateQuerySample(sample QuerySamplesInfo) error { - if sample.Query.Valid && sample.Query.String == "" { - return fmt.Errorf("insufficient privilege to access query. sample set: %+v", sample) + if c.disableQueryRedaction { + if sample.Query.Valid && sample.Query.String == "" { + return fmt.Errorf("insufficient privilege to access query sample set: %+v", sample) + } } if !sample.DatabaseName.Valid { @@ -426,13 +442,8 @@ func (c *QuerySamples) buildQuerySampleLabels(state *SampleState) string { } } - queryText := state.LastRow.Query.String - if !c.disableQueryRedaction { - queryText = redact(queryText) - } - labels := fmt.Sprintf( - `datname="%s" pid="%d" leader_pid="%s" user="%s" app="%s" client="%s" backend_type="%s" state="%s" xid="%d" xmin="%d" xact_time="%s" query_time="%s" queryid="%d" query="%s" engine="postgres"`, + `datname="%s" pid="%d" leader_pid="%s" user="%s" app="%s" client="%s" backend_type="%s" state="%s" xid="%d" xmin="%d" xact_time="%s" query_time="%s" queryid="%d"`, state.LastRow.DatabaseName.String, state.LastRow.PID, leaderPID, @@ -446,11 +457,13 @@ func (c *QuerySamples) buildQuerySampleLabels(state *SampleState) string { xactDuration, queryDuration, state.LastRow.QueryID.Int64, - queryText, ) if state.LastCpuTime != "" { labels = fmt.Sprintf(`%s cpu_time="%s"`, labels, state.LastCpuTime) } + if c.disableQueryRedaction && state.LastRow.Query.Valid { + labels = fmt.Sprintf(`%s query="%s"`, labels, state.LastRow.Query.String) + } return labels } @@ -460,12 +473,8 @@ func (c *QuerySamples) buildWaitEventLabels(state *SampleState, we WaitEventOccu if state.LastRow.LeaderPID.Valid { leaderPID = fmt.Sprintf(`%d`, state.LastRow.LeaderPID.Int64) } - queryText := state.LastRow.Query.String - if !c.disableQueryRedaction { - queryText = redact(queryText) - } return fmt.Sprintf( - `datname="%s" pid="%d" leader_pid="%s" user="%s" backend_type="%s" state="%s" xid="%d" xmin="%d" wait_time="%s" wait_event_type="%s" wait_event="%s" wait_event_name="%s" blocked_by_pids="%v" queryid="%d" query="%s" engine="postgres"`, + `datname="%s" pid="%d" leader_pid="%s" user="%s" backend_type="%s" state="%s" xid="%d" xmin="%d" wait_time="%s" wait_event_type="%s" wait_event="%s" wait_event_name="%s" blocked_by_pids="%v" queryid="%d"`, state.LastRow.DatabaseName.String, state.LastRow.PID, leaderPID, @@ -480,7 +489,6 @@ func (c *QuerySamples) buildWaitEventLabels(state *SampleState, we WaitEventOccu waitEventFullName, we.BlockedByPIDs, state.LastRow.QueryID.Int64, - queryText, ) } diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index a1c8e0dc3d..6630b71ddb 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -40,31 +40,28 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "active query without wait event", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", }).AddRow( now, "testdb", 100, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, "client backend", backendStartTime, sql.NullInt32{Int32: 500, Valid: true}, sql.NullInt32{Int32: 400, Valid: true}, xactStartTime, "active", stateChangeTime, sql.NullString{}, sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 123, Valid: true}, - "SELECT * FROM users", )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", })) }, @@ -72,37 +69,34 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, expectedLines: []string{ - `level="info" datname="testdb" pid="100" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="active" xid="500" xmin="400" xact_time="2m0s" query_time="30s" queryid="123" query="SELECT * FROM users" engine="postgres" cpu_time="10s"`, + `level="info" datname="testdb" pid="100" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="active" xid="500" xmin="400" xact_time="2m0s" query_time="30s" queryid="123" cpu_time="10s"`, }, }, { name: "parallel query with leader PID", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", }).AddRow( now, "testdb", 101, sql.NullInt64{Int64: 100, Valid: true}, "testuser", "testapp", "127.0.0.1", 5432, "parallel worker", now, sql.NullInt32{}, sql.NullInt32{}, now, "active", now, sql.NullString{}, sql.NullString{}, nil, now, sql.NullInt64{Int64: 123, Valid: true}, - "SELECT * FROM large_table", )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", })) }, @@ -110,41 +104,34 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, expectedLines: []string{ - fmt.Sprintf(`level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" state="active" xid="0" xmin="0" xact_time="%s" query_time="%s" queryid="123" query="SELECT * FROM large_table" engine="postgres" cpu_time="%s"`, - time.Duration(0).String(), - time.Duration(0).String(), - time.Duration(0).String(), - ), + `level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" state="active" xid="0" xmin="0" xact_time="0s" query_time="0s" queryid="123" cpu_time="0s"`, // time.Duration(0).String(), }, }, { name: "query with wait event", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", }).AddRow( now, "testdb", 102, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, xactStartTime, "waiting", stateChangeTime, sql.NullString{String: "Lock", Valid: true}, sql.NullString{String: "relation", Valid: true}, pq.Int64Array{103, 104}, now, sql.NullInt64{Int64: 124, Valid: true}, - "UPDATE users SET status = 'active'", )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", })) }, @@ -153,14 +140,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_WAIT_EVENT}, }, expectedLines: []string{ - `level="info" datname="testdb" pid="102" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="waiting" xid="0" xmin="0" xact_time="2m0s" query_time="0s" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`, - `level="info" datname="testdb" pid="102" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124" query="UPDATE users SET status = ?" engine="postgres"`, + `level="info" datname="testdb" pid="102" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="waiting" xid="0" xmin="0" xact_time="2m0s" query_time="0s" queryid="124"`, + `level="info" datname="testdb" pid="102" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124"`, }, }, { name: "insufficient privilege query - no loki entries expected", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -177,7 +164,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { "", )) // Second scrape: empty to complete cycle - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -187,38 +174,36 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { "query", })) }, - expectedErrorLine: `err="insufficient privilege to access query`, - expectedLabels: []model.LabelSet{}, // No Loki entries expected - expectedLines: []string{}, // No Loki entries expected + disableQueryRedaction: true, + expectedErrorLine: `err="insufficient privilege to access query`, + expectedLabels: []model.LabelSet{}, // No Loki entries expected + expectedLines: []string{}, // No Loki entries expected }, { name: "null database name - no loki entries expected", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", }).AddRow( now, sql.NullString{Valid: false}, 104, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, "client backend", now, sql.NullInt32{}, sql.NullInt32{}, now, "active", now, sql.NullString{}, sql.NullString{}, nil, now, sql.NullInt64{Int64: 126, Valid: true}, - "SELECT * FROM users", )) // Second scrape: empty to complete cycle - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", "backend_type", "backend_start", "backend_xid", "backend_xmin", "xact_start", "state", "state_change", "wait_event_type", "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", })) }, expectedErrorLine: `err="database name is not valid`, @@ -228,7 +213,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "query with redaction disabled", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -245,7 +230,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { "SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'", )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows([]string{ "now", "datname", "pid", "leader_pid", "usename", "application_name", "client_addr", "client_port", @@ -260,7 +245,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { {"op": OP_QUERY_SAMPLE}, }, expectedLines: []string{ - `level="info" datname="testdb" pid="106" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="active" xid="0" xmin="0" xact_time="2m0s" query_time="30s" queryid="128" query="SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'" engine="postgres" cpu_time="10s"`, + `level="info" datname="testdb" pid="106" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="active" xid="0" xmin="0" xact_time="2m0s" query_time="30s" queryid="128" cpu_time="10s" query="SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'"`, }, }, } @@ -330,7 +315,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { require.Equal(t, len(tc.expectedLines), len(lokiEntries)) for i, entry := range lokiEntries { require.Equal(t, tc.expectedLabels[i], entry.Labels) - require.Contains(t, entry.Line, tc.expectedLines[i]) + require.Equal(t, tc.expectedLines[i], entry.Line) } }) } @@ -363,15 +348,16 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { lokiClient := loki_fake.NewClient(func() {}) sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - CollectInterval: 10 * time.Millisecond, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + DB: db, + CollectInterval: 10 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + DisableQueryRedaction: true, }) require.NoError(t, err) // First scrape: active row - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 1000, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -381,7 +367,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Second scrape: no rows -> finalize - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -390,7 +376,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { entries := lokiClient.Received() require.Len(t, entries, 1) require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Contains(t, entries[0].Line, `datname="testdb" pid="1000" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="active" xid="10" xmin="20" xact_time="2m0s" query_time="30s" queryid="999" query="SELECT * FROM t" engine="postgres" cpu_time="10s"`) + require.Equal(t, `level="info" datname="testdb" pid="1000" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="active" xid="10" xmin="20" xact_time="2m0s" query_time="30s" queryid="999" cpu_time="10s" query="SELECT * FROM t"`, entries[0].Line) expectedTimestamp := time.Unix(0, now.UnixNano()) require.True(t, entries[0].Timestamp.Equal(expectedTimestamp)) }, 5*time.Second, 50*time.Millisecond) @@ -411,15 +397,16 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { lokiClient := loki_fake.NewClient(func() {}) sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - CollectInterval: 10 * time.Millisecond, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + DB: db, + CollectInterval: 10 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + DisableQueryRedaction: true, }) require.NoError(t, err) // Scrape 1: wait event with unordered/dup PIDs - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 300, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -429,7 +416,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 2: same wait event with normalized PIDs - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 300, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -439,7 +426,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 3: disappear - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -449,8 +436,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.Len(t, entries, 2) require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) - require.Contains(t, entries[1].Line, `wait_time="12s"`) - require.Contains(t, entries[1].Line, `blocked_by_pids="[103 104]"`) + require.Equal(t, `level="info" datname="testdb" pid="300" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="12s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124"`, entries[1].Line) }, 5*time.Second, 50*time.Millisecond) sampleCollector.Stop() @@ -469,15 +455,16 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { lokiClient := loki_fake.NewClient(func() {}) sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - CollectInterval: 10 * time.Millisecond, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + DB: db, + CollectInterval: 10 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + DisableQueryRedaction: true, }) require.NoError(t, err) // Scrape 1: wait event - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 301, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -487,7 +474,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 2: active with no wait -> close occurrence - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 301, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -497,7 +484,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 3: disappear - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -506,8 +493,9 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { entries := lokiClient.Received() require.Len(t, entries, 2) require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Equal(t, `level="info" datname="testdb" pid="301" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="active" xid="0" xmin="0" xact_time="2m0s" query_time="0s" queryid="555" cpu_time="0s" query="UPDATE users SET status = 'active'"`, entries[0].Line) require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) - require.Contains(t, entries[1].Line, `wait_time="10s"`) + require.Equal(t, `level="info" datname="testdb" pid="301" leader_pid="" user="testuser" backend_type="client backend" state="active" xid="0" xmin="0" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="555"`, entries[1].Line) }, 5*time.Second, 50*time.Millisecond) sampleCollector.Stop() @@ -517,7 +505,6 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.NoError(t, mock.ExpectationsWereMet()) }) - // CPU persists across later waits t.Run("cpu persists across waits", func(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) @@ -527,15 +514,16 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { lokiClient := loki_fake.NewClient(func() {}) sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - CollectInterval: 10 * time.Millisecond, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + DB: db, + CollectInterval: 10 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + DisableQueryRedaction: true, }) require.NoError(t, err) // Scrape 1: active CPU snapshot (10s) - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 402, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -545,7 +533,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Scrape 2: waiting with wait_event; state_change 7s ago - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 402, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -555,7 +543,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Scrape 3: disappear -> finalize - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -564,10 +552,9 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { entries := lokiClient.Received() require.Len(t, entries, 2) require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Contains(t, entries[0].Line, `cpu_time="10s"`) + require.Equal(t, `level="info" datname="testdb" pid="402" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="waiting" xid="0" xmin="0" xact_time="2m0s" query_time="30s" queryid="9002" cpu_time="10s" query="SELECT * FROM t"`, entries[0].Line) require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) - require.Contains(t, entries[1].Line, `state="waiting"`) - require.Contains(t, entries[1].Line, `wait_time="7s"`) + require.Equal(t, `level="info" datname="testdb" pid="402" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="7s" wait_event_type="IO" wait_event="DataFileRead" wait_event_name="IO:DataFileRead" blocked_by_pids="[501]" queryid="9002"`, entries[1].Line) }, 5*time.Second, 50*time.Millisecond) sampleCollector.Stop() @@ -577,7 +564,6 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.NoError(t, mock.ExpectationsWereMet()) }) - // New occurrence when blocked_by_pids set changes t.Run("wait-event starts new occurrence on set change", func(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) @@ -587,15 +573,16 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { lokiClient := loki_fake.NewClient(func() {}) sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ - DB: db, - CollectInterval: 10 * time.Millisecond, - EntryHandler: lokiClient, - Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + DB: db, + CollectInterval: 10 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + DisableQueryRedaction: true, }) require.NoError(t, err) // Scrape 1: wait event set A - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 403, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -605,7 +592,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE t SET c=1", )) // Scrape 2: same event, set changes -> new occurrence - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 403, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -615,7 +602,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE t SET c=1", )) // Scrape 3: disappear -> finalize - mock.ExpectQuery(selectPgStatActivity).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -624,12 +611,11 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { entries := lokiClient.Received() require.Len(t, entries, 3) require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Equal(t, `level="info" datname="testdb" pid="403" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="waiting" xid="0" xmin="0" xact_time="2m0s" query_time="30s" queryid="9003" query="UPDATE t SET c=1"`, entries[0].Line) require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) - require.Contains(t, entries[1].Line, `state="waiting"`) - require.Contains(t, entries[1].Line, `blocked_by_pids="[103]"`) + require.Equal(t, `level="info" datname="testdb" pid="403" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="5s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103]" queryid="9003"`, entries[1].Line) require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[2].Labels) - require.Contains(t, entries[2].Line, `blocked_by_pids="[103 104]"`) - require.Contains(t, entries[2].Line, `wait_time="8s"`) + require.Equal(t, `level="info" datname="testdb" pid="403" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="8s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="9003"`, entries[2].Line) }, 5*time.Second, 50*time.Millisecond) sampleCollector.Stop()