Skip to content

Commit ea1cae5

Browse files
committed
.
1 parent 830f991 commit ea1cae5

File tree

2 files changed

+47
-7
lines changed

2 files changed

+47
-7
lines changed

Diff for: src/main/java/disruptor/Disruptor2.java

+38-6
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@
1010
import com.lmax.disruptor.RingBuffer;
1111
import com.lmax.disruptor.Sequence;
1212
import com.lmax.disruptor.SequenceBarrier;
13+
import com.lmax.disruptor.TimeoutException;
1314
import com.lmax.disruptor.WaitStrategy;
1415
import com.lmax.disruptor.dsl.Disruptor;
1516
import com.lmax.disruptor.dsl.ExceptionHandlerWrapper;
1617
import com.lmax.disruptor.dsl.ProducerType;
1718
import java.util.concurrent.Executor;
19+
import java.util.concurrent.ExecutorService;
1820
import java.util.concurrent.ThreadFactory;
21+
import java.util.concurrent.TimeUnit;
1922
import java.util.concurrent.atomic.AtomicBoolean;
2023

2124
public class Disruptor2<T> extends Disruptor<T> {
@@ -30,9 +33,39 @@ public Disruptor2(EventFactory eventFactory, int ringBufferSize, ThreadFactory t
3033
executor = new BasicExecutor(threadFactory);
3134
}
3235

33-
public Disruptor2(EventFactory eventFactory, int ringBufferSize, ThreadFactory threadFactory, ProducerType producerType, WaitStrategy waitStrategy) {
36+
public Disruptor2(EventFactory eventFactory, int ringBufferSize, ThreadFactory threadFactory, ProducerType producerType, WaitStrategy waitStrategy, ExecutorService executorService) {
3437
super(eventFactory, ringBufferSize, threadFactory, producerType, waitStrategy);
35-
executor = new BasicExecutor(threadFactory);
38+
executor = executorService;
39+
}
40+
private boolean hasBacklog()
41+
{
42+
final long cursor = getRingBuffer().getCursor();
43+
44+
return consumerRepository.hasBacklog(cursor, false);
45+
}
46+
public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException
47+
{
48+
final long timeOutAt = System.nanoTime() + timeUnit.toNanos(timeout);
49+
while (hasBacklog())
50+
{
51+
if (timeout >= 0 && System.nanoTime() > timeOutAt)
52+
{
53+
throw TimeoutException.INSTANCE;
54+
}
55+
// Busy spin
56+
}
57+
halt();
58+
}
59+
60+
/**
61+
* Calls {@link com.lmax.disruptor.EventProcessor#halt()} on all of the event processors created via this disruptor.
62+
*/
63+
public void halt()
64+
{
65+
for (final ConsumerInfo consumerInfo : consumerRepository)
66+
{
67+
consumerInfo.halt();
68+
}
3669
}
3770

3871
/**
@@ -53,10 +86,9 @@ public RingBuffer<T> start() {
5386

5487
return getRingBuffer();
5588
}
56-
private void checkOnlyStartedOnce()
57-
{
58-
if (!started.compareAndSet(false, true))
59-
{
89+
90+
private void checkOnlyStartedOnce() {
91+
if (!started.compareAndSet(false, true)) {
6092
throw new IllegalStateException("Disruptor.start() must only be called once.");
6193
}
6294
}

Diff for: src/main/java/disruptor/Test.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
import com.lmax.disruptor.ExceptionHandler;
66
import com.lmax.disruptor.WaitStrategy;
77
import com.lmax.disruptor.dsl.ProducerType;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
810
import java.util.concurrent.ThreadFactory;
911

1012
public class Test {
1113

1214
public static void main(String[] args) {
15+
ExecutorService executorService = Executors.newCachedThreadPool();
1316
// 对象工厂。
1417
final EventFactory<User> eventFactory = new EventFactory<User>() {
1518
@Override
@@ -26,7 +29,7 @@ public User newInstance() {
2629
public Thread newThread(Runnable r) {
2730
return new Thread(r, "test");
2831
}
29-
}, producerType, waitStrategy);
32+
}, producerType, waitStrategy, executorService);
3033
// 添加异常处理。
3134
final ExceptionHandler<User> errorHandler = new ExceptionHandler<>() {
3235
@Override
@@ -60,5 +63,10 @@ public void handleOnShutdownException(Throwable ignore) {
6063
eventTranslator.setName("" + i);
6164
disruptor.publishEvent(eventTranslator);
6265
}
66+
disruptor.shutdown();
67+
68+
executorService.shutdownNow();
69+
70+
System.out.println("xxxxxxxx");
6371
}
6472
}

0 commit comments

Comments
 (0)