Skip to content

Commit

Permalink
fix(TCOMP-2339): remove static modifier (#682)
Browse files Browse the repository at this point in the history
* fix(TCOMP-2339): remove static modifier
 add a list to manage QUEUE
: use constant for optim
 remove joda dependency

---------

Co-authored-by: yyin-talend <[email protected]>
Co-authored-by: Christophe Le Saec <[email protected]>
  • Loading branch information
3 people committed Oct 31, 2023
1 parent 1fa8d76 commit d40ccfe
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 45 deletions.
7 changes: 6 additions & 1 deletion component-runtime-beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -34,7 +37,6 @@
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.source.ProducerFinder;
import org.talend.sdk.component.runtime.base.Delegated;
import org.talend.sdk.component.runtime.base.LifecycleImpl;
import org.talend.sdk.component.runtime.input.Input;
import org.talend.sdk.component.runtime.input.Mapper;
import org.talend.sdk.component.runtime.manager.service.ProducerFinderImpl;
Expand All @@ -46,9 +48,11 @@
@Slf4j
public class BeamProducerFinder extends ProducerFinderImpl {

static final int CAPACITY = Integer.parseInt(System.getProperty("talend.beam.wrapper.capacity", "100000"));
private static final int QUEUE_SIZE = 200;

static final Queue<Record> QUEUE = new ArrayBlockingQueue<>(CAPACITY, true);
private static final int BEAM_PARALLELISM = 10;

private static final Map<UUID, Queue<Record>> QUEUE = new ConcurrentHashMap<>();

@Override
public Iterator<Record> find(final String familyName, final String inputName, final int version,
Expand All @@ -62,7 +66,10 @@ public Iterator<Record> find(final String familyName, final String inputName, fi
log.warn("Component Kit Mapper instantiation failed, trying to wrap native beam mapper...");
final Object delegate = Delegated.class.cast(mapper).getDelegate();
if (PTransform.class.isInstance(delegate)) {
return new QueueInput(delegate, familyName, inputName, familyName, PTransform.class.cast(delegate));
final UUID uuid = UUID.randomUUID();
QUEUE.put(uuid, new ArrayBlockingQueue<>(QUEUE_SIZE, true));
return new QueueInput(delegate, familyName, inputName, familyName, PTransform.class.cast(delegate),
uuid);
}
throw new IllegalStateException(e);
}
Expand All @@ -72,7 +79,7 @@ Object writeReplace() throws ObjectStreamException {
return new SerializableService(plugin, ProducerFinder.class.getName());
}

static class QueueInput extends LifecycleImpl implements Input, Iterator<Record> {
static class QueueInput implements Iterator<Record>, Serializable {

private final PTransform<PBegin, PCollection<Record>> transform;

Expand All @@ -84,10 +91,14 @@ static class QueueInput extends LifecycleImpl implements Input, Iterator<Record>

private Record next;

private final UUID queueId;

private Thread th;

public QueueInput(final Object delegate, final String rootName, final String name, final String plugin,
final PTransform<PBegin, PCollection<Record>> transform) {
super(delegate, rootName, name, plugin);
final PTransform<PBegin, PCollection<Record>> transform, final UUID queueId) {
this.transform = transform;
this.queueId = queueId;
result = runDataReadingPipeline();
}

Expand All @@ -97,6 +108,9 @@ public boolean hasNext() {
next = findNext();
started = true;
}
if (next == null) {
QUEUE.remove(this.queueId);
}
return next != null;
}

Expand All @@ -111,12 +125,21 @@ public Record next() {
}

private Record findNext() {
Record record = QUEUE.poll();
final Queue<Record> recordQueue = QUEUE.get(this.queueId);

Record record = recordQueue.poll();

int index = 0;
while (record == null && (!end)) {
end = result.getState() != PipelineResult.State.RUNNING;
sleep();
record = QUEUE.poll();
end = result != null && result.getState() != PipelineResult.State.RUNNING;
if (!end && index > 10) {
result.waitUntilFinish();
} else {
index++;
log.debug("findNext NULL, retry : end={}; size:{}", end, recordQueue.size());
sleep();
}
record = recordQueue.poll();
}
return record;
}
Expand All @@ -133,7 +156,7 @@ record = QUEUE.poll();
* <p>
* Not specifying the appropriate classloader can lead to weird exceptions like:
* </p>
*
*
* <pre>
* No translator known for org.apache.beam.repackaged.direct_java.runners.core.construction.SplittableParDo$PrimitiveBoundedRead
* </pre>
Expand All @@ -149,49 +172,63 @@ private PipelineResult runDataReadingPipeline() {

try {
Thread.currentThread().setContextClassLoader(beamAwareClassLoader);

PipelineOptions options = PipelineOptionsFactory.create();
PushRecord pushRecord = new PushRecord();
DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
options.setRunner(DirectRunner.class);
options.setTargetParallelism(BEAM_PARALLELISM);
options.setBlockOnRun(false);
MyDoFn pushRecord = new MyDoFn(this.queueId);
ParDo.SingleOutput<Record, Void> of = ParDo.of(pushRecord);
Pipeline p = Pipeline.create(options);
p.apply(transform).apply(of);

return p.run();
final PipelineResult[] result = new PipelineResult[1];
th = new Thread(() -> {
result[0] = p.run();
});
this.th.start();
while (result[0] == null) {
sleep();
}
return result[0];
} finally {
Thread.currentThread().setContextClassLoader(callerClassLoader);
}
}

private void sleep() {
try {
Thread.sleep(100L);
Thread.sleep(30L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

static class PushRecord extends DoFn<Record, Void> implements Serializable {
static class MyDoFn extends DoFn<Record, Void> {

private final UUID queueId;

public MyDoFn(final UUID queueId) {
this.queueId = queueId;
}

@ProcessElement
public void processElement(final @Element Record record) {
boolean ok = QUEUE.offer(record);
public void processElement(final ProcessContext context) {
final Queue<Record> recordQueue = QUEUE.get(this.queueId);
boolean ok = recordQueue.offer(context.element());
log.debug("queue injected {}; ok={}; thread:{}", recordQueue.size(), ok, Thread.currentThread().getId());

while (!ok) {
if (QUEUE.size() >= CAPACITY) {
final String msg = String.format(
"Wrapper queue if full (capacity: %d). Consider increasing it according data with talend.beam.wrapper.capacity property.",
CAPACITY);
log.error("[processElement] {}", msg);
throw new IllegalStateException(msg);
}
sleep();
ok = QUEUE.offer(record);
ok = recordQueue.offer(context.element());
log.debug("\tqueue injected retry {}; ok={}; thread:{}", recordQueue.size(), ok,
Thread.currentThread().getId());
}
}

private void sleep() {
try {
Thread.sleep(100L);
Thread.sleep(20L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
@WithComponents("org.talend.sdk.component.junit.beam.test")
public class ProducerFinderEnvironmentTest implements Serializable {


@Injected
private BaseComponentsHandler handler;

Expand All @@ -80,6 +81,18 @@ public class ProducerFinderEnvironmentTest implements Serializable {
@Service
private RecordBuilderFactory factory;

/**
+ * arrayblocking queue capacity
+ * 7 fixed CAPACITY variable
+ * recordCount value
+ * 10 11sec 13sec
+ * 100 16sec 13sec
+ * 1000 57sec 16sec
+ * 10000 ~ 7min55 52sec
+ * 100000 1h34min 25min
+ */
private final Integer recordCount = 5000; // 10 100 1000 10000 100000

@BeforeAll
static void forceManagerInit() {
// manager for non environmental tests
Expand All @@ -93,19 +106,25 @@ static void forceManagerInit() {

@Test
void finderWithTacokitFamily() {
final Iterator<Record> recordIterator = getFinder(ComponentManager.instance(), "TckFamily");
final Iterator<Record> recordIterator = getFinder(ComponentManager.instance(), "TckFamily", recordCount);
recordIterator.forEachRemaining(Assertions::assertNotNull);
}

@Test
void finderWithBeamFamily() {
final Iterator<Record> recordIterator = getFinder(ComponentManager.instance(), "BeamFamily");
recordIterator.forEachRemaining(Assertions::assertNotNull);
final Iterator<Record> recordIterator = getFinder(ComponentManager.instance(), "BeamFamily", recordCount);
final int[] total = new int[1];
total[0] = 0;
recordIterator.forEachRemaining((Record rec) -> {
Assertions.assertNotNull(rec);
total[0]++;
});
Assertions.assertEquals(recordCount, total[0], "did not consume all records");
}

@EnvironmentalTest
void runPipelineBeam() {
Mapper mapper = manager.findMapper("BeamFamily", "from", 1, singletonMap("count", "10")).get();
Mapper mapper = manager.findMapper("BeamFamily", "from", 1, singletonMap("count", recordCount.toString())).get();
assertNotNull(mapper);
final Object delegate = Delegated.class.cast(mapper).getDelegate();
assertNotNull(delegate);
Expand All @@ -114,7 +133,7 @@ void runPipelineBeam() {

@EnvironmentalTest
void runPipelineTacokt() {
Mapper mapper = manager.findMapper("TckFamily", "from", 1, singletonMap("count", "10")).get();
Mapper mapper = manager.findMapper("TckFamily", "from", 1, singletonMap("count", recordCount.toString())).get();
assertNotNull(mapper);
runPipeline(TalendIO.read(mapper));
}
Expand All @@ -123,7 +142,7 @@ private void runPipeline(PTransform<PBegin, PCollection<Record>> transform) {
final Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
final PTransform<PBegin, PCollection<Record>> start = transform;
final PCollection<Record> out = pipeline.apply(start);
List<Record> records = IntStream.range(0, 10)
List<Record> records = IntStream.range(0, recordCount)
.mapToObj(i -> factory.newRecordBuilder()
.withString("id", "id_" + i)
.build())
Expand All @@ -142,19 +161,20 @@ void runJobWithBeamFamily() {
runJob(handler.asManager(), "BeamFamily");
}

private Iterator<Record> getFinder(final ComponentManager manager, final String family) {
private Iterator<Record> getFinder(final ComponentManager manager, final String family, final int expectedNumber) {
final Container container = manager.findPlugin("test-classes").get();
ProducerFinder finder = (ProducerFinder) container.get(AllServices.class)
.getServices()
.get(ProducerFinder.class);
assertNotNull(finder);
final Iterator<Record> recordIterator = finder.find(family, "from", 1, singletonMap("count", "10"));
final Iterator<Record> recordIterator = finder.find(family, "from", 1,
singletonMap("count", Integer.toString(expectedNumber)));
assertNotNull(recordIterator);
return recordIterator;
}

private void runJob(final ComponentManager manager, final String family) {
final Iterator<Record> recordIterator = getFinder(manager, family);
final Iterator<Record> recordIterator = getFinder(manager, family, recordCount);
assertNotNull(recordIterator);
handler.setInputData(toIterable(recordIterator));
Job
Expand All @@ -166,7 +186,7 @@ private void runJob(final ComponentManager manager, final String family) {
.to("output")
.build()
.run();
assertEquals(10, handler.getCollectedData(Record.class).size());
assertEquals(recordCount, handler.getCollectedData(Record.class).size());
}

static <T> Iterable<T> toIterable(Iterator<T> it) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.ServiceLoader;

import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -47,13 +48,22 @@ class BeamEnvironmentsTest {
@EnvironmentalTest
void execute() throws ClassNotFoundException {
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final String runner = ServiceLoader
Iterator<?> loadClasses = ServiceLoader
.load(classLoader.loadClass("org.apache.beam.sdk.runners.PipelineRunnerRegistrar"))
.iterator()
.next()
.iterator();
// take second runner if exist
Object runner = null;
if (loadClasses.hasNext()) {
runner = loadClasses.next(); // first
}
if (loadClasses.hasNext()) {
runner = loadClasses.next(); // second if exist
}

final String runnerName = runner
.getClass()
.getName();
EXECUTIONS.add(System.getProperty("BeamEnvironmentsTest") + "/" + runner);
EXECUTIONS.add(System.getProperty("BeamEnvironmentsTest") + "/" + runnerName);
}

@AfterAll
Expand Down

0 comments on commit d40ccfe

Please sign in to comment.