1+ //===----------------------------------------------------------------------===//
2+ //
3+ // This source file is part of the Swift.org open source project
4+ //
5+ // Copyright (c) 2025 Apple Inc. and the Swift.org project authors
6+ // Licensed under Apache License v2.0
7+ //
8+ // See LICENSE.txt for license information
9+ // See CONTRIBUTORS.txt for the list of Swift.org project authors
10+ //
11+ // SPDX-License-Identifier: Apache-2.0
12+ //
13+ //===----------------------------------------------------------------------===//
14+
15+ package org .swift .swiftkit .core ;
16+
17+ import java .util .Deque ;
18+ import java .util .concurrent .*;
19+ import java .util .concurrent .atomic .AtomicReference ;
20+ import java .util .function .Function ;
21+
22+ /**
23+ * A simple completable {@link Future} for platforms that do not support {@link java.util.concurrent.CompletableFuture},
24+ * e.g. before Java 8, and/or before Android 23.
25+ * <p>
26+ * Prefer using the {@link CompletableFuture} for bridging Swift asynchronous functions, i.e. use the {@code completableFuture}
27+ * mode in {@code swift-java jextract}.
28+ *
29+ * @param <T> The result type
30+ */
31+ public final class SimpleCompletableFuture <T > implements Future <T > {
32+ // Marker object used to indicate the Future has not yet been completed.
33+ private static final Object PENDING = new Object ();
34+ private static final Object NULL = new Object ();
35+ private final AtomicReference <Object > result = new AtomicReference <>(PENDING );
36+
37+ private final Deque <Runnable > callbacks = new ConcurrentLinkedDeque <>();
38+
39+ /**
40+ * Wrapper type we use to indicate that a recorded result was a failure (recorded using {@link SimpleCompletableFuture#completeExceptionally(Throwable)}.
41+ * Since no-one else can instantiate this type, we know for sure that a recorded CompletedExceptionally indicates a failure.
42+ */
43+ static final class CompletedExceptionally {
44+ private final Throwable exception ;
45+
46+ private CompletedExceptionally (Throwable exception ) {
47+ this .exception = exception ;
48+ }
49+ }
50+
51+ /**
52+ * Returns a new future that, when this stage completes
53+ * normally, is executed with this stage's result as the argument
54+ * to the supplied function.
55+ *
56+ * <p>This method is analogous to
57+ * {@link java.util.Optional#map Optional.map} and
58+ * {@link java.util.stream.Stream#map Stream.map}.
59+ *
60+ * @return the new Future
61+ */
62+ public <U > Future <U > thenApply (Function <? super T , ? extends U > fn ) {
63+ SimpleCompletableFuture <U > newFuture = new SimpleCompletableFuture <>();
64+ addCallback (() -> {
65+ Object observed = this .result .get ();
66+ if (observed instanceof CompletedExceptionally ) {
67+ newFuture .completeExceptionally (((CompletedExceptionally ) observed ).exception );
68+ } else {
69+ try {
70+ // We're guaranteed that an observed result is of type T.
71+ // noinspection unchecked
72+ U newResult = fn .apply (observed == NULL ? null : (T ) observed );
73+ newFuture .complete (newResult );
74+ } catch (Throwable t ) {
75+ newFuture .completeExceptionally (t );
76+ }
77+ }
78+ });
79+ return newFuture ;
80+ }
81+
82+ /**
83+ * If not already completed, sets the value returned by {@link #get()} and
84+ * related methods to the given value.
85+ *
86+ * @param value the result value
87+ * @return {@code true} if this invocation caused this CompletableFuture
88+ * to transition to a completed state, else {@code false}
89+ */
90+ public boolean complete (T value ) {
91+ if (result .compareAndSet (PENDING , value == null ? NULL : value )) {
92+ synchronized (result ) {
93+ result .notifyAll ();
94+ }
95+ runCallbacks ();
96+ return true ;
97+ }
98+
99+ return false ;
100+ }
101+
102+ /**
103+ * If not already completed, causes invocations of {@link #get()}
104+ * and related methods to throw the given exception.
105+ *
106+ * @param ex the exception
107+ * @return {@code true} if this invocation caused this CompletableFuture
108+ * to transition to a completed state, else {@code false}
109+ */
110+ public boolean completeExceptionally (Throwable ex ) {
111+ if (result .compareAndSet (PENDING , new CompletedExceptionally (ex ))) {
112+ synchronized (result ) {
113+ result .notifyAll ();
114+ }
115+ runCallbacks ();
116+ return true ;
117+ }
118+
119+ return false ;
120+ }
121+
122+ private void runCallbacks () {
123+ // This is a pretty naive implementation; even if we enter this by racing a thenApply,
124+ // with a completion; we're using a concurrent deque so we won't happen to trigger a callback twice.
125+ Runnable callback ;
126+ while ((callback = callbacks .pollFirst ()) != null ) {
127+ callback .run ();
128+ }
129+ }
130+
131+ @ Override
132+ public boolean cancel (boolean mayInterruptIfRunning ) {
133+ // TODO: If we're representing a Swift Task computation with this future,
134+ // we could trigger a Task.cancel() from here
135+ return false ;
136+ }
137+
138+ @ Override
139+ public boolean isCancelled () {
140+ return false ;
141+ }
142+
143+ @ Override
144+ public boolean isDone () {
145+ return this .result .get () != PENDING ;
146+ }
147+
148+ @ Override
149+ public T get () throws InterruptedException , ExecutionException {
150+ Object observed ;
151+ // If PENDING check fails immediately, we have no need to take the result lock at all.
152+ while ((observed = result .get ()) == PENDING ) {
153+ synchronized (result ) {
154+ if (result .get () == PENDING ) {
155+ result .wait ();
156+ }
157+ }
158+ }
159+
160+ return getReturn (observed );
161+ }
162+
163+ @ Override
164+ public T get (long timeout , TimeUnit unit ) throws InterruptedException , ExecutionException , TimeoutException {
165+ Object observed ;
166+
167+ // Fast path: are we already completed and don't need to do any waiting?
168+ if ((observed = result .get ()) != PENDING ) {
169+ return get ();
170+ }
171+
172+ long nanos = unit .toNanos (timeout );
173+ synchronized (result ) {
174+ if (!isDone ()) {
175+ if (nanos <= 0 ) {
176+ throw new TimeoutException ();
177+ }
178+ long deadline = System .nanoTime () + nanos ;
179+ while (!isDone ()) {
180+ nanos = deadline - System .nanoTime ();
181+ if (nanos <= 0L ) {
182+ throw new TimeoutException ();
183+ }
184+ result .wait (nanos / 1000000 , (int ) (nanos % 1000000 ));
185+ }
186+ }
187+ }
188+
189+ // Seems we broke out of the wait loop, let's trigger the 'get()' implementation
190+ observed = result .get ();
191+ if (observed == PENDING ) {
192+ throw new ExecutionException ("Unexpectedly finished wait-loop while future was not completed, this is a bug." , null );
193+ }
194+ return getReturn (observed );
195+ }
196+
197+ private T getReturn (Object observed ) throws ExecutionException {
198+ if (observed instanceof CompletedExceptionally ) {
199+ // We observed a failure, unwrap and throw it
200+ Throwable exception = ((CompletedExceptionally ) observed ).exception ;
201+ if (exception instanceof CancellationException ) {
202+ throw (CancellationException ) exception ;
203+ }
204+ throw new ExecutionException (exception );
205+ } else if (observed == NULL ) {
206+ return null ;
207+ } else {
208+ // We're guaranteed that we only allowed registering completions of type `T`
209+ // noinspection unchecked
210+ return (T ) observed ;
211+ }
212+ }
213+
214+ private void addCallback (Runnable action ) {
215+ callbacks .add (action );
216+ if (isDone ()) {
217+ // This may race, but we don't care since triggering the callbacks is going to be at-most-once
218+ // by means of using the concurrent deque as our list of callbacks.
219+ runCallbacks ();
220+ }
221+ }
222+
223+ }
0 commit comments