Skip to content

Commit f25c5e4

Browse files
committed
Move ConversantMedia disruptor support to new module (#2914)
Moves `DisruptorBlockingQueue` to a separate module. Includes peer review (cf. #2914).
1 parent 5a5224e commit f25c5e4

File tree

33 files changed

+702
-121
lines changed

33 files changed

+702
-121
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
This file is here to activate the `plugin-processing` Maven profile.

log4j-conversant/pom.xml

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to you under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<modelVersion>4.0.0</modelVersion>
20+
<parent>
21+
<groupId>org.apache.logging.log4j</groupId>
22+
<artifactId>log4j</artifactId>
23+
<version>${revision}</version>
24+
<relativePath>../log4j-parent</relativePath>
25+
</parent>
26+
27+
<artifactId>log4j-conversant</artifactId>
28+
<name>Apache Log4j Conversant Disruptor-based supplements</name>
29+
<description>Provides ConversantMedia Disruptor-based data structure implementations for the Apache Log4j.</description>
30+
31+
<properties>
32+
<conversant.disruptor.version>1.2.21</conversant.disruptor.version>
33+
</properties>
34+
35+
<dependencyManagement>
36+
<dependencies>
37+
38+
<dependency>
39+
<groupId>com.conversantmedia</groupId>
40+
<artifactId>disruptor</artifactId>
41+
<version>${conversant.disruptor.version}</version>
42+
</dependency>
43+
44+
</dependencies>
45+
</dependencyManagement>
46+
47+
<dependencies>
48+
49+
<dependency>
50+
<groupId>org.jspecify</groupId>
51+
<artifactId>jspecify</artifactId>
52+
<scope>provided</scope>
53+
</dependency>
54+
55+
<dependency>
56+
<groupId>com.conversantmedia</groupId>
57+
<artifactId>disruptor</artifactId>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>org.apache.logging.log4j</groupId>
62+
<artifactId>log4j-core</artifactId>
63+
</dependency>
64+
65+
<dependency>
66+
<groupId>org.apache.logging.log4j</groupId>
67+
<artifactId>log4j-plugins</artifactId>
68+
</dependency>
69+
70+
<dependency>
71+
<groupId>org.assertj</groupId>
72+
<artifactId>assertj-core</artifactId>
73+
<scope>test</scope>
74+
</dependency>
75+
76+
<dependency>
77+
<groupId>org.junit.jupiter</groupId>
78+
<artifactId>junit-jupiter-api</artifactId>
79+
<scope>test</scope>
80+
</dependency>
81+
82+
<dependency>
83+
<groupId>org.apache.logging.log4j</groupId>
84+
<artifactId>log4j-core-test</artifactId>
85+
<scope>test</scope>
86+
</dependency>
87+
88+
</dependencies>
89+
90+
</project>

log4j-core/src/main/java/org/apache/logging/log4j/core/async/DisruptorBlockingQueueFactory.java log4j-conversant/src/main/java/org/apache/logging/log4j/conversant/DisruptorBlockingQueueFactory.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,25 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.logging.log4j.core.async;
17+
package org.apache.logging.log4j.conversant;
1818

1919
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
2020
import com.conversantmedia.util.concurrent.SpinPolicy;
2121
import java.util.concurrent.BlockingQueue;
22+
import org.apache.logging.log4j.core.async.BlockingQueueFactory;
2223
import org.apache.logging.log4j.plugins.Configurable;
2324
import org.apache.logging.log4j.plugins.Plugin;
2425
import org.apache.logging.log4j.plugins.PluginAttribute;
2526
import org.apache.logging.log4j.plugins.PluginFactory;
2627

2728
/**
28-
* Factory for creating instances of {@link DisruptorBlockingQueue}.
29+
* A {@link BlockingQueueFactory} based on <a href="https://github.com/conversant/disruptor">Conversant Disruptor BlockingQueue</a>.
2930
*
30-
* @since 2.7
31+
* @since 3.0.0
3132
*/
3233
@Configurable(elementType = BlockingQueueFactory.ELEMENT_TYPE, printObject = true)
3334
@Plugin("DisruptorBlockingQueue")
34-
public class DisruptorBlockingQueueFactory implements BlockingQueueFactory {
35+
public final class DisruptorBlockingQueueFactory implements BlockingQueueFactory {
3536

3637
private final SpinPolicy spinPolicy;
3738

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.logging.log4j.conversant;
18+
19+
import static java.util.Objects.requireNonNull;
20+
21+
import aQute.bnd.annotation.spi.ServiceProvider;
22+
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
23+
import com.conversantmedia.util.concurrent.SpinPolicy;
24+
import java.util.Queue;
25+
import java.util.function.Consumer;
26+
import java.util.function.Supplier;
27+
import org.apache.logging.log4j.Logger;
28+
import org.apache.logging.log4j.core.util.Integers;
29+
import org.apache.logging.log4j.kit.env.PropertyEnvironment;
30+
import org.apache.logging.log4j.kit.recycler.Recycler;
31+
import org.apache.logging.log4j.kit.recycler.RecyclerFactory;
32+
import org.apache.logging.log4j.kit.recycler.RecyclerFactoryProvider;
33+
import org.apache.logging.log4j.kit.recycler.RecyclerProperties;
34+
import org.apache.logging.log4j.kit.recycler.support.AbstractRecycler;
35+
import org.apache.logging.log4j.status.StatusLogger;
36+
37+
/**
38+
* A {@link Recycler} factory provider implementation based on
39+
* <a href="https://github.com/conversant/disruptor>Conversant's Disruptor BlockingQueue</a>.
40+
*
41+
* @since 3.0.0
42+
*/
43+
@ServiceProvider(value = RecyclerFactoryProvider.class)
44+
public final class DisruptorRecyclerFactoryProvider implements RecyclerFactoryProvider {
45+
46+
private static final Logger LOGGER = StatusLogger.getLogger();
47+
48+
@Override
49+
public int getOrder() {
50+
return 600;
51+
}
52+
53+
@Override
54+
public String getName() {
55+
return "conversant-disruptor";
56+
}
57+
58+
@Override
59+
public RecyclerFactory createForEnvironment(final PropertyEnvironment environment) {
60+
requireNonNull(environment, "environment");
61+
return new DisruptorRecyclerFactory(
62+
environment.getProperty(RecyclerProperties.class),
63+
environment.getProperty(DisruptorRecyclerProperties.class));
64+
}
65+
66+
private static final class DisruptorRecyclerFactory implements RecyclerFactory {
67+
68+
/**
69+
* Minimum capacity of the disruptor
70+
*/
71+
private static final int MIN_CAPACITY = 8;
72+
73+
private final int capacity;
74+
private final SpinPolicy spinPolicy;
75+
76+
private DisruptorRecyclerFactory(
77+
final RecyclerProperties recyclerProperties, final DisruptorRecyclerProperties disruptorProperties) {
78+
this.capacity = validateCapacity(recyclerProperties.capacity());
79+
this.spinPolicy = disruptorProperties.spinPolicy();
80+
}
81+
82+
@Override
83+
public <V> Recycler<V> create(final Supplier<V> supplier, final Consumer<V> cleaner) {
84+
requireNonNull(supplier, "supplier");
85+
requireNonNull(cleaner, "cleaner");
86+
final DisruptorBlockingQueue<V> queue = new DisruptorBlockingQueue<>(capacity, spinPolicy);
87+
return new DisruptorRecycler<>(supplier, cleaner, queue);
88+
}
89+
90+
private static Integer validateCapacity(final int capacity) {
91+
if (capacity < MIN_CAPACITY) {
92+
LOGGER.warn(
93+
"Invalid DisruptorBlockingQueue capacity {}, using minimum size {}.", capacity, MIN_CAPACITY);
94+
return MIN_CAPACITY;
95+
}
96+
final int roundedCapacity = Integers.ceilingNextPowerOfTwo(capacity);
97+
if (capacity != roundedCapacity) {
98+
LOGGER.warn(
99+
"Invalid DisruptorBlockingQueue size {}, using rounded size {}.", capacity, roundedCapacity);
100+
}
101+
return roundedCapacity;
102+
}
103+
104+
private static final class DisruptorRecycler<V> extends AbstractRecycler<V> {
105+
106+
private final Consumer<V> cleaner;
107+
108+
private final Queue<V> queue;
109+
110+
private DisruptorRecycler(final Supplier<V> supplier, final Consumer<V> cleaner, final Queue<V> queue) {
111+
super(supplier);
112+
this.cleaner = cleaner;
113+
this.queue = queue;
114+
}
115+
116+
@Override
117+
public V acquire() {
118+
final V value = queue.poll();
119+
return value != null ? value : createInstance();
120+
}
121+
122+
@Override
123+
public void release(final V value) {
124+
requireNonNull(value, "value");
125+
cleaner.accept(value);
126+
queue.offer(value);
127+
}
128+
}
129+
}
130+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.logging.log4j.conversant;
18+
19+
import com.conversantmedia.util.concurrent.SpinPolicy;
20+
import org.apache.logging.log4j.kit.env.Log4jProperty;
21+
import org.jspecify.annotations.NullMarked;
22+
23+
@NullMarked
24+
@Log4jProperty(name = "recycler.conversant")
25+
public record DisruptorRecyclerProperties(@Log4jProperty(defaultValue = "WAITING") SpinPolicy spinPolicy) {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
@Export
18+
@Version("3.0.0")
19+
package org.apache.logging.log4j.conversant;
20+
21+
import org.osgi.annotation.bundle.Export;
22+
import org.osgi.annotation.versioning.Version;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.logging.log4j.conversant.test;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertNotNull;
23+
import static org.junit.jupiter.api.Assertions.assertTrue;
24+
25+
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
26+
import java.lang.reflect.Field;
27+
import java.util.List;
28+
import java.util.concurrent.BlockingQueue;
29+
import java.util.concurrent.TimeUnit;
30+
import org.apache.logging.log4j.LoggingException;
31+
import org.apache.logging.log4j.core.LoggerContext;
32+
import org.apache.logging.log4j.core.appender.AsyncAppender;
33+
import org.apache.logging.log4j.core.test.appender.ListAppender;
34+
import org.apache.logging.log4j.core.test.junit.LoggerContextSource;
35+
import org.apache.logging.log4j.spi.ExtendedLogger;
36+
import org.junit.jupiter.api.Test;
37+
38+
class DisruptorBlockingQueueFactoryTest {
39+
40+
private static void exceptionTest(final LoggerContext context) throws InterruptedException {
41+
final ExtendedLogger logger = context.getLogger(AsyncAppender.class);
42+
final Exception parent = new IllegalStateException("Test");
43+
final Throwable child = new LoggingException("This is a test", parent);
44+
logger.error("This is a test", child);
45+
final ListAppender appender = context.getConfiguration().getAppender("LIST");
46+
final List<String> messages;
47+
try {
48+
messages = appender.getMessages(1, 2, TimeUnit.SECONDS);
49+
} finally {
50+
appender.clear();
51+
}
52+
assertNotNull(messages);
53+
assertEquals(1, messages.size());
54+
assertTrue(messages.get(0).contains(parent.getClass().getName()));
55+
}
56+
57+
private static void rewriteTest(final LoggerContext context) throws InterruptedException {
58+
final ExtendedLogger logger = context.getLogger(AsyncAppender.class);
59+
logger.error("This is a test");
60+
logger.warn("Hello world!");
61+
final ListAppender appender = context.getConfiguration().getAppender("LIST");
62+
final List<String> messages;
63+
try {
64+
messages = appender.getMessages(2, 2, TimeUnit.SECONDS);
65+
} finally {
66+
appender.clear();
67+
}
68+
assertNotNull(messages);
69+
assertEquals(2, messages.size());
70+
assertEquals("This is a test", messages.get(0));
71+
assertEquals("Hello world!", messages.get(1));
72+
}
73+
74+
private static void assertConversantDisruptorIsUsed(final LoggerContext context) {
75+
final AsyncAppender appender = context.getConfiguration().getAppender("ASYNC");
76+
assertThat(appender).isNotNull();
77+
final BlockingQueue<?> queue = (BlockingQueue<?>) assertDoesNotThrow(() -> {
78+
Field queueField = AsyncAppender.class.getDeclaredField("queue");
79+
queueField.setAccessible(true);
80+
return queueField.get(appender);
81+
});
82+
assertThat(queue).isInstanceOf(DisruptorBlockingQueue.class);
83+
}
84+
85+
@Test
86+
@LoggerContextSource("DisruptorBlockingQueueFactoryTest.xml")
87+
public void testJcToolsBlockingQueue(final LoggerContext context) throws InterruptedException {
88+
assertConversantDisruptorIsUsed(context);
89+
rewriteTest(context);
90+
exceptionTest(context);
91+
}
92+
}

0 commit comments

Comments
 (0)