documentation and small changes

- added docs for `Promise` and  `PromiseFactory`
- removed outdated README docs
- moved some common utilities to `PromiseUtil`
- improved efficiency of result array resizing
- added cancellation result to promise executors
- changed visibility of `PromiseJoiner` to public, and made some method names more verbose
- inlined `DeferredExecutionException` to inside `AbstractPromise`
- inlined default promise implementation to inner class in the factory
- removed necessity for base factories to provide a logger
This commit is contained in:
tommyskeff
2025-01-06 17:53:33 +00:00
parent 9e392c91ba
commit 4236adbd9e
27 changed files with 1210 additions and 466 deletions

View File

@@ -25,8 +25,8 @@ class ExecutorServiceImpl implements PromiseExecutor<Future<?>> {
}
@Override
public void cancel(Future<?> task) {
task.cancel(true);
public boolean cancel(Future<?> task) {
return task.cancel(true);
}
}

View File

@@ -8,26 +8,65 @@ import java.util.concurrent.TimeUnit;
public interface PromiseExecutor<T> {
/**
* Creates a new {@link PromiseExecutor} that runs tasks on virtual threads.
* @return the new executor
*/
static PromiseExecutor<?> virtualThreaded() {
return new VirtualThreadImpl();
}
/**
* Creates a new {@link PromiseExecutor} that runs tasks on a single thread.
* @return the new executor
*/
static PromiseExecutor<?> singleThreaded() {
return of(Executors.newSingleThreadScheduledExecutor());
}
/**
* Creates a new {@link PromiseExecutor} that runs tasks on multiple threads.
* @param threads the number of threads
* @return the new executor
*/
static PromiseExecutor<?> multiThreaded(int threads) {
return of(Executors.newScheduledThreadPool(threads));
}
/**
* Creates a new {@link PromiseExecutor} that runs tasks on the given executor service.
* @param service the executor service
* @return the new executor
*/
static PromiseExecutor<?> of(@NotNull ScheduledExecutorService service) {
return new ExecutorServiceImpl(service);
}
/**
* Runs the given task.
* @param task the task
* @return the task
* @throws Exception if scheduling the task failed
*/
T run(@NotNull Runnable task) throws Exception;
/**
* Runs the given task after the given delay.
* @param task the task
* @param delay the delay
* @param unit the time unit
* @return the task
* @throws Exception if scheduling the task failed
*/
T run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) throws Exception;
void cancel(T task);
/**
* Cancels the given task if possible. This may interrupt the task mid-execution.
*
* @param task the task
* @return {@code true} if the task was cancelled. {@code false} if the task was already completed
* or could not be cancelled.
*/
boolean cancel(T task);
}

View File

@@ -24,8 +24,13 @@ class VirtualThreadImpl implements PromiseExecutor<Thread> {
}
@Override
public void cancel(Thread task) {
task.interrupt();
public boolean cancel(Thread task) {
if (task.isAlive()) {
task.interrupt();
return true;
} else {
return false;
}
}
}

View File

@@ -1,8 +1,20 @@
package dev.tommyjs.futur.function;
/**
* Represents an operation that accepts a single input argument and returns no result,
* and may throw an exception. This is a functional interface whose functional method is {@link #accept(Object)}.
*
* @param <T> the type of the input to the operation
*/
@FunctionalInterface
public interface ExceptionalConsumer<T> {
/**
* Performs this operation on the given argument, potentially throwing an exception.
*
* @param value the input argument
* @throws Exception if unable to compute a result
*/
void accept(T value) throws Exception;
}
}

View File

@@ -1,8 +1,22 @@
package dev.tommyjs.futur.function;
/**
* Represents a function that accepts one argument and produces a result,
* and may throw an exception. This is a functional interface whose functional method is {@link #apply(Object)}.
*
* @param <K> the type of the input to the function
* @param <V> the type of the result of the function
*/
@FunctionalInterface
public interface ExceptionalFunction<K, V> {
/**
* Applies this function to the given argument, potentially throwing an exception.
*
* @param value the input argument
* @return the function result
* @throws Exception if unable to compute a result
*/
V apply(K value) throws Exception;
}
}

View File

@@ -1,8 +1,17 @@
package dev.tommyjs.futur.function;
/**
* Represents a runnable task that may throw an exception.
* This is a functional interface whose functional method is {@link #run()}.
*/
@FunctionalInterface
public interface ExceptionalRunnable {
/**
* Performs this runnable task, potentially throwing an exception.
*
* @throws Exception if unable to complete the task
*/
void run() throws Exception;
}
}

View File

@@ -1,8 +1,20 @@
package dev.tommyjs.futur.function;
/**
* Represents a supplier of results that may throw an exception.
* This is a functional interface whose functional method is {@link #get()}.
*
* @param <T> the type of results supplied by this supplier
*/
@FunctionalInterface
public interface ExceptionalSupplier<T> {
/**
* Gets a result, potentially throwing an exception.
*
* @return a result
* @throws Exception if unable to supply a result
*/
T get() throws Exception;
}
}

View File

@@ -3,6 +3,7 @@ package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.util.ConcurrentResultArray;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -24,18 +25,18 @@ public class CompletionJoiner extends PromiseJoiner<Promise<?>, Void, Void, List
}
@Override
protected Void getKey(Promise<?> value) {
protected Void getChildKey(Promise<?> value) {
return null;
}
@Override
protected @NotNull Promise<Void> getPromise(Promise<?> value) {
protected @NotNull Promise<Void> getChildPromise(Promise<?> value) {
//noinspection unchecked
return (Promise<Void>) value;
}
@Override
protected @Nullable Throwable onFinish(int index, Void key, @NotNull PromiseCompletion<Void> res) {
protected @Nullable Throwable onChildComplete(int index, Void key, @NotNull PromiseCompletion<Void> res) {
results.set(index, res);
return null;
}

View File

@@ -3,6 +3,7 @@ package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.util.ConcurrentResultArray;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -27,19 +28,22 @@ public class MappedResultJoiner<K, V> extends PromiseJoiner<Map.Entry<K, Promise
}
@Override
protected K getKey(Map.Entry<K, Promise<V>> entry) {
protected K getChildKey(Map.Entry<K, Promise<V>> entry) {
return entry.getKey();
}
@Override
protected @NotNull Promise<V> getPromise(Map.Entry<K, Promise<V>> entry) {
protected @NotNull Promise<V> getChildPromise(Map.Entry<K, Promise<V>> entry) {
return entry.getValue();
}
@Override
protected @Nullable Throwable onFinish(int index, K key, @NotNull PromiseCompletion<V> res) {
protected @Nullable Throwable onChildComplete(int index, K key, @NotNull PromiseCompletion<V> res) {
if (res.isError()) {
if (exceptionHandler == null) return res.getException();
if (exceptionHandler == null) {
return res.getException();
}
exceptionHandler.accept(key, res.getException());
}
@@ -54,6 +58,7 @@ public class MappedResultJoiner<K, V> extends PromiseJoiner<Map.Entry<K, Promise
for (Map.Entry<K, V> entry : list) {
map.put(entry.getKey(), entry.getValue());
}
return map;
}

View File

@@ -1,6 +1,10 @@
package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.*;
import dev.tommyjs.futur.promise.CompletablePromise;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.util.PromiseUtil;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -8,7 +12,7 @@ import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
abstract class PromiseJoiner<V, K, T, R> {
public abstract class PromiseJoiner<V, K, T, R> {
private final CompletablePromise<R> joined;
@@ -16,15 +20,11 @@ abstract class PromiseJoiner<V, K, T, R> {
this.joined = factory.unresolved();
}
public @NotNull Promise<R> joined() {
return joined;
}
protected abstract K getChildKey(V value);
protected abstract K getKey(V value);
protected abstract @NotNull Promise<T> getChildPromise(V value);
protected abstract @NotNull Promise<T> getPromise(V value);
protected abstract @Nullable Throwable onFinish(int index, K key, @NotNull PromiseCompletion<T> completion);
protected abstract @Nullable Throwable onChildComplete(int index, K key, @NotNull PromiseCompletion<T> completion);
protected abstract R getResult();
@@ -35,18 +35,19 @@ abstract class PromiseJoiner<V, K, T, R> {
int i = 0;
do {
V value = promises.next();
Promise<T> p = getPromise(value);
Promise<T> p = getChildPromise(value);
if (link) {
AbstractPromise.cancelOnFinish(p, joined);
PromiseUtil.cancelOnComplete(joined, p);
}
if (!joined.isCompleted()) {
count.incrementAndGet();
K key = getKey(value);
K key = getChildKey(value);
int index = i++;
p.addListener((res) -> {
Throwable e = onFinish(index, key, res);
p.addAsyncListener(res -> {
Throwable e = onChildComplete(index, key, res);
if (e != null) {
joined.completeExceptionally(e);
} else if (count.decrementAndGet() == 0 && waiting.get()) {
@@ -56,11 +57,17 @@ abstract class PromiseJoiner<V, K, T, R> {
}
} while (promises.hasNext());
count.updateAndGet((v) -> {
if (v == 0) joined.complete(getResult());
else waiting.set(true);
return v;
});
if (!joined.isCompleted()) {
count.updateAndGet(v -> {
if (v == 0) joined.complete(getResult());
else waiting.set(true);
return v;
});
}
}
public @NotNull Promise<R> joined() {
return joined;
}
}

View File

@@ -3,6 +3,7 @@ package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.util.ConcurrentResultArray;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -28,19 +29,22 @@ public class ResultJoiner<T> extends PromiseJoiner<Promise<T>, Void, T, List<T>>
}
@Override
protected Void getKey(Promise<T> value) {
protected Void getChildKey(Promise<T> value) {
return null;
}
@Override
protected @NotNull Promise<T> getPromise(Promise<T> value) {
protected @NotNull Promise<T> getChildPromise(Promise<T> value) {
return value;
}
@Override
protected @Nullable Throwable onFinish(int index, Void key, @NotNull PromiseCompletion<T> res) {
protected @Nullable Throwable onChildComplete(int index, Void key, @NotNull PromiseCompletion<T> res) {
if (res.isError()) {
if (exceptionHandler == null) return res.getException();
if (exceptionHandler == null) {
return res.getException();
}
exceptionHandler.accept(index, res.getException());
}

View File

@@ -16,18 +16,18 @@ public class VoidJoiner extends PromiseJoiner<Promise<?>, Void, Void, Void> {
}
@Override
protected Void getKey(Promise<?> value) {
protected Void getChildKey(Promise<?> value) {
return null;
}
@Override
protected @NotNull Promise<Void> getPromise(Promise<?> value) {
protected @NotNull Promise<Void> getChildPromise(Promise<?> value) {
//noinspection unchecked
return (Promise<Void>) value;
}
@Override
protected @Nullable Throwable onFinish(int index, Void key, @NotNull PromiseCompletion<Void> completion) {
protected @Nullable Throwable onChildComplete(int index, Void key, @NotNull PromiseCompletion<Void> completion) {
return completion.getException();
}

View File

@@ -4,6 +4,7 @@ import dev.tommyjs.futur.function.ExceptionalConsumer;
import dev.tommyjs.futur.function.ExceptionalFunction;
import dev.tommyjs.futur.function.ExceptionalRunnable;
import dev.tommyjs.futur.function.ExceptionalSupplier;
import dev.tommyjs.futur.util.PromiseUtil;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@@ -18,18 +19,6 @@ import java.util.function.Consumer;
public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T> {
public static <V> void propagateResult(Promise<V> from, CompletablePromise<V> to) {
from.addDirectListener(to::complete, to::completeExceptionally);
}
public static void propagateCancel(Promise<?> from, Promise<?> to) {
from.onCancel(to::cancel);
}
public static void cancelOnFinish(Promise<?> toCancel, Promise<?> toFinish) {
toFinish.addDirectListener(_ -> toCancel.cancel());
}
private final AtomicReference<Collection<PromiseListener<T>>> listeners;
private final AtomicReference<PromiseCompletion<T>> completion;
private final CountDownLatch latch;
@@ -40,6 +29,8 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
this.latch = new CountDownLatch(1);
}
public abstract @NotNull AbstractPromiseFactory<FS, FA> getFactory();
private void runCompleter(@NotNull CompletablePromise<?> promise, @NotNull ExceptionalRunnable completer) {
try {
completer.run();
@@ -62,8 +53,6 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
};
}
public abstract @NotNull AbstractPromiseFactory<FS, FA> getFactory();
protected @NotNull Logger getLogger() {
return getFactory().getLogger();
}
@@ -106,7 +95,7 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
@Override
public @NotNull Promise<T> fork() {
CompletablePromise<T> fork = getFactory().unresolved();
propagateResult(this, fork);
PromiseUtil.propagateCompletion(this, fork);
return fork;
}
@@ -139,7 +128,7 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
promise::completeExceptionally
);
propagateCancel(promise, this);
PromiseUtil.propagateCancel(promise, this);
return promise;
}
@@ -151,14 +140,14 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
if (nestedPromise == null) {
promise.complete(null);
} else {
propagateResult(nestedPromise, promise);
propagateCancel(promise, nestedPromise);
PromiseUtil.propagateCompletion(nestedPromise, promise);
PromiseUtil.propagateCancel(promise, nestedPromise);
}
},
promise::completeExceptionally
);
propagateCancel(promise, this);
PromiseUtil.propagateCancel(promise, this);
return promise;
}
@@ -216,7 +205,7 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
promise::completeExceptionally
);
propagateCancel(promise, this);
PromiseUtil.propagateCancel(promise, this);
return promise;
}
@@ -232,7 +221,7 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
promise::completeExceptionally
);
propagateCancel(promise, this);
PromiseUtil.propagateCancel(promise, this);
return promise;
}
@@ -244,14 +233,14 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
if (nestedPromise == null) {
promise.complete(null);
} else {
propagateResult(nestedPromise, promise);
propagateCancel(promise, nestedPromise);
PromiseUtil.propagateCompletion(nestedPromise, promise);
PromiseUtil.propagateCancel(promise, nestedPromise);
}
},
promise::completeExceptionally
);
propagateCancel(promise, this);
PromiseUtil.propagateCancel(promise, this);
return promise;
}
@@ -297,14 +286,6 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
return thenApplyDelayedAsync(_ -> task.get(), delay, unit);
}
@Override
public @NotNull Promise<T> thenPopulateReference(@NotNull AtomicReference<T> reference) {
return thenApplyAsync(result -> {
reference.set(result);
return result;
});
}
@Override
public <V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task) {
CompletablePromise<V> promise = getFactory().unresolved();
@@ -317,7 +298,7 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
promise::completeExceptionally
);
propagateCancel(promise, this);
PromiseUtil.propagateCancel(promise, this);
return promise;
}
@@ -333,7 +314,7 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
promise::completeExceptionally
);
propagateCancel(promise, this);
PromiseUtil.propagateCancel(promise, this);
return promise;
}
@@ -345,17 +326,25 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
if (nestedPromise == null) {
promise.complete(null);
} else {
propagateResult(nestedPromise, promise);
propagateCancel(promise, nestedPromise);
PromiseUtil.propagateCompletion(nestedPromise, promise);
PromiseUtil.propagateCancel(promise, nestedPromise);
}
},
promise::completeExceptionally
);
propagateCancel(promise, this);
PromiseUtil.propagateCancel(promise, this);
return promise;
}
@Override
public @NotNull Promise<T> thenPopulateReference(@NotNull AtomicReference<T> reference) {
return thenApplyAsync(result -> {
reference.set(result);
return result;
});
}
@Override
public @NotNull Promise<Void> erase() {
return thenSupply(() -> null);
@@ -368,7 +357,7 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
@Override
public @NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
return addAsyncListener((res) -> {
return addAsyncListener(res -> {
if (res.isSuccess()) {
if (successListener != null) successListener.accept(res.getResult());
} else {
@@ -384,7 +373,7 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
@Override
public @NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
return addDirectListener((res) -> {
return addDirectListener(res -> {
if (res.isSuccess()) {
if (successListener != null) successListener.accept(res.getResult());
} else {
@@ -414,6 +403,7 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
private void callListenerAsync(PromiseListener<T> listener, PromiseCompletion<T> res) {
try {
getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, res));
} catch (RejectedExecutionException ignored) {
} catch (Exception e) {
getLogger().warn("Exception caught while running promise listener", e);
}
@@ -447,10 +437,9 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
}
@Override
public <E extends Throwable> @NotNull Promise<T> onError(@NotNull Class<E> clazz, @NotNull Consumer<E> listener) {
return onError((e) -> {
if (clazz.isAssignableFrom(e.getClass())) {
getLogger().info("On Error {}", e.getClass());
public <E extends Throwable> @NotNull Promise<T> onError(@NotNull Class<E> type, @NotNull Consumer<E> listener) {
return onError(e -> {
if (type.isAssignableFrom(e.getClass())) {
//noinspection unchecked
listener.accept((E) e);
}
@@ -551,4 +540,7 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
return future;
}
private static class DeferredExecutionException extends ExecutionException {
}
}

View File

@@ -5,8 +5,10 @@ import dev.tommyjs.futur.joiner.CompletionJoiner;
import dev.tommyjs.futur.joiner.MappedResultJoiner;
import dev.tommyjs.futur.joiner.ResultJoiner;
import dev.tommyjs.futur.joiner.VoidJoiner;
import dev.tommyjs.futur.util.PromiseUtil;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@@ -17,17 +19,52 @@ import java.util.stream.Stream;
public abstract class AbstractPromiseFactory<FS, FA> implements PromiseFactory {
public abstract @NotNull Logger getLogger();
public abstract @NotNull PromiseExecutor<FS> getSyncExecutor();
public abstract @NotNull PromiseExecutor<FA> getAsyncExecutor();
@Override
public <T> @NotNull Promise<T> resolve(T value) {
CompletablePromise<T> promise = unresolved();
promise.complete(value);
return promise;
}
@Override
public <T> @NotNull Promise<T> error(@NotNull Throwable error) {
CompletablePromise<T> promise = unresolved();
promise.completeExceptionally(error);
return promise;
}
@Override
public <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) {
return wrap(future, future);
}
private <T> @NotNull Promise<T> wrap(@NotNull CompletionStage<T> completion, Future<T> future) {
CompletablePromise<T> promise = unresolved();
completion.whenComplete((v, e) -> {
if (e != null) {
promise.completeExceptionally(e);
} else {
promise.complete(v);
}
});
promise.onCancel(_ -> future.cancel(true));
return promise;
}
@Override
public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(
@NotNull Promise<K> p1,
@NotNull Promise<V> p2,
boolean dontFork
boolean link
) {
return all(dontFork, p1, p2).thenApply((_) -> new AbstractMap.SimpleImmutableEntry<>(
return all(link, p1, p2).thenApply(_ -> new AbstractMap.SimpleImmutableEntry<>(
Objects.requireNonNull(p1.getCompletion()).getResult(),
Objects.requireNonNull(p2.getCompletion()).getResult()
));
@@ -71,59 +108,26 @@ public abstract class AbstractPromiseFactory<FS, FA> implements PromiseFactory {
}
@Override
public <V> @NotNull Promise<V> race(@NotNull Iterator<Promise<V>> promises, boolean link) {
public <V> @NotNull Promise<V> race(@NotNull Iterator<Promise<V>> promises, boolean cancelLosers) {
CompletablePromise<V> promise = unresolved();
promises.forEachRemaining(p -> {
if (link) AbstractPromise.cancelOnFinish(p, promise);
if (!promise.isCompleted())
AbstractPromise.propagateResult(p, promise);
});
return promise;
}
@Override
public <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises, boolean link) {
return race(promises.iterator(), link);
}
@Override
public <V> @NotNull Promise<V> race(@NotNull Stream<Promise<V>> promises, boolean link) {
return race(promises.iterator(), link);
}
@Override
public <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) {
return wrap(future, future);
}
private <T> @NotNull Promise<T> wrap(@NotNull CompletionStage<T> completion, Future<T> future) {
CompletablePromise<T> promise = unresolved();
completion.whenComplete((v, e) -> {
if (e != null) {
promise.completeExceptionally(e);
} else {
promise.complete(v);
if (cancelLosers) PromiseUtil.cancelOnComplete(promise, p);
if (!promise.isCompleted()) {
PromiseUtil.propagateCompletion(p, promise);
}
});
promise.onCancel(_ -> future.cancel(true));
return promise;
}
@Override
public <T> @NotNull Promise<T> resolve(T value) {
CompletablePromise<T> promise = unresolved();
promise.complete(value);
return promise;
public <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises, boolean cancelLosers) {
return race(promises.iterator(), cancelLosers);
}
@Override
public <T> @NotNull Promise<T> error(@NotNull Throwable error) {
CompletablePromise<T> promise = unresolved();
promise.completeExceptionally(error);
return promise;
public <V> @NotNull Promise<V> race(@NotNull Stream<Promise<V>> promises, boolean cancelLosers) {
return race(promises.iterator(), cancelLosers);
}
}

View File

@@ -1,5 +1,8 @@
package dev.tommyjs.futur.promise;
/**
* A listener for a {@link Promise} that is called when the promise is resolved. This listener is
* executed asynchronously by the {@link PromiseFactory} that created the completed promise.
*/
public interface AsyncPromiseListener<T> extends PromiseListener<T> {
}

View File

@@ -3,10 +3,21 @@ package dev.tommyjs.futur.promise;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* A {@link Promise} that can be completed.
*/
public interface CompletablePromise<T> extends Promise<T> {
/**
* Completes the promise successfully with the given result.
* @param result the result
*/
void complete(@Nullable T result);
/**
* Completes the promise exceptionally with the given exception.
* @param result the exception
*/
void completeExceptionally(@NotNull Throwable result);
}

View File

@@ -1,11 +0,0 @@
package dev.tommyjs.futur.promise;
import java.util.concurrent.ExecutionException;
class DeferredExecutionException extends ExecutionException {
public DeferredExecutionException() {
super();
}
}

View File

@@ -12,171 +12,516 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
* <p>
* A promise represents the result of an asynchronous computation. A promise will transition from a
* pending state to a completed state at most once, but may remain in a pending state indefinitely.
* </p>
*
* <p>
* Promises are created by a {@link PromiseFactory} and support chaining operations to be executed
* upon completion. These operations can be synchronous or asynchronous, and can be composed in a
* variety of ways. Promises can be listened to for completions, either with a result or with an
* exception. Promises can be cancelled, which will propagate a cancellation signal through the
* chain, but a promise can also be forked, which will prevent propagation of cancellations.
* </p>
*
* @see #cancel()
* @see #fork()
*/
public interface Promise<T> {
/**
* Returns the factory that created this promise. This factory can be used to create new promises.
*/
@NotNull PromiseFactory getFactory();
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* when this promise completes.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenRun(@NotNull ExceptionalRunnable task);
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* when this promise completes and will be passed the result of this promise.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenConsume(@NotNull ExceptionalConsumer<T> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* when this promise completes, and will supply a value to the next promise in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenSupply(@NotNull ExceptionalSupplier<V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* when this promise completes, and will apply the specified function to the result of this promise
* in order to supply a value to the next promise in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenApply(@NotNull ExceptionalFunction<T, V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* when this promise completes, and will compose the next promise in the chainfrom the result of
* this promise.
*
* @param task the task to execute
* @return a new promise that completes, once this promise and the promise returned by
* the task are complete, with the result of the task promise
*/
<V> @NotNull Promise<V> thenCompose(@NotNull ExceptionalFunction<T, Promise<V>> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* sync executor of the factory that created this promise, immediately after this promise completes.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenRunSync(@NotNull ExceptionalRunnable task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* sync executor of the factory that created this promise, after the specified delay after this
* promise completes.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* sync executor of the factory that created this promise immediately after this promise completes,
* and will be passed the result of this promise.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenConsumeSync(@NotNull ExceptionalConsumer<T> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* sync executor of the factory that created this promise after the specified delay after this
* promise completes, and will be passed the result of this promise.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenConsumeDelayedSync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* by the sync executor of the factory that created this promise when this promise completes, and
* will supply a value to the next promise in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenSupplySync(@NotNull ExceptionalSupplier<V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the sync
* executor of the factory that created this promise after the specified delay after this promise
* completes, and will supply a value to the next promise in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the sync
* executor of the factory that created this promise immediately after this promise completes, and
* will apply the specified function to the result of this promise in order to supply a value to the
* next promise in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenApplySync(@NotNull ExceptionalFunction<T, V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the sync
* executor of the factory that created this promise after the specified delay after this promise
* completes, and will apply the specified function to the result of this promise in order to supply
* a value to the next promise in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the sync
* executor of the factory that created this promise immediately after this promise completes, and
* will compose the next promise in the chain from the result of this promise.
*
* @param task the task to execute
* @return a new promise that completes, once this promise and the promise returned by the task are
* complete, with the result of the task promise
*/
<V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, Promise<V>> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* async executor of the factory that created this promise, immediately after this promise completes.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenRunAsync(@NotNull ExceptionalRunnable task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* async executor of the factory that created this promise after the specified delay after this
* promise completes.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* async executor of the factory that created this promise immediately after this promise completes,
* and will be passed the result of this promise.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenConsumeAsync(@NotNull ExceptionalConsumer<T> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* async executor of the factory that created this promise after the specified delay after this
* promise completes, and will be passed the result of this promise.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenConsumeDelayedAsync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* async executor of the factory that created this promise immediately after this promise completes,
* and will supply a value to the next promise in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenSupplyAsync(@NotNull ExceptionalSupplier<V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the async
* executor of the factory that created this promise after the specified delay after this promise
* completes, and will supply a value to the next promise in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenSupplyDelayedAsync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit);
@NotNull Promise<T> thenPopulateReference(@NotNull AtomicReference<T> reference);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the async
* executor of the factory that created this promise immediately after this promise completes, and
* will apply the specified function to the result of this promise in order to supply a value to the
* next promise in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the async
* executor of the factory that created this promise after the specified delay after this promise
* completes, and will apply the specified function to the result of this promise in order to supply
* a value to the next promise in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the async
* executor of the factory that created this promise immediately after this promise completes, and
* will compose the next promise in the chain from the result of this promise.
*
* @param task the task to execute
* @return a new promise that completes, once this promise and the promise returned by the task are
* complete, with the result of the task promise
*/
<V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task);
/**
* Adds a listener to this promise that will populate the specified reference with the result of this
* promise upon successful completion.
*
* @param reference the reference to populate
* @return continuation of the promise chain
*/
@NotNull Promise<T> thenPopulateReference(@NotNull AtomicReference<T> reference);
/**
* Returns a promise backed by this promise that will complete with {@code null} if this promise
* completes successfully, or with the exception if this promise completes exceptionally.
*/
@NotNull Promise<Void> erase();
/**
* Logs any exceptions that occur in the promise chain.
*
* @return continuation of the promise chain
*/
default @NotNull Promise<T> logExceptions() {
return logExceptions("Exception caught in promise chain");
}
/**
* Logs any exceptions that occur in the promise chain with the specified message.
*
* @param message the message to log
* @return continuation of the promise chain
*/
@NotNull Promise<T> logExceptions(@NotNull String message);
/**
* @apiNote Direct listeners run on the same thread as the completion.
* Adds a listener to this promise that will be executed immediately when this promise completes,
* on the same thread as the completion call.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
@NotNull Promise<T> addDirectListener(@NotNull PromiseListener<T> listener);
/**
* @apiNote Direct listeners run on the same thread as the completion.
* Adds a listener to this promise that will be executed immediately when this promise completes,
* on the same thread as the completion call. One of {@code successHandler} and {@code errorHandler} will be
* called when the promise completes successfully or exceptionally, respectively.
*
* @param successHandler the function to call on success
* @param errorHandler the function to call on error
* @return continuation of the promise chain
*/
@NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
/**
* @apiNote Async listeners are run in parallel.
* Adds a listener to this promise that will be executed immediately when this promise completes,
* by the async executor of the factory that created this promise.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
@NotNull Promise<T> addAsyncListener(@NotNull AsyncPromiseListener<T> listener);
/**
* @apiNote Same as addAsyncListener.
* Adds a listener to this promise that will be executed immediately when this promise completes.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
default @NotNull Promise<T> addListener(@NotNull AsyncPromiseListener<T> listener) {
return addAsyncListener(listener);
}
/**
* @apiNote Async listeners are run in parallel.
* Adds a listener to this promise that will be executed immediately when this promise completes,
* by the async executor of the factory that created this promise. One of {@code successHandler} and
* {@code errorHandler} will be called when the promise completes successfully or exceptionally, respectively.
*
* @param successHandler the function to call on success
* @param errorHandler the function to call on error
*/
@NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
/**
* Adds a listener to this promise that will be called if the promise is completed successfully.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
@NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener);
/**
* Adds a listener to this promise that will be called if the promise is completed exceptionally.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
@NotNull Promise<T> onError(@NotNull Consumer<Throwable> listener);
<E extends Throwable> @NotNull Promise<T> onError(@NotNull Class<E> clazz, @NotNull Consumer<E> listener);
/**
* Adds a listener to this promise that will be called if the promise is completed exceptionally
* with an exception of the specified type.
*
* @param listener the listener to add
* @param type the class of the exception to listen for
* @return continuation of the promise chain
*/
<E extends Throwable> @NotNull Promise<T> onError(@NotNull Class<E> type, @NotNull Consumer<E> listener);
/**
* Adds a listener to this promise that will be called if the promise is cancelled.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
@NotNull Promise<T> onCancel(@NotNull Consumer<CancellationException> listener);
/**
* Cancels the promise with a TimeoutException after the specified time.
* Cancels the promise if not already completed after the specified timeout. This will result in
* an exceptional completion with a {@link CancellationException}.
*
* @param time the amount of time to wait before cancelling the promise
* @param unit the time unit of the delay
* @return continuation of the promise chain
*/
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);
/**
* Cancels the promise with a TimeoutException after the specified time.
* Cancels the promise if not already completed after the specified timeout. This will result in
* an exceptional completion with a {@link CancellationException}.
* @param ms the amount of time to wait before cancelling the promise (in milliseconds)
* @return continuation of the promise chain
*/
default @NotNull Promise<T> timeout(long ms) {
return timeout(ms, TimeUnit.MILLISECONDS);
}
/**
* Completes the promise exceptionally with a TimeoutException after the specified time.
* Times out the promise if not already completed after the specified timeout. This will result
* in an exceptional completion with a {@link TimeoutException}. This will not result in the
* promise being cancelled.
*
* @param time the amount of time to wait before timing out the promise
* @param unit the time unit of the delay
* @return continuation of the promise chain
*/
@NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit);
/**
* Completes the promise exceptionally with a TimeoutException after the specified time.
* Times out the promise if not already completed after the specified timeout. This will result
* in an exceptional completion with a {@link TimeoutException}. This will not result in the
* promise being cancelled.
* @param ms the amount of time to wait before timing out the promise (in milliseconds)
* @return continuation of the promise chain
*/
default @NotNull Promise<T> maxWaitTime(long ms) {
return maxWaitTime(ms, TimeUnit.MILLISECONDS);
}
/**
* Cancels the promise if not already completed after the specified timeout. This will result in
* an exceptional completion with the specified cancellation.
* @param exception the cancellation exception to complete the promise with
*/
void cancel(@NotNull CancellationException exception);
/**
* Cancels the promise if not already completed after the specified timeout. This will result in
* an exceptional completion with a {@link CancellationException}.
* @param reason the reason for the cancellation
*/
default void cancel(@NotNull String reason) {
cancel(new CancellationException(reason));
};
}
/**
* Cancels the promise if not already completed after the specified timeout. This will result in
* an exceptional completion with a {@link CancellationException}.
*/
default void cancel() {
cancel(new CancellationException());
}
/**
* Waits if necessary for this promise to complete, and then returns its result.
* @throws CancellationException if the computation was cancelled
* @throws CompletionException if this promise completed exceptionally
* Blocks until this promise has completed, and then returns its result.
* @throws CancellationException if the promise was cancelled
* @throws CompletionException if the promise completed exceptionally
* @return the result of the promise
*/
@Blocking
T await();
/**
* Waits if necessary for this promise to complete, and then returns its result.
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if this promise completed exceptionally
* @throws InterruptedException if the current thread was interrupted while waiting
* Blocks until this promise has completed, and then returns its result.
* @throws CancellationException if the promise was cancelled
* @throws ExecutionException if the promise completed exceptionally
* @throws InterruptedException if the current thread was interrupted while waiting
* @return the result of the promise
*/
@Blocking
T get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for this future to complete, and then returns its result, if available.
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if this promise completed exceptionally
* @throws InterruptedException if the current thread was interrupted while waiting
* @throws TimeoutException if the wait timed out
* Blocks until either this promise has completed or the timeout has been exceeded, and then
* returns its result, if available.
* @throws CancellationException if the promise was cancelled
* @throws ExecutionException if the promise completed exceptionally
* @throws InterruptedException if the current thread was interrupted while waiting
* @throws TimeoutException if the timeout was exceeded
* @return the result of the promise
*/
@Blocking
T get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
/**
* Stops this promise from propagating up cancellations.
* Returns a new promise, backed by this promise, that will not propagate cancellations. This means
* that if the returned promise is cancelled, the cancellation will not be propagated to this promise
* or any other promises that share this promise as a parent.
* @return continuation the promise chain that will not propagate cancellations
*/
@NotNull Promise<T> fork();
/**
* Returns the current completion state of this promise. If the promise has not completed, this method
* will return {@code null}.
* @return the completion state of this promise, or {@code null} if the promise has not completed
*/
@Nullable PromiseCompletion<T> getCompletion();
/**
* Returns whether this promise has completed.
* @return {@code true} if the promise has completed, {@code false} otherwise
*/
boolean isCompleted();
/**
* Converts this promise to a {@link CompletableFuture}. The returned future will complete with the
* result of this promise when it completes.
* @return a future that will complete with the result of this promise
*/
@NotNull CompletableFuture<T> toFuture();
}

View File

@@ -5,39 +5,78 @@ import org.jetbrains.annotations.Nullable;
import java.util.concurrent.CancellationException;
/**
* Represents the result of a {@link Promise}, containing either an optional result or an exception.
*/
public class PromiseCompletion<T> {
private @Nullable T result;
private @Nullable Throwable exception;
/**
* Creates a new successful completion.
* @param result the result
*/
public PromiseCompletion(@Nullable T result) {
this.result = result;
}
/**
* Creates a new exceptional completion.
* @param exception the exception
*/
public PromiseCompletion(@NotNull Throwable exception) {
this.exception = exception;
}
/**
* Creates a new successful completion with a result of {@code null}.
*/
public PromiseCompletion() {
this.result = null;
this((T) null);
}
/**
* Checks if the completion was successful.
* @return {@code true} if the completion was successful, {@code false} otherwise
*/
public boolean isSuccess() {
return exception == null;
}
/**
* Checks if the completion was exceptional.
* @return {@code true} if the completion was exceptional, {@code false} otherwise
*/
public boolean isError() {
return exception != null;
}
public boolean wasCanceled() {
/**
* Checks if the completion was cancelled.
* @return {@code true} if the completion was cancelled, {@code false} otherwise
*/
public boolean wasCancelled() {
return exception instanceof CancellationException;
}
@Deprecated
public boolean wasCanceled() {
return wasCancelled();
}
/**
* Gets the result of the completion.
* @return the result, or {@code null} if the completion was exceptional
*/
public @Nullable T getResult() {
return result;
}
/**
* Gets the exception of the completion.
* @return the exception, or {@code null} if the completion was successful
*/
public @Nullable Throwable getException() {
return exception;
}

View File

@@ -1,6 +1,7 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.util.PromiseUtil;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@@ -13,181 +14,491 @@ import java.util.stream.Stream;
public interface PromiseFactory {
static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor<?> syncExecutor, @NotNull PromiseExecutor<?> asyncExecutor) {
/**
* Creates a new {@link PromiseFactory} with the given logger and executors.
* @param logger the logger
* @param syncExecutor the synchronous executor
* @param asyncExecutor the asynchronous executor
* @return the new promise factory
*/
static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor<?> syncExecutor,
@NotNull PromiseExecutor<?> asyncExecutor) {
return new PromiseFactoryImpl<>(logger, syncExecutor, asyncExecutor);
}
/**
* Creates a new {@link PromiseFactory} with the given logger and dual executor.
* @param logger the logger
* @param executor the executor
* @return the new promise factory
*/
static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor<?> executor) {
return new PromiseFactoryImpl<>(logger, executor, executor);
}
/**
* Creates a new {@link PromiseFactory} with the given logger and executor.
* @param logger the logger
* @param executor the executor
* @return the new promise factory
*/
static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull ScheduledExecutorService executor) {
return of(logger, PromiseExecutor.of(executor));
}
private static int size(@NotNull Stream<?> stream) {
long estimate = stream.spliterator().estimateSize();
return estimate == Long.MAX_VALUE ? 10 : (int) estimate;
}
@NotNull Logger getLogger();
/**
* Creates a new uncompleted promise.
* @return the new promise
*/
<T> @NotNull CompletablePromise<T> unresolved();
<K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2, boolean cancelOnError);
default <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return combine(p1, p2, true);
}
<K, V> @NotNull Promise<Map<K, V>> combine(
@NotNull Map<K, Promise<V>> promises,
@Nullable BiConsumer<K, Throwable> exceptionHandler,
boolean propagateCancel
);
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @NotNull BiConsumer<K, Throwable> exceptionHandler) {
return combine(promises, exceptionHandler, true);
}
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, boolean cancelOnError) {
return combine(promises, null, cancelOnError);
}
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
return combine(promises, null, true);
}
<V> @NotNull Promise<List<V>> combine(
@NotNull Iterator<Promise<V>> promises, int expectedSize,
@Nullable BiConsumer<Integer, Throwable> exceptionHandler, boolean propagateCancel
);
default <V> @NotNull Promise<List<V>> combine(
@NotNull Collection<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler,
boolean propagateCancel
) {
return combine(promises.iterator(), promises.size(), exceptionHandler, propagateCancel);
}
default <V> @NotNull Promise<List<V>> combine(
@NotNull Collection<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler
) {
return combine(promises.iterator(), promises.size(), exceptionHandler, true);
}
default <V> @NotNull Promise<List<V>> combine(@NotNull Collection<Promise<V>> promises, boolean cancelOnError) {
return combine(promises.iterator(), promises.size(), null, cancelOnError);
}
default <V> @NotNull Promise<List<V>> combine(@NotNull Collection<Promise<V>> promises) {
return combine(promises.iterator(), promises.size(), null, true);
}
default <V> @NotNull Promise<List<V>> combine(
@NotNull Stream<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler,
boolean propagateCancel
) {
return combine(promises.iterator(), size(promises), exceptionHandler, propagateCancel);
}
default <V> @NotNull Promise<List<V>> combine(
@NotNull Stream<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler
) {
return combine(promises.iterator(), size(promises), exceptionHandler, true);
}
default <V> @NotNull Promise<List<V>> combine(@NotNull Stream<Promise<V>> promises, boolean cancelOnError) {
return combine(promises.iterator(), size(promises), null, cancelOnError);
}
default <V> @NotNull Promise<List<V>> combine(@NotNull Stream<Promise<V>> promises) {
return combine(promises.iterator(), size(promises), null, true);
}
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(
@NotNull Iterator<Promise<?>> promises, int estimatedSize, boolean propagateCancel);
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Collection<Promise<?>> promises, boolean propagateCancel) {
return allSettled(promises.iterator(), promises.size(), propagateCancel);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Collection<Promise<?>> promises) {
return allSettled(promises.iterator(), promises.size(), true);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Stream<Promise<?>> promises, boolean propagateCancel) {
return allSettled(promises.iterator(), size(promises), propagateCancel);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Stream<Promise<?>> promises) {
return allSettled(promises.iterator(), size(promises), true);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Promise<?>... promises) {
return allSettled(Arrays.asList(promises).iterator(), promises.length, propagateCancel);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promises) {
return allSettled(Arrays.asList(promises).iterator(), promises.length, true);
}
@NotNull Promise<Void> all(@NotNull Iterator<Promise<?>> promises, boolean cancelAllOnError);
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promises, boolean cancelAllOnError) {
return all(promises.iterator(), cancelAllOnError);
}
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promises) {
return all(promises.iterator(), true);
}
default @NotNull Promise<Void> all(@NotNull Stream<Promise<?>> promises, boolean cancelAllOnError) {
return all(promises.iterator(), cancelAllOnError);
}
default @NotNull Promise<Void> all(@NotNull Stream<Promise<?>> promises) {
return all(promises.iterator(), true);
}
default @NotNull Promise<Void> all(boolean cancelAllOnError, @NotNull Promise<?>... promises) {
return all(Arrays.asList(promises).iterator(), cancelAllOnError);
}
default @NotNull Promise<Void> all(@NotNull Promise<?>... promises) {
return all(Arrays.asList(promises).iterator(), true);
}
<V> @NotNull Promise<V> race(@NotNull Iterator<Promise<V>> promises, boolean cancelLosers);
default <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises, boolean cancelLosers) {
return race(promises.iterator(), cancelLosers);
}
default <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises) {
return race(promises.iterator(), true);
}
default <V> @NotNull Promise<V> race(@NotNull Stream<Promise<V>> promises, boolean cancelLosers) {
return race(promises.iterator(), cancelLosers);
}
default <V> @NotNull Promise<V> race(@NotNull Stream<Promise<V>> promises) {
return race(promises.iterator(), true);
}
<T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future);
/**
* Creates a new promise, completed with the given value.
* @param value the value to complete the promise with
* @return the new promise
*/
<T> @NotNull Promise<T> resolve(T value);
/**
* Creates a new promise, completed with {@code null}.
* @apiNote This method is often useful for starting promise chains.
* @return the new promise
*/
default @NotNull Promise<Void> start() {
return resolve(null);
}
<T> @NotNull Promise<T> resolve(T value);
/**
* Creates a new promise, completed exceptionally with the given error.
* @param error the error to complete the promise with
* @return the new promise
*/
<T> @NotNull Promise<T> error(@NotNull Throwable error);
/**
* Creates a new promise backed by the given future. The promise will be completed upon completion
* of the future.
* @param future the future to wrap
* @return the new promise
*/
<T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future);
/**
* Combines two promises into a single promise that completes when both promises complete. If
* {@code link} is {@code true} and either input promise completes exceptionally (including
* cancellation), the other promise will be cancelled and the output promise will complete
* exceptionally.
* @param p1 the first promise
* @param p2 the second promise
* @param link whether to cancel the other promise on error
* @return the combined promise
*/
<K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2,
boolean link);
/**
* Combines two promises into a single promise that completes when both promises complete. If either
* input promise completes exceptionally, the other promise will be cancelled and the output promise
* will complete exceptionally.
* @param p1 the first promise
* @param p2 the second promise
* @return the combined promise
*/
default <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return combine(p1, p2, true);
}
/**
* Combines key-value pairs of inputs to promises into a single promise that completes with key-value
* pairs of inputs to outputs when all promises complete. If {@code link} is {@code true}
* and any promise completes exceptionally, the other promises will be cancelled and the output
* promise will complete exceptionally. If an exception handler is present, promises that fail
* will not cause this behaviour, and instead the exception handler will be called with the key
* that failed and the exception.
* @param promises the input promises
* @param exceptionHandler the exception handler
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
<K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises,
@Nullable BiConsumer<K, Throwable> exceptionHandler,
boolean link);
/**
* Combines key-value pairs of inputs to promises into a single promise that completes with key-value
* pairs of inputs to outputs when all promises complete. If any promise completes exceptionally,
* the exception handler will be called with the key that failed and the exception. The output promise
* will always complete successfully regardless of whether input promises fail.
* @param promises the input promises
* @param exceptionHandler the exception handler
* @return the combined promise
*/
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises,
@NotNull BiConsumer<K, Throwable> exceptionHandler) {
return combine(promises, exceptionHandler, true);
}
/**
* Combines key-value pairs of inputs to promises into a single promise that completes with key-value
* pairs of inputs to outputs when all promises complete. If {@code link} is {@code true}
* and any promise completes exceptionally, the other promises will be cancelled and the output
* promise will complete exceptionally.
* @param promises the input promises
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, boolean link) {
return combine(promises, null, link);
}
/**
* Combines key-value pairs of inputs to promises into a single promise that completes with key-value
* pairs of inputs to outputs when all promises complete. If any promise completes exceptionally,
* the output promise will complete exceptionally.
* @param promises the input promises
* @return the combined promise
*/
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
return combine(promises, null, true);
}
/**
* Combines an iterator of promises into a single promise that completes with a list of results when all
* promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all
* other promises will be cancelled and the output promise will complete exceptionally. If an exception
* handler is present, promises that fail will not cause this behaviour, and instead the exception
* handler will be called with the index that failed and the exception.
* @param promises the input promises
* @param exceptionHandler the exception handler
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
<V> @NotNull Promise<List<V>> combine(@NotNull Iterator<Promise<V>> promises, int expectedSize,
@Nullable BiConsumer<Integer, Throwable> exceptionHandler,
boolean link);
/**
* Combines a collection of promises into a single promise that completes with a list of results when all
* promises complete. If any promise completes exceptionally, the exception handler will be called with
* the index that failed and the exception. The output promise will always complete successfully regardless
* of whether input promises fail.
* @param promises the input promises
* @param exceptionHandler the exception handler
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Collection<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler,
boolean link) {
return combine(promises.iterator(), promises.size(), exceptionHandler, link);
}
/**
* Combines a collection of promises into a single promise that completes with a list of results when all
* promises complete. If any promise completes exceptionally, the exception handler will be called with
* the index that failed and the exception. The output promise will always complete successfully regardless
* of whether input promises fail.
* @param promises the input promises
* @param exceptionHandler the exception handler
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Collection<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler) {
return combine(promises.iterator(), promises.size(), exceptionHandler, true);
}
/**
* Combines a collection of promises into a single promise that completes with a list of results when all
* promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all
* other promises will be cancelled and the output promise will complete exceptionally.
* @param promises the input promises
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Collection<Promise<V>> promises, boolean link) {
return combine(promises.iterator(), promises.size(), null, link);
}
/**
* Combines a collection of promises into a single promise that completes with a list of results when all
* promises complete. If any promise completes exceptionally, the output promise will complete exceptionally.
* @param promises the input promises
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Collection<Promise<V>> promises) {
return combine(promises.iterator(), promises.size(), null, true);
}
/**
* Combines a stream of promises into a single promise that completes with a list of results when all
* promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all
* other promises will be cancelled and the output promise will complete exceptionally. If an exception
* handler is present, promises that fail will not cause this behaviour, and instead the exception
* handler will be called with the index that failed and the exception.
* @param promises the input promises
* @param exceptionHandler the exception handler
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Stream<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler,
boolean link) {
return combine(promises.iterator(), PromiseUtil.estimateSize(promises), exceptionHandler, link);
}
/**
* Combines a stream of promises into a single promise that completes with a list of results when all
* promises complete. If any promise completes exceptionally, the exception handler will be called with
* the index that failed and the exception. The output promise will always complete successfully regardless
* of whether input promises fail.
* @param promises the input promises
* @param exceptionHandler the exception handler
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Stream<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler) {
return combine(promises.iterator(), PromiseUtil.estimateSize(promises), exceptionHandler, true);
}
/**
* Combines a stream of promises into a single promise that completes with a list of results when all
* promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all
* other promises will be cancelled and the output promise will complete exceptionally.
* @param promises the input promises
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Stream<Promise<V>> promises, boolean link) {
return combine(promises.iterator(), PromiseUtil.estimateSize(promises), null, link);
}
/**
* Combines a stream of promises into a single promise that completes with a list of results when all
* promises complete. If any promise completes exceptionally, the output promise will complete exceptionally.
* @param promises the input promises
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Stream<Promise<V>> promises) {
return combine(promises.iterator(), PromiseUtil.estimateSize(promises), null, true);
}
/**
* Combines an iterator of promises into a single promise that completes with a list of results when all
* promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all
* other promises will be cancelled. The output promise will always complete successfully regardless
* of whether input promises fail.
* @param promises the input promises
* @param expectedSize the expected size of the list (used for optimization)
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterator<Promise<?>> promises,
int expectedSize, boolean link);
/**
* Combines a collection of promises into a single promise that completes with a list of results when all
* promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all
* other promises will be cancelled. The output promise will always complete successfully regardless
* of whether input promises fail.
* @param promises the input promises
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Collection<Promise<?>> promises,
boolean link) {
return allSettled(promises.iterator(), promises.size(), link);
}
/**
* Combines a collection of promises into a single promise that completes with a list of results when all
* promises complete. If any promise completes exceptionally, all other promises will be cancelled.
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Collection<Promise<?>> promises) {
return allSettled(promises.iterator(), promises.size(), true);
}
/**
* Combines a stream of promises into a single promise that completes with a list of results when all
* promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all
* other promises will be cancelled. The output promise will always complete successfully regardless
* of whether input promises fail.
* @param promises the input promises
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Stream<Promise<?>> promises,
boolean link) {
return allSettled(promises.iterator(), PromiseUtil.estimateSize(promises), link);
}
/**
* Combines a stream of promises into a single promise that completes with a list of results when all
* promises complete. If any promise completes exceptionally, all other promises will be cancelled.
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Stream<Promise<?>> promises) {
return allSettled(promises.iterator(), PromiseUtil.estimateSize(promises), true);
}
/**
* Combines an array of promises into a single promise that completes with a list of results when all
* promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all
* other promises will be cancelled. The output promise will always complete successfully regardless
* of whether input promises fail.
* @param link whether to cancel all promises on any exceptional completions
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean link,
@NotNull Promise<?>... promises) {
return allSettled(Arrays.asList(promises).iterator(), promises.length, link);
}
/**
* Combines an array of promises into a single promise that completes with a list of results when all
* promises complete. If any promise completes exceptionally, all other promises will be cancelled.
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promises) {
return allSettled(Arrays.asList(promises).iterator(), promises.length, true);
}
/**
* Combines an iterator of promises into a single promise that completes when all promises complete.
* If {@code link} is {@code true} and any promise completes exceptionally, all other promises will
* be cancelled and the output promise will complete exceptionally.
* @param promises the input promises
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
@NotNull Promise<Void> all(@NotNull Iterator<Promise<?>> promises, boolean link);
/**
* Combines an iterable of promises into a single promise that completes when all promises complete.
* If {@code link} is {@code true} and any promise completes exceptionally, all other promises will
* be cancelled and the output promise will complete exceptionally.
* @param promises the input promises
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promises, boolean link) {
return all(promises.iterator(), link);
}
/**
* Combines an iterable of promises into a single promise that completes when all promises complete.
* If any promise completes exceptionally, all other promises will be cancelled and the output
* promise will complete exceptionally.
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promises) {
return all(promises.iterator(), true);
}
/**
* Combines a stream of promises into a single promise that completes when all promises complete.
* If {@code link} is {@code true} and any promise completes exceptionally, all other promises will
* be cancelled and the output promise will complete exceptionally.
* @param promises the input promises
* @param link whether to cancel all promises on any exceptional completions
* @return the combined promise
*/
default @NotNull Promise<Void> all(@NotNull Stream<Promise<?>> promises, boolean link) {
return all(promises.iterator(), link);
}
/**
* Combines a stream of promises into a single promise that completes when all promises complete.
* If any promise completes exceptionally, all other promises will be cancelled and the output
* promise willcomplete exceptionally.
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<Void> all(@NotNull Stream<Promise<?>> promises) {
return all(promises.iterator(), true);
}
/**
* Combines an array of promises into a single promise that completes when all promises complete.
* If {@code link} is {@code true} and any promise completes exceptionally, all other promises will
* be cancelled
* and the output promise will complete exceptionally.
* @param link whether to cancel all promises on any exceptional completions
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<Void> all(boolean link, @NotNull Promise<?>... promises) {
return all(Arrays.asList(promises).iterator(), link);
}
/**
* Combines an array of promises into a single promise that completes when all promises complete.
* If any promise completes exceptionally, all other promises will be cancelled and the output
* promise will complete exceptionally.
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<Void> all(@NotNull Promise<?>... promises) {
return all(Arrays.asList(promises).iterator(), true);
}
/**
* Combines an iterator of promises into a single promise that completes when the first promise
* completes (successfully or exceptionally). If {@code cancelLosers} is {@code true}, all other
* promises will be cancelled when the first promise
* completes.
* @param promises the input promises
* @param cancelLosers whether to cancel the other promises when the first completes
* @return the combined promise
*/
<V> @NotNull Promise<V> race(@NotNull Iterator<Promise<V>> promises, boolean cancelLosers);
/**
* Combines an iterable of promises into a single promise that completes when the first promise
* completes (successfully or exceptionally). All other promises will be cancelled when the first
* promise completes.
* @param promises the input promises
* @return the combined promise
*/
default <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises, boolean cancelLosers) {
return race(promises.iterator(), cancelLosers);
}
/**
* Combines an iterable of promises into a single promise that completes when the first promise
* completes (successfully or exceptionally). All other promises will be cancelled when the first
* promise completes.
* @param promises the input promises
*/
default <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises) {
return race(promises.iterator(), true);
}
/**
* Combines a stream of promises into a single promise that completes when the first promise
* completes (successfully or exceptionally). If {@code cancelLosers} is {@code true}, all other
* promises will be cancelled when the first promise completes.
* @param promises the input promises
* @param cancelLosers whether to cancel the other promises when the first completes
* @return the combined promise
*/
default <V> @NotNull Promise<V> race(@NotNull Stream<Promise<V>> promises, boolean cancelLosers) {
return race(promises.iterator(), cancelLosers);
}
/**
* Combines a stream of promises into a single promise that completes when the first promise
* completes (successfully or exceptionally). All other promises will be cancelled when the first
* promise completes.
* @param promises the input promises
* @return the combined promise
*/
default <V> @NotNull Promise<V> race(@NotNull Stream<Promise<V>> promises) {
return race(promises.iterator(), true);
}
}

View File

@@ -22,7 +22,7 @@ public class PromiseFactoryImpl<FS, FA> extends AbstractPromiseFactory<FS, FA> {
@Override
public @NotNull <T> CompletablePromise<T> unresolved() {
return new PromiseImpl<>(this);
return new PromiseImpl<>();
}
@Override
@@ -40,4 +40,13 @@ public class PromiseFactoryImpl<FS, FA> extends AbstractPromiseFactory<FS, FA> {
return asyncExecutor;
}
public class PromiseImpl<T> extends AbstractPromise<T, FS, FA> {
@Override
public @NotNull AbstractPromiseFactory<FS, FA> getFactory() {
return PromiseFactoryImpl.this;
}
}
}

View File

@@ -1,18 +0,0 @@
package dev.tommyjs.futur.promise;
import org.jetbrains.annotations.NotNull;
public class PromiseImpl<T, FS, FA> extends AbstractPromise<T, FS, FA> {
private final @NotNull AbstractPromiseFactory<FS, FA> factory;
public PromiseImpl(@NotNull AbstractPromiseFactory<FS, FA> factory) {
this.factory = factory;
}
@Override
public @NotNull AbstractPromiseFactory<FS, FA> getFactory() {
return factory;
}
}

View File

@@ -2,8 +2,15 @@ package dev.tommyjs.futur.promise;
import org.jetbrains.annotations.NotNull;
/**
* A listener for a {@link Promise} that is called when the promise is resolved.
*/
public interface PromiseListener<T> {
void handle(@NotNull PromiseCompletion<T> ctx);
/**
* Handles the completion of the promise.
* @param completion the promise completion
*/
void handle(@NotNull PromiseCompletion<T> completion);
}

View File

@@ -1,4 +1,4 @@
package dev.tommyjs.futur.joiner;
package dev.tommyjs.futur.util;
import org.jetbrains.annotations.NotNull;
@@ -6,7 +6,10 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
class ConcurrentResultArray<T> {
public class ConcurrentResultArray<T> {
private static final float RESIZE_THRESHOLD = 0.75F;
private static final float RESIZE_FACTOR = 1.2F;
private final AtomicReference<T[]> ref;
@@ -17,8 +20,9 @@ class ConcurrentResultArray<T> {
public void set(int index, T element) {
ref.updateAndGet(array -> {
if (array.length <= index)
return Arrays.copyOf(array, index + 6);
if (array.length * RESIZE_THRESHOLD <= index) {
array = Arrays.copyOf(array, (int) (array.length * RESIZE_FACTOR));
}
array[index] = element;
return array;

View File

@@ -0,0 +1,48 @@
package dev.tommyjs.futur.util;
import dev.tommyjs.futur.promise.CompletablePromise;
import dev.tommyjs.futur.promise.Promise;
import org.jetbrains.annotations.NotNull;
import java.util.stream.Stream;
public class PromiseUtil {
/**
* Propagates the completion, once completed, of the given promise to the given promise.
* @param from the promise to propagate the completion from
* @param to the completable promise to propagate the completion to
*/
public static <V> void propagateCompletion(@NotNull Promise<V> from, @NotNull CompletablePromise<V> to) {
from.addDirectListener(to::complete, to::completeExceptionally);
}
/**
* Propagates the cancellation, once cancelled, of the given promise to the given promise.
* @param from the promise to propagate the cancellation from
* @param to the promise to propagate the cancellation to
*/
public static void propagateCancel(@NotNull Promise<?> from, @NotNull Promise<?> to) {
from.onCancel(to::cancel);
}
/**
* Cancels the given promise once the given promise is completed.
* @param from the promise to propagate the completion from
* @param to the promise to cancel upon completion
*/
public static void cancelOnComplete(@NotNull Promise<?> from, @NotNull Promise<?> to) {
from.addDirectListener(_ -> to.cancel());
}
/**
* Estimates the size of the given stream.
* @param stream the stream
* @return the estimated size
*/
public static int estimateSize(@NotNull Stream<?> stream) {
long estimate = stream.spliterator().estimateSize();
return estimate == Long.MAX_VALUE ? 10 : (int) estimate;
}
}