Skip to content

Commit 313d9af

Browse files
committed
Cleanup PagesSerdeFactory creation
1 parent c36815a commit 313d9af

22 files changed

+96
-53
lines changed

core/trino-main/src/main/java/io/trino/client/direct/DirectTrinoClient.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.trino.execution.QueryManager;
2424
import io.trino.execution.QueryState;
2525
import io.trino.execution.buffer.PageDeserializer;
26-
import io.trino.execution.buffer.PagesSerdeFactory;
2726
import io.trino.memory.context.SimpleLocalMemoryContext;
2827
import io.trino.operator.DirectExchangeClient;
2928
import io.trino.operator.DirectExchangeClientSupplier;
@@ -43,10 +42,10 @@
4342
import java.util.concurrent.ExecutionException;
4443

4544
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
46-
import static io.trino.SystemSessionProperties.getExchangeCompressionCodec;
4745
import static io.trino.SystemSessionProperties.getRetryPolicy;
4846
import static io.trino.execution.QueryState.FAILED;
4947
import static io.trino.execution.QueryState.FINISHING;
48+
import static io.trino.execution.buffer.PagesSerdes.createExchangePagesSerdeFactory;
5049
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
5150
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
5251
import static java.util.Objects.requireNonNull;
@@ -94,7 +93,7 @@ public DispatchQuery execute(SessionContext sessionContext, @Language("SQL") Str
9493
}
9594
});
9695

97-
PageDeserializer pageDeserializer = new PagesSerdeFactory(blockEncodingSerde, getExchangeCompressionCodec(dispatchQuery.getSession())).createDeserializer(Optional.empty());
96+
PageDeserializer pageDeserializer = createExchangePagesSerdeFactory(blockEncodingSerde, dispatchQuery.getSession()).createDeserializer(Optional.empty());
9897
for (QueryState state = queryManager.getQueryState(queryId);
9998
(state != FAILED) &&
10099
!exchangeClient.isFinished() &&

core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ public class PagesSerdeFactory
3939
private final CompressionCodec compressionCodec;
4040
private final int blockSizeInBytes;
4141

42-
public PagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, CompressionCodec compressionCodec)
42+
// created via PagesSerdes.create*
43+
PagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, CompressionCodec compressionCodec)
4344
{
4445
this(blockEncodingSerde, compressionCodec, SERIALIZED_PAGE_DEFAULT_BLOCK_SIZE_IN_BYTES);
4546
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.execution.buffer;
15+
16+
import io.trino.Session;
17+
import io.trino.spi.block.BlockEncodingSerde;
18+
19+
import static io.trino.SystemSessionProperties.getExchangeCompressionCodec;
20+
21+
public final class PagesSerdes
22+
{
23+
private PagesSerdes() {}
24+
25+
public static PagesSerdeFactory createExchangePagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, Session session)
26+
{
27+
return new PagesSerdeFactory(blockEncodingSerde, getExchangeCompressionCodec(session));
28+
}
29+
30+
public static PagesSerdeFactory createSpillingPagesSerdeFactory(BlockEncodingSerde blockEncodingSerde, CompressionCodec compressionCodec)
31+
{
32+
return new PagesSerdeFactory(blockEncodingSerde, compressionCodec);
33+
}
34+
}

core/trino-main/src/main/java/io/trino/server/protocol/Query.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import io.trino.execution.QueryState;
4343
import io.trino.execution.StageId;
4444
import io.trino.execution.buffer.PageDeserializer;
45-
import io.trino.execution.buffer.PagesSerdeFactory;
4645
import io.trino.memory.context.SimpleLocalMemoryContext;
4746
import io.trino.operator.DirectExchangeClientSupplier;
4847
import io.trino.server.ExternalUriInfo;
@@ -86,10 +85,10 @@
8685
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
8786
import static io.airlift.concurrent.MoreFutures.addTimeout;
8887
import static io.airlift.units.DataSize.Unit.MEGABYTE;
89-
import static io.trino.SystemSessionProperties.getExchangeCompressionCodec;
9088
import static io.trino.SystemSessionProperties.getRetryPolicy;
9189
import static io.trino.execution.QueryState.FAILED;
9290
import static io.trino.execution.QueryState.FINISHING;
91+
import static io.trino.execution.buffer.PagesSerdes.createExchangePagesSerdeFactory;
9392
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
9493
import static io.trino.server.protocol.ProtocolUtil.createColumn;
9594
import static io.trino.server.protocol.ProtocolUtil.toStatementStats;
@@ -262,7 +261,7 @@ private Query(
262261
this.resultsProcessorExecutor = resultsProcessorExecutor;
263262
this.timeoutExecutor = timeoutExecutor;
264263
this.supportsParametricDateTime = session.getClientCapabilities().contains(ClientCapabilities.PARAMETRIC_DATETIME.toString());
265-
deserializer = new PagesSerdeFactory(blockEncodingSerde, getExchangeCompressionCodec(session))
264+
deserializer = createExchangePagesSerdeFactory(blockEncodingSerde, session)
266265
.createDeserializer(session.getExchangeEncryptionKey().map(Ciphers::deserializeAesEncryptionKey));
267266
}
268267

core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpillerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
4747
import static io.trino.FeaturesConfig.SPILLER_SPILL_PATH;
4848
import static io.trino.cache.SafeCaches.buildNonEvictableCacheWithWeakInvalidateAll;
49+
import static io.trino.execution.buffer.PagesSerdes.createSpillingPagesSerdeFactory;
4950
import static io.trino.spi.StandardErrorCode.OUT_OF_SPILL_SPACE;
5051
import static io.trino.util.Ciphers.createRandomAesEncryptionKey;
5152
import static java.lang.String.format;
@@ -107,7 +108,7 @@ public FileSingleStreamSpillerFactory(
107108
CompressionCodec compressionCodec,
108109
boolean spillEncryptionEnabled)
109110
{
110-
this.serdeFactory = new PagesSerdeFactory(blockEncodingSerde, compressionCodec);
111+
this.serdeFactory = createSpillingPagesSerdeFactory(blockEncodingSerde, compressionCodec);
111112
this.executor = requireNonNull(executor, "executor is null");
112113
this.spillerStats = requireNonNull(spillerStats, "spillerStats cannot be null");
113114
requireNonNull(spillPaths, "spillPaths is null");

core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import io.trino.execution.TaskId;
4444
import io.trino.execution.TaskManagerConfig;
4545
import io.trino.execution.buffer.OutputBuffer;
46-
import io.trino.execution.buffer.PagesSerdeFactory;
4746
import io.trino.metadata.MergeHandle;
4847
import io.trino.metadata.Metadata;
4948
import io.trino.metadata.ResolvedFunction;
@@ -314,7 +313,6 @@
314313
import static io.trino.SystemSessionProperties.getAdaptivePartialAggregationUniqueRowsRatioThreshold;
315314
import static io.trino.SystemSessionProperties.getAggregationOperatorUnspillMemoryLimit;
316315
import static io.trino.SystemSessionProperties.getDynamicRowFilterSelectivityThreshold;
317-
import static io.trino.SystemSessionProperties.getExchangeCompressionCodec;
318316
import static io.trino.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount;
319317
import static io.trino.SystemSessionProperties.getFilterAndProjectMinOutputPageSize;
320318
import static io.trino.SystemSessionProperties.getPagePartitioningBufferPoolSize;
@@ -331,6 +329,7 @@
331329
import static io.trino.SystemSessionProperties.isSpillEnabled;
332330
import static io.trino.cache.CacheUtils.uncheckedCacheGet;
333331
import static io.trino.cache.SafeCaches.buildNonEvictableCache;
332+
import static io.trino.execution.buffer.PagesSerdes.createExchangePagesSerdeFactory;
334333
import static io.trino.metadata.GlobalFunctionCatalog.builtinFunctionName;
335334
import static io.trino.operator.DistinctLimitOperator.DistinctLimitOperatorFactory;
336335
import static io.trino.operator.HashArraySizeSupplier.incrementalLoadFactorHashArraySizeSupplier;
@@ -675,7 +674,7 @@ public LocalExecutionPlan plan(
675674
plan.getId(),
676675
outputTypes,
677676
pagePreprocessor,
678-
new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), getExchangeCompressionCodec(session))),
677+
createExchangePagesSerdeFactory(plannerContext.getBlockEncodingSerde(), session)),
679678
physicalOperation),
680679
context);
681680

@@ -940,7 +939,7 @@ private PhysicalOperation createMergeSource(RemoteSourceNode node, LocalExecutio
940939
context.getNextOperatorId(),
941940
node.getId(),
942941
directExchangeClientSupplier,
943-
new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), getExchangeCompressionCodec(session)),
942+
createExchangePagesSerdeFactory(plannerContext.getBlockEncodingSerde(), session),
944943
orderingCompiler,
945944
types,
946945
outputChannels,
@@ -960,7 +959,7 @@ private PhysicalOperation createRemoteSource(RemoteSourceNode node, LocalExecuti
960959
context.getNextOperatorId(),
961960
node.getId(),
962961
directExchangeClientSupplier,
963-
new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), getExchangeCompressionCodec(session)),
962+
createExchangePagesSerdeFactory(plannerContext.getBlockEncodingSerde(), session),
964963
node.getRetryPolicy(),
965964
exchangeManagerRegistry);
966965

core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import io.trino.execution.buffer.BufferState;
3232
import io.trino.execution.buffer.OutputBuffer;
3333
import io.trino.execution.buffer.OutputBufferStateMachine;
34-
import io.trino.execution.buffer.PagesSerdeFactory;
3534
import io.trino.execution.buffer.PartitionedOutputBuffer;
3635
import io.trino.execution.buffer.PipelinedOutputBuffers;
3736
import io.trino.execution.buffer.PipelinedOutputBuffers.OutputBufferId;
@@ -50,7 +49,6 @@
5049
import io.trino.operator.output.TaskOutputOperator.TaskOutputOperatorFactory;
5150
import io.trino.spi.Page;
5251
import io.trino.spi.QueryId;
53-
import io.trino.spi.block.TestingBlockEncodingSerde;
5452
import io.trino.spi.connector.ConnectorSplit;
5553
import io.trino.spiller.SpillSpaceTracker;
5654
import io.trino.sql.planner.LocalExecutionPlanner.LocalExecutionPlan;
@@ -79,9 +77,9 @@
7977
import static io.trino.execution.TaskState.RUNNING;
8078
import static io.trino.execution.TaskTestUtils.TABLE_SCAN_NODE_ID;
8179
import static io.trino.execution.TaskTestUtils.createTestSplitMonitor;
82-
import static io.trino.execution.buffer.CompressionCodec.NONE;
8380
import static io.trino.execution.buffer.PagesSerdeUtil.getSerializedPagePositionCount;
8481
import static io.trino.execution.buffer.PipelinedOutputBuffers.BufferType.PARTITIONED;
82+
import static io.trino.execution.buffer.TestingPagesSerdes.createTestingPagesSerdeFactory;
8583
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
8684
import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE;
8785
import static java.util.Objects.requireNonNull;
@@ -127,7 +125,7 @@ public void testSimple()
127125
TABLE_SCAN_NODE_ID,
128126
outputBuffer,
129127
Function.identity(),
130-
new PagesSerdeFactory(new TestingBlockEncodingSerde(), NONE));
128+
createTestingPagesSerdeFactory());
131129
LocalExecutionPlan localExecutionPlan = new LocalExecutionPlan(
132130
ImmutableList.of(new DriverFactory(
133131
0,

core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.trino.spi.PageBuilder;
2424
import io.trino.spi.block.BlockBuilder;
2525
import io.trino.spi.block.RowBlockBuilder;
26-
import io.trino.spi.block.TestingBlockEncodingSerde;
2726
import io.trino.spi.type.DecimalType;
2827
import io.trino.spi.type.Int128;
2928
import io.trino.spi.type.RowType;
@@ -55,9 +54,9 @@
5554
import static com.google.common.collect.ImmutableList.toImmutableList;
5655
import static io.airlift.slice.Slices.utf8Slice;
5756
import static io.trino.execution.buffer.BenchmarkDataGenerator.createValues;
58-
import static io.trino.execution.buffer.CompressionCodec.NONE;
5957
import static io.trino.execution.buffer.PagesSerdeUtil.readPages;
6058
import static io.trino.execution.buffer.PagesSerdeUtil.writePages;
59+
import static io.trino.execution.buffer.TestingPagesSerdes.createTestingPagesSerdeFactory;
6160
import static io.trino.jmh.Benchmarks.benchmark;
6261
import static io.trino.plugin.tpch.TpchTables.getTablePages;
6362
import static io.trino.spi.type.BigintType.BIGINT;
@@ -207,7 +206,7 @@ public abstract static class TypeBenchmarkData
207206

208207
public void setup(Type type, Function<Random, ?> valueGenerator)
209208
{
210-
PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new TestingBlockEncodingSerde(), NONE);
209+
PagesSerdeFactory serdeFactory = createTestingPagesSerdeFactory();
211210
PageSerializer serializer = serdeFactory.createSerializer(Optional.empty());
212211
PageDeserializer deserializer = serdeFactory.createDeserializer(Optional.empty());
213212
PageBuilder pageBuilder = new PageBuilder(ImmutableList.of(type));
@@ -404,7 +403,7 @@ public static class LineitemBenchmarkData
404403
@Setup
405404
public void setup()
406405
{
407-
PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new TestingBlockEncodingSerde(), NONE);
406+
PagesSerdeFactory serdeFactory = createTestingPagesSerdeFactory();
408407
PageSerializer serializer = serdeFactory.createSerializer(Optional.empty());
409408
PageDeserializer deserializer = serdeFactory.createDeserializer(Optional.empty());
410409

core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkPagesSerde.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import io.trino.spi.Page;
1919
import io.trino.spi.PageBuilder;
2020
import io.trino.spi.block.BlockBuilder;
21-
import io.trino.spi.block.TestingBlockEncodingSerde;
2221
import io.trino.spi.type.Type;
2322
import org.junit.jupiter.api.Test;
2423
import org.openjdk.jmh.annotations.Benchmark;
@@ -46,6 +45,7 @@
4645
import static io.airlift.slice.Slices.utf8Slice;
4746
import static io.trino.execution.buffer.CompressionCodec.LZ4;
4847
import static io.trino.execution.buffer.CompressionCodec.NONE;
48+
import static io.trino.execution.buffer.TestingPagesSerdes.createTestingPagesSerdeFactory;
4949
import static io.trino.jmh.Benchmarks.benchmark;
5050
import static io.trino.operator.PageAssertions.assertPageEquals;
5151
import static io.trino.spi.type.VarcharType.VARCHAR;
@@ -115,7 +115,7 @@ public static class BenchmarkData
115115
@Setup
116116
public void initialize()
117117
{
118-
PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new TestingBlockEncodingSerde(), compressionCodec);
118+
PagesSerdeFactory serdeFactory = createTestingPagesSerdeFactory(compressionCodec);
119119
Optional<SecretKey> encryptionKey = encrypted ? Optional.of(createRandomAesEncryptionKey()) : Optional.empty();
120120
serializer = serdeFactory.createSerializer(encryptionKey);
121121
deserializer = serdeFactory.createDeserializer(encryptionKey);

core/trino-main/src/test/java/io/trino/execution/buffer/BufferTestUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import static com.google.common.base.Preconditions.checkArgument;
3232
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
3333
import static io.trino.execution.buffer.BufferState.FINISHED;
34+
import static io.trino.execution.buffer.CompressionCodec.LZ4;
35+
import static io.trino.execution.buffer.TestingPagesSerdes.createTestingPagesSerdeFactory;
3436
import static java.util.concurrent.TimeUnit.MILLISECONDS;
3537
import static java.util.concurrent.TimeUnit.SECONDS;
3638
import static org.assertj.core.api.Assertions.assertThat;
@@ -39,7 +41,7 @@ public final class BufferTestUtils
3941
{
4042
private BufferTestUtils() {}
4143

42-
private static final PagesSerdeFactory PAGES_SERDE_FACTORY = new TestingPagesSerdeFactory();
44+
private static final PagesSerdeFactory PAGES_SERDE_FACTORY = createTestingPagesSerdeFactory(LZ4);
4345
static final Duration NO_WAIT = new Duration(0, MILLISECONDS);
4446
static final Duration MAX_WAIT = new Duration(1, SECONDS);
4547
private static final DataSize BUFFERED_PAGE_SIZE = DataSize.ofBytes(serializePage(createPage(42)).getRetainedSize());

0 commit comments

Comments
 (0)