From b84e8268be489a4cb5282d6465cb7030a2e379ae Mon Sep 17 00:00:00 2001 From: Jan Supol Date: Tue, 26 Feb 2019 17:46:12 +0100 Subject: [PATCH] Build core-common module on JDK 11. Possible to create multirelease jar when built with JDK 8 and then with JDK 11 without "clean". Signed-off-by: Jan Supol --- core-common/pom.xml | 392 +++++++++++++++++- .../jersey/internal/util/JerseyPublisher.java | 12 +- .../internal/jsr166/SubmissionPublisher.java | 2 +- .../jsr166/SubmissionPublisherFactory.java | 53 +++ .../internal/jsr166/UnsafeAccessor.java | 0 .../internal/jsr166/SubmissionPublisher.java | 387 +++++++++++++++++ .../jsr166/SubmissionPublisherFactory.java | 90 ++++ .../jersey/internal/jsr166/Flow.java | 2 + .../jsr166/SubmittableFlowPublisher.java | 292 +++++++++++++ .../jersey/internal/jsr166/package-info.java | 0 .../internal/util/JerseyPublisherTest.java | 13 +- .../src/test/resources/surefire.policy | 11 +- pom.xml | 20 +- 13 files changed, 1254 insertions(+), 20 deletions(-) rename core-common/src/main/{java => java8}/org/glassfish/jersey/internal/jsr166/SubmissionPublisher.java (99%) create mode 100644 core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/SubmissionPublisherFactory.java rename core-common/src/main/{java => java8}/org/glassfish/jersey/internal/jsr166/UnsafeAccessor.java (100%) create mode 100644 core-common/src/main/java9/org/glassfish/jersey/internal/jsr166/SubmissionPublisher.java create mode 100644 core-common/src/main/java9/org/glassfish/jersey/internal/jsr166/SubmissionPublisherFactory.java rename core-common/src/main/{java => jsr166}/org/glassfish/jersey/internal/jsr166/Flow.java (99%) create mode 100644 core-common/src/main/jsr166/org/glassfish/jersey/internal/jsr166/SubmittableFlowPublisher.java rename core-common/src/main/{java => jsr166}/org/glassfish/jersey/internal/jsr166/package-info.java (100%) diff --git a/core-common/pom.xml b/core-common/pom.xml index bb0c03195b..8185bd8ccd 100644 --- a/core-common/pom.xml +++ b/core-common/pom.xml @@ -131,7 +131,6 @@ * org.glassfish.jersey.*;version=${project.version} - org.glassfish.jersey.osgi true @@ -176,11 +175,6 @@ jakarta.annotation jakarta.annotation-api - - jakarta.activation - jakarta.activation-api - ${jakarta.activation.version} - org.osgi org.osgi.core @@ -213,6 +207,388 @@ + + jdk8 + + 1.8 + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + generate-sources + + add-source + + + + src/main/jsr166 + src/main/java8 + + + + + + + maven-antrun-plugin + + + com.sun + tools + 1.8.0 + system + ${java.home}/../lib/tools.jar + + + + + validate + + run + + + + Building for JDK8 + + + + + + compile-1-jsr166 + process-resources + + + + + + + run + + + + + + compile-2-java8 + process-resources + + + + + + + + run + + + + + + + + + + jdk9+ + + [9,12) + + + + com.sun.activation + jakarta.activation + ${jakarta.activation.version} + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + validate + + run + + + + Building for JDK9+ + + + + + compile-1-jsr166 + process-resources + + + + + + + run + + + + + compile-2-java9 + process-resources + + + + + + + + run + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + compile-0-addsources + process-sources + + add-source + + + + src/main/jsr166 + src/main/java9 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.0 + false + + + default-compile + + + 9 + + + + base-compile + + compile + + + + + module-info.java + + + + + + + + + [1.8,9) + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-resources-plugin + true + + + copy-jdk9-classes-to-meta-inf + prepare-package + + copy-resources + + + ${project.build.outputDirectory}/META-INF/versions/9 + + + ${java9.build.outputDirectory} + + + + + + + + + + ${project.build.directory}/classes-java9 + + + + copyJDK8FilesToMultiReleaseJar + + + + target/classes-java8/org/glassfish/jersey/internal/jsr166/UnsafeAccessor.class + + [9,12) + + + + + org.apache.felix + maven-bundle-plugin + true + true + + + true + + + + + maven-clean-plugin + + + + remove-jdk9-jsr166-sources + initialize + + clean + + + true + + + ${project.build.directory}/generated-sources/rsrc-gen/org/glassfish/jersey/internal/jsr166 + + + + + + remove-jdk9-jsr166-META-INF-sources + initialize + + clean + + + true + + + ${project.build.directory}/generated-sources/rsrc-gen/META-INF + + + + + + remove-jdk9-jsr166-classes + prepare-package + + clean + + + true + + + ${project.build.outputDirectory}/org/glassfish/jersey/internal/jsr166 + + *.class + + + Flow*.class + SubmittableFlowPublisher.class + package-info.class + + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + true + + + copy-jdk8-classes-ouputDirectory + prepare-package + + copy-resources + + + ${project.build.outputDirectory} + + + ${java8.build.outputDirectory} + + + + + + copy-jdk8-sources + prepare-package + + copy-resources + + + ${project.build.directory}/generated-sources/rsrc-gen/org/glassfish/jersey/internal/jsr166 + + + ${java8.sourceDirectory}/org/glassfish/jersey/internal/jsr166 + + + + + + copy-jdk9-sources + prepare-package + + copy-resources + + + ${project.build.directory}/generated-sources/rsrc-gen/META-INF/versions/9/org/glassfish/jersey/internal/jsr166 + + + ${java9.sourceDirectory}/org/glassfish/jersey/internal/jsr166 + + + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.1 + + + attach-sources + package + + jar-no-fork + + + + org/glassfish/jersey/internal/jsr166/Jdk9SubmissionPublisher.java + + + + + + + + + securityOff @@ -254,6 +630,10 @@ -Djava.security.manager -Djava.security.policy=${project.build.directory}/test-classes/surefire.policy + ${project.basedir}/src/main/jsr166 + ${project.build.directory}/classes-java8 + ${project.basedir}/src/main/java8 + ${project.basedir}/src/main/java9 diff --git a/core-common/src/main/java/org/glassfish/jersey/internal/util/JerseyPublisher.java b/core-common/src/main/java/org/glassfish/jersey/internal/util/JerseyPublisher.java index 44b346883e..becb368acf 100644 --- a/core-common/src/main/java/org/glassfish/jersey/internal/util/JerseyPublisher.java +++ b/core-common/src/main/java/org/glassfish/jersey/internal/util/JerseyPublisher.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -27,19 +27,21 @@ import org.glassfish.jersey.internal.LocalizationMessages; import org.glassfish.jersey.internal.jsr166.Flow; import org.glassfish.jersey.internal.jsr166.SubmissionPublisher; +import org.glassfish.jersey.internal.jsr166.SubmissionPublisherFactory; +import org.glassfish.jersey.internal.jsr166.SubmittableFlowPublisher; /** * Implementation of {@link Flow.Publisher} corresponding to reactive streams specification. *

- * Delegates to {@link SubmissionPublisher} repackaged from jsr166. + * Delegates to {@link SubmissionPublisher} repackaged from jsr166 on JDK 8 or to JDK {@code SubmissionPublisher}. * * @author Adam Lindenthal (adam.lindenthal at oracle.com) */ public class JerseyPublisher implements Flow.Publisher { private static final int DEFAULT_BUFFER_CAPACITY = 256; - private SubmissionPublisher submissionPublisher = new SubmissionPublisher<>(); + private SubmittableFlowPublisher submissionPublisher = SubmissionPublisherFactory.createSubmissionPublisher(); private final PublisherStrategy strategy; @@ -90,7 +92,7 @@ public JerseyPublisher(final Executor executor) { */ public JerseyPublisher(final Executor executor, final PublisherStrategy strategy) { this.strategy = strategy; - submissionPublisher = new SubmissionPublisher<>(executor, DEFAULT_BUFFER_CAPACITY); + submissionPublisher = SubmissionPublisherFactory.createSubmissionPublisher(executor, DEFAULT_BUFFER_CAPACITY); } @@ -128,7 +130,7 @@ public JerseyPublisher(final int maxBufferCapacity) { */ public JerseyPublisher(final Executor executor, final int maxBufferCapacity, PublisherStrategy strategy) { this.strategy = strategy; - submissionPublisher = new SubmissionPublisher<>(executor, maxBufferCapacity); + submissionPublisher = SubmissionPublisherFactory.createSubmissionPublisher(executor, maxBufferCapacity); } @Override diff --git a/core-common/src/main/java/org/glassfish/jersey/internal/jsr166/SubmissionPublisher.java b/core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/SubmissionPublisher.java similarity index 99% rename from core-common/src/main/java/org/glassfish/jersey/internal/jsr166/SubmissionPublisher.java rename to core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/SubmissionPublisher.java index 7ca48eed26..6eb4cb5740 100644 --- a/core-common/src/main/java/org/glassfish/jersey/internal/jsr166/SubmissionPublisher.java +++ b/core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/SubmissionPublisher.java @@ -134,7 +134,7 @@ * @author Doug Lea * @since 9 */ -public class SubmissionPublisher implements Flow.Publisher, +public class SubmissionPublisher implements Flow.Publisher, SubmittableFlowPublisher, AutoCloseable { /* * Most mechanics are handled by BufferedSubscription. This class diff --git a/core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/SubmissionPublisherFactory.java b/core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/SubmissionPublisherFactory.java new file mode 100644 index 0000000000..3729469126 --- /dev/null +++ b/core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/SubmissionPublisherFactory.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.internal.jsr166; + +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.function.BiConsumer; + +/** + * Factory creating JDK8 compatible SubmissionPublisher (Jdk8SubmissionPublisher) or JDK 9+ SubmissionPublisher + */ +public class SubmissionPublisherFactory { + + /** + * Creates a new SubmissionPublisher using the {@link + * ForkJoinPool#commonPool()} for async delivery to subscribers + * (unless it does not support a parallelism level of at least two, + * in which case, a new Thread is created to run each task), with + * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no + * handler for Subscriber exceptions in method {@link + * Flow.Subscriber#onNext(Object) onNext}. + */ + public static SubmittableFlowPublisher createSubmissionPublisher() { + return new SubmissionPublisher(); + } + + public static SubmittableFlowPublisher createSubmissionPublisher(Executor executor, + int maxBufferCapacity) { + return new SubmissionPublisher(executor, maxBufferCapacity); + } + + public static SubmittableFlowPublisher createSubmissionPublisher(Executor executor, + int maxBufferCapacity, + BiConsumer, + ? super Throwable> handler) { + return new SubmissionPublisher(executor, maxBufferCapacity, handler); + } + +} diff --git a/core-common/src/main/java/org/glassfish/jersey/internal/jsr166/UnsafeAccessor.java b/core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/UnsafeAccessor.java similarity index 100% rename from core-common/src/main/java/org/glassfish/jersey/internal/jsr166/UnsafeAccessor.java rename to core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/UnsafeAccessor.java diff --git a/core-common/src/main/java9/org/glassfish/jersey/internal/jsr166/SubmissionPublisher.java b/core-common/src/main/java9/org/glassfish/jersey/internal/jsr166/SubmissionPublisher.java new file mode 100644 index 0000000000..7a7ce4e424 --- /dev/null +++ b/core-common/src/main/java9/org/glassfish/jersey/internal/jsr166/SubmissionPublisher.java @@ -0,0 +1,387 @@ +/* + * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.internal.jsr166; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.BiPredicate; +import java.util.function.Consumer; + +/** + * A {@link Flow.Publisher} that asynchronously issues submitted + * (non-null) items to current subscribers until it is closed. Each + * current subscriber receives newly submitted items in the same order + * unless drops or exceptions are encountered. Using a + * SubmissionPublisher allows item generators to act as compliant reactive-streams + * Publishers relying on drop handling and/or blocking for flow + * control. + *

+ *

A SubmissionPublisher uses the {@link Executor} supplied in its + * constructor for delivery to subscribers. The best choice of + * Executor depends on expected usage. If the generator(s) of + * submitted items run in separate threads, and the number of + * subscribers can be estimated, consider using a {@link + * Executors#newFixedThreadPool}. Otherwise consider using the + * default, normally the {@link ForkJoinPool#commonPool}. + *

+ *

Buffering allows producers and consumers to transiently operate + * at different rates. Each subscriber uses an independent buffer. + * Buffers are created upon first use and expanded as needed up to the + * given maximum. (The enforced capacity may be rounded up to the + * nearest power of two and/or bounded by the largest value supported + * by this implementation.) Invocations of {@link + * Flow.Subscription#request(long) request} do not directly result in + * buffer expansion, but risk saturation if unfilled requests exceed + * the maximum capacity. The default value of {@link + * Flow#defaultBufferSize()} may provide a useful starting point for + * choosing a capacity based on expected rates, resources, and usages. + *

+ *

Publication methods support different policies about what to do + * when buffers are saturated. Method {@link #submit(Object) submit} + * blocks until resources are available. This is simplest, but least + * responsive. The {@code offer} methods may drop items (either + * immediately or with bounded timeout), but provide an opportunity to + * interpose a handler and then retry. + *

+ *

If any Subscriber method throws an exception, its subscription + * is cancelled. If a handler is supplied as a constructor argument, + * it is invoked before cancellation upon an exception in method + * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods + * {@link Flow.Subscriber#onSubscribe onSubscribe}, + * {@link Flow.Subscriber#onError(Throwable) onError} and + * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or + * handled before cancellation. If the supplied Executor throws + * {@link RejectedExecutionException} (or any other RuntimeException + * or Error) when attempting to execute a task, or a drop handler + * throws an exception when processing a dropped item, then the + * exception is rethrown. In these cases, not all subscribers will + * have been issued the published item. It is usually good practice to + * {@link #closeExceptionally closeExceptionally} in these cases. + *

+ *

Method {@link #consume(Consumer)} simplifies support for a + * common case in which the only action of a subscriber is to request + * and process all items using a supplied function. + *

+ *

This class may also serve as a convenient base for subclasses + * that generate items, and use the methods in this class to publish + * them. For example here is a class that periodically publishes the + * items generated from a supplier. (In practice you might add methods + * to independently start and stop generation, to share Executors + * among publishers, and so on, or use a SubmissionPublisher as a + * component rather than a superclass.) + *

+ *

 {@code
+ * class PeriodicPublisher extends SubmissionPublisher {
+ *   final ScheduledFuture periodicTask;
+ *   final ScheduledExecutorService scheduler;
+ *   PeriodicPublisher(Executor executor, int maxBufferCapacity,
+ *                     Supplier supplier,
+ *                     long period, TimeUnit unit) {
+ *     super(executor, maxBufferCapacity);
+ *     scheduler = new ScheduledThreadPoolExecutor(1);
+ *     periodicTask = scheduler.scheduleAtFixedRate(
+ *       () -> submit(supplier.get()), 0, period, unit);
+ *   }
+ *   public void close() {
+ *     periodicTask.cancel(false);
+ *     scheduler.shutdown();
+ *     super.close();
+ *   }
+ * }}
+ *

+ *

Here is an example of a {@link Flow.Processor} implementation. + * It uses single-step requests to its publisher for simplicity of + * illustration. A more adaptive version could monitor flow using the + * lag estimate returned from {@code submit}, along with other utility + * methods. + *

+ *

 {@code
+ * class TransformProcessor extends SubmissionPublisher
+ *   implements Flow.Processor {
+ *   final Function function;
+ *   Flow.Subscription subscription;
+ *   TransformProcessor(Executor executor, int maxBufferCapacity,
+ *                      Function function) {
+ *     super(executor, maxBufferCapacity);
+ *     this.function = function;
+ *   }
+ *   public void onSubscribe(Flow.Subscription subscription) {
+ *     (this.subscription = subscription).request(1);
+ *   }
+ *   public void onNext(S item) {
+ *     subscription.request(1);
+ *     submit(function.apply(item));
+ *   }
+ *   public void onError(Throwable ex) { closeExceptionally(ex); }
+ *   public void onComplete() { close(); }
+ * }}
+ * + * @param the published item type + * @author Doug Lea + * @since 9 + */ +public class SubmissionPublisher implements SubmittableFlowPublisher { + + private final java.util.concurrent.SubmissionPublisher publisher; + + /** + * Creates a new SubmissionPublisher using the given Executor for + * async delivery to subscribers, with the given maximum buffer size + * for each subscriber, and, if non-null, the given handler invoked + * when any Subscriber throws an exception in method {@link + * Flow.Subscriber#onNext(Object) onNext}. + * + * @param executor the executor to use for async delivery, + * supporting creation of at least one independent thread + * @param maxBufferCapacity the maximum capacity for each + * subscriber's buffer (the enforced capacity may be rounded up to + * the nearest power of two and/or bounded by the largest value + * supported by this implementation; method {@link #getMaxBufferCapacity} + * returns the actual value) + * @param handler if non-null, procedure to invoke upon exception + * thrown in method {@code onNext} + * @throws NullPointerException if executor is null + * @throws IllegalArgumentException if maxBufferCapacity not + * positive + */ + public SubmissionPublisher(Executor executor, int maxBufferCapacity, + BiConsumer, ? super Throwable> handler) { + publisher = new java.util.concurrent.SubmissionPublisher(executor, maxBufferCapacity, convertConsumer(handler)); + } + + /** + * Creates a new SubmissionPublisher using the given Executor for + * async delivery to subscribers, with the given maximum buffer size + * for each subscriber, and no handler for Subscriber exceptions in + * method {@link Flow.Subscriber#onNext(Object) onNext}. + * + * @param executor the executor to use for async delivery, + * supporting creation of at least one independent thread + * @param maxBufferCapacity the maximum capacity for each + * subscriber's buffer (the enforced capacity may be rounded up to + * the nearest power of two and/or bounded by the largest value + * supported by this implementation; method {@link #getMaxBufferCapacity} + * returns the actual value) + * @throws NullPointerException if executor is null + * @throws IllegalArgumentException if maxBufferCapacity not + * positive + */ + public SubmissionPublisher(Executor executor, int maxBufferCapacity) { + publisher = new java.util.concurrent.SubmissionPublisher(executor, maxBufferCapacity); + } + + /** + * Creates a new SubmissionPublisher using the {@link + * ForkJoinPool#commonPool()} for async delivery to subscribers + * (unless it does not support a parallelism level of at least two, + * in which case, a new Thread is created to run each task), with + * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no + * handler for Subscriber exceptions in method {@link + * Flow.Subscriber#onNext(Object) onNext}. + */ + public SubmissionPublisher() { + publisher = new java.util.concurrent.SubmissionPublisher(); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture consume(Consumer consumer) { + return publisher.consume(consumer); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + publisher.close(); + } + + /** + * {@inheritDoc} + */ + @Override + public void closeExceptionally(Throwable error) { + publisher.closeExceptionally(error); + } + + /** + * {@inheritDoc} + */ + @Override + public long estimateMinimumDemand() { + return publisher.estimateMinimumDemand(); + } + + /** + * {@inheritDoc} + */ + @Override + public int estimateMaximumLag() { + return publisher.estimateMaximumLag(); + } + + /** + * {@inheritDoc} + */ + @Override + public Throwable getClosedException() { + return publisher.getClosedException(); + } + + /** + * {@inheritDoc} + */ + @Override + public int getMaxBufferCapacity() { + return publisher.getMaxBufferCapacity(); + } + + /** + * {@inheritDoc} + */ + @Override + public int offer(T item, long timeout, TimeUnit unit, BiPredicate, ? super T> onDrop) { + return publisher.offer(item, timeout, unit, convertPredicate(onDrop)); + } + + /** + * {@inheritDoc} + */ + public int offer(T item, BiPredicate, ? super T> onDrop) { + return publisher.offer(item, convertPredicate(onDrop)); + } + + /** + * {@inheritDoc} + */ + @Override + public int submit(T item) { + return publisher.submit(item); + } + + /** + * {@inheritDoc} + */ + @Override + public void subscribe(Flow.Subscriber subscriber) { + publisher.subscribe(convertSubscriber(subscriber)); + } + + private static BiConsumer, ? super Throwable> convertConsumer( + BiConsumer, ? super Throwable> consumer) { + return new BiConsumer, Throwable>() { + @Override + public void accept(java.util.concurrent.Flow.Subscriber subscriber, Throwable throwable) { + consumer.accept(convertSubscriber(subscriber), throwable); + } + }; + } + + private static BiPredicate, ? super T> convertPredicate( + BiPredicate, ? super T> predicate) { + return new BiPredicate, T>() { + @Override + public boolean test(java.util.concurrent.Flow.Subscriber subscriber, T t) { + return predicate.test(convertSubscriber(subscriber), t); + } + }; + } + + private static Flow.Subscriber convertSubscriber(java.util.concurrent.Flow.Subscriber subscriber) { + return new Flow.Subscriber() { + + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscriber.onSubscribe(convertSubscription(subscription)); + } + + @Override + public void onNext(T item) { + subscriber.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + subscriber.onError(throwable); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + }; + } + + private static java.util.concurrent.Flow.Subscriber convertSubscriber(Flow.Subscriber subscriber) { + return new java.util.concurrent.Flow.Subscriber() { + + @Override + public void onSubscribe(java.util.concurrent.Flow.Subscription subscription) { + subscriber.onSubscribe(convertSubscription(subscription)); + } + + @Override + public void onNext(T item) { + subscriber.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + subscriber.onError(throwable); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + }; + } + + private static java.util.concurrent.Flow.Subscription convertSubscription(Flow.Subscription subscription) { + return new java.util.concurrent.Flow.Subscription() { + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + subscription.cancel(); + } + }; + } + + private static Flow.Subscription convertSubscription(java.util.concurrent.Flow.Subscription subscription) { + return new Flow.Subscription() { + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + subscription.cancel(); + } + }; + } +} diff --git a/core-common/src/main/java9/org/glassfish/jersey/internal/jsr166/SubmissionPublisherFactory.java b/core-common/src/main/java9/org/glassfish/jersey/internal/jsr166/SubmissionPublisherFactory.java new file mode 100644 index 0000000000..ecdc41a350 --- /dev/null +++ b/core-common/src/main/java9/org/glassfish/jersey/internal/jsr166/SubmissionPublisherFactory.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.internal.jsr166; + +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.function.BiConsumer; + +/** + * Factory creating JDK8 compatible SubmissionPublisher (Jdk8SubmissionPublisher) or JDK 9+ SubmissionPublisher + */ +public class SubmissionPublisherFactory { + + /** + * Creates a new SubmittableFlowPublisher using the {@link + * ForkJoinPool#commonPool()} for async delivery to subscribers + * (unless it does not support a parallelism level of at least two, + * in which case, a new Thread is created to run each task), with + * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no + * handler for Subscriber exceptions in method {@link + * Flow.Subscriber#onNext(Object) onNext}. + */ + public static SubmittableFlowPublisher createSubmissionPublisher() { + return new SubmissionPublisher(); + } + + /** + * Creates a new Jdk9SubmissionPublisher using the given Executor for + * async delivery to subscribers, with the given maximum buffer size + * for each subscriber, and no handler for Subscriber exceptions in + * method {@link Flow.Subscriber#onNext(Object) onNext}. + * + * @param executor the executor to use for async delivery, + * supporting creation of at least one independent thread + * @param maxBufferCapacity the maximum capacity for each + * subscriber's buffer (the enforced capacity may be rounded up to + * the nearest power of two and/or bounded by the largest value + * supported by this implementation; method {@link #getMaxBufferCapacity} + * returns the actual value) + * @throws NullPointerException if executor is null + * @throws IllegalArgumentException if maxBufferCapacity not + * positive + */ + public static SubmittableFlowPublisher createSubmissionPublisher(Executor executor, + int maxBufferCapacity) { + return new SubmissionPublisher(executor, maxBufferCapacity); + } + + /** + * Creates a new SubmittableFlowPublisher using the given Executor for + * async delivery to subscribers, with the given maximum buffer size + * for each subscriber, and, if non-null, the given handler invoked + * when any Subscriber throws an exception in method {@link + * Flow.Subscriber#onNext(Object) onNext}. + * + * @param executor the executor to use for async delivery, + * supporting creation of at least one independent thread + * @param maxBufferCapacity the maximum capacity for each + * subscriber's buffer (the enforced capacity may be rounded up to + * the nearest power of two and/or bounded by the largest value + * supported by this implementation; method {@link #getMaxBufferCapacity} + * returns the actual value) + * @param handler if non-null, procedure to invoke upon exception + * thrown in method {@code onNext} + * @throws NullPointerException if executor is null + * @throws IllegalArgumentException if maxBufferCapacity not + * positive + */ + public static SubmittableFlowPublisher createSubmissionPublisher(Executor executor, + int maxBufferCapacity, + BiConsumer, + ? super Throwable> handler) { + return new SubmissionPublisher(executor, maxBufferCapacity, handler); + } + +} diff --git a/core-common/src/main/java/org/glassfish/jersey/internal/jsr166/Flow.java b/core-common/src/main/jsr166/org/glassfish/jersey/internal/jsr166/Flow.java similarity index 99% rename from core-common/src/main/java/org/glassfish/jersey/internal/jsr166/Flow.java rename to core-common/src/main/jsr166/org/glassfish/jersey/internal/jsr166/Flow.java index fe64d88dd6..5e04b817d2 100644 --- a/core-common/src/main/java/org/glassfish/jersey/internal/jsr166/Flow.java +++ b/core-common/src/main/jsr166/org/glassfish/jersey/internal/jsr166/Flow.java @@ -6,6 +6,8 @@ package org.glassfish.jersey.internal.jsr166; +import java.util.concurrent.Executor; + /** * Interrelated interfaces and static methods for establishing * flow-controlled components in which {@link Publisher Publishers} diff --git a/core-common/src/main/jsr166/org/glassfish/jersey/internal/jsr166/SubmittableFlowPublisher.java b/core-common/src/main/jsr166/org/glassfish/jersey/internal/jsr166/SubmittableFlowPublisher.java new file mode 100644 index 0000000000..0978463a52 --- /dev/null +++ b/core-common/src/main/jsr166/org/glassfish/jersey/internal/jsr166/SubmittableFlowPublisher.java @@ -0,0 +1,292 @@ +/* + * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.internal.jsr166; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; +import java.util.function.Consumer; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.RejectedExecutionException; + + +/** + * A {@link Flow.Publisher} that asynchronously issues submitted + * (non-null) items to current subscribers until it is closed. Each + * current subscriber receives newly submitted items in the same order + * unless drops or exceptions are encountered. Using a + * SubmittableFlowPublisher allows item generators to act as compliant reactive-streams + * Publishers relying on drop handling and/or blocking for flow + * control. + *

+ *

An implementation of SubmittableFlowPublisher uses the {@link Executor} supplied in its + * constructor for delivery to subscribers. The best choice of + * Executor depends on expected usage. If the generator(s) of + * submitted items run in separate threads, and the number of + * subscribers can be estimated, consider using a {@link + * Executors#newFixedThreadPool}. Otherwise consider using the + * default, normally the {@link ForkJoinPool#commonPool}. + *

+ *

Buffering allows producers and consumers to transiently operate + * at different rates. Each subscriber uses an independent buffer. + * Buffers are created upon first use and expanded as needed up to the + * given maximum. (The enforced capacity may be rounded up to the + * nearest power of two and/or bounded by the largest value supported + * by this implementation.) Invocations of {@link + * Flow.Subscription#request(long) request} do not directly result in + * buffer expansion, but risk saturation if unfilled requests exceed + * the maximum capacity. The default value of {@link + * Flow#defaultBufferSize()} may provide a useful starting point for + * choosing a capacity based on expected rates, resources, and usages. + *

+ *

Publication methods support different policies about what to do + * when buffers are saturated. Method {@link #submit(Object) submit} + * blocks until resources are available. This is simplest, but least + * responsive. The {@code offer} methods may drop items (either + * immediately or with bounded timeout), but provide an opportunity to + * interpose a handler and then retry. + *

+ *

If any Subscriber method throws an exception, its subscription + * is cancelled. If a handler is supplied as a constructor argument, + * it is invoked before cancellation upon an exception in method + * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods + * {@link Flow.Subscriber#onSubscribe onSubscribe}, + * {@link Flow.Subscriber#onError(Throwable) onError} and + * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or + * handled before cancellation. If the supplied Executor throws + * {@link RejectedExecutionException} (or any other RuntimeException + * or Error) when attempting to execute a task, or a drop handler + * throws an exception when processing a dropped item, then the + * exception is rethrown. In these cases, not all subscribers will + * have been issued the published item. It is usually good practice to + * {@link #closeExceptionally closeExceptionally} in these cases. + *

+ *

Method {@link #consume(Consumer)} simplifies support for a + * common case in which the only action of a subscriber is to request + * and process all items using a supplied function. + * + * @param the published item type + */ +public interface SubmittableFlowPublisher extends Flow.Publisher, AutoCloseable { + + /** + * Processes all published items using the given Consumer function. + * Returns a CompletableFuture that is completed normally when this + * publisher signals {@link Flow.Subscriber#onComplete() + * onComplete}, or completed exceptionally upon any error, or an + * exception is thrown by the Consumer, or the returned + * CompletableFuture is cancelled, in which case no further items + * are processed. + * + * @param consumer the function applied to each onNext item + * @return a CompletableFuture that is completed normally + * when the publisher signals onComplete, and exceptionally + * upon any error or cancellation + * @throws NullPointerException if consumer is null + */ + CompletableFuture consume(final Consumer consumer); + + /** + * Unless already closed, issues {@link + * Flow.Subscriber#onComplete() onComplete} signals to current + * subscribers, and disallows subsequent attempts to publish. + * Upon return, this method does NOT guarantee that all + * subscribers have yet completed. + */ + void close(); + + /** + * Unless already closed, issues {@link + * Flow.Subscriber#onError(Throwable) onError} signals to current + * subscribers with the given error, and disallows subsequent + * attempts to publish. Future subscribers also receive the given + * error. Upon return, this method does NOT guarantee + * that all subscribers have yet completed. + * + * @param error the {@code onError} argument sent to subscribers + * @throws NullPointerException if error is null + */ + void closeExceptionally(final Throwable error); + + /** + * Returns an estimate of the minimum number of items requested + * (via {@link Flow.Subscription#request(long) request}) but not + * yet produced, among all current subscribers. + * + * @return the estimate, or zero if no subscribers + */ + long estimateMinimumDemand(); + + /** + * Returns an estimate of the maximum number of items produced but + * not yet consumed among all current subscribers. + * + * @return the estimate + */ + int estimateMaximumLag(); + + /** + * Returns the exception associated with {@link + * #closeExceptionally(Throwable) closeExceptionally}, or null if + * not closed or if closed normally. + * + * @return the exception, or null if none + */ + Throwable getClosedException(); + + /** + * Returns the maximum per-subscriber buffer capacity. + * + * @return the maximum per-subscriber buffer capacity + */ + int getMaxBufferCapacity(); + + /** + * Publishes the given item, if possible, to each current subscriber + * by asynchronously invoking its {@link + * Flow.Subscriber#onNext(Object) onNext} method, blocking while + * resources for any subscription are unavailable, up to the + * specified timeout or until the caller thread is interrupted, at + * which point the given handler (if non-null) is invoked, and if it + * returns true, retried once. (The drop handler may distinguish + * timeouts from interrupts by checking whether the current thread + * is interrupted.) Other calls to methods in this class by other + * threads are blocked while the handler is invoked. Unless + * recovery is assured, options are usually limited to logging the + * error and/or issuing an {@link Flow.Subscriber#onError(Throwable) + * onError} signal to the subscriber. + *

+ *

This method returns a status indicator: If negative, it + * represents the (negative) number of drops (failed attempts to + * issue the item to a subscriber). Otherwise it is an estimate of + * the maximum lag (number of items submitted but not yet + * consumed) among all current subscribers. This value is at least + * one (accounting for this submitted item) if there are any + * subscribers, else zero. + *

+ *

If the Executor for this publisher throws a + * RejectedExecutionException (or any other RuntimeException or + * Error) when attempting to asynchronously notify subscribers, or + * the drop handler throws an exception when processing a dropped + * item, then this exception is rethrown. + * + * @param item the (non-null) item to publish + * @param timeout how long to wait for resources for any subscriber + * before giving up, in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @param onDrop if non-null, the handler invoked upon a drop to a + * subscriber, with arguments of the subscriber and item; if it + * returns true, an offer is re-attempted (once) + * @return if negative, the (negative) number of drops; otherwise + * an estimate of maximum lag + * @throws IllegalStateException if closed + * @throws NullPointerException if item is null + * @throws RejectedExecutionException if thrown by Executor + */ + int offer(T item, long timeout, TimeUnit unit, BiPredicate, ? super T> onDrop); + + /** + * Publishes the given item, if possible, to each current subscriber + * by asynchronously invoking its {@link + * Flow.Subscriber#onNext(Object) onNext} method. The item may be + * dropped by one or more subscribers if resource limits are + * exceeded, in which case the given handler (if non-null) is + * invoked, and if it returns true, retried once. Other calls to + * methods in this class by other threads are blocked while the + * handler is invoked. Unless recovery is assured, options are + * usually limited to logging the error and/or issuing an {@link + * Flow.Subscriber#onError(Throwable) onError} signal to the + * subscriber. + *

+ *

This method returns a status indicator: If negative, it + * represents the (negative) number of drops (failed attempts to + * issue the item to a subscriber). Otherwise it is an estimate of + * the maximum lag (number of items submitted but not yet + * consumed) among all current subscribers. This value is at least + * one (accounting for this submitted item) if there are any + * subscribers, else zero. + *

+ *

If the Executor for this publisher throws a + * RejectedExecutionException (or any other RuntimeException or + * Error) when attempting to asynchronously notify subscribers, or + * the drop handler throws an exception when processing a dropped + * item, then this exception is rethrown. + * + * @param item the (non-null) item to publish + * @param onDrop if non-null, the handler invoked upon a drop to a + * subscriber, with arguments of the subscriber and item; if it + * returns true, an offer is re-attempted (once) + * @return if negative, the (negative) number of drops; otherwise + * an estimate of maximum lag + * @throws IllegalStateException if closed + * @throws NullPointerException if item is null + * @throws RejectedExecutionException if thrown by Executor + */ + int offer(T item, BiPredicate, ? super T> onDrop); + + /** + * Publishes the given item to each current subscriber by + * asynchronously invoking its {@link Flow.Subscriber#onNext(Object) + * onNext} method, blocking uninterruptibly while resources for any + * subscriber are unavailable. This method returns an estimate of + * the maximum lag (number of items submitted but not yet consumed) + * among all current subscribers. This value is at least one + * (accounting for this submitted item) if there are any + * subscribers, else zero. + *

+ *

If the Executor for this publisher throws a + * RejectedExecutionException (or any other RuntimeException or + * Error) when attempting to asynchronously notify subscribers, + * then this exception is rethrown, in which case not all + * subscribers will have been issued this item. + * + * @param item the (non-null) item to publish + * @return the estimated maximum lag among subscribers + * @throws IllegalStateException if closed + * @throws NullPointerException if item is null + * @throws RejectedExecutionException if thrown by Executor + */ + int submit(T item); + + /** + * Adds the given Subscriber unless already subscribed. If already + * subscribed, the Subscriber's {@link + * Flow.Subscriber#onError(Throwable) onError} method is invoked on + * the existing subscription with an {@link IllegalStateException}. + * Otherwise, upon success, the Subscriber's {@link + * Flow.Subscriber#onSubscribe onSubscribe} method is invoked + * asynchronously with a new {@link Flow.Subscription}. If {@link + * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the + * subscription is cancelled. Otherwise, if this SubmittableFlowPublisher + * was closed exceptionally, then the subscriber's {@link + * Flow.Subscriber#onError onError} method is invoked with the + * corresponding exception, or if closed without exception, the + * subscriber's {@link Flow.Subscriber#onComplete() onComplete} + * method is invoked. Subscribers may enable receiving items by + * invoking the {@link Flow.Subscription#request(long) request} + * method of the new Subscription, and may unsubscribe by invoking + * its {@link Flow.Subscription#cancel() cancel} method. + * + * @param subscriber the subscriber + * @throws NullPointerException if subscriber is null + */ + void subscribe(Flow.Subscriber subscriber); +} diff --git a/core-common/src/main/java/org/glassfish/jersey/internal/jsr166/package-info.java b/core-common/src/main/jsr166/org/glassfish/jersey/internal/jsr166/package-info.java similarity index 100% rename from core-common/src/main/java/org/glassfish/jersey/internal/jsr166/package-info.java rename to core-common/src/main/jsr166/org/glassfish/jersey/internal/jsr166/package-info.java diff --git a/core-common/src/test/java/org/glassfish/jersey/internal/util/JerseyPublisherTest.java b/core-common/src/test/java/org/glassfish/jersey/internal/util/JerseyPublisherTest.java index 13a42c7d63..1c216ae21a 100644 --- a/core-common/src/test/java/org/glassfish/jersey/internal/util/JerseyPublisherTest.java +++ b/core-common/src/test/java/org/glassfish/jersey/internal/util/JerseyPublisherTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -111,11 +111,13 @@ public void test() throws InterruptedException { @Test public void testNonBlocking() throws InterruptedException { final int MSG_COUNT = 300; + final int DATA_COUNT = Flow.defaultBufferSize() + 2; + final int WAIT_TIME = 20 * DATA_COUNT; final JerseyPublisher publisher = new JerseyPublisher<>(); final CountDownLatch openLatchActive = new CountDownLatch(1); - final CountDownLatch writeLatch = new CountDownLatch(MSG_COUNT); + final CountDownLatch writeLatch = new CountDownLatch(DATA_COUNT); final CountDownLatch closeLatch = new CountDownLatch(1); final CountDownLatch openLatchDead = new CountDownLatch(1); @@ -147,9 +149,10 @@ public void testNonBlocking() throws InterruptedException { publisher.publish("MSG-" + i); }, 0, 10, TimeUnit.MILLISECONDS); - assertTrue(writeLatch.await(6000, TimeUnit.MILLISECONDS)); + writeLatch.await(WAIT_TIME, TimeUnit.MILLISECONDS); + assertTrue(writeLatch.getCount() <= 1); - assertEquals(MSG_COUNT, activeSubscriber.getReceivedData().size()); + assertTrue(DATA_COUNT - activeSubscriber.getReceivedData().size() <= 1); assertEquals(0, deadSubscriber.getReceivedData().size()); assertFalse(activeSubscriber.hasError()); @@ -157,7 +160,7 @@ public void testNonBlocking() throws InterruptedException { publisher.close(); - assertTrue(closeLatch.await(6000, TimeUnit.MILLISECONDS)); + assertTrue(closeLatch.await(WAIT_TIME, TimeUnit.MILLISECONDS)); assertTrue(activeSubscriber.isCompleted()); assertFalse(deadSubscriber.isCompleted()); } diff --git a/core-common/src/test/resources/surefire.policy b/core-common/src/test/resources/surefire.policy index f181fe41b1..77fa02af3b 100644 --- a/core-common/src/test/resources/surefire.policy +++ b/core-common/src/test/resources/surefire.policy @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014, 2018 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2014, 2019 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -47,3 +47,12 @@ grant codebase "file:${project.build.directory}/classes/-" { permission java.lang.RuntimePermission "accessClassInPackage.sun.misc.*"; permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; }; + +// JDK 11 permissions +grant { + permission com.sun.tools.attach.AttachPermission "createAttachProvider"; + permission com.sun.tools.attach.AttachPermission "attachVirtualMachine"; + permission java.lang.RuntimePermission "loadLibrary.attach"; + permission java.lang.RuntimePermission "manageProcess"; + permission java.lang.RuntimePermission "accessClassInPackage.jdk.internal.misc"; +}; diff --git a/pom.xml b/pom.xml index 51d9d8f508..5b3b2daaf8 100644 --- a/pom.xml +++ b/pom.xml @@ -302,7 +302,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.1 + 3.8.0 true ${java.version} @@ -403,12 +403,27 @@ org.apache.maven.plugins maven-surefire-plugin - 2.18.1 + 3.0.0-M3 -Xmx${surefire.maxmem.argline}m -Dfile.encoding=UTF8 ${surefire.security.argline} ${surefire.coverage.argline} ${skip.tests} + + + org.apache.maven.surefire + surefire-logger-api + 3.0.0-M3 + + true + + + org.apache.maven.surefire + surefire-api + 3.0.0-M3 + true + + org.apache.maven.plugins @@ -1053,6 +1068,7 @@ true true true + **/module-info.java