Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,12 @@ def register_plugins(plugins)

def start_workers
@worker_threads.clear # In case we're restarting the pipeline
@outputs_registered.make_false
# @outputs_registered.make_false
begin
maybe_setup_out_plugins
worker_stage = lir_execution.worker_stage

register_plugins(worker_stage.outputs)
register_plugins(worker_stage.filters)

pipeline_workers = safe_pipeline_worker_count
@preserve_event_order = preserve_event_order?(pipeline_workers)
Expand Down Expand Up @@ -297,7 +300,7 @@ def start_workers

workers_init_start = Time.now
worker_loops = pipeline_workers.times
.map { Thread.new { init_worker_loop } }
.map { Thread.new { init_worker_loop(worker_stage) } }
.map(&:value)
workers_init_elapsed = Time.now - workers_init_start

Expand Down Expand Up @@ -585,11 +588,11 @@ def close_plugin_and_ignore(plugin)
end

# @return [WorkerLoop] a new WorkerLoop instance or nil upon construction exception
def init_worker_loop
def init_worker_loop(worker_stage)
begin
org.logstash.execution.WorkerLoop.new(
filter_queue_client, # QueueReadClient
lir_execution, # CompiledPipeline
worker_stage, # CompiledPipeline.WorkerStage
@worker_observer, # WorkerObserver
# pipeline reporter counters
@events_consumed, # LongAdder
Expand Down
156 changes: 113 additions & 43 deletions logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -73,20 +76,18 @@ public final class CompiledPipeline {
*/
private final EventCondition.Compiler conditionalCompiler = new EventCondition.Compiler();

/**
* Configured inputs.
*/
private final Collection<IRubyObject> inputs;
private final InputStage inputStage;

/**
* Configured Filters, indexed by their ID as returned by {@link PluginVertex#getId()}.
*/
private final Map<String, AbstractFilterDelegatorExt> filters;
private final WorkerStage workerStage;

/**
* Configured outputs.
*/
private final Map<String, AbstractOutputDelegatorExt> outputs;
private final Lock workerStageReadLock;
private final Lock workerStageWriteLock;

{
final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
workerStageReadLock = readWriteLock.readLock();
workerStageWriteLock = readWriteLock.writeLock();
}

/**
* Parsed pipeline configuration graph.
Expand All @@ -111,6 +112,50 @@ public void notify(ConditionalEvaluationError err) {
}
}

public class InputStage {
final Collection<IRubyObject> inputs;

InputStage(final ConfigVariableExpander configVariableExpander) {
try {
this.inputs = setupInputs(configVariableExpander);
} catch (final Exception e) {
throw new IllegalStateException("Unable to configure plugins for input stage: " + e.getMessage(), e);
}
}
}

public class WorkerStage {
final Map<String, AbstractFilterDelegatorExt> filters;
final Map<String, AbstractOutputDelegatorExt> outputs;

WorkerStage(final ConfigVariableExpander configVariableExpander) {
try {
this.filters = Map.copyOf(setupFilters(configVariableExpander));
this.outputs = Map.copyOf(setupOutputs(configVariableExpander));
} catch (final Exception e) {
throw new IllegalStateException("Unable to configure plugins for worker stage: " + e.getMessage(), e);
}
}

public Collection<AbstractOutputDelegatorExt> getOutputs() {
return outputs.values();
}

public Collection<AbstractFilterDelegatorExt> getFilters() {
return filters.values();
}

public CompiledExecution buildExecution() {
return buildExecution(false);
}

public CompiledExecution buildExecution(final boolean orderedExecution) {
return orderedExecution
? new CompiledPipeline.CompiledOrderedExecution(this)
: new CompiledPipeline.CompiledUnorderedExecution(this);
}
}

public CompiledPipeline(
final PipelineIR pipelineIR,
final RubyIntegration.PluginFactory pluginFactory)
Expand All @@ -130,24 +175,36 @@ public CompiledPipeline(
try (ConfigVariableExpander cve = new ConfigVariableExpander(
secretStore,
EnvironmentVariableProvider.defaultProvider())) {
inputs = setupInputs(cve);
filters = setupFilters(cve);
outputs = setupOutputs(cve);
this.inputStage = new InputStage(cve);
this.workerStage = withLock(workerStageWriteLock, () -> new WorkerStage(cve));
} catch (Exception e) {
throw new IllegalStateException("Unable to configure plugins: " + e.getMessage(), e);
}
}

public WorkerStage workerStage() {
return withLock(workerStageReadLock, () -> workerStage);
}

static <T> T withLock(final Lock lock, final Supplier<T> supplier) {
lock.lock();
try {
return supplier.get();
} finally {
lock.unlock();
}
}

public Collection<AbstractOutputDelegatorExt> outputs() {
return Collections.unmodifiableCollection(outputs.values());
return withLock(workerStageReadLock, () -> Collections.unmodifiableCollection(this.workerStage.outputs.values()));
}

public Collection<AbstractFilterDelegatorExt> filters() {
return Collections.unmodifiableCollection(filters.values());
return withLock(workerStageReadLock, () -> Collections.unmodifiableCollection(this.workerStage.filters.values()));
}

public Collection<IRubyObject> inputs() {
return Collections.unmodifiableCollection(inputs);
return Collections.unmodifiableCollection(this.inputStage.inputs);
}

/**
Expand All @@ -156,6 +213,7 @@ public Collection<IRubyObject> inputs() {
* unordered execution model.
* @return CompiledPipeline.CompiledExecution the compiled pipeline
*/
@Deprecated // use WorkerStage#buildExecution
public CompiledPipeline.CompiledExecution buildExecution() {
return buildExecution(false);
}
Expand All @@ -167,10 +225,11 @@ public CompiledPipeline.CompiledExecution buildExecution() {
* @param orderedExecution determines whether to build an execution that enforces order or not
* @return CompiledPipeline.CompiledExecution the compiled pipeline
*/
@Deprecated // use WorkerStage#buildExecution
public CompiledPipeline.CompiledExecution buildExecution(boolean orderedExecution) {
return orderedExecution
? new CompiledPipeline.CompiledOrderedExecution()
: new CompiledPipeline.CompiledUnorderedExecution();
? new CompiledPipeline.CompiledOrderedExecution(this.workerStage)
: new CompiledPipeline.CompiledUnorderedExecution(this.workerStage);
}

/**
Expand Down Expand Up @@ -296,28 +355,14 @@ public static Object expandConfigVariableKeepingSecrets(ConfigVariableExpander c
return expandConfigVariable(cve, valueToExpand, true);
}

/**
* Checks if a certain {@link Vertex} represents a {@link AbstractFilterDelegatorExt}.
* @param vertex Vertex to check
* @return True iff {@link Vertex} represents a {@link AbstractFilterDelegatorExt}
*/
private boolean isFilter(final Vertex vertex) {
return filters.containsKey(vertex.getId());
}

/**
* Checks if a certain {@link Vertex} represents an output.
* @param vertex Vertex to check
* @return True iff {@link Vertex} represents an output
*/
private boolean isOutput(final Vertex vertex) {
return outputs.containsKey(vertex.getId());
}

public final class CompiledOrderedExecution extends CompiledExecution {

@SuppressWarnings({"unchecked"}) private final RubyArray<RubyEvent> EMPTY_ARRAY = RubyUtil.RUBY.newEmptyArray();

CompiledOrderedExecution(WorkerStage workerStage) {
super(workerStage);
}

@Override
public int compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
return compute(batch.events(), flush, shutdown);
Expand Down Expand Up @@ -353,6 +398,10 @@ private void _compute(final RubyArray<RubyEvent> batch, final RubyArray<RubyEven

public final class CompiledUnorderedExecution extends CompiledExecution {

CompiledUnorderedExecution(WorkerStage workerStage) {
super(workerStage);
}

@Override
public int compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
return compute(batch.events(), flush, shutdown);
Expand Down Expand Up @@ -396,10 +445,13 @@ public abstract class CompiledExecution implements Execution<QueueBatch> {
*/
private final Map<String, Dataset> plugins = new HashMap<>(50);

private final WorkerStage workerStage;

protected final Dataset compiledFilters;
protected final Dataset compiledOutputs;

CompiledExecution() {
CompiledExecution(final WorkerStage workerStage) {
this.workerStage = workerStage;
compiledFilters = compileFilters();
compiledOutputs = compileOutputs();
}
Expand All @@ -425,7 +477,7 @@ private Dataset compileFilters() {
*/
private Dataset compileOutputs() {
final Collection<Vertex> outputNodes = pipelineIR.getGraph()
.allLeaves().filter(CompiledPipeline.this::isOutput)
.allLeaves().filter(this::isOutput)
.collect(Collectors.toList());
if (outputNodes.isEmpty()) {
return Dataset.IDENTITY;
Expand All @@ -450,7 +502,7 @@ private Dataset filterDataset(final Vertex vertex, final Collection<Dataset> dat
final ComputeStepSyntaxElement<Dataset> prepared =
DatasetCompiler.filterDataset(
flatten(datasets, vertex),
filters.get(vertexId)
workerStage.filters.get(vertexId)
);

plugins.put(vertexId, prepared.instantiate());
Expand All @@ -473,8 +525,8 @@ private Dataset outputDataset(final Vertex vertex, final Collection<Dataset> dat
final ComputeStepSyntaxElement<Dataset> prepared =
DatasetCompiler.outputDataset(
flatten(datasets, vertex),
outputs.get(vertexId),
outputs.size() == 1
workerStage.outputs.get(vertexId),
workerStage.outputs.size() == 1
);

plugins.put(vertexId, prepared.instantiate());
Expand Down Expand Up @@ -576,5 +628,23 @@ private Collection<Dataset> compileDependencies(
}
).collect(Collectors.toList());
}

/**
* Checks if a certain {@link Vertex} represents a {@link AbstractFilterDelegatorExt}.
* @param vertex Vertex to check
* @return True iff {@link Vertex} represents a {@link AbstractFilterDelegatorExt}
*/
private boolean isFilter(final Vertex vertex) {
return this.workerStage.filters.containsKey(vertex.getId());
}

/**
* Checks if a certain {@link Vertex} represents an output.
* @param vertex Vertex to check
* @return True iff {@link Vertex} represents an output
*/
private boolean isOutput(final Vertex vertex) {
return this.workerStage.outputs.containsKey(vertex.getId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public final class WorkerLoop implements Runnable {

public WorkerLoop(
final QueueReadClient readClient,
final CompiledPipeline compiledPipeline,
final CompiledPipeline.WorkerStage compiledPipeline,
final WorkerObserver workerObserver,
final LongAdder consumedCounter,
final LongAdder filteredCounter,
Expand Down
Loading