Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(TCOMP-2701): Endless runing job with lookup join #889

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -35,6 +34,7 @@
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.record.Schema;
import org.talend.sdk.component.api.service.source.ProducerFinder;
import org.talend.sdk.component.runtime.base.Delegated;
import org.talend.sdk.component.runtime.input.Input;
Expand All @@ -48,11 +48,24 @@
@Slf4j
public class BeamProducerFinder extends ProducerFinderImpl {

private static final int QUEUE_SIZE = 200;
private static final int QUEUE_SIZE = Integer.parseInt(System.getProperty("talend.beam.wrapper.capacity", "1000"));

private static final int BEAM_PARALLELISM = 10;

private static final Map<UUID, Queue<Record>> QUEUE = new ConcurrentHashMap<>();
private static final Map<UUID, ArrayBlockingQueue<Record>> QUEUE = new ConcurrentHashMap<>();

private static final Record END_OF_QUEUE = new Record() {

@Override
public Schema getSchema() {
return null;
}

@Override
public <T> T get(final Class<T> expectedType, final String name) {
return null;
}
};

@Override
public Iterator<Record> find(final String familyName, final String inputName, final int version,
Expand All @@ -67,8 +80,11 @@
final Object delegate = Delegated.class.cast(mapper).getDelegate();
if (PTransform.class.isInstance(delegate)) {
final UUID uuid = UUID.randomUUID();
QUEUE.put(uuid, new ArrayBlockingQueue<>(QUEUE_SIZE, true));
return new QueueInput(delegate, familyName, inputName, familyName, PTransform.class.cast(delegate),
ArrayBlockingQueue<Record> abq = new ArrayBlockingQueue<>(QUEUE_SIZE, true);
log.debug("Create and add blocking queue {}.", uuid);
QUEUE.put(uuid, abq);
return new BlockingQueueIterator(delegate, familyName, inputName, familyName,
PTransform.class.cast(delegate),
uuid);
}
throw new IllegalStateException(e);
Expand All @@ -79,27 +95,22 @@
return new SerializableService(plugin, ProducerFinder.class.getName());
}

static class QueueInput implements Iterator<Record>, Serializable {
static class BlockingQueueIterator implements Iterator<Record>, Serializable {

private final PTransform<PBegin, PCollection<Record>> transform;

private final PipelineResult result;

private boolean started;

private boolean end;

private Record next;

private final UUID queueId;

private Thread th;

public QueueInput(final Object delegate, final String rootName, final String name, final String plugin,
public BlockingQueueIterator(final Object delegate, final String rootName, final String name,

Check warning on line 108 in component-runtime-beam/src/main/java/org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.java

View check run for this annotation

sonar-eks / Component Runtime Sonarqube Results

component-runtime-beam/src/main/java/org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.java#L108

Remove these unused method parameters "delegate", "rootName", "name", "plugin".
final String plugin,
final PTransform<PBegin, PCollection<Record>> transform, final UUID queueId) {
this.transform = transform;
this.queueId = queueId;
result = runDataReadingPipeline();
runDataReadingPipeline();
}

@Override
Expand All @@ -109,6 +120,7 @@
started = true;
}
if (next == null) {
log.debug("Remove blocking queue {}.", this.queueId);
QUEUE.remove(this.queueId);
}
return next != null;
Expand All @@ -121,26 +133,27 @@
}
final Record current = next;
next = findNext();

return current;
}

private Record findNext() {
final Queue<Record> recordQueue = QUEUE.get(this.queueId);

Record record = recordQueue.poll();

int index = 0;
while (record == null && (!end)) {
end = result != null && result.getState() != PipelineResult.State.RUNNING;
if (!end && index > 10) {
result.waitUntilFinish();
} else {
index++;
log.debug("findNext NULL, retry : end={}; size:{}", end, recordQueue.size());
sleep();
final ArrayBlockingQueue<Record> recordQueue = QUEUE.get(this.queueId);

Record record = null;

Check warning on line 143 in component-runtime-beam/src/main/java/org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.java

View check run for this annotation

sonar-eks / Component Runtime Sonarqube Results

component-runtime-beam/src/main/java/org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.java#L143

Rename this variable to not match a restricted identifier.
try {
log.debug("Take next element from blocking queue: {}, thread: {}.", this.queueId,
Thread.currentThread().getId());
record = recordQueue.take();
if (record == END_OF_QUEUE) {
log.debug("END_OF_QUEUE reached for blocking queue {}, in thread: {}.", this.queueId,
Thread.currentThread().getId());
return null;
}
record = recordQueue.poll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return record;
}

Expand All @@ -166,7 +179,7 @@
* various TCK framework wrappers that are in place.</i>
* </p>
*/
private PipelineResult runDataReadingPipeline() {
private void runDataReadingPipeline() {
final ClassLoader beamAwareClassLoader = Pipeline.class.getClassLoader();
final ClassLoader callerClassLoader = Thread.currentThread().getContextClassLoader();

Expand All @@ -175,21 +188,26 @@
DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
options.setRunner(DirectRunner.class);
options.setTargetParallelism(BEAM_PARALLELISM);
options.setBlockOnRun(false);
options.setBlockOnRun(true);
MyDoFn pushRecord = new MyDoFn(this.queueId);
ParDo.SingleOutput<Record, Void> of = ParDo.of(pushRecord);
Pipeline p = Pipeline.create(options);
p.apply(transform).apply(of);

final PipelineResult[] result = new PipelineResult[1];
th = new Thread(() -> {
result[0] = p.run();
Thread th = new Thread(() -> {
log.debug("Start thread {} to produce elements in blocking queue {}.",
Thread.currentThread().getId(), this.queueId);
PipelineResult pipelineResult = p.run();
pipelineResult.waitUntilFinish();
try {
QUEUE.get(this.queueId).put(END_OF_QUEUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
this.th.start();
while (result[0] == null) {
sleep();
}
return result[0];

th.start();

} finally {
Thread.currentThread().setContextClassLoader(callerClassLoader);
}
Expand All @@ -214,21 +232,12 @@

@ProcessElement
public void processElement(final ProcessContext context) {
final Queue<Record> recordQueue = QUEUE.get(this.queueId);
boolean ok = recordQueue.offer(context.element());
log.debug("queue injected {}; ok={}; thread:{}", recordQueue.size(), ok, Thread.currentThread().getId());

while (!ok) {
sleep();
ok = recordQueue.offer(context.element());
log.debug("\tqueue injected retry {}; ok={}; thread:{}", recordQueue.size(), ok,
Thread.currentThread().getId());
}
}

private void sleep() {
final ArrayBlockingQueue<Record> recordQueue = QUEUE.get(this.queueId);
try {
Thread.sleep(20L);
log.debug("Add an element in blocking queue: {}, thread: {}.", this.queueId,
Thread.currentThread().getId());
Record record = context.element();

Check warning on line 239 in component-runtime-beam/src/main/java/org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.java

View check run for this annotation

sonar-eks / Component Runtime Sonarqube Results

component-runtime-beam/src/main/java/org/talend/sdk/component/runtime/beam/spi/BeamProducerFinder.java#L239

Rename this variable to not match a restricted identifier.
recordQueue.put(record);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
Loading
Loading