@@ -100,6 +100,10 @@ func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
100
100
101
101
bb := bufs .Get (workerID )
102
102
103
+ // Write the marker of a regular data block.
104
+ bb .B = append (bb .B , 0 )
105
+
106
+ // Marshal the data block.
103
107
bb .B = db .Marshal (bb .B )
104
108
105
109
if len (bb .B ) < 1024 * 1024 {
@@ -117,7 +121,10 @@ func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
117
121
}
118
122
}
119
123
120
- if err := vtstorage .RunQuery (ctx , cp .TenantIDs , cp .Query , writeBlock ); err != nil {
124
+ qctx := cp .NewQueryContext (ctx )
125
+ defer cp .UpdatePerQueryStatsMetrics ()
126
+
127
+ if err := vtstorage .RunQuery (qctx , writeBlock ); err != nil {
121
128
return err
122
129
}
123
130
if errGlobal != nil {
@@ -131,7 +138,13 @@ func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
131
138
}
132
139
}
133
140
134
- return nil
141
+ // Send the query stats block.
142
+ bb := bufs .Get (0 )
143
+ // Write the marker of query stats block.
144
+ bb .B = append (bb .B , 1 )
145
+ // Marshal the block itself
146
+ bb .B = marshalQueryStatsBlock (bb .B , qctx )
147
+ return sendBuf (bb )
135
148
}
136
149
137
150
func processFieldNamesRequest (ctx context.Context , w http.ResponseWriter , r * http.Request ) error {
@@ -140,12 +153,15 @@ func processFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt
140
153
return err
141
154
}
142
155
143
- fieldNames , err := vtstorage .GetFieldNames (ctx , cp .TenantIDs , cp .Query )
156
+ qctx := cp .NewQueryContext (ctx )
157
+ defer cp .UpdatePerQueryStatsMetrics ()
158
+
159
+ fieldNames , err := vtstorage .GetFieldNames (qctx )
144
160
if err != nil {
145
161
return fmt .Errorf ("cannot obtain field names: %w" , err )
146
162
}
147
163
148
- return writeValuesWithHits (w , fieldNames , cp .DisableCompression )
164
+ return writeValuesWithHits (w , qctx , fieldNames , cp .DisableCompression )
149
165
}
150
166
151
167
func processFieldValuesRequest (ctx context.Context , w http.ResponseWriter , r * http.Request ) error {
@@ -161,12 +177,15 @@ func processFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht
161
177
return err
162
178
}
163
179
164
- fieldValues , err := vtstorage .GetFieldValues (ctx , cp .TenantIDs , cp .Query , fieldName , uint64 (limit ))
180
+ qctx := cp .NewQueryContext (ctx )
181
+ defer cp .UpdatePerQueryStatsMetrics ()
182
+
183
+ fieldValues , err := vtstorage .GetFieldValues (qctx , fieldName , uint64 (limit ))
165
184
if err != nil {
166
185
return fmt .Errorf ("cannot obtain field values: %w" , err )
167
186
}
168
187
169
- return writeValuesWithHits (w , fieldValues , cp .DisableCompression )
188
+ return writeValuesWithHits (w , qctx , fieldValues , cp .DisableCompression )
170
189
}
171
190
172
191
func processStreamFieldNamesRequest (ctx context.Context , w http.ResponseWriter , r * http.Request ) error {
@@ -175,12 +194,15 @@ func processStreamFieldNamesRequest(ctx context.Context, w http.ResponseWriter,
175
194
return err
176
195
}
177
196
178
- fieldNames , err := vtstorage .GetStreamFieldNames (ctx , cp .TenantIDs , cp .Query )
197
+ qctx := cp .NewQueryContext (ctx )
198
+ defer cp .UpdatePerQueryStatsMetrics ()
199
+
200
+ fieldNames , err := vtstorage .GetStreamFieldNames (qctx )
179
201
if err != nil {
180
202
return fmt .Errorf ("cannot obtain stream field names: %w" , err )
181
203
}
182
204
183
- return writeValuesWithHits (w , fieldNames , cp .DisableCompression )
205
+ return writeValuesWithHits (w , qctx , fieldNames , cp .DisableCompression )
184
206
}
185
207
186
208
func processStreamFieldValuesRequest (ctx context.Context , w http.ResponseWriter , r * http.Request ) error {
@@ -196,12 +218,15 @@ func processStreamFieldValuesRequest(ctx context.Context, w http.ResponseWriter,
196
218
return err
197
219
}
198
220
199
- fieldValues , err := vtstorage .GetStreamFieldValues (ctx , cp .TenantIDs , cp .Query , fieldName , uint64 (limit ))
221
+ qctx := cp .NewQueryContext (ctx )
222
+ defer cp .UpdatePerQueryStatsMetrics ()
223
+
224
+ fieldValues , err := vtstorage .GetStreamFieldValues (qctx , fieldName , uint64 (limit ))
200
225
if err != nil {
201
226
return fmt .Errorf ("cannot obtain stream field values: %w" , err )
202
227
}
203
228
204
- return writeValuesWithHits (w , fieldValues , cp .DisableCompression )
229
+ return writeValuesWithHits (w , qctx , fieldValues , cp .DisableCompression )
205
230
}
206
231
207
232
func processStreamsRequest (ctx context.Context , w http.ResponseWriter , r * http.Request ) error {
@@ -215,12 +240,15 @@ func processStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R
215
240
return err
216
241
}
217
242
218
- streams , err := vtstorage .GetStreams (ctx , cp .TenantIDs , cp .Query , uint64 (limit ))
243
+ qctx := cp .NewQueryContext (ctx )
244
+ defer cp .UpdatePerQueryStatsMetrics ()
245
+
246
+ streams , err := vtstorage .GetStreams (qctx , uint64 (limit ))
219
247
if err != nil {
220
248
return fmt .Errorf ("cannot obtain streams: %w" , err )
221
249
}
222
250
223
- return writeValuesWithHits (w , streams , cp .DisableCompression )
251
+ return writeValuesWithHits (w , qctx , streams , cp .DisableCompression )
224
252
}
225
253
226
254
func processStreamIDsRequest (ctx context.Context , w http.ResponseWriter , r * http.Request ) error {
@@ -234,19 +262,33 @@ func processStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http
234
262
return err
235
263
}
236
264
237
- streamIDs , err := vtstorage .GetStreamIDs (ctx , cp .TenantIDs , cp .Query , uint64 (limit ))
265
+ qctx := cp .NewQueryContext (ctx )
266
+ defer cp .UpdatePerQueryStatsMetrics ()
267
+
268
+ streamIDs , err := vtstorage .GetStreamIDs (qctx , uint64 (limit ))
238
269
if err != nil {
239
270
return fmt .Errorf ("cannot obtain streams: %w" , err )
240
271
}
241
272
242
- return writeValuesWithHits (w , streamIDs , cp .DisableCompression )
273
+ return writeValuesWithHits (w , qctx , streamIDs , cp .DisableCompression )
243
274
}
244
275
245
276
type commonParams struct {
246
277
TenantIDs []logstorage.TenantID
247
278
Query * logstorage.Query
248
279
249
280
DisableCompression bool
281
+
282
+ // qs contains execution statistics for the Query.
283
+ qs logstorage.QueryStats
284
+ }
285
+
286
+ func (cp * commonParams ) NewQueryContext (ctx context.Context ) * logstorage.QueryContext {
287
+ return logstorage .NewQueryContext (ctx , & cp .qs , cp .TenantIDs , cp .Query )
288
+ }
289
+
290
+ func (cp * commonParams ) UpdatePerQueryStatsMetrics () {
291
+ vtstorage .UpdatePerQueryStatsMetrics (& cp .qs )
250
292
}
251
293
252
294
func getCommonParams (r * http.Request , expectedProtocolVersion string ) (* commonParams , error ) {
@@ -287,12 +329,18 @@ func getCommonParams(r *http.Request, expectedProtocolVersion string) (*commonPa
287
329
return cp , nil
288
330
}
289
331
290
- func writeValuesWithHits (w http.ResponseWriter , vhs []logstorage.ValueWithHits , disableCompression bool ) error {
332
+ func writeValuesWithHits (w http.ResponseWriter , qctx * logstorage. QueryContext , vhs []logstorage.ValueWithHits , disableCompression bool ) error {
291
333
var b []byte
334
+
335
+ // Marshal vhs at first
336
+ b = encoding .MarshalUint64 (b , uint64 (len (vhs )))
292
337
for i := range vhs {
293
338
b = vhs [i ].Marshal (b )
294
339
}
295
340
341
+ // Marshal query stats block after that
342
+ b = marshalQueryStatsBlock (b , qctx )
343
+
296
344
if ! disableCompression {
297
345
b = zstd .CompressLevel (nil , b , 1 )
298
346
}
@@ -306,6 +354,13 @@ func writeValuesWithHits(w http.ResponseWriter, vhs []logstorage.ValueWithHits,
306
354
return nil
307
355
}
308
356
357
+ func marshalQueryStatsBlock (dst []byte , qctx * logstorage.QueryContext ) []byte {
358
+ queryDurationNsecs := qctx .QueryDurationNsecs ()
359
+ db := qctx .QueryStats .CreateDataBlock (queryDurationNsecs )
360
+ dst = db .Marshal (dst )
361
+ return dst
362
+ }
363
+
309
364
func getInt64FromRequest (r * http.Request , argName string ) (int64 , error ) {
310
365
s := r .FormValue (argName )
311
366
n , err := strconv .ParseInt (s , 10 , 64 )
0 commit comments