Skip to content

Commit

Permalink
STARCH-537 work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
mwfyffeiu committed Jan 15, 2024
1 parent 0df1cd1 commit cddb455
Show file tree
Hide file tree
Showing 10 changed files with 714 additions and 175 deletions.
103 changes: 77 additions & 26 deletions base/src/main/java/edu/iu/IuAsynchronousSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
*/
package edu.iu;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.time.Instant;
import java.util.Queue;
Expand Down Expand Up @@ -210,9 +212,9 @@ public int characteristics() {

}

private class Subscriber implements Spliterator<T>, IuAsynchronousSubscription<T> {
private class Subscriber implements Consumer<T>, Spliterator<T>, IuAsynchronousSubscription<T> {
private final Stream<T> stream;
// private volatile Source<T> source;
private final Runnable cancel;
private final Queue<SourceSplit> children = new ConcurrentLinkedDeque<>();
private final Queue<T> accepted = new ConcurrentLinkedQueue<>();
private volatile Spliterator<T> delegate;
Expand All @@ -227,10 +229,27 @@ private Subscriber() {
if (delegate.estimateSize() > 0)
this.delegate = delegate;

subscribers.offer(this);
final var ref = new WeakReference<Consumer<T>>(this);
listeners.add(ref);
cancel = () -> listeners.remove(ref);

stream = StreamSupport.stream(this, false).onClose(this::close);
}

@Override
public void accept(T t) {
synchronized (this) {
if (canAccept())
accepted.offer(t);
else {
bootstrapPipe();
pipe.accept(t);
}
acceptedCount++;
this.notifyAll();
}
}

@Override
public Spliterator<T> trySplit() {
final var delegate = this.delegate;
Expand Down Expand Up @@ -395,7 +414,7 @@ public synchronized void error(Throwable e) {

@Override
public synchronized void close() {
subscribers.remove(this);
cancel.run();

if (pipe != null)
pipe.close();
Expand Down Expand Up @@ -480,24 +499,11 @@ private synchronized void bootstrapPipe() {
}
}

private void accept(T t) {
synchronized (this) {
if (canAccept())
accepted.offer(t);
else {
bootstrapPipe();
pipe.accept(t);
}
acceptedCount++;
this.notifyAll();
}

}

}

private final Supplier<Spliterator<T>> initialSplitSupplier;
private final Queue<Subscriber> subscribers = new ConcurrentLinkedQueue<>();
private final Queue<Reference<Consumer<T>>> listeners = new ConcurrentLinkedQueue<>();
// private final Queue<Subscriber> subscribers = new ConcurrentLinkedQueue<>();
private boolean closed;

/**
Expand All @@ -510,6 +516,27 @@ public IuAsynchronousSubject(Supplier<Spliterator<T>> initialSplitSupplier) {
this.initialSplitSupplier = initialSplitSupplier;
}

/**
* Registers a <strong>listener</strong>.
*
* @param listener {@link Consumer}, will be provided all values available on
* the subject, in order, before return. After return,
* {@link Consumer#accept(Object)} will be invoked inline each
* time a new value is {@link #accept(Object) accepted} by the
* <strong>subject</strong>.
* @return thunk for canceling future calls to {@link Consumer#accept(Object)}
* on the listener.
*/
public Runnable listen(Consumer<T> listener) {
final var split = initialSplitSupplier.get();
while (split.tryAdvance(listener))
;

final var ref = new WeakReference<>(listener);
listeners.add(ref);
return () -> listeners.remove(ref);
}

/**
* <strong>Subscribes</strong> to a {@link Stream} that supplies all values
* available without blocking then blocks until new values are available or the
Expand Down Expand Up @@ -542,7 +569,15 @@ public synchronized void accept(T value) {
if (closed)
throw new IllegalStateException("closed");

subscribers.forEach(subscriber -> subscriber.accept(value));
final var i = listeners.iterator();
while (i.hasNext()) {
final var ref = i.next();
final var listener = ref.get();
if (listener == null)
i.remove();
else
listener.accept(value);
}
}

/**
Expand All @@ -568,8 +603,13 @@ public synchronized void close() {
closed = true;

Throwable e = null;
while (!subscribers.isEmpty())
e = IuException.suppress(e, () -> subscribers.poll().close());
while (!listeners.isEmpty())
e = IuException.suppress(e, () -> {
final var ref = listeners.poll();
final var listener = ref.get();
if (listener instanceof AutoCloseable closeableListener)
closeableListener.close();
});

if (e != null)
throw IuException.unchecked(e);
Expand All @@ -579,13 +619,24 @@ public synchronized void close() {
* Reports a fatal error to all <strong>subscribers</strong> and {@link #close()
* closes} the <strong>subject</strong>.
*
* @param e fatal error
* @param error fatal error
*/
public synchronized void error(Throwable e) {
while (!subscribers.isEmpty())
IuException.suppress(e, () -> subscribers.poll().error(e));
public synchronized void error(final Throwable error) {
Throwable e = null;
while (!listeners.isEmpty())
e = IuException.suppress(e, () -> {
final var ref = listeners.poll();
final var listener = ref.get();
if (listener instanceof Subscriber subscriber)
subscriber.error(error);
else if (listener instanceof AutoCloseable closeableListener)
closeableListener.close();
});

closed = true;

if (e != null)
throw IuException.unchecked(e);
}

}
18 changes: 13 additions & 5 deletions base/src/main/java/edu/iu/IuVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* Implements a basic visitor pattern for tracking disparate uniform instances
Expand Down Expand Up @@ -96,15 +98,12 @@ public Spliterator<T> trySplit() {

@Override
public long estimateSize() {
if (elementSpliterator.hasCharacteristics(SIZED))
return elementSpliterator.estimateSize();
else
return elements.size();
return elementSpliterator.estimateSize();
}

@Override
public int characteristics() {
return elementSpliterator.characteristics() | SIZED;
return elementSpliterator.characteristics();
}
}

Expand Down Expand Up @@ -197,6 +196,15 @@ public void clear(T element) {
}
}

/**
* Gets a {@link Stream} of non-cleared references.
*
* @return {@link Stream}
*/
public Stream<T> stream() {
return StreamSupport.stream(new ElementSplitter(elements.spliterator()), false);
}

/**
* Gets a {@link IuAsynchronousSubject} originated by non-cleared references to
* accepted elements.
Expand Down
108 changes: 107 additions & 1 deletion base/src/test/java/edu/iu/AsynchronousSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.time.Duration;
Expand Down Expand Up @@ -399,7 +401,7 @@ public void testCloseThrowsErrorFromPipe() {
@Test
public void testCloseStreamUnsubscribes() throws Throwable {
final var q = new ConcurrentLinkedQueue<String>();
final var f = IuAsynchronousSubject.class.getDeclaredField("subscribers");
final var f = IuAsynchronousSubject.class.getDeclaredField("listeners");
f.setAccessible(true);
try (final var subject = new IuAsynchronousSubject<>(q::spliterator)) {
final Queue<?> subscribers = (Queue<?>) f.get(subject);
Expand Down Expand Up @@ -447,9 +449,22 @@ public void testSubscriberVolume() throws Throwable {
});
}

class Counter implements Consumer<Object> {
int count;

@Override
public void accept(Object t) {
count++;
}
}
final var counter = new Counter();
subject.listen(counter);

generator.await();
subject.close();

assertEquals(1000, counter.count);

while (!toAwait.isEmpty())
toAwait.pop().run();
}
Expand Down Expand Up @@ -746,4 +761,95 @@ public void testPauseAndExpire() throws Throwable {
}
}

@SuppressWarnings("unchecked")
@Test
public void testUnlisten() throws Throwable {
try (final var subject = new IuAsynchronousSubject<>(Spliterators::emptySpliterator)) {
final var l = mock(Consumer.class);
final var c = subject.listen(l);
subject.accept(new Object());
verify(l).accept(any());
c.run();
subject.accept(new Object());
verify(l).accept(any()); // not twice
}
}

@SuppressWarnings("unchecked")
@Test
public void testGCClearsListeners() throws Throwable {
final var o = List.of(new Object());
final var f = IuAsynchronousSubject.class.getDeclaredField("listeners");
f.setAccessible(true);
try (final var subject = new IuAsynchronousSubject<>(o::spliterator)) {
final Queue<?> listeners = (Queue<?>) f.get(subject);
var l = mock(Consumer.class);
subject.listen(l);
assertEquals(1, listeners.size());
subject.accept(new Object());
verify(l, times(2)).accept(any());
l = null;
System.gc();
Thread.sleep(100L);
subject.accept(new Object());
assertEquals(0, listeners.size());
}
}

@Test
public void testCloseableListener() throws Throwable {
class CloseableListener implements Consumer<Object>, AutoCloseable {
@Override
public void close() throws Exception {
}

@Override
public void accept(Object t) {
}
}

final var l = spy(new CloseableListener());
{
final var subject = new IuAsynchronousSubject<>(Spliterators::emptySpliterator);
subject.listen(l);
subject.close();
verify(l).close();
}

try (final var subject = new IuAsynchronousSubject<>(Spliterators::emptySpliterator)) {
subject.listen(l);
subject.error(new RuntimeException());
}
}

@SuppressWarnings("unchecked")
@Test
public void testCloseableListenerErrors() throws Throwable {
final var e = new RuntimeException();
class CloseableListener implements Consumer<Object>, AutoCloseable {
@Override
public void close() {
throw e;
}

@Override
public void accept(Object t) {
}
}
final var l = spy(new CloseableListener());

{
final var subject = new IuAsynchronousSubject<>(Spliterators::emptySpliterator);
subject.listen(l);
assertSame(e, assertThrows(RuntimeException.class, subject::close));
}

try (final var subject = new IuAsynchronousSubject<>(Spliterators::emptySpliterator)) {
subject.listen(l);
final var l2 = mock(Consumer.class);
subject.listen(l2);
assertSame(e, assertThrows(RuntimeException.class, () -> subject.error(new Exception())));
}
}

}
26 changes: 26 additions & 0 deletions base/src/test/java/edu/iu/IuVisitorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package edu.iu;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -122,6 +123,31 @@ public void testPrunesClearedRefs() throws InterruptedException {
verify(f).apply(null);
}

@Test
public void testStreamClearsRefs() throws Throwable {
final var visitor = new IuVisitor<Object>();
final var o = new Object();
visitor.accept(o);
visitor.accept(new Object());
assertEquals(2L, visitor.stream().count());
System.gc();
Thread.sleep(100L);
assertEquals(1L, visitor.stream().count());
assertSame(o, visitor.stream().findAny().get());
}

@Test
public void testStreamOfOriginalElements() throws Throwable {
final var visitor = new IuVisitor<Object>();
final var control = Collections.synchronizedList(new ArrayList<Object>());
for (var i = 0; i < 1000; i++) {
final var o = new Object();
visitor.accept(o);
control.add(o);
}
assertEquals(1000L, visitor.stream().parallel().count());
}

@Test
public void testSubject() throws Throwable {
final var visitor = new IuVisitor<Object>();
Expand Down
Loading

0 comments on commit cddb455

Please sign in to comment.