Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some query metric #14190

Merged
merged 2 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.tsfile.read.reader.IPageReader;
import org.apache.tsfile.read.reader.IPointReader;
import org.apache.tsfile.read.reader.page.AlignedPageReader;
import org.apache.tsfile.read.reader.page.TablePageReader;
import org.apache.tsfile.read.reader.series.PaginationController;
import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.RamUsageEstimator;
Expand Down Expand Up @@ -1228,7 +1229,10 @@ protected static class VersionPageReader {
this.version = new MergeReaderPriority(fileTimestamp, version, offset, isSeq);
this.data = data;
this.isSeq = isSeq;
this.isAligned = data instanceof AlignedPageReader || data instanceof MemAlignedPageReader;
this.isAligned =
data instanceof AlignedPageReader
|| data instanceof MemAlignedPageReader
|| data instanceof TablePageReader;
this.isMem = data instanceof MemPageReader || data instanceof MemAlignedPageReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1107,45 +1107,45 @@ private void unbindConstructChunkReader(AbstractMetricService metricService) {
/////////////////////////////////////////////////////////////////////////////////////////////////
private static final String READ_CHUNK = "read_chunk";
private static final String ALL = "all";
public static final String READ_CHUNK_ALL = READ_CHUNK + "_" + ALL;
public static final String READ_CHUNK_CACHE = READ_CHUNK + "_" + CACHE;
public static final String READ_CHUNK_FILE = READ_CHUNK + "_" + FILE;
private Timer readChunkAllTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer readChunkCacheTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer readChunkFileTimer = DoNothingMetricManager.DO_NOTHING_TIMER;

private void bindReadChunk(AbstractMetricService metricService) {
readChunkAllTimer =
readChunkCacheTimer =
metricService.getOrCreateTimer(
Metric.SERIES_SCAN_COST.toString(),
MetricLevel.IMPORTANT,
Tag.STAGE.toString(),
READ_CHUNK,
READ_CHUNK_CACHE,
Tag.TYPE.toString(),
NULL,
Tag.FROM.toString(),
ALL);
CACHE);
readChunkFileTimer =
metricService.getOrCreateTimer(
Metric.SERIES_SCAN_COST.toString(),
MetricLevel.IMPORTANT,
Tag.STAGE.toString(),
READ_CHUNK,
READ_CHUNK_FILE,
Tag.TYPE.toString(),
NULL,
Tag.FROM.toString(),
FILE);
}

private void unbindReadChunk(AbstractMetricService metricService) {
readChunkAllTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
readChunkCacheTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
readChunkFileTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
Arrays.asList(ALL, FILE)
Arrays.asList(CACHE, FILE)
.forEach(
from ->
metricService.remove(
MetricType.TIMER,
Metric.SERIES_SCAN_COST.toString(),
Tag.STAGE.toString(),
READ_CHUNK,
READ_CHUNK + "_" + from,
Tag.TYPE.toString(),
NULL,
Tag.FROM.toString(),
Expand Down Expand Up @@ -1571,8 +1571,8 @@ public void recordSeriesScanCost(String type, long cost) {
case READ_TIMESERIES_METADATA_FILE:
readTimeseriesMetadataFileTimer.updateNanos(cost);
break;
case READ_CHUNK_ALL:
readChunkAllTimer.updateNanos(cost);
case READ_CHUNK_CACHE:
readChunkCacheTimer.updateNanos(cost);
break;
case READ_CHUNK_FILE:
readChunkFileTimer.updateNanos(cost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public Operator visitTableScan(TableScanNode node, LocalExecutionPlanContext con
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
TableScanNode.class.getSimpleName());
TableScanOperator.class.getSimpleName());

int maxTsBlockLineNum = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
if (context.getTypeProvider().getTemplatedInfo() != null) {
Expand Down Expand Up @@ -1132,7 +1132,7 @@ public Operator visitStreamSort(StreamSortNode node, LocalExecutionPlanContext c
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
StreamSortNode.class.getSimpleName());
TableStreamSortOperator.class.getSimpleName());
List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
int sortItemsCount = node.getOrderingScheme().getOrderBy().size();

Expand Down Expand Up @@ -1176,11 +1176,6 @@ public Operator visitStreamSort(StreamSortNode node, LocalExecutionPlanContext c

@Override
public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(), node.getPlanNodeId(), JoinNode.class.getSimpleName());
List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());

Operator leftChild = node.getLeftChild().accept(this, context);
Expand Down Expand Up @@ -1221,6 +1216,13 @@ public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) {
}

if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
TableInnerJoinOperator.class.getSimpleName());
return new TableInnerJoinOperator(
operatorContext,
leftChild,
Expand All @@ -1232,6 +1234,13 @@ public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) {
ASC_TIME_COMPARATOR,
dataTypes);
} else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.FULL) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
TableFullOuterJoinOperator.class.getSimpleName());
return new TableFullOuterJoinOperator(
operatorContext,
leftChild,
Expand Down Expand Up @@ -1357,25 +1366,28 @@ public Operator visitTableDeviceQueryCount(

@Override
public Operator visitAggregation(AggregationNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AggregationNode.class.getSimpleName());

Operator child = node.getChild().accept(this, context);

if (node.getGroupingKeys().isEmpty()) {
return planGlobalAggregation(node, child, context.getTypeProvider(), operatorContext);
return planGlobalAggregation(node, child, context.getTypeProvider(), context);
}

return planGroupByAggregation(node, child, context.getTypeProvider(), operatorContext);
return planGroupByAggregation(node, child, context.getTypeProvider(), context);
}

private Operator planGlobalAggregation(
AggregationNode node, Operator child, TypeProvider typeProvider, OperatorContext context) {

AggregationNode node,
Operator child,
TypeProvider typeProvider,
LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AggregationOperator.class.getSimpleName());
Map<Symbol, AggregationNode.Aggregation> aggregationMap = node.getAggregations();
ImmutableList.Builder<TableAggregator> aggregatorBuilder = new ImmutableList.Builder<>();
Map<Symbol, Integer> childLayout =
Expand All @@ -1393,7 +1405,7 @@ private Operator planGlobalAggregation(
typeProvider,
true,
null)));
return new AggregationOperator(context, child, aggregatorBuilder.build());
return new AggregationOperator(operatorContext, child, aggregatorBuilder.build());
}

// timeColumnName will only be set for AggTableScan.
Expand Down Expand Up @@ -1438,7 +1450,7 @@ private Operator planGroupByAggregation(
AggregationNode node,
Operator child,
TypeProvider typeProvider,
OperatorContext operatorContext) {
LocalExecutionPlanContext context) {
Map<Symbol, Integer> childLayout =
makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols());

Expand All @@ -1458,6 +1470,13 @@ private Operator planGroupByAggregation(
buildAggregator(
childLayout, k, v, node.getStep(), typeProvider, true, null)));

OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
StreamingAggregationOperator.class.getSimpleName());
return new StreamingAggregationOperator(
operatorContext,
child,
Expand Down Expand Up @@ -1499,6 +1518,13 @@ private Operator planGroupByAggregation(
}

List<Integer> preGroupedChannels = preGroupedChannelsBuilder.build();
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
StreamingHashAggregationOperator.class.getSimpleName());
return new StreamingHashAggregationOperator(
operatorContext,
child,
Expand All @@ -1522,6 +1548,13 @@ private Operator planGroupByAggregation(
(k, v) ->
aggregatorBuilder.add(
buildGroupByAggregator(childLayout, k, v, node.getStep(), typeProvider)));
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
HashAggregationOperator.class.getSimpleName());

return new HashAggregationOperator(
operatorContext,
Expand Down Expand Up @@ -1739,7 +1772,7 @@ public Operator visitAggregationTableScan(
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AggregationTableScanNode.class.getSimpleName());
TableAggregationTableScanOperator.class.getSimpleName());
SeriesScanOptions.Builder scanOptionsBuilder =
node.getTimePredicate().isPresent()
? getSeriesScanOptionsBuilder(context, node.getTimePredicate().get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.queryengine.plan.relational.sql.parser;

import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataType;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
Expand Down Expand Up @@ -54,6 +55,10 @@
import static java.util.Objects.requireNonNull;

public class SqlParser {

private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
PerformanceOverviewMetrics.getInstance();

private static final ANTLRErrorListener LEXER_ERROR_LISTENER =
new BaseErrorListener() {
@Override
Expand Down Expand Up @@ -131,6 +136,7 @@ private Node invokeParser(
Optional<NodeLocation> location,
Function<RelationalSqlParser, ParserRuleContext> parseFunction,
ZoneId zoneId) {
long startTime = System.nanoTime();
try {
RelationalSqlLexer lexer =
new RelationalSqlLexer(new CaseInsensitiveStream(CharStreams.fromString(sql)));
Expand Down Expand Up @@ -190,6 +196,8 @@ public Token recoverInline(Parser recognizer) throws RecognitionException {
return new AstBuilder(location.orElse(null), zoneId).visit(tree);
} catch (StackOverflowError e) {
throw new ParsingException(name + " is too large (stack overflow while parsing)");
} finally {
PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - startTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import java.util.function.Function;
import java.util.function.LongConsumer;

import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.READ_CHUNK_ALL;
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.READ_CHUNK_CACHE;
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.READ_CHUNK_FILE;

/**
Expand Down Expand Up @@ -162,11 +162,13 @@ private Chunk get(
} finally {
if (chunkLoader.isCacheMiss()) {
cacheMissAdder.accept(1);
SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
READ_CHUNK_FILE, System.nanoTime() - startTime);
} else {
cacheHitAdder.accept(1);
SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
READ_CHUNK_CACHE, System.nanoTime() - startTime);
}
SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
READ_CHUNK_ALL, System.nanoTime() - startTime);
}
}

Expand Down
Loading