Skip to content

Commit

Permalink
Merge branch 'feature/STARCH-711/type_introspection_bundle' into feat…
Browse files Browse the repository at this point in the history
…ure/STARCH-367/logging_integration
  • Loading branch information
mwfyffeiu committed Jan 13, 2024
2 parents 2f91c01 + 8223ac7 commit f7ca39b
Show file tree
Hide file tree
Showing 25 changed files with 4,389 additions and 236 deletions.
74 changes: 41 additions & 33 deletions base/src/main/java/edu/iu/IuAsynchronousSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,7 @@ public boolean tryAdvance(Consumer<? super T> action) {
this.delegate = null;

if (continueAdvance(action)) {
if (isExhausted() //
&& acceptedSize() == 0)
if (!canAdvance())
bootstrapPipe();

return true;
Expand Down Expand Up @@ -327,7 +326,7 @@ public synchronized long estimateSize() {
@Override
public int characteristics() {
if (pipedSplit == null)
if (!isClosedOrError())
if (!isClosed())
return CONCURRENT;
else
return IMMUTABLE | SIZED;
Expand All @@ -351,9 +350,7 @@ public long pause(long acceptedCount, Duration timeout) throws TimeoutException,
final var initCount = this.acceptedCount;
final var targetCount = initCount + acceptedCount;
IuObject.waitFor(this, //
() -> (pipe != null //
&& pipe.isClosed()) //
|| isClosedOrError() //
() -> isClosed() //
|| this.acceptedCount >= targetCount //
, expires);

Expand All @@ -365,7 +362,7 @@ public long pause(Instant expires) throws InterruptedException {
final var initCount = acceptedCount;

synchronized (this) {
while ((pipe == null || !pipe.isClosed()) && !isClosedOrError()) {
while (!isClosed()) {
final var now = Instant.now();
if (now.isBefore(expires)) {
final var waitFor = Duration.between(now, expires);
Expand All @@ -378,6 +375,14 @@ public long pause(Instant expires) throws InterruptedException {
return acceptedCount - initCount;
}

@Override
public synchronized boolean isClosed() {
if (pipe == null)
return closed || error != null;
else
return pipe.isClosed();
}

@Override
public synchronized void error(Throwable e) {
if (pipe != null)
Expand All @@ -400,9 +405,34 @@ public synchronized void close() {
this.notifyAll();
}

private boolean isClosedOrError() {
return closed //
|| error != null;
private boolean isExhausted() {
if (delegate != null)
return false;

return areChildrenExhausted();
}

private boolean canAdvance() {
return !isExhausted() || !accepted.isEmpty();
}

private boolean canAccept() {
return delegate != null || !accepted.isEmpty();
}

private boolean areChildrenExhausted() {
final var i = children.iterator();
while (i.hasNext())
if (i.next().delegate == null)
i.remove();
else
return false;

return true;
}

private int acceptedSize() {
return accepted.size();
}

private Consumer<? super T> cancelAcceptedValueAfterAction(Consumer<? super T> action) {
Expand Down Expand Up @@ -441,28 +471,6 @@ private void continueForEach(Consumer<? super T> action) {
}
}

private int acceptedSize() {
return accepted.size();
}

private boolean areChildrenExhausted() {
final var i = children.iterator();
while (i.hasNext())
if (i.next().delegate == null)
i.remove();
else
return false;

return true;
}

private boolean isExhausted() {
if (delegate != null)
return false;

return areChildrenExhausted();
}

private synchronized void bootstrapPipe() {
if (pipe == null //
&& error == null //
Expand All @@ -474,7 +482,7 @@ private synchronized void bootstrapPipe() {

private void accept(T t) {
synchronized (this) {
if (delegate != null || !accepted.isEmpty())
if (canAccept())
accepted.offer(t);
else {
bootstrapPipe();
Expand Down
177 changes: 94 additions & 83 deletions base/src/main/java/edu/iu/IuAsynchronousSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,86 +29,97 @@
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package edu.iu;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;

/**
* Provides access to {@link IuAsynchronousSubject#subscribe() subscription}
* resources for an {@link IuAsynchronousSubject}.
*
* @param <T> value type
*/
public interface IuAsynchronousSubscription<T> extends AutoCloseable {

/**
* Gets a stream over all values, including those
* {@link IuAsynchronousSubject#accept(Object) accepted} after the subscription
* was created.
*
* @return {@link Stream}
*/
Stream<T> stream();

/**
* Gets an estimated number of values that may be advanced by the stream without
* blocking.
*
* @return available values
*/
long available();

/**
* Pauses execution on the current thread until new values are
* {@link IuAsynchronousSubject#accept(Object) accepted}.
*
* <p>
* This method has no effect on a subscription not yet backed by
* {@link IuAsynchronousPipe}.
* </p>
*
* @param acceptedCount count of newly accepted values to wait for; returns
* without delay if &lt;= 0
* @param timeout amount of time to wait; <em>should</em> be positive
*
* @return the actual number of values accepted while paused
* @throws TimeoutException if the timeout interval expires before
* {@code receivedCount} values are received
* @throws InterruptedException if the current thread is interrupted while
* waiting for values to be received
* @see IuAsynchronousPipe#pauseReceiver(int, Duration)
*/
long pause(long acceptedCount, Duration timeout) throws TimeoutException, InterruptedException;

/**
* Pauses execution until either a timeout interval expires or the subject is
* closed.
*
* <p>
* This method has no effect on a subscription not yet backed by
* {@link IuAsynchronousPipe}.
* </p>
*
* @param expires instant the timeout interval expires
*
* @return the number of values accepted onto the pipe while paused
* @throws InterruptedException if the current thread is interrupted while
* waiting for the pipe to close
* @see IuAsynchronousPipe#pauseReceiver(Instant)
*/
long pause(Instant expires) throws InterruptedException;

/**
* Reports an error that occurred that should terminate the subscription.
*
* @param e {@link Throwable}
*/
void error(Throwable e);

@Override
void close();

}
package edu.iu;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;

/**
* Provides access to {@link IuAsynchronousSubject#subscribe() subscription}
* resources for an {@link IuAsynchronousSubject}.
*
* @param <T> value type
*/
public interface IuAsynchronousSubscription<T> extends AutoCloseable {

/**
* Gets a stream over all values, including those
* {@link IuAsynchronousSubject#accept(Object) accepted} after the subscription
* was created.
*
* @return {@link Stream}
*/
Stream<T> stream();

/**
* Determines whether or not the subscription is closed.
*
* <p>
* Once closed, all remaining values can be advanced without blocking.
* </p>
*
* @return true if close; else false
*/
boolean isClosed();

/**
* Gets an estimated number of values that may be advanced by the stream without
* blocking.
*
* @return available values
*/
long available();

/**
* Pauses execution on the current thread until new values are
* {@link IuAsynchronousSubject#accept(Object) accepted}.
*
* <p>
* This method has no effect on a subscription not yet backed by
* {@link IuAsynchronousPipe}.
* </p>
*
* @param acceptedCount count of newly accepted values to wait for; returns
* without delay if &lt;= 0
* @param timeout amount of time to wait; <em>should</em> be positive
*
* @return the actual number of values accepted while paused
* @throws TimeoutException if the timeout interval expires before
* {@code receivedCount} values are received
* @throws InterruptedException if the current thread is interrupted while
* waiting for values to be received
* @see IuAsynchronousPipe#pauseReceiver(int, Duration)
*/
long pause(long acceptedCount, Duration timeout) throws TimeoutException, InterruptedException;

/**
* Pauses execution until either a timeout interval expires or the subject is
* closed.
*
* <p>
* This method has no effect on a subscription not yet backed by
* {@link IuAsynchronousPipe}.
* </p>
*
* @param expires instant the timeout interval expires
*
* @return the number of values accepted onto the pipe while paused
* @throws InterruptedException if the current thread is interrupted while
* waiting for the pipe to close
* @see IuAsynchronousPipe#pauseReceiver(Instant)
*/
long pause(Instant expires) throws InterruptedException;

/**
* Reports an error that occurred that should terminate the subscription.
*
* @param e {@link Throwable}
*/
void error(Throwable e);

@Override
void close();

}
Loading

0 comments on commit f7ca39b

Please sign in to comment.