diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index de1c64bd0e1..c95c9d5461c 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -259,9 +259,12 @@ def register_plugins(plugins) def start_workers @worker_threads.clear # In case we're restarting the pipeline - @outputs_registered.make_false + # @outputs_registered.make_false begin - maybe_setup_out_plugins + worker_stage = lir_execution.worker_stage + + register_plugins(worker_stage.outputs) + register_plugins(worker_stage.filters) pipeline_workers = safe_pipeline_worker_count @preserve_event_order = preserve_event_order?(pipeline_workers) @@ -297,7 +300,7 @@ def start_workers workers_init_start = Time.now worker_loops = pipeline_workers.times - .map { Thread.new { init_worker_loop } } + .map { Thread.new { init_worker_loop(worker_stage) } } .map(&:value) workers_init_elapsed = Time.now - workers_init_start @@ -585,11 +588,11 @@ def close_plugin_and_ignore(plugin) end # @return [WorkerLoop] a new WorkerLoop instance or nil upon construction exception - def init_worker_loop + def init_worker_loop(worker_stage) begin org.logstash.execution.WorkerLoop.new( filter_queue_client, # QueueReadClient - lir_execution, # CompiledPipeline + worker_stage, # CompiledPipeline.WorkerStage @worker_observer, # WorkerObserver # pipeline reporter counters @events_consumed, # LongAdder diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index e851e0fa899..bf1bc8b0d73 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -54,6 +54,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -73,20 +76,18 @@ public final class CompiledPipeline { */ private final EventCondition.Compiler conditionalCompiler = new EventCondition.Compiler(); - /** - * Configured inputs. - */ - private final Collection inputs; + private final InputStage inputStage; - /** - * Configured Filters, indexed by their ID as returned by {@link PluginVertex#getId()}. - */ - private final Map filters; + private final WorkerStage workerStage; - /** - * Configured outputs. - */ - private final Map outputs; + private final Lock workerStageReadLock; + private final Lock workerStageWriteLock; + + { + final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + workerStageReadLock = readWriteLock.readLock(); + workerStageWriteLock = readWriteLock.writeLock(); + } /** * Parsed pipeline configuration graph. @@ -111,6 +112,50 @@ public void notify(ConditionalEvaluationError err) { } } + public class InputStage { + final Collection inputs; + + InputStage(final ConfigVariableExpander configVariableExpander) { + try { + this.inputs = setupInputs(configVariableExpander); + } catch (final Exception e) { + throw new IllegalStateException("Unable to configure plugins for input stage: " + e.getMessage(), e); + } + } + } + + public class WorkerStage { + final Map filters; + final Map outputs; + + WorkerStage(final ConfigVariableExpander configVariableExpander) { + try { + this.filters = Map.copyOf(setupFilters(configVariableExpander)); + this.outputs = Map.copyOf(setupOutputs(configVariableExpander)); + } catch (final Exception e) { + throw new IllegalStateException("Unable to configure plugins for worker stage: " + e.getMessage(), e); + } + } + + public Collection getOutputs() { + return outputs.values(); + } + + public Collection getFilters() { + return filters.values(); + } + + public CompiledExecution buildExecution() { + return buildExecution(false); + } + + public CompiledExecution buildExecution(final boolean orderedExecution) { + return orderedExecution + ? new CompiledPipeline.CompiledOrderedExecution(this) + : new CompiledPipeline.CompiledUnorderedExecution(this); + } + } + public CompiledPipeline( final PipelineIR pipelineIR, final RubyIntegration.PluginFactory pluginFactory) @@ -130,24 +175,36 @@ public CompiledPipeline( try (ConfigVariableExpander cve = new ConfigVariableExpander( secretStore, EnvironmentVariableProvider.defaultProvider())) { - inputs = setupInputs(cve); - filters = setupFilters(cve); - outputs = setupOutputs(cve); + this.inputStage = new InputStage(cve); + this.workerStage = withLock(workerStageWriteLock, () -> new WorkerStage(cve)); } catch (Exception e) { throw new IllegalStateException("Unable to configure plugins: " + e.getMessage(), e); } } + public WorkerStage workerStage() { + return withLock(workerStageReadLock, () -> workerStage); + } + + static T withLock(final Lock lock, final Supplier supplier) { + lock.lock(); + try { + return supplier.get(); + } finally { + lock.unlock(); + } + } + public Collection outputs() { - return Collections.unmodifiableCollection(outputs.values()); + return withLock(workerStageReadLock, () -> Collections.unmodifiableCollection(this.workerStage.outputs.values())); } public Collection filters() { - return Collections.unmodifiableCollection(filters.values()); + return withLock(workerStageReadLock, () -> Collections.unmodifiableCollection(this.workerStage.filters.values())); } public Collection inputs() { - return Collections.unmodifiableCollection(inputs); + return Collections.unmodifiableCollection(this.inputStage.inputs); } /** @@ -156,6 +213,7 @@ public Collection inputs() { * unordered execution model. * @return CompiledPipeline.CompiledExecution the compiled pipeline */ + @Deprecated // use WorkerStage#buildExecution public CompiledPipeline.CompiledExecution buildExecution() { return buildExecution(false); } @@ -167,10 +225,11 @@ public CompiledPipeline.CompiledExecution buildExecution() { * @param orderedExecution determines whether to build an execution that enforces order or not * @return CompiledPipeline.CompiledExecution the compiled pipeline */ + @Deprecated // use WorkerStage#buildExecution public CompiledPipeline.CompiledExecution buildExecution(boolean orderedExecution) { return orderedExecution - ? new CompiledPipeline.CompiledOrderedExecution() - : new CompiledPipeline.CompiledUnorderedExecution(); + ? new CompiledPipeline.CompiledOrderedExecution(this.workerStage) + : new CompiledPipeline.CompiledUnorderedExecution(this.workerStage); } /** @@ -296,28 +355,14 @@ public static Object expandConfigVariableKeepingSecrets(ConfigVariableExpander c return expandConfigVariable(cve, valueToExpand, true); } - /** - * Checks if a certain {@link Vertex} represents a {@link AbstractFilterDelegatorExt}. - * @param vertex Vertex to check - * @return True iff {@link Vertex} represents a {@link AbstractFilterDelegatorExt} - */ - private boolean isFilter(final Vertex vertex) { - return filters.containsKey(vertex.getId()); - } - - /** - * Checks if a certain {@link Vertex} represents an output. - * @param vertex Vertex to check - * @return True iff {@link Vertex} represents an output - */ - private boolean isOutput(final Vertex vertex) { - return outputs.containsKey(vertex.getId()); - } - public final class CompiledOrderedExecution extends CompiledExecution { @SuppressWarnings({"unchecked"}) private final RubyArray EMPTY_ARRAY = RubyUtil.RUBY.newEmptyArray(); + CompiledOrderedExecution(WorkerStage workerStage) { + super(workerStage); + } + @Override public int compute(final QueueBatch batch, final boolean flush, final boolean shutdown) { return compute(batch.events(), flush, shutdown); @@ -353,6 +398,10 @@ private void _compute(final RubyArray batch, final RubyArray { */ private final Map plugins = new HashMap<>(50); + private final WorkerStage workerStage; + protected final Dataset compiledFilters; protected final Dataset compiledOutputs; - CompiledExecution() { + CompiledExecution(final WorkerStage workerStage) { + this.workerStage = workerStage; compiledFilters = compileFilters(); compiledOutputs = compileOutputs(); } @@ -425,7 +477,7 @@ private Dataset compileFilters() { */ private Dataset compileOutputs() { final Collection outputNodes = pipelineIR.getGraph() - .allLeaves().filter(CompiledPipeline.this::isOutput) + .allLeaves().filter(this::isOutput) .collect(Collectors.toList()); if (outputNodes.isEmpty()) { return Dataset.IDENTITY; @@ -450,7 +502,7 @@ private Dataset filterDataset(final Vertex vertex, final Collection dat final ComputeStepSyntaxElement prepared = DatasetCompiler.filterDataset( flatten(datasets, vertex), - filters.get(vertexId) + workerStage.filters.get(vertexId) ); plugins.put(vertexId, prepared.instantiate()); @@ -473,8 +525,8 @@ private Dataset outputDataset(final Vertex vertex, final Collection dat final ComputeStepSyntaxElement prepared = DatasetCompiler.outputDataset( flatten(datasets, vertex), - outputs.get(vertexId), - outputs.size() == 1 + workerStage.outputs.get(vertexId), + workerStage.outputs.size() == 1 ); plugins.put(vertexId, prepared.instantiate()); @@ -576,5 +628,23 @@ private Collection compileDependencies( } ).collect(Collectors.toList()); } + + /** + * Checks if a certain {@link Vertex} represents a {@link AbstractFilterDelegatorExt}. + * @param vertex Vertex to check + * @return True iff {@link Vertex} represents a {@link AbstractFilterDelegatorExt} + */ + private boolean isFilter(final Vertex vertex) { + return this.workerStage.filters.containsKey(vertex.getId()); + } + + /** + * Checks if a certain {@link Vertex} represents an output. + * @param vertex Vertex to check + * @return True iff {@link Vertex} represents an output + */ + private boolean isOutput(final Vertex vertex) { + return this.workerStage.outputs.containsKey(vertex.getId()); + } } } diff --git a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java index 9d849e42da4..0f052c65c04 100644 --- a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java +++ b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java @@ -52,7 +52,7 @@ public final class WorkerLoop implements Runnable { public WorkerLoop( final QueueReadClient readClient, - final CompiledPipeline compiledPipeline, + final CompiledPipeline.WorkerStage compiledPipeline, final WorkerObserver workerObserver, final LongAdder consumedCounter, final LongAdder filteredCounter, diff --git a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java index 3aa9458979e..bcf3fa8585d 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java @@ -136,7 +136,7 @@ public void buildsTrivialPipeline() throws Exception { Collections.emptyMap(), Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).workerStage().buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -159,7 +159,7 @@ public void buildsStraightPipeline() throws Exception { Collections.singletonMap("mockfilter", () -> IDENTITY_FILTER), Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).workerStage().buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -192,7 +192,7 @@ public void buildsForkedPipeline() throws Exception { filters, Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).workerStage().buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -314,7 +314,7 @@ private void verifyRegex(String operator, int expectedEvents) Collections.singletonMap("mockaddfilter", () -> null), Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution() + ).workerStage().buildExecution() .compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(expectedEvents)); @@ -348,7 +348,7 @@ public void equalityCheckOnCompositeField() throws Exception { filters, Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).workerStage().buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -374,7 +374,7 @@ public void conditionalWithNullField() throws Exception { filters, Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).workerStage().buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -401,7 +401,7 @@ public void conditionalNestedMetaFieldPipeline() throws Exception { filters, Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).workerStage().buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -436,7 +436,7 @@ public void moreThan255Parents() throws Exception { filters, Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); + ).workerStage().buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); MatcherAssert.assertThat(outputEvents.contains(testEvent), CoreMatchers.is(true)); @@ -484,7 +484,7 @@ private void verifyComparison(final boolean expected, final String conditional, Collections.singletonMap("mockaddfilter", () -> ADD_FIELD_FILTER), Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution() + ).workerStage().buildExecution() .compute(RubyUtil.RUBY.newArray(testEvent), false, false); final Collection outputEvents = EVENT_SINKS.get(runId); MatcherAssert.assertThat(outputEvents.size(), CoreMatchers.is(1)); @@ -591,9 +591,9 @@ public void testCacheCompiledClassesWithDifferentId() throws IOException, Invali // actual test: compiling a pipeline with an extra filter should only create 1 extra class ComputeStepSyntaxElement.cleanClassCache(); - cBaselinePipeline.buildExecution(); + cBaselinePipeline.workerStage().buildExecution(); final int cachedBefore = ComputeStepSyntaxElement.classCacheSize(); - cPipelineWithDifferentId.buildExecution(); + cPipelineWithDifferentId.workerStage().buildExecution(); final int cachedAfter = ComputeStepSyntaxElement.classCacheSize(); final String message = String.format("unexpected cache size, cachedAfter: %d, cachedBefore: %d", cachedAfter, cachedBefore); @@ -627,9 +627,9 @@ public void testReuseCompiledClasses() throws IOException, InvalidIRException { // test: compiling a much bigger pipeline and asserting no additional classes are generated ComputeStepSyntaxElement.cleanClassCache(); - cBaselinePipeline.buildExecution(); + cBaselinePipeline.workerStage().buildExecution(); final int cachedBefore = ComputeStepSyntaxElement.classCacheSize(); - cPipelineTwiceAsBig.buildExecution(); + cPipelineTwiceAsBig.workerStage().buildExecution(); final int cachedAfter = ComputeStepSyntaxElement.classCacheSize(); final String message = String.format("unexpected cache size, cachedAfter: %d, cachedBefore: %d", cachedAfter, cachedBefore); @@ -654,12 +654,12 @@ public void compilerBenchmark() throws Exception { final CompiledPipeline testCompiledPipeline = new CompiledPipeline(testPipelineIR, pluginFactory); final long compilationBaseline = time(ChronoUnit.MILLIS, () -> { - final CompiledPipeline.CompiledExecution compiledExecution = baselineCompiledPipeline.buildExecution(); + final CompiledPipeline.CompiledExecution compiledExecution = baselineCompiledPipeline.workerStage().buildExecution(); compiledExecution.compute(RubyUtil.RUBY.newArray(testEvent), false, false); }); final long compilationTest = time(ChronoUnit.MILLIS, () -> { - final CompiledPipeline.CompiledExecution compiledExecution = testCompiledPipeline.buildExecution(); + final CompiledPipeline.CompiledExecution compiledExecution = testCompiledPipeline.workerStage().buildExecution(); compiledExecution.compute(RubyUtil.RUBY.newArray(testEvent), false, false); }); diff --git a/logstash-core/src/test/java/org/logstash/config/ir/EventConditionTest.java b/logstash-core/src/test/java/org/logstash/config/ir/EventConditionTest.java index c7130b7e430..911a58001bd 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/EventConditionTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/EventConditionTest.java @@ -135,7 +135,7 @@ public void testInclusionWithFieldInField() throws Exception { Collections.singletonMap("mockfilter", () -> IDENTITY_FILTER), Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(inputBatch, false, false); + ).workerStage().buildExecution().compute(inputBatch, false, false); final RubyEvent[] outputEvents = EVENT_SINKS.get(runId).toArray(new RubyEvent[0]); assertThat(outputEvents.length, is(3)); @@ -179,6 +179,7 @@ private void testConditionWithConstantValue(String condition, int expectedMatche Collections.singletonMap("mockfilter", () -> IDENTITY_FILTER), Collections.singletonMap("mockoutput", mockOutputSupplier()) )) + .workerStage() .buildExecution() .compute(RubyUtil.RUBY.newArray(RubyEvent.newRubyEvent(RubyUtil.RUBY)), false, false); @@ -224,7 +225,7 @@ public void testConditionWithSecretStoreVariable() throws InvalidIRException { Collections.emptyMap(), // no filters Collections.singletonMap("mockoutput", mockOutputSupplier()) ) - ).buildExecution().compute(inputBatch, false, false); + ).workerStage().buildExecution().compute(inputBatch, false, false); final RubyEvent[] outputEvents = EVENT_SINKS.get(runId).toArray(new RubyEvent[0]); assertThat(outputEvents.length, is(1));