diff --git a/mantis-connectors/mantis-connector-iceberg/build.gradle b/mantis-connectors/mantis-connector-iceberg/build.gradle index 15e6fe360..0f954a262 100644 --- a/mantis-connectors/mantis-connector-iceberg/build.gradle +++ b/mantis-connectors/mantis-connector-iceberg/build.gradle @@ -20,11 +20,24 @@ apply plugin: 'mantis' ext { hadoopVersion = '2.7.3' icebergVersion = '0.9.+' - junitVersion = '5.3.+' + junitVersion = '5.4.+' mockitoVersion = '2.18.+' + parquetVersion = '1.12.0' } dependencies { + configurations { + // we need parquet dependency to be present in testing classpath. + // hence we need to extend from shadow configuration + testImplementation.extendsFrom shadow + all { + // we want a parquet version above 1.12.0 because we need this fix + // https://issues.apache.org/jira/browse/PARQUET-1851 + resolutionStrategy { + force "org.apache.parquet:parquet-hadoop:${parquetVersion}" + } + } + } implementation project(":mantis-runtime") // We only need the Configuration interface. Users can bring their own hadoop-common version. @@ -41,6 +54,7 @@ dependencies { shadow "org.slf4j:slf4j-log4j12:$slf4jVersion" testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion" + testImplementation "org.junit.jupiter:junit-jupiter-params:$junitVersion" testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" testImplementation "org.mockito:mockito-core:$mockitoVersion" diff --git a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/config/SinkConfig.java b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/config/SinkConfig.java index f61121122..83728736e 100644 --- a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/config/SinkConfig.java +++ b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/config/SinkConfig.java @@ -24,10 +24,12 @@ import io.mantisrx.connector.iceberg.sink.committer.config.CommitterConfig; import io.mantisrx.connector.iceberg.sink.writer.config.WriterConfig; import io.mantisrx.runtime.parameter.Parameters; +import lombok.RequiredArgsConstructor; /** * Convenient base config used by {@link WriterConfig} and {@link CommitterConfig}. */ +@RequiredArgsConstructor public class SinkConfig { private final String catalog; diff --git a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStage.java b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStage.java index ef03733dd..aac9d7367 100644 --- a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStage.java +++ b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStage.java @@ -16,6 +16,7 @@ package io.mantisrx.connector.iceberg.sink.writer; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.mantisrx.connector.iceberg.sink.codecs.IcebergCodecs; import io.mantisrx.connector.iceberg.sink.writer.config.WriterConfig; import io.mantisrx.connector.iceberg.sink.writer.config.WriterProperties; @@ -34,6 +35,8 @@ import io.mantisrx.runtime.parameter.type.IntParameter; import io.mantisrx.runtime.parameter.type.StringParameter; import io.mantisrx.runtime.parameter.validator.Validators; +import io.mantisrx.runtime.scheduler.MantisRxSingleThreadScheduler; +import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -75,6 +78,7 @@ public static ScalarToScalar.Config config() { return new ScalarToScalar.Config() .description("") .codec(IcebergCodecs.dataFile()) + .serialInput() .withParameters(parameters()); } @@ -119,15 +123,27 @@ public static Transformer newTransformer(Context context) { LocationProvider locationProvider = context.getServiceLocator().service(LocationProvider.class); IcebergWriterFactory factory = new DefaultIcebergWriterFactory(config, workerInfo, table, locationProvider); - IcebergWriterPool writerPool = new FixedIcebergWriterPool( - factory, - config.getWriterFlushFrequencyBytes(), - config.getWriterMaximumPoolSize()); + IcebergWriterPool writerPool = new FixedIcebergWriterPool(factory, config); WriterMetrics metrics = new WriterMetrics(); PartitionerFactory partitionerFactory = context.getServiceLocator().service(PartitionerFactory.class); Partitioner partitioner = partitionerFactory.getPartitioner(table); - return new Transformer(config, metrics, writerPool, partitioner, Schedulers.computation(), Schedulers.io()); + return newTransformer(config, metrics, writerPool, partitioner, context.getWorkerInfo()); + } + + @VisibleForTesting + static Transformer newTransformer( + WriterConfig writerConfig, + WriterMetrics writerMetrics, + IcebergWriterPool writerPool, + Partitioner partitioner, + WorkerInfo workerInfo) { + int workerIdx = workerInfo.getWorkerIndex(); + String nameFormat = "IcebergWriter (" + (workerIdx + 1) + ")-%d"; + Scheduler executingService = new MantisRxSingleThreadScheduler( + new ThreadFactoryBuilder().setNameFormat(nameFormat).build()); + return new Transformer(writerConfig, writerMetrics, writerPool, partitioner, + Schedulers.computation(), executingService); } public IcebergWriterStage() { diff --git a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/config/WriterConfig.java b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/config/WriterConfig.java index cd7d9b4c9..a821d769b 100644 --- a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/config/WriterConfig.java +++ b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/config/WriterConfig.java @@ -52,6 +52,19 @@ public WriterConfig(Parameters parameters, Configuration hadoopConfig) { this.hadoopConfig = hadoopConfig; } + public WriterConfig(String catalog, String database, String table, int writerRowGroupSize, + long writerFlushFrequencyBytes, long writerFlushFrequencyMsec, + String writerFileFormat, int writerMaximumPoolSize, + Configuration hadoopConfig) { + super(catalog, database, table); + this.writerRowGroupSize = writerRowGroupSize; + this.writerFlushFrequencyBytes = writerFlushFrequencyBytes; + this.writerFlushFrequencyMsec = writerFlushFrequencyMsec; + this.writerFileFormat = writerFileFormat; + this.writerMaximumPoolSize = writerMaximumPoolSize; + this.hadoopConfig = hadoopConfig; + } + /** * Returns an int representing maximum number of rows that should exist in a file. */ diff --git a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/pool/FixedIcebergWriterPool.java b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/pool/FixedIcebergWriterPool.java index ba7b69ef6..cb87786ce 100644 --- a/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/pool/FixedIcebergWriterPool.java +++ b/mantis-connectors/mantis-connector-iceberg/src/main/java/io/mantisrx/connector/iceberg/sink/writer/pool/FixedIcebergWriterPool.java @@ -43,6 +43,10 @@ public class FixedIcebergWriterPool implements IcebergWriterPool { private final long flushFrequencyBytes; private final int maximumPoolSize; + public FixedIcebergWriterPool(IcebergWriterFactory factory, WriterConfig writerConfig) { + this(factory, writerConfig.getWriterFlushFrequencyBytes(), writerConfig.getWriterMaximumPoolSize()); + } + public FixedIcebergWriterPool(IcebergWriterFactory factory, long flushFrequencyBytes, int maximumPoolSize) { this.factory = factory; this.flushFrequencyBytes = flushFrequencyBytes; diff --git a/mantis-connectors/mantis-connector-iceberg/src/main/resources/log4j.properties b/mantis-connectors/mantis-connector-iceberg/src/main/resources/log4j.properties index 09d89d53c..4d33ac1eb 100644 --- a/mantis-connectors/mantis-connector-iceberg/src/main/resources/log4j.properties +++ b/mantis-connectors/mantis-connector-iceberg/src/main/resources/log4j.properties @@ -19,4 +19,4 @@ log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n \ No newline at end of file +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n \ No newline at end of file diff --git a/mantis-connectors/mantis-connector-iceberg/src/test/java/io/mantisrx/connector/iceberg/sink/writer/IcebergTableExtension.java b/mantis-connectors/mantis-connector-iceberg/src/test/java/io/mantisrx/connector/iceberg/sink/writer/IcebergTableExtension.java new file mode 100644 index 000000000..353c63ec1 --- /dev/null +++ b/mantis-connectors/mantis-connector-iceberg/src/test/java/io/mantisrx/connector/iceberg/sink/writer/IcebergTableExtension.java @@ -0,0 +1,109 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.connector.iceberg.sink.writer; + +import com.google.common.io.Files; +import java.io.File; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.LocationProvider; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +/** + * Junit Jupiter Extension to create Iceberg Tables for unit testing with a specified schema, + * properties, etc.... and to also clean the files after unit tests. + *

+ * The way to use the IcebergTableExtension is by adding the following code to your test class. This + * creates the table before the test is executed. + *

+ *     @RegisterExtension
+ *     static IcebergTableExtension tableExtension =
+ *       IcebergTableExtension.builder()
+ *           .schema(SCHEMA)
+ *           .spec(SPEC)
+ *           .build();
+ * 
+ * + *

The created table can be obtained by the {@link IcebergTableExtension#getTable()} method. + */ +@Slf4j +@Builder +public class IcebergTableExtension implements BeforeAllCallback, BeforeEachCallback, + AfterEachCallback { + + private File rootDir; + + @Getter + @Builder.Default + private String catalog = "catalog"; + + @Getter + @Builder.Default + private String database = "database"; + + @Getter + @Builder.Default + private String tableName = "table"; + + @Getter + private Schema schema; + private PartitionSpec spec; + + @Getter + private Table table; + + @Override + public void beforeAll(ExtensionContext context) throws Exception { + log.info("Before All"); + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + log.info("Before Each"); + if (rootDir == null) { + rootDir = Files.createTempDir(); + } + + final File tableDir = new File(rootDir, getTableIdentifier().toString()); + final HadoopTables tables = new HadoopTables(); + table = tables.create(schema, spec, tableDir.getPath()); + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + FileUtils.deleteDirectory(rootDir); + rootDir = null; + } + + public LocationProvider getLocationProvider() { + return table.locationProvider(); + } + + public TableIdentifier getTableIdentifier() { + return TableIdentifier.of(catalog, database, tableName); + } +} diff --git a/mantis-connectors/mantis-connector-iceberg/src/test/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterEndToEndTest.java b/mantis-connectors/mantis-connector-iceberg/src/test/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterEndToEndTest.java new file mode 100644 index 000000000..173913b8f --- /dev/null +++ b/mantis-connectors/mantis-connector-iceberg/src/test/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterEndToEndTest.java @@ -0,0 +1,162 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.connector.iceberg.sink.writer; + +import static io.mantisrx.connector.iceberg.sink.writer.config.WriterProperties.WRITER_FILE_FORMAT_DEFAULT; +import static io.mantisrx.connector.iceberg.sink.writer.config.WriterProperties.WRITER_FLUSH_FREQUENCY_BYTES_DEFAULT; +import static io.mantisrx.connector.iceberg.sink.writer.config.WriterProperties.WRITER_MAXIMUM_POOL_SIZE_DEFAULT; +import static io.mantisrx.connector.iceberg.sink.writer.config.WriterProperties.WRITER_ROW_GROUP_SIZE_DEFAULT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.mantisrx.connector.iceberg.sink.writer.IcebergWriterStage.Transformer; +import io.mantisrx.connector.iceberg.sink.writer.config.WriterConfig; +import io.mantisrx.connector.iceberg.sink.writer.factory.DefaultIcebergWriterFactory; +import io.mantisrx.connector.iceberg.sink.writer.factory.IcebergWriterFactory; +import io.mantisrx.connector.iceberg.sink.writer.metrics.WriterMetrics; +import io.mantisrx.connector.iceberg.sink.writer.partitioner.Partitioner; +import io.mantisrx.connector.iceberg.sink.writer.pool.FixedIcebergWriterPool; +import io.mantisrx.connector.iceberg.sink.writer.pool.IcebergWriterPool; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.MantisJobDurationType; +import io.mantisrx.runtime.WorkerInfo; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import rx.Observable; +import rx.Subscription; + +@Slf4j +public class IcebergWriterEndToEndTest { + + // records of the following shape + // { + // "partition": 1, + // "id": 2 + // } + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "partition", Types.IntegerType.get()), + Types.NestedField.required(2, "id", Types.IntegerType.get())); + + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("partition").build(); + + @RegisterExtension + static IcebergTableExtension tableExtension = + IcebergTableExtension.builder() + .schema(SCHEMA) + .spec(SPEC) + .build(); + + private static final WorkerInfo WORKER_INFO = + new WorkerInfo("testJobName", "jobId", 1, 1, 1, MantisJobDurationType.Perpetual, + "host"); + + private Partitioner partitioner = record -> { + GenericRecord partitionRecord = GenericRecord.create(SPEC.schema()); + partitionRecord.setField("partition", 1); + return partitionRecord; + }; + + private Context stageContext = mock(Context.class); + + @BeforeEach + public void initStageContext() { + when(stageContext.getWorkerInfo()).thenReturn(WORKER_INFO); + } + + @Test + public void testTransformerEndToEnd() throws Exception { + final WriterConfig writerConfig = new WriterConfig( + tableExtension.getCatalog(), + tableExtension.getDatabase(), + tableExtension.getTableName(), + WRITER_ROW_GROUP_SIZE_DEFAULT, + Long.parseLong(WRITER_FLUSH_FREQUENCY_BYTES_DEFAULT), + 100L, + WRITER_FILE_FORMAT_DEFAULT, + WRITER_MAXIMUM_POOL_SIZE_DEFAULT, + new Configuration()); + + final IcebergWriterFactory writerFactory = + new DefaultIcebergWriterFactory(writerConfig, WORKER_INFO, tableExtension.getTable(), + tableExtension.getLocationProvider()); + + final IcebergWriterPool writerPool = new FixedIcebergWriterPool(writerFactory, writerConfig); + Transformer transformer = + IcebergWriterStage.newTransformer(writerConfig, new WriterMetrics(), writerPool, partitioner, + WORKER_INFO); + + final int size = 1000; + + // odd numbers observable + Observable oddObservable = + Observable + .interval(0, 1, TimeUnit.MILLISECONDS) + .filter(val -> (val % 2 == 1)) + .takeUntil(val -> val > size) + .map(val -> { + GenericRecord record = GenericRecord.create(SCHEMA); + record.setField("partition", 1); + record.setField("id", val); + return record; + }) + .publish() + .autoConnect(); + + // even numbers observable + Observable evenObservable = + Observable + .interval(size / 2, 1, TimeUnit.MILLISECONDS) + .filter(val -> (val % 2 == 0)) + .takeUntil(val -> val + (size /2) > size) + .map(val -> { + GenericRecord record = GenericRecord.create(SCHEMA); + record.setField("partition", 1); + record.setField("id", val); + return record; + }) + .publish() + .autoConnect(); + + Observable recordObservable = Observable.merge(oddObservable, evenObservable); + + Observable dataFileObservable = transformer.call(recordObservable); + + AtomicReference failure = new AtomicReference<>(); + List dataFileList = new ArrayList<>(); + Subscription subscription = dataFileObservable.subscribe(dataFileList::add, failure::set); + Thread.sleep(2 * size); + if (failure.get() != null) { + throw new Exception(failure.get()); + } + log.info("Collected {} data files successfully", dataFileList.size()); + } +} diff --git a/mantis-connectors/mantis-connector-iceberg/src/test/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStageTest.java b/mantis-connectors/mantis-connector-iceberg/src/test/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStageTest.java index aadeb76ff..6e07c713a 100644 --- a/mantis-connectors/mantis-connector-iceberg/src/test/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStageTest.java +++ b/mantis-connectors/mantis-connector-iceberg/src/test/java/io/mantisrx/connector/iceberg/sink/writer/IcebergWriterStageTest.java @@ -37,6 +37,8 @@ import io.mantisrx.connector.iceberg.sink.writer.pool.FixedIcebergWriterPool; import io.mantisrx.connector.iceberg.sink.writer.pool.IcebergWriterPool; import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.MantisJobDurationType; +import io.mantisrx.runtime.WorkerInfo; import io.mantisrx.runtime.lifecycle.ServiceLocator; import io.mantisrx.runtime.parameter.Parameters; import java.io.IOException; @@ -124,6 +126,9 @@ record = GenericRecord.create(SCHEMA); this.context = mock(Context.class); when(this.context.getParameters()).thenReturn(parameters); when(this.context.getServiceLocator()).thenReturn(serviceLocator); + when(this.context.getWorkerInfo()).thenReturn( + new WorkerInfo("testJobName", "jobId", 1, 1, 1, MantisJobDurationType.Perpetual, + "host")); // Flow Observable source = Observable.interval(1, TimeUnit.MILLISECONDS, this.scheduler) diff --git a/mantis-connectors/mantis-connector-iceberg/src/test/resources/log4j.properties b/mantis-connectors/mantis-connector-iceberg/src/test/resources/log4j.properties index 09d89d53c..4d33ac1eb 100644 --- a/mantis-connectors/mantis-connector-iceberg/src/test/resources/log4j.properties +++ b/mantis-connectors/mantis-connector-iceberg/src/test/resources/log4j.properties @@ -19,4 +19,4 @@ log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n \ No newline at end of file +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n \ No newline at end of file diff --git a/mantis-runtime/build.gradle b/mantis-runtime/build.gradle index 0fb40d3fd..df078f622 100644 --- a/mantis-runtime/build.gradle +++ b/mantis-runtime/build.gradle @@ -28,5 +28,3 @@ dependencies { testImplementation "org.mockito:mockito-core:$mockitoVersion" } - - diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/WorkerInfo.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/WorkerInfo.java index e3f66f781..b67e63a9e 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/WorkerInfo.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/WorkerInfo.java @@ -17,7 +17,7 @@ package io.mantisrx.runtime; import io.mantisrx.common.WorkerPorts; - +import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting; public class WorkerInfo { @@ -30,9 +30,9 @@ public class WorkerInfo { private final WorkerPorts workerPorts; - private final MantisJobDurationType durationType; + @VisibleForTesting public WorkerInfo(String jobName, String jobId, int stageNumber, int workerIndex, int workerNumber, MantisJobDurationType durationType, String host) { this(jobName, jobId, stageNumber, workerIndex, workerNumber, durationType, host, new WorkerPorts(-1, -1, -1, -1, -1)); } @@ -46,7 +46,6 @@ public WorkerInfo(String jobName, String jobId, int stageNumber, int workerIndex this.durationType = durationType; this.host = host; this.workerPorts = workerPorts; - } /** @@ -88,11 +87,9 @@ public String getHost() { } public WorkerPorts getWorkerPorts() { - return workerPorts; } - @Override public String toString() { return "WorkerInfo{" +