diff --git a/executor/cluster_reader.go b/executor/cluster_reader.go index 7b49fa1e8bd1c..c96b4b59caa49 100644 --- a/executor/cluster_reader.go +++ b/executor/cluster_reader.go @@ -47,10 +47,15 @@ import ( "google.golang.org/grpc/credentials" ) -const clusterLogBatchSize = 1024 +const clusterLogBatchSize = 256 + +type dummyCloser struct{} + +func (dummyCloser) close() error { return nil } type clusterRetriever interface { retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) + close() error } // ClusterReaderExec executes cluster information retrieving from the cluster components @@ -80,7 +85,13 @@ func (e *ClusterReaderExec) Next(ctx context.Context, req *chunk.Chunk) error { return nil } +// Close implements the Executor Close interface. +func (e *ClusterReaderExec) Close() error { + return e.retriever.close() +} + type clusterConfigRetriever struct { + dummyCloser retrieved bool extractor *plannercore.ClusterTableExtractor } @@ -207,6 +218,7 @@ func (e *clusterConfigRetriever) retrieve(_ context.Context, sctx sessionctx.Con } type clusterServerInfoRetriever struct { + dummyCloser retrieved bool extractor *plannercore.ClusterTableExtractor serverInfoType diagnosticspb.ServerInfoType @@ -370,6 +382,7 @@ type clusterLogRetriever struct { retrieving bool heap *logResponseHeap extractor *plannercore.ClusterLogTableExtractor + cancel context.CancelFunc } type logStreamResult struct { @@ -480,6 +493,9 @@ func (e *clusterLogRetriever) startRetrieving(ctx context.Context, sctx sessionc Patterns: patterns, } + // The retrieve progress may be abort + ctx, e.cancel = context.WithCancel(ctx) + var results []chan logStreamResult for _, srv := range serversInfo { typ := srv.ServerType @@ -595,3 +611,8 @@ func (e *clusterLogRetriever) retrieve(ctx context.Context, sctx sessionctx.Cont return finalRows, nil } + +func (e *clusterLogRetriever) close() error { + e.cancel() + return nil +} diff --git a/executor/diagnostics.go b/executor/diagnostics.go index da5bbd1529675..7f4b0cff335bd 100644 --- a/executor/diagnostics.go +++ b/executor/diagnostics.go @@ -52,6 +52,7 @@ var inspectionRules = []inspectionRule{ } type inspectionRetriever struct { + dummyCloser retrieved bool extractor *plannercore.InspectionResultTableExtractor } diff --git a/executor/metric_reader.go b/executor/metric_reader.go index 85f1f0412eb58..9766f548a13df 100644 --- a/executor/metric_reader.go +++ b/executor/metric_reader.go @@ -38,6 +38,7 @@ const promReadTimeout = time.Second * 10 // MetricRetriever uses to read metric data. type MetricRetriever struct { + dummyCloser table *model.TableInfo tblDef *metricschema.MetricTableDef extractor *plannercore.MetricTableExtractor