From d40ccfee46e2099aa3f0584878dcc5623c59ac6b Mon Sep 17 00:00:00 2001 From: Emmanuel GALLOIS Date: Tue, 31 Oct 2023 09:21:40 +0100 Subject: [PATCH] fix(TCOMP-2339): remove static modifier (#682) * fix(TCOMP-2339): remove static modifier add a list to manage QUEUE : use constant for optim remove joda dependency --------- Co-authored-by: yyin-talend Co-authored-by: Christophe Le Saec --- component-runtime-beam/pom.xml | 7 +- .../runtime/beam/spi/BeamProducerFinder.java | 97 +++++++++++++------ .../beam/ProducerFinderEnvironmentTest.java | 40 ++++++-- .../environment/BeamEnvironmentsTest.java | 18 +++- 4 files changed, 117 insertions(+), 45 deletions(-) diff --git a/component-runtime-beam/pom.xml b/component-runtime-beam/pom.xml index 4fb2926012879..57509b499c1ba 100644 --- a/component-runtime-beam/pom.xml +++ b/component-runtime-beam/pom.xml @@ -66,7 +66,12 @@ org.apache.beam beam-runners-direct-java ${beam.version} - test + + + joda-time + joda-time + + org.apache.beam diff --git a/component-runtime-beam/src/main/java/org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.java b/component-runtime-beam/src/main/java/org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.java index 01da5889dc25a..34cc6d873c4be 100644 --- a/component-runtime-beam/src/main/java/org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.java +++ b/component-runtime-beam/src/main/java/org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.java @@ -20,11 +20,14 @@ import java.util.Iterator; import java.util.Map; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.runners.direct.DirectOptions; +import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -34,7 +37,6 @@ import org.talend.sdk.component.api.record.Record; import org.talend.sdk.component.api.service.source.ProducerFinder; import org.talend.sdk.component.runtime.base.Delegated; -import org.talend.sdk.component.runtime.base.LifecycleImpl; import org.talend.sdk.component.runtime.input.Input; import org.talend.sdk.component.runtime.input.Mapper; import org.talend.sdk.component.runtime.manager.service.ProducerFinderImpl; @@ -46,9 +48,11 @@ @Slf4j public class BeamProducerFinder extends ProducerFinderImpl { - static final int CAPACITY = Integer.parseInt(System.getProperty("talend.beam.wrapper.capacity", "100000")); + private static final int QUEUE_SIZE = 200; - static final Queue QUEUE = new ArrayBlockingQueue<>(CAPACITY, true); + private static final int BEAM_PARALLELISM = 10; + + private static final Map> QUEUE = new ConcurrentHashMap<>(); @Override public Iterator find(final String familyName, final String inputName, final int version, @@ -62,7 +66,10 @@ public Iterator find(final String familyName, final String inputName, fi log.warn("Component Kit Mapper instantiation failed, trying to wrap native beam mapper..."); final Object delegate = Delegated.class.cast(mapper).getDelegate(); if (PTransform.class.isInstance(delegate)) { - return new QueueInput(delegate, familyName, inputName, familyName, PTransform.class.cast(delegate)); + final UUID uuid = UUID.randomUUID(); + QUEUE.put(uuid, new ArrayBlockingQueue<>(QUEUE_SIZE, true)); + return new QueueInput(delegate, familyName, inputName, familyName, PTransform.class.cast(delegate), + uuid); } throw new IllegalStateException(e); } @@ -72,7 +79,7 @@ Object writeReplace() throws ObjectStreamException { return new SerializableService(plugin, ProducerFinder.class.getName()); } - static class QueueInput extends LifecycleImpl implements Input, Iterator { + static class QueueInput implements Iterator, Serializable { private final PTransform> transform; @@ -84,10 +91,14 @@ static class QueueInput extends LifecycleImpl implements Input, Iterator private Record next; + private final UUID queueId; + + private Thread th; + public QueueInput(final Object delegate, final String rootName, final String name, final String plugin, - final PTransform> transform) { - super(delegate, rootName, name, plugin); + final PTransform> transform, final UUID queueId) { this.transform = transform; + this.queueId = queueId; result = runDataReadingPipeline(); } @@ -97,6 +108,9 @@ public boolean hasNext() { next = findNext(); started = true; } + if (next == null) { + QUEUE.remove(this.queueId); + } return next != null; } @@ -111,12 +125,21 @@ public Record next() { } private Record findNext() { - Record record = QUEUE.poll(); + final Queue recordQueue = QUEUE.get(this.queueId); + Record record = recordQueue.poll(); + + int index = 0; while (record == null && (!end)) { - end = result.getState() != PipelineResult.State.RUNNING; - sleep(); - record = QUEUE.poll(); + end = result != null && result.getState() != PipelineResult.State.RUNNING; + if (!end && index > 10) { + result.waitUntilFinish(); + } else { + index++; + log.debug("findNext NULL, retry : end={}; size:{}", end, recordQueue.size()); + sleep(); + } + record = recordQueue.poll(); } return record; } @@ -133,7 +156,7 @@ record = QUEUE.poll(); *

* Not specifying the appropriate classloader can lead to weird exceptions like: *

- * + * *
          * No translator known for org.apache.beam.repackaged.direct_java.runners.core.construction.SplittableParDo$PrimitiveBoundedRead
          * 
@@ -149,14 +172,24 @@ private PipelineResult runDataReadingPipeline() { try { Thread.currentThread().setContextClassLoader(beamAwareClassLoader); - - PipelineOptions options = PipelineOptionsFactory.create(); - PushRecord pushRecord = new PushRecord(); + DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class); + options.setRunner(DirectRunner.class); + options.setTargetParallelism(BEAM_PARALLELISM); + options.setBlockOnRun(false); + MyDoFn pushRecord = new MyDoFn(this.queueId); ParDo.SingleOutput of = ParDo.of(pushRecord); Pipeline p = Pipeline.create(options); p.apply(transform).apply(of); - return p.run(); + final PipelineResult[] result = new PipelineResult[1]; + th = new Thread(() -> { + result[0] = p.run(); + }); + this.th.start(); + while (result[0] == null) { + sleep(); + } + return result[0]; } finally { Thread.currentThread().setContextClassLoader(callerClassLoader); } @@ -164,34 +197,38 @@ private PipelineResult runDataReadingPipeline() { private void sleep() { try { - Thread.sleep(100L); + Thread.sleep(30L); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } - static class PushRecord extends DoFn implements Serializable { + static class MyDoFn extends DoFn { + + private final UUID queueId; + + public MyDoFn(final UUID queueId) { + this.queueId = queueId; + } @ProcessElement - public void processElement(final @Element Record record) { - boolean ok = QUEUE.offer(record); + public void processElement(final ProcessContext context) { + final Queue recordQueue = QUEUE.get(this.queueId); + boolean ok = recordQueue.offer(context.element()); + log.debug("queue injected {}; ok={}; thread:{}", recordQueue.size(), ok, Thread.currentThread().getId()); + while (!ok) { - if (QUEUE.size() >= CAPACITY) { - final String msg = String.format( - "Wrapper queue if full (capacity: %d). Consider increasing it according data with talend.beam.wrapper.capacity property.", - CAPACITY); - log.error("[processElement] {}", msg); - throw new IllegalStateException(msg); - } sleep(); - ok = QUEUE.offer(record); + ok = recordQueue.offer(context.element()); + log.debug("\tqueue injected retry {}; ok={}; thread:{}", recordQueue.size(), ok, + Thread.currentThread().getId()); } } private void sleep() { try { - Thread.sleep(100L); + Thread.sleep(20L); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/component-runtime-testing/component-runtime-beam-junit/src/test/java/org/talend/sdk/component/junit/beam/ProducerFinderEnvironmentTest.java b/component-runtime-testing/component-runtime-beam-junit/src/test/java/org/talend/sdk/component/junit/beam/ProducerFinderEnvironmentTest.java index 75c692058ba24..fbaef47fc47d8 100644 --- a/component-runtime-testing/component-runtime-beam-junit/src/test/java/org/talend/sdk/component/junit/beam/ProducerFinderEnvironmentTest.java +++ b/component-runtime-testing/component-runtime-beam-junit/src/test/java/org/talend/sdk/component/junit/beam/ProducerFinderEnvironmentTest.java @@ -72,6 +72,7 @@ @WithComponents("org.talend.sdk.component.junit.beam.test") public class ProducerFinderEnvironmentTest implements Serializable { + @Injected private BaseComponentsHandler handler; @@ -80,6 +81,18 @@ public class ProducerFinderEnvironmentTest implements Serializable { @Service private RecordBuilderFactory factory; + /** + + * arrayblocking queue capacity + + * 7 fixed CAPACITY variable + + * recordCount value + + * 10 11sec 13sec + + * 100 16sec 13sec + + * 1000 57sec 16sec + + * 10000 ~ 7min55 52sec + + * 100000 1h34min 25min + + */ + private final Integer recordCount = 5000; // 10 100 1000 10000 100000 + @BeforeAll static void forceManagerInit() { // manager for non environmental tests @@ -93,19 +106,25 @@ static void forceManagerInit() { @Test void finderWithTacokitFamily() { - final Iterator recordIterator = getFinder(ComponentManager.instance(), "TckFamily"); + final Iterator recordIterator = getFinder(ComponentManager.instance(), "TckFamily", recordCount); recordIterator.forEachRemaining(Assertions::assertNotNull); } @Test void finderWithBeamFamily() { - final Iterator recordIterator = getFinder(ComponentManager.instance(), "BeamFamily"); - recordIterator.forEachRemaining(Assertions::assertNotNull); + final Iterator recordIterator = getFinder(ComponentManager.instance(), "BeamFamily", recordCount); + final int[] total = new int[1]; + total[0] = 0; + recordIterator.forEachRemaining((Record rec) -> { + Assertions.assertNotNull(rec); + total[0]++; + }); + Assertions.assertEquals(recordCount, total[0], "did not consume all records"); } @EnvironmentalTest void runPipelineBeam() { - Mapper mapper = manager.findMapper("BeamFamily", "from", 1, singletonMap("count", "10")).get(); + Mapper mapper = manager.findMapper("BeamFamily", "from", 1, singletonMap("count", recordCount.toString())).get(); assertNotNull(mapper); final Object delegate = Delegated.class.cast(mapper).getDelegate(); assertNotNull(delegate); @@ -114,7 +133,7 @@ void runPipelineBeam() { @EnvironmentalTest void runPipelineTacokt() { - Mapper mapper = manager.findMapper("TckFamily", "from", 1, singletonMap("count", "10")).get(); + Mapper mapper = manager.findMapper("TckFamily", "from", 1, singletonMap("count", recordCount.toString())).get(); assertNotNull(mapper); runPipeline(TalendIO.read(mapper)); } @@ -123,7 +142,7 @@ private void runPipeline(PTransform> transform) { final Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create()); final PTransform> start = transform; final PCollection out = pipeline.apply(start); - List records = IntStream.range(0, 10) + List records = IntStream.range(0, recordCount) .mapToObj(i -> factory.newRecordBuilder() .withString("id", "id_" + i) .build()) @@ -142,19 +161,20 @@ void runJobWithBeamFamily() { runJob(handler.asManager(), "BeamFamily"); } - private Iterator getFinder(final ComponentManager manager, final String family) { + private Iterator getFinder(final ComponentManager manager, final String family, final int expectedNumber) { final Container container = manager.findPlugin("test-classes").get(); ProducerFinder finder = (ProducerFinder) container.get(AllServices.class) .getServices() .get(ProducerFinder.class); assertNotNull(finder); - final Iterator recordIterator = finder.find(family, "from", 1, singletonMap("count", "10")); + final Iterator recordIterator = finder.find(family, "from", 1, + singletonMap("count", Integer.toString(expectedNumber))); assertNotNull(recordIterator); return recordIterator; } private void runJob(final ComponentManager manager, final String family) { - final Iterator recordIterator = getFinder(manager, family); + final Iterator recordIterator = getFinder(manager, family, recordCount); assertNotNull(recordIterator); handler.setInputData(toIterable(recordIterator)); Job @@ -166,7 +186,7 @@ private void runJob(final ComponentManager manager, final String family) { .to("output") .build() .run(); - assertEquals(10, handler.getCollectedData(Record.class).size()); + assertEquals(recordCount, handler.getCollectedData(Record.class).size()); } static Iterable toIterable(Iterator it) { diff --git a/component-runtime-testing/component-runtime-junit/src/test/java/org/talend/sdk/component/junit5/environment/BeamEnvironmentsTest.java b/component-runtime-testing/component-runtime-junit/src/test/java/org/talend/sdk/component/junit5/environment/BeamEnvironmentsTest.java index be7b604f185b4..e37102c1557ce 100644 --- a/component-runtime-testing/component-runtime-junit/src/test/java/org/talend/sdk/component/junit5/environment/BeamEnvironmentsTest.java +++ b/component-runtime-testing/component-runtime-junit/src/test/java/org/talend/sdk/component/junit5/environment/BeamEnvironmentsTest.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.ServiceLoader; import org.junit.jupiter.api.AfterAll; @@ -47,13 +48,22 @@ class BeamEnvironmentsTest { @EnvironmentalTest void execute() throws ClassNotFoundException { final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - final String runner = ServiceLoader + Iterator loadClasses = ServiceLoader .load(classLoader.loadClass("org.apache.beam.sdk.runners.PipelineRunnerRegistrar")) - .iterator() - .next() + .iterator(); + // take second runner if exist + Object runner = null; + if (loadClasses.hasNext()) { + runner = loadClasses.next(); // first + } + if (loadClasses.hasNext()) { + runner = loadClasses.next(); // second if exist + } + + final String runnerName = runner .getClass() .getName(); - EXECUTIONS.add(System.getProperty("BeamEnvironmentsTest") + "/" + runner); + EXECUTIONS.add(System.getProperty("BeamEnvironmentsTest") + "/" + runnerName); } @AfterAll