From 4236adbd9e64fe50c318c72fd08e1182d9b22a1c Mon Sep 17 00:00:00 2001 From: tommyskeff Date: Mon, 6 Jan 2025 17:53:33 +0000 Subject: [PATCH] documentation and small changes - added docs for `Promise` and `PromiseFactory` - removed outdated README docs - moved some common utilities to `PromiseUtil` - improved efficiency of result array resizing - added cancellation result to promise executors - changed visibility of `PromiseJoiner` to public, and made some method names more verbose - inlined `DeferredExecutionException` to inside `AbstractPromise` - inlined default promise implementation to inner class in the factory - removed necessity for base factories to provide a logger --- README.md | 123 +--- build.gradle | 3 +- .../futur/executor/ExecutorServiceImpl.java | 4 +- .../futur/executor/PromiseExecutor.java | 41 +- .../futur/executor/VirtualThreadImpl.java | 9 +- .../futur/function/ExceptionalConsumer.java | 14 +- .../futur/function/ExceptionalFunction.java | 16 +- .../futur/function/ExceptionalRunnable.java | 11 +- .../futur/function/ExceptionalSupplier.java | 14 +- .../futur/joiner/CompletionJoiner.java | 7 +- .../futur/joiner/MappedResultJoiner.java | 13 +- .../tommyjs/futur/joiner/PromiseJoiner.java | 45 +- .../tommyjs/futur/joiner/ResultJoiner.java | 12 +- .../dev/tommyjs/futur/joiner/VoidJoiner.java | 6 +- .../futur/promise/AbstractPromise.java | 78 +-- .../futur/promise/AbstractPromiseFactory.java | 90 +-- .../futur/promise/AsyncPromiseListener.java | 5 +- .../futur/promise/CompletablePromise.java | 11 + .../promise/DeferredExecutionException.java | 11 - .../dev/tommyjs/futur/promise/Promise.java | 397 ++++++++++- .../futur/promise/PromiseCompletion.java | 43 +- .../tommyjs/futur/promise/PromiseFactory.java | 625 +++++++++++++----- .../futur/promise/PromiseFactoryImpl.java | 11 +- .../tommyjs/futur/promise/PromiseImpl.java | 18 - .../futur/promise/PromiseListener.java | 9 +- .../ConcurrentResultArray.java | 12 +- .../dev/tommyjs/futur/util/PromiseUtil.java | 48 ++ 27 files changed, 1210 insertions(+), 466 deletions(-) delete mode 100644 futur-api/src/main/java/dev/tommyjs/futur/promise/DeferredExecutionException.java delete mode 100644 futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseImpl.java rename futur-api/src/main/java/dev/tommyjs/futur/{joiner => util}/ConcurrentResultArray.java (64%) create mode 100644 futur-api/src/main/java/dev/tommyjs/futur/util/PromiseUtil.java diff --git a/README.md b/README.md index 80ea3a6..1b09a96 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,8 @@ repositories { } dependencies { - compile 'dev.tommyjs:futur-api:2.1.3' - compile 'dev.tommyjs:futur-reactor:2.1.3' + compile 'dev.tommyjs:futur-api:2.4.0' + compile 'dev.tommyjs:futur-lazy:2.4.0' } ``` ### Gradle DSL @@ -26,8 +26,8 @@ repositories { } dependencies { - implementation("dev.tommyjs:futur-api:2.1.3") - implementation("dev.tommyjs:futur-reactor:2.1.3") + implementation("dev.tommyjs:futur-api:2.4.0") + implementation("dev.tommyjs:futur-lazy:2.4.0") } ``` ### Maven @@ -43,121 +43,12 @@ dependencies { dev.tommyjs futur-api - 2.1.3 + 2.4.0 dev.tommyjs - futur-reactor - 2.1.3 + futur-lazy + 2.4.0 ``` - - -## Getting started -Futur4J uses an underlying `Scheduler` instance to power both synchronous and asynchronous task execution. - -The library was originally built for use in Minecraft servers, and therefore has native support for a "main" thread with can be seamlessly switched to in a `Promise` chain (read more below). - -It is recommended, but not required, to manually create a `Scheduler`. If this is not done, a single-threaded default `Scheduler` will be used. -```java -// check if scheduler has already been loaded -if (!Schedulers.isLoaded()) { - Scheduler scheduler = ThreadPoolScheduler.create(6); // create a scheduler using an underlying thread pool (6 threads) - Schedulers.setScheduler(scheduler); // initialize the scheduling -} -``` - -The `futur-standalone` module has two scheduler implementations available. The `ExclusiveThreadPoolScheduler` operates one thread pool, and throws an exception when sync tasks are attempted to be executed. -The `ThreadPoolScheduler` uses a single-threaded "main" pool and a multi-threaded async pool. - -For Minecraft, Bukkit, Velocity and BungeeCord support is coming soon. Feel free to make PRs for other external library support as modules. - -### Using the scheduler -You can invoke tasks using the synchronous and asynchronous scheduler `Scheduler` methods. It is worth noting that exceptions in these tasks are swallowed silently, and it is therefore better to -use promise chains, or use the `Schedulers` utility class, which will wrap tasks in a try-catch and log to the standard error. - -```java -scheduler.runDelayedSync(() -> { - System.out.println("I was scheduled to execute 10 seconds later!"); -}, 10L, TimeUnit.SECONDS); - -Schedulers.runAsync(() -> { - throw new RuntimeException("I will be caught by the exception wrapper in the Schedulers class!"); -}); -``` - -### Reacting to task completions -You may have recieved some form of "future" instance (a task that will complete at an unspecified time in the future). This may be a future4j-native `Promise` instance, or another form of future. In the case of the latter, you may obtain a `Promise` - instance through the various available wrappers (e.g. `ReactorTransformer` for use with reactor core). - -You can then add listeners to the `Promise`, which will be executed asynchronously on promise "completion". A promise is deemed completed once the task has either concluded successfully or an exception has been thrown. You can access this information in a `PromiseCompletion`, -which is passed to promise listeners. Promise listeners should not throw exceptions, and will print a stack trace if they do to avoid silent failure. However, callbacks in promise chain methods are permitted to throw exceptions as they will be passed down the chain. -```java -Promise promise = doSomeAsyncProcessing(); -promise.addListener(ctx -> { - if (ctx.isError()) { - System.out.println("Error! Oh no!"); - } else { - String result = ctx.getResult(); - System.out.println(result); - } -}); -``` - -### Promise chains -`Promise` also contains convenience wrapper methods to avoid checking the completion for exceptions every time. These methods also return a new promise which will resolve once the original promise, and then handler callback have finished execution. We can use this feature to create -"promise chains", which are a sequence of promise handlers. These handlers are permitted to throw exceptions, which will be passed down promise chains until they are handled. If exceptions are not handled properly, they will be silently swallowed. It is recommend to append `logExceptions()` -to chains unless exceptions are handled explicitly with a custom listener. This will simply log exceptions in the promise chain to the standard error. - -You can also use the sync and async methods to choose which executor is chosen for task execution. The ability to seamlessly switch between executors comes in handy with applications where you must be on the "main thread" to perform non-threadsafe operations. - -`Promise.start()` can be used to create a "resolved" promise. This is useful when starting a promise chain or returning an already-completed promise (common in compose callbacks, mentioned below). - -```java -Promise.start().thenSupplyAsync(() -> { - // do some async processing - return "Hello World!"; -}).thenApplyAsync(input -> { - // convert string to upper case - return input.toUpperCase(); -}).thenConsumeSync(result -> { - // display result - System.out.println("Result: " + result); -}).logExceptions(); // log exceptions to the standard error -``` - -The promise chain methods follow Java convention of `Supplier`, `Consumer` and `Runnable` through the `thenSupply`, `thenConsume` and `thenRun` methods. There is also `thenApply` (which acts as a Java `Function`), and `thenCompose`, which is explained below. Futur4J has its own implementations of the Java concepts though to allow for -exceptions within handlers. - -The `thenCompose` method is similar to the concept in the Java `CompletableFuture` API. You are able to return another `Promise` within a promise handler, and a `Promise>` will be wrapped up into just a `Promise`. This is often useful when using an external library that returns some sort of "future" inside a handler. - -```java -String username = "abc123"; -mySuperSecretApi.usernameToId(username) // returns Promise - .thenComposeAsync(id -> { - return userManager.fetchFromDatabase(id); // returns Promise - }).thenConsumeSync(playerData -> { - System.out.println(username + " has " + playerData.getCoins() + " coins!"); - }).logExceptions(); -``` - -### Utility methods -The `Promises` utility class provides many helpful methods for working with promises and groups of promises. - -`Promises.combine(Promise, Promise)` combines two promises into one `Promise>`. - -`Promises.erase(Promise)` erases the type on a `Promise` instance and returns a `Promise`. This is also supported for lists of promises with `Promises.all(List)`, which will return a `Promise` representing the future whereby all promises have completed. - -If all promises are of identical type, you can use `Promises.combine(List>)` which will return one `Promise>`, similarly representing the future whereby all promises have completed. - -This can also be applied to key-value scenarios in the same way, where you can provide a mapper function to be applied to all elements and combine into a single promise. - - - - - -### Future wrappers -External libaries provide asynchronous ways to interact with their output in all shapes and sizes. Futur4J currently has wrappers for the following libraries/frameworks: -- Reactor Core (via futur-reactor) -- Java CompletableFuture (via `Promises.wrap` in futur-api) diff --git a/build.gradle b/build.gradle index 9b2d364..bf38bbf 100644 --- a/build.gradle +++ b/build.gradle @@ -14,7 +14,7 @@ nexusPublishing { subprojects { group = 'dev.tommyjs' - version = '2.4' + version = '2.4.0' apply plugin: 'java-library' apply plugin: 'com.github.johnrengelman.shadow' @@ -43,6 +43,7 @@ subprojects { useJUnitPlatform() testLogging { exceptionFormat = 'full' + showStandardStreams = true } } } \ No newline at end of file diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorServiceImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorServiceImpl.java index 52e2065..acefe38 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorServiceImpl.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorServiceImpl.java @@ -25,8 +25,8 @@ class ExecutorServiceImpl implements PromiseExecutor> { } @Override - public void cancel(Future task) { - task.cancel(true); + public boolean cancel(Future task) { + return task.cancel(true); } } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java index 4ab7637..bb9fe03 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java @@ -8,26 +8,65 @@ import java.util.concurrent.TimeUnit; public interface PromiseExecutor { + /** + * Creates a new {@link PromiseExecutor} that runs tasks on virtual threads. + * @return the new executor + */ static PromiseExecutor virtualThreaded() { return new VirtualThreadImpl(); } + /** + * Creates a new {@link PromiseExecutor} that runs tasks on a single thread. + * @return the new executor + */ static PromiseExecutor singleThreaded() { return of(Executors.newSingleThreadScheduledExecutor()); } + /** + * Creates a new {@link PromiseExecutor} that runs tasks on multiple threads. + * @param threads the number of threads + * @return the new executor + */ static PromiseExecutor multiThreaded(int threads) { return of(Executors.newScheduledThreadPool(threads)); } + /** + * Creates a new {@link PromiseExecutor} that runs tasks on the given executor service. + * @param service the executor service + * @return the new executor + */ static PromiseExecutor of(@NotNull ScheduledExecutorService service) { return new ExecutorServiceImpl(service); } + /** + * Runs the given task. + * @param task the task + * @return the task + * @throws Exception if scheduling the task failed + */ T run(@NotNull Runnable task) throws Exception; + /** + * Runs the given task after the given delay. + * @param task the task + * @param delay the delay + * @param unit the time unit + * @return the task + * @throws Exception if scheduling the task failed + */ T run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) throws Exception; - void cancel(T task); + /** + * Cancels the given task if possible. This may interrupt the task mid-execution. + * + * @param task the task + * @return {@code true} if the task was cancelled. {@code false} if the task was already completed + * or could not be cancelled. + */ + boolean cancel(T task); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/VirtualThreadImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/VirtualThreadImpl.java index 95faa7c..397efc3 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/executor/VirtualThreadImpl.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/VirtualThreadImpl.java @@ -24,8 +24,13 @@ class VirtualThreadImpl implements PromiseExecutor { } @Override - public void cancel(Thread task) { - task.interrupt(); + public boolean cancel(Thread task) { + if (task.isAlive()) { + task.interrupt(); + return true; + } else { + return false; + } } } \ No newline at end of file diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java index cb84eff..8d96919 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java @@ -1,8 +1,20 @@ package dev.tommyjs.futur.function; +/** + * Represents an operation that accepts a single input argument and returns no result, + * and may throw an exception. This is a functional interface whose functional method is {@link #accept(Object)}. + * + * @param the type of the input to the operation + */ @FunctionalInterface public interface ExceptionalConsumer { + /** + * Performs this operation on the given argument, potentially throwing an exception. + * + * @param value the input argument + * @throws Exception if unable to compute a result + */ void accept(T value) throws Exception; -} +} \ No newline at end of file diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java index 9d32b45..403da2b 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java @@ -1,8 +1,22 @@ package dev.tommyjs.futur.function; +/** + * Represents a function that accepts one argument and produces a result, + * and may throw an exception. This is a functional interface whose functional method is {@link #apply(Object)}. + * + * @param the type of the input to the function + * @param the type of the result of the function + */ @FunctionalInterface public interface ExceptionalFunction { + /** + * Applies this function to the given argument, potentially throwing an exception. + * + * @param value the input argument + * @return the function result + * @throws Exception if unable to compute a result + */ V apply(K value) throws Exception; -} +} \ No newline at end of file diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java index d426a9c..fc130cb 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java @@ -1,8 +1,17 @@ package dev.tommyjs.futur.function; +/** + * Represents a runnable task that may throw an exception. + * This is a functional interface whose functional method is {@link #run()}. + */ @FunctionalInterface public interface ExceptionalRunnable { + /** + * Performs this runnable task, potentially throwing an exception. + * + * @throws Exception if unable to complete the task + */ void run() throws Exception; -} +} \ No newline at end of file diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java index 7e62218..ba39049 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java @@ -1,8 +1,20 @@ package dev.tommyjs.futur.function; +/** + * Represents a supplier of results that may throw an exception. + * This is a functional interface whose functional method is {@link #get()}. + * + * @param the type of results supplied by this supplier + */ @FunctionalInterface public interface ExceptionalSupplier { + /** + * Gets a result, potentially throwing an exception. + * + * @return a result + * @throws Exception if unable to supply a result + */ T get() throws Exception; -} +} \ No newline at end of file diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/CompletionJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/CompletionJoiner.java index 3f75d55..3165d05 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/joiner/CompletionJoiner.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/CompletionJoiner.java @@ -3,6 +3,7 @@ package dev.tommyjs.futur.joiner; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseCompletion; import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.util.ConcurrentResultArray; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -24,18 +25,18 @@ public class CompletionJoiner extends PromiseJoiner, Void, Void, List } @Override - protected Void getKey(Promise value) { + protected Void getChildKey(Promise value) { return null; } @Override - protected @NotNull Promise getPromise(Promise value) { + protected @NotNull Promise getChildPromise(Promise value) { //noinspection unchecked return (Promise) value; } @Override - protected @Nullable Throwable onFinish(int index, Void key, @NotNull PromiseCompletion res) { + protected @Nullable Throwable onChildComplete(int index, Void key, @NotNull PromiseCompletion res) { results.set(index, res); return null; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/MappedResultJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/MappedResultJoiner.java index 1145da9..fa61dbd 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/joiner/MappedResultJoiner.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/MappedResultJoiner.java @@ -3,6 +3,7 @@ package dev.tommyjs.futur.joiner; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseCompletion; import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.util.ConcurrentResultArray; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -27,19 +28,22 @@ public class MappedResultJoiner extends PromiseJoiner> entry) { + protected K getChildKey(Map.Entry> entry) { return entry.getKey(); } @Override - protected @NotNull Promise getPromise(Map.Entry> entry) { + protected @NotNull Promise getChildPromise(Map.Entry> entry) { return entry.getValue(); } @Override - protected @Nullable Throwable onFinish(int index, K key, @NotNull PromiseCompletion res) { + protected @Nullable Throwable onChildComplete(int index, K key, @NotNull PromiseCompletion res) { if (res.isError()) { - if (exceptionHandler == null) return res.getException(); + if (exceptionHandler == null) { + return res.getException(); + } + exceptionHandler.accept(key, res.getException()); } @@ -54,6 +58,7 @@ public class MappedResultJoiner extends PromiseJoiner entry : list) { map.put(entry.getKey(), entry.getValue()); } + return map; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/PromiseJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/PromiseJoiner.java index 31319ba..39997c4 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/joiner/PromiseJoiner.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/PromiseJoiner.java @@ -1,6 +1,10 @@ package dev.tommyjs.futur.joiner; -import dev.tommyjs.futur.promise.*; +import dev.tommyjs.futur.promise.CompletablePromise; +import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.PromiseCompletion; +import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.util.PromiseUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -8,7 +12,7 @@ import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -abstract class PromiseJoiner { +public abstract class PromiseJoiner { private final CompletablePromise joined; @@ -16,15 +20,11 @@ abstract class PromiseJoiner { this.joined = factory.unresolved(); } - public @NotNull Promise joined() { - return joined; - } + protected abstract K getChildKey(V value); - protected abstract K getKey(V value); + protected abstract @NotNull Promise getChildPromise(V value); - protected abstract @NotNull Promise getPromise(V value); - - protected abstract @Nullable Throwable onFinish(int index, K key, @NotNull PromiseCompletion completion); + protected abstract @Nullable Throwable onChildComplete(int index, K key, @NotNull PromiseCompletion completion); protected abstract R getResult(); @@ -35,18 +35,19 @@ abstract class PromiseJoiner { int i = 0; do { V value = promises.next(); - Promise p = getPromise(value); + Promise p = getChildPromise(value); + if (link) { - AbstractPromise.cancelOnFinish(p, joined); + PromiseUtil.cancelOnComplete(joined, p); } if (!joined.isCompleted()) { count.incrementAndGet(); - K key = getKey(value); + K key = getChildKey(value); int index = i++; - p.addListener((res) -> { - Throwable e = onFinish(index, key, res); + p.addAsyncListener(res -> { + Throwable e = onChildComplete(index, key, res); if (e != null) { joined.completeExceptionally(e); } else if (count.decrementAndGet() == 0 && waiting.get()) { @@ -56,11 +57,17 @@ abstract class PromiseJoiner { } } while (promises.hasNext()); - count.updateAndGet((v) -> { - if (v == 0) joined.complete(getResult()); - else waiting.set(true); - return v; - }); + if (!joined.isCompleted()) { + count.updateAndGet(v -> { + if (v == 0) joined.complete(getResult()); + else waiting.set(true); + return v; + }); + } + } + + public @NotNull Promise joined() { + return joined; } } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/ResultJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/ResultJoiner.java index d69ad33..28046fb 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/joiner/ResultJoiner.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/ResultJoiner.java @@ -3,6 +3,7 @@ package dev.tommyjs.futur.joiner; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseCompletion; import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.util.ConcurrentResultArray; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -28,19 +29,22 @@ public class ResultJoiner extends PromiseJoiner, Void, T, List> } @Override - protected Void getKey(Promise value) { + protected Void getChildKey(Promise value) { return null; } @Override - protected @NotNull Promise getPromise(Promise value) { + protected @NotNull Promise getChildPromise(Promise value) { return value; } @Override - protected @Nullable Throwable onFinish(int index, Void key, @NotNull PromiseCompletion res) { + protected @Nullable Throwable onChildComplete(int index, Void key, @NotNull PromiseCompletion res) { if (res.isError()) { - if (exceptionHandler == null) return res.getException(); + if (exceptionHandler == null) { + return res.getException(); + } + exceptionHandler.accept(index, res.getException()); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/VoidJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/VoidJoiner.java index 8998956..195f9ab 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/joiner/VoidJoiner.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/VoidJoiner.java @@ -16,18 +16,18 @@ public class VoidJoiner extends PromiseJoiner, Void, Void, Void> { } @Override - protected Void getKey(Promise value) { + protected Void getChildKey(Promise value) { return null; } @Override - protected @NotNull Promise getPromise(Promise value) { + protected @NotNull Promise getChildPromise(Promise value) { //noinspection unchecked return (Promise) value; } @Override - protected @Nullable Throwable onFinish(int index, Void key, @NotNull PromiseCompletion completion) { + protected @Nullable Throwable onChildComplete(int index, Void key, @NotNull PromiseCompletion completion) { return completion.getException(); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 5e673c4..fe55aa1 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -4,6 +4,7 @@ import dev.tommyjs.futur.function.ExceptionalConsumer; import dev.tommyjs.futur.function.ExceptionalFunction; import dev.tommyjs.futur.function.ExceptionalRunnable; import dev.tommyjs.futur.function.ExceptionalSupplier; +import dev.tommyjs.futur.util.PromiseUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -18,18 +19,6 @@ import java.util.function.Consumer; public abstract class AbstractPromise implements CompletablePromise { - public static void propagateResult(Promise from, CompletablePromise to) { - from.addDirectListener(to::complete, to::completeExceptionally); - } - - public static void propagateCancel(Promise from, Promise to) { - from.onCancel(to::cancel); - } - - public static void cancelOnFinish(Promise toCancel, Promise toFinish) { - toFinish.addDirectListener(_ -> toCancel.cancel()); - } - private final AtomicReference>> listeners; private final AtomicReference> completion; private final CountDownLatch latch; @@ -40,6 +29,8 @@ public abstract class AbstractPromise implements CompletablePromise getFactory(); + private void runCompleter(@NotNull CompletablePromise promise, @NotNull ExceptionalRunnable completer) { try { completer.run(); @@ -62,8 +53,6 @@ public abstract class AbstractPromise implements CompletablePromise getFactory(); - protected @NotNull Logger getLogger() { return getFactory().getLogger(); } @@ -106,7 +95,7 @@ public abstract class AbstractPromise implements CompletablePromise fork() { CompletablePromise fork = getFactory().unresolved(); - propagateResult(this, fork); + PromiseUtil.propagateCompletion(this, fork); return fork; } @@ -139,7 +128,7 @@ public abstract class AbstractPromise implements CompletablePromise implements CompletablePromise implements CompletablePromise implements CompletablePromise implements CompletablePromise implements CompletablePromise task.get(), delay, unit); } - @Override - public @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference) { - return thenApplyAsync(result -> { - reference.set(result); - return result; - }); - } - @Override public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { CompletablePromise promise = getFactory().unresolved(); @@ -317,7 +298,7 @@ public abstract class AbstractPromise implements CompletablePromise implements CompletablePromise implements CompletablePromise thenPopulateReference(@NotNull AtomicReference reference) { + return thenApplyAsync(result -> { + reference.set(result); + return result; + }); + } + @Override public @NotNull Promise erase() { return thenSupply(() -> null); @@ -368,7 +357,7 @@ public abstract class AbstractPromise implements CompletablePromise addAsyncListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { - return addAsyncListener((res) -> { + return addAsyncListener(res -> { if (res.isSuccess()) { if (successListener != null) successListener.accept(res.getResult()); } else { @@ -384,7 +373,7 @@ public abstract class AbstractPromise implements CompletablePromise addDirectListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { - return addDirectListener((res) -> { + return addDirectListener(res -> { if (res.isSuccess()) { if (successListener != null) successListener.accept(res.getResult()); } else { @@ -414,6 +403,7 @@ public abstract class AbstractPromise implements CompletablePromise listener, PromiseCompletion res) { try { getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, res)); + } catch (RejectedExecutionException ignored) { } catch (Exception e) { getLogger().warn("Exception caught while running promise listener", e); } @@ -447,10 +437,9 @@ public abstract class AbstractPromise implements CompletablePromise @NotNull Promise onError(@NotNull Class clazz, @NotNull Consumer listener) { - return onError((e) -> { - if (clazz.isAssignableFrom(e.getClass())) { - getLogger().info("On Error {}", e.getClass()); + public @NotNull Promise onError(@NotNull Class type, @NotNull Consumer listener) { + return onError(e -> { + if (type.isAssignableFrom(e.getClass())) { //noinspection unchecked listener.accept((E) e); } @@ -551,4 +540,7 @@ public abstract class AbstractPromise implements CompletablePromise implements PromiseFactory { + public abstract @NotNull Logger getLogger(); + public abstract @NotNull PromiseExecutor getSyncExecutor(); public abstract @NotNull PromiseExecutor getAsyncExecutor(); + @Override + public @NotNull Promise resolve(T value) { + CompletablePromise promise = unresolved(); + promise.complete(value); + return promise; + } + + @Override + public @NotNull Promise error(@NotNull Throwable error) { + CompletablePromise promise = unresolved(); + promise.completeExceptionally(error); + return promise; + } + + @Override + public @NotNull Promise wrap(@NotNull CompletableFuture future) { + return wrap(future, future); + } + + private @NotNull Promise wrap(@NotNull CompletionStage completion, Future future) { + CompletablePromise promise = unresolved(); + completion.whenComplete((v, e) -> { + if (e != null) { + promise.completeExceptionally(e); + } else { + promise.complete(v); + } + }); + + promise.onCancel(_ -> future.cancel(true)); + return promise; + } + @Override public @NotNull Promise> combine( @NotNull Promise p1, @NotNull Promise p2, - boolean dontFork + boolean link ) { - return all(dontFork, p1, p2).thenApply((_) -> new AbstractMap.SimpleImmutableEntry<>( + return all(link, p1, p2).thenApply(_ -> new AbstractMap.SimpleImmutableEntry<>( Objects.requireNonNull(p1.getCompletion()).getResult(), Objects.requireNonNull(p2.getCompletion()).getResult() )); @@ -71,59 +108,26 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise race(@NotNull Iterator> promises, boolean link) { + public @NotNull Promise race(@NotNull Iterator> promises, boolean cancelLosers) { CompletablePromise promise = unresolved(); promises.forEachRemaining(p -> { - if (link) AbstractPromise.cancelOnFinish(p, promise); - if (!promise.isCompleted()) - AbstractPromise.propagateResult(p, promise); - }); - - return promise; - } - - @Override - public @NotNull Promise race(@NotNull Iterable> promises, boolean link) { - return race(promises.iterator(), link); - } - - @Override - public @NotNull Promise race(@NotNull Stream> promises, boolean link) { - return race(promises.iterator(), link); - } - - @Override - public @NotNull Promise wrap(@NotNull CompletableFuture future) { - return wrap(future, future); - } - - private @NotNull Promise wrap(@NotNull CompletionStage completion, Future future) { - CompletablePromise promise = unresolved(); - - completion.whenComplete((v, e) -> { - if (e != null) { - promise.completeExceptionally(e); - } else { - promise.complete(v); + if (cancelLosers) PromiseUtil.cancelOnComplete(promise, p); + if (!promise.isCompleted()) { + PromiseUtil.propagateCompletion(p, promise); } }); - promise.onCancel(_ -> future.cancel(true)); return promise; } @Override - public @NotNull Promise resolve(T value) { - CompletablePromise promise = unresolved(); - promise.complete(value); - return promise; + public @NotNull Promise race(@NotNull Iterable> promises, boolean cancelLosers) { + return race(promises.iterator(), cancelLosers); } @Override - public @NotNull Promise error(@NotNull Throwable error) { - CompletablePromise promise = unresolved(); - promise.completeExceptionally(error); - return promise; + public @NotNull Promise race(@NotNull Stream> promises, boolean cancelLosers) { + return race(promises.iterator(), cancelLosers); } } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java index 7e0343b..799b6be 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java @@ -1,5 +1,8 @@ package dev.tommyjs.futur.promise; +/** + * A listener for a {@link Promise} that is called when the promise is resolved. This listener is + * executed asynchronously by the {@link PromiseFactory} that created the completed promise. + */ public interface AsyncPromiseListener extends PromiseListener { - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/CompletablePromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/CompletablePromise.java index 101bc55..3baff14 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/CompletablePromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/CompletablePromise.java @@ -3,10 +3,21 @@ package dev.tommyjs.futur.promise; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +/** + * A {@link Promise} that can be completed. + */ public interface CompletablePromise extends Promise { + /** + * Completes the promise successfully with the given result. + * @param result the result + */ void complete(@Nullable T result); + /** + * Completes the promise exceptionally with the given exception. + * @param result the exception + */ void completeExceptionally(@NotNull Throwable result); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/DeferredExecutionException.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/DeferredExecutionException.java deleted file mode 100644 index c1f58f8..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/DeferredExecutionException.java +++ /dev/null @@ -1,11 +0,0 @@ -package dev.tommyjs.futur.promise; - -import java.util.concurrent.ExecutionException; - -class DeferredExecutionException extends ExecutionException { - - public DeferredExecutionException() { - super(); - } - -} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index 8883019..1cab047 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -12,171 +12,516 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +/** + *

+ * A promise represents the result of an asynchronous computation. A promise will transition from a + * pending state to a completed state at most once, but may remain in a pending state indefinitely. + *

+ * + *

+ * Promises are created by a {@link PromiseFactory} and support chaining operations to be executed + * upon completion. These operations can be synchronous or asynchronous, and can be composed in a + * variety of ways. Promises can be listened to for completions, either with a result or with an + * exception. Promises can be cancelled, which will propagate a cancellation signal through the + * chain, but a promise can also be forked, which will prevent propagation of cancellations. + *

+ * + * @see #cancel() + * @see #fork() + */ public interface Promise { + /** + * Returns the factory that created this promise. This factory can be used to create new promises. + */ @NotNull PromiseFactory getFactory(); + /** + * Chains a task to be executed after this promise completes. The task will be executed immediately + * when this promise completes. + * + * @param task the task to execute + * @return a new promise that completes after the task is executed + */ @NotNull Promise thenRun(@NotNull ExceptionalRunnable task); + /** + * Chains a task to be executed after this promise completes. The task will be executed immediately + * when this promise completes and will be passed the result of this promise. + * + * @param task the task to execute + * @return a new promise that completes after the task is executed + */ @NotNull Promise thenConsume(@NotNull ExceptionalConsumer task); + /** + * Chains a task to be executed after this promise completes. The task will be executed immediately + * when this promise completes, and will supply a value to the next promise in the chain. + * + * @param task the task to execute + * @return a new promise that completes, after the task is executed, with the task result + */ @NotNull Promise thenSupply(@NotNull ExceptionalSupplier task); + /** + * Chains a task to be executed after this promise completes. The task will be executed immediately + * when this promise completes, and will apply the specified function to the result of this promise + * in order to supply a value to the next promise in the chain. + * + * @param task the task to execute + * @return a new promise that completes, after the task is executed, with the task result + */ @NotNull Promise thenApply(@NotNull ExceptionalFunction task); + /** + * Chains a task to be executed after this promise completes. The task will be executed immediately + * when this promise completes, and will compose the next promise in the chainfrom the result of + * this promise. + * + * @param task the task to execute + * @return a new promise that completes, once this promise and the promise returned by + * the task are complete, with the result of the task promise + */ @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the + * sync executor of the factory that created this promise, immediately after this promise completes. + * + * @param task the task to execute + * @return a new promise that completes after the task is executed + */ @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the + * sync executor of the factory that created this promise, after the specified delay after this + * promise completes. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes after the task is executed + */ @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the + * sync executor of the factory that created this promise immediately after this promise completes, + * and will be passed the result of this promise. + * + * @param task the task to execute + * @return a new promise that completes after the task is executed + */ @NotNull Promise thenConsumeSync(@NotNull ExceptionalConsumer task); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the + * sync executor of the factory that created this promise after the specified delay after this + * promise completes, and will be passed the result of this promise. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes after the task is executed + */ @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit); + /** + * Chains a task to be executed after this promise completes. The task will be executed immediately + * by the sync executor of the factory that created this promise when this promise completes, and + * will supply a value to the next promise in the chain. + * + * @param task the task to execute + * @return a new promise that completes, after the task is executed, with the task result + */ @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the sync + * executor of the factory that created this promise after the specified delay after this promise + * completes, and will supply a value to the next promise in the chain. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes, after the task is executed, with the task result + */ @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the sync + * executor of the factory that created this promise immediately after this promise completes, and + * will apply the specified function to the result of this promise in order to supply a value to the + * next promise in the chain. + * + * @param task the task to execute + * @return a new promise that completes, after the task is executed, with the task result + */ @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the sync + * executor of the factory that created this promise after the specified delay after this promise + * completes, and will apply the specified function to the result of this promise in order to supply + * a value to the next promise in the chain. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes, after the task is executed, with the task result + */ @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the sync + * executor of the factory that created this promise immediately after this promise completes, and + * will compose the next promise in the chain from the result of this promise. + * + * @param task the task to execute + * @return a new promise that completes, once this promise and the promise returned by the task are + * complete, with the result of the task promise + */ @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the + * async executor of the factory that created this promise, immediately after this promise completes. + * + * @param task the task to execute + * @return a new promise that completes after the task is executed + */ @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the + * async executor of the factory that created this promise after the specified delay after this + * promise completes. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes after the task is executed + */ @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the + * async executor of the factory that created this promise immediately after this promise completes, + * and will be passed the result of this promise. + * + * @param task the task to execute + * @return a new promise that completes after the task is executed + */ @NotNull Promise thenConsumeAsync(@NotNull ExceptionalConsumer task); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the + * async executor of the factory that created this promise after the specified delay after this + * promise completes, and will be passed the result of this promise. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes after the task is executed + */ @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the + * async executor of the factory that created this promise immediately after this promise completes, + * and will supply a value to the next promise in the chain. + * + * @param task the task to execute + * @return a new promise that completes, after the task is executed, with the task result + */ @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the async + * executor of the factory that created this promise after the specified delay after this promise + * completes, and will supply a value to the next promise in the chain. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes, after the task is executed, with the task result + */ @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit); - @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference); - + /** + * Chains a task to be executed after this promise completes. The task will be executed by the async + * executor of the factory that created this promise immediately after this promise completes, and + * will apply the specified function to the result of this promise in order to supply a value to the + * next promise in the chain. + * + * @param task the task to execute + * @return a new promise that completes, after the task is executed, with the task result + */ @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the async + * executor of the factory that created this promise after the specified delay after this promise + * completes, and will apply the specified function to the result of this promise in order to supply + * a value to the next promise in the chain. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes, after the task is executed, with the task result + */ @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); + /** + * Chains a task to be executed after this promise completes. The task will be executed by the async + * executor of the factory that created this promise immediately after this promise completes, and + * will compose the next promise in the chain from the result of this promise. + * + * @param task the task to execute + * @return a new promise that completes, once this promise and the promise returned by the task are + * complete, with the result of the task promise + */ @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task); + /** + * Adds a listener to this promise that will populate the specified reference with the result of this + * promise upon successful completion. + * + * @param reference the reference to populate + * @return continuation of the promise chain + */ + @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference); + + /** + * Returns a promise backed by this promise that will complete with {@code null} if this promise + * completes successfully, or with the exception if this promise completes exceptionally. + */ @NotNull Promise erase(); + /** + * Logs any exceptions that occur in the promise chain. + * + * @return continuation of the promise chain + */ default @NotNull Promise logExceptions() { return logExceptions("Exception caught in promise chain"); } + /** + * Logs any exceptions that occur in the promise chain with the specified message. + * + * @param message the message to log + * @return continuation of the promise chain + */ @NotNull Promise logExceptions(@NotNull String message); /** - * @apiNote Direct listeners run on the same thread as the completion. + * Adds a listener to this promise that will be executed immediately when this promise completes, + * on the same thread as the completion call. + * + * @param listener the listener to add + * @return continuation of the promise chain */ @NotNull Promise addDirectListener(@NotNull PromiseListener listener); /** - * @apiNote Direct listeners run on the same thread as the completion. + * Adds a listener to this promise that will be executed immediately when this promise completes, + * on the same thread as the completion call. One of {@code successHandler} and {@code errorHandler} will be + * called when the promise completes successfully or exceptionally, respectively. + * + * @param successHandler the function to call on success + * @param errorHandler the function to call on error + * @return continuation of the promise chain */ @NotNull Promise addDirectListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); /** - * @apiNote Async listeners are run in parallel. + * Adds a listener to this promise that will be executed immediately when this promise completes, + * by the async executor of the factory that created this promise. + * + * @param listener the listener to add + * @return continuation of the promise chain */ @NotNull Promise addAsyncListener(@NotNull AsyncPromiseListener listener); /** - * @apiNote Same as addAsyncListener. + * Adds a listener to this promise that will be executed immediately when this promise completes. + * + * @param listener the listener to add + * @return continuation of the promise chain */ default @NotNull Promise addListener(@NotNull AsyncPromiseListener listener) { return addAsyncListener(listener); } /** - * @apiNote Async listeners are run in parallel. + * Adds a listener to this promise that will be executed immediately when this promise completes, + * by the async executor of the factory that created this promise. One of {@code successHandler} and + * {@code errorHandler} will be called when the promise completes successfully or exceptionally, respectively. + * + * @param successHandler the function to call on success + * @param errorHandler the function to call on error */ @NotNull Promise addAsyncListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); + /** + * Adds a listener to this promise that will be called if the promise is completed successfully. + * + * @param listener the listener to add + * @return continuation of the promise chain + */ @NotNull Promise onSuccess(@NotNull Consumer listener); + /** + * Adds a listener to this promise that will be called if the promise is completed exceptionally. + * + * @param listener the listener to add + * @return continuation of the promise chain + */ @NotNull Promise onError(@NotNull Consumer listener); - @NotNull Promise onError(@NotNull Class clazz, @NotNull Consumer listener); + /** + * Adds a listener to this promise that will be called if the promise is completed exceptionally + * with an exception of the specified type. + * + * @param listener the listener to add + * @param type the class of the exception to listen for + * @return continuation of the promise chain + */ + @NotNull Promise onError(@NotNull Class type, @NotNull Consumer listener); + /** + * Adds a listener to this promise that will be called if the promise is cancelled. + * + * @param listener the listener to add + * @return continuation of the promise chain + */ @NotNull Promise onCancel(@NotNull Consumer listener); /** - * Cancels the promise with a TimeoutException after the specified time. + * Cancels the promise if not already completed after the specified timeout. This will result in + * an exceptional completion with a {@link CancellationException}. + * + * @param time the amount of time to wait before cancelling the promise + * @param unit the time unit of the delay + * @return continuation of the promise chain */ @NotNull Promise timeout(long time, @NotNull TimeUnit unit); /** - * Cancels the promise with a TimeoutException after the specified time. + * Cancels the promise if not already completed after the specified timeout. This will result in + * an exceptional completion with a {@link CancellationException}. + * @param ms the amount of time to wait before cancelling the promise (in milliseconds) + * @return continuation of the promise chain */ default @NotNull Promise timeout(long ms) { return timeout(ms, TimeUnit.MILLISECONDS); } /** - * Completes the promise exceptionally with a TimeoutException after the specified time. + * Times out the promise if not already completed after the specified timeout. This will result + * in an exceptional completion with a {@link TimeoutException}. This will not result in the + * promise being cancelled. + * + * @param time the amount of time to wait before timing out the promise + * @param unit the time unit of the delay + * @return continuation of the promise chain */ @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit); /** - * Completes the promise exceptionally with a TimeoutException after the specified time. + * Times out the promise if not already completed after the specified timeout. This will result + * in an exceptional completion with a {@link TimeoutException}. This will not result in the + * promise being cancelled. + * @param ms the amount of time to wait before timing out the promise (in milliseconds) + * @return continuation of the promise chain */ default @NotNull Promise maxWaitTime(long ms) { return maxWaitTime(ms, TimeUnit.MILLISECONDS); } + /** + * Cancels the promise if not already completed after the specified timeout. This will result in + * an exceptional completion with the specified cancellation. + * @param exception the cancellation exception to complete the promise with + */ void cancel(@NotNull CancellationException exception); + /** + * Cancels the promise if not already completed after the specified timeout. This will result in + * an exceptional completion with a {@link CancellationException}. + * @param reason the reason for the cancellation + */ default void cancel(@NotNull String reason) { cancel(new CancellationException(reason)); - }; + } + /** + * Cancels the promise if not already completed after the specified timeout. This will result in + * an exceptional completion with a {@link CancellationException}. + */ default void cancel() { cancel(new CancellationException()); } /** - * Waits if necessary for this promise to complete, and then returns its result. - * @throws CancellationException if the computation was cancelled - * @throws CompletionException if this promise completed exceptionally + * Blocks until this promise has completed, and then returns its result. + * @throws CancellationException if the promise was cancelled + * @throws CompletionException if the promise completed exceptionally + * @return the result of the promise */ @Blocking T await(); /** - * Waits if necessary for this promise to complete, and then returns its result. - * @throws CancellationException if the computation was cancelled - * @throws ExecutionException if this promise completed exceptionally - * @throws InterruptedException if the current thread was interrupted while waiting + * Blocks until this promise has completed, and then returns its result. + * @throws CancellationException if the promise was cancelled + * @throws ExecutionException if the promise completed exceptionally + * @throws InterruptedException if the current thread was interrupted while waiting + * @return the result of the promise */ @Blocking T get() throws InterruptedException, ExecutionException; /** - * Waits if necessary for at most the given time for this future to complete, and then returns its result, if available. - * @throws CancellationException if the computation was cancelled - * @throws ExecutionException if this promise completed exceptionally - * @throws InterruptedException if the current thread was interrupted while waiting - * @throws TimeoutException if the wait timed out + * Blocks until either this promise has completed or the timeout has been exceeded, and then + * returns its result, if available. + * @throws CancellationException if the promise was cancelled + * @throws ExecutionException if the promise completed exceptionally + * @throws InterruptedException if the current thread was interrupted while waiting + * @throws TimeoutException if the timeout was exceeded + * @return the result of the promise */ @Blocking T get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; /** - * Stops this promise from propagating up cancellations. + * Returns a new promise, backed by this promise, that will not propagate cancellations. This means + * that if the returned promise is cancelled, the cancellation will not be propagated to this promise + * or any other promises that share this promise as a parent. + * @return continuation the promise chain that will not propagate cancellations */ @NotNull Promise fork(); + /** + * Returns the current completion state of this promise. If the promise has not completed, this method + * will return {@code null}. + * @return the completion state of this promise, or {@code null} if the promise has not completed + */ @Nullable PromiseCompletion getCompletion(); + /** + * Returns whether this promise has completed. + * @return {@code true} if the promise has completed, {@code false} otherwise + */ boolean isCompleted(); + /** + * Converts this promise to a {@link CompletableFuture}. The returned future will complete with the + * result of this promise when it completes. + * @return a future that will complete with the result of this promise + */ @NotNull CompletableFuture toFuture(); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java index 865b66c..2af014f 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java @@ -5,39 +5,78 @@ import org.jetbrains.annotations.Nullable; import java.util.concurrent.CancellationException; +/** + * Represents the result of a {@link Promise}, containing either an optional result or an exception. + */ public class PromiseCompletion { private @Nullable T result; private @Nullable Throwable exception; + /** + * Creates a new successful completion. + * @param result the result + */ public PromiseCompletion(@Nullable T result) { this.result = result; } + /** + * Creates a new exceptional completion. + * @param exception the exception + */ public PromiseCompletion(@NotNull Throwable exception) { this.exception = exception; } + /** + * Creates a new successful completion with a result of {@code null}. + */ public PromiseCompletion() { - this.result = null; + this((T) null); } + /** + * Checks if the completion was successful. + * @return {@code true} if the completion was successful, {@code false} otherwise + */ public boolean isSuccess() { return exception == null; } + /** + * Checks if the completion was exceptional. + * @return {@code true} if the completion was exceptional, {@code false} otherwise + */ public boolean isError() { return exception != null; } - public boolean wasCanceled() { + /** + * Checks if the completion was cancelled. + * @return {@code true} if the completion was cancelled, {@code false} otherwise + */ + public boolean wasCancelled() { return exception instanceof CancellationException; } + @Deprecated + public boolean wasCanceled() { + return wasCancelled(); + } + + /** + * Gets the result of the completion. + * @return the result, or {@code null} if the completion was exceptional + */ public @Nullable T getResult() { return result; } + /** + * Gets the exception of the completion. + * @return the exception, or {@code null} if the completion was successful + */ public @Nullable Throwable getException() { return exception; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java index 80d6963..8aac38f 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -1,6 +1,7 @@ package dev.tommyjs.futur.promise; import dev.tommyjs.futur.executor.PromiseExecutor; +import dev.tommyjs.futur.util.PromiseUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -13,181 +14,491 @@ import java.util.stream.Stream; public interface PromiseFactory { - static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor syncExecutor, @NotNull PromiseExecutor asyncExecutor) { + /** + * Creates a new {@link PromiseFactory} with the given logger and executors. + * @param logger the logger + * @param syncExecutor the synchronous executor + * @param asyncExecutor the asynchronous executor + * @return the new promise factory + */ + static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor syncExecutor, + @NotNull PromiseExecutor asyncExecutor) { return new PromiseFactoryImpl<>(logger, syncExecutor, asyncExecutor); } + /** + * Creates a new {@link PromiseFactory} with the given logger and dual executor. + * @param logger the logger + * @param executor the executor + * @return the new promise factory + */ static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor executor) { return new PromiseFactoryImpl<>(logger, executor, executor); } + /** + * Creates a new {@link PromiseFactory} with the given logger and executor. + * @param logger the logger + * @param executor the executor + * @return the new promise factory + */ static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull ScheduledExecutorService executor) { return of(logger, PromiseExecutor.of(executor)); } - private static int size(@NotNull Stream stream) { - long estimate = stream.spliterator().estimateSize(); - return estimate == Long.MAX_VALUE ? 10 : (int) estimate; - } - - @NotNull Logger getLogger(); - + /** + * Creates a new uncompleted promise. + * @return the new promise + */ @NotNull CompletablePromise unresolved(); - @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2, boolean cancelOnError); - - default @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { - return combine(p1, p2, true); - } - - @NotNull Promise> combine( - @NotNull Map> promises, - @Nullable BiConsumer exceptionHandler, - boolean propagateCancel - ); - - default @NotNull Promise> combine(@NotNull Map> promises, @NotNull BiConsumer exceptionHandler) { - return combine(promises, exceptionHandler, true); - } - - default @NotNull Promise> combine(@NotNull Map> promises, boolean cancelOnError) { - return combine(promises, null, cancelOnError); - } - - default @NotNull Promise> combine(@NotNull Map> promises) { - return combine(promises, null, true); - } - - @NotNull Promise> combine( - @NotNull Iterator> promises, int expectedSize, - @Nullable BiConsumer exceptionHandler, boolean propagateCancel - ); - - default @NotNull Promise> combine( - @NotNull Collection> promises, - @NotNull BiConsumer exceptionHandler, - boolean propagateCancel - ) { - return combine(promises.iterator(), promises.size(), exceptionHandler, propagateCancel); - } - - default @NotNull Promise> combine( - @NotNull Collection> promises, - @NotNull BiConsumer exceptionHandler - ) { - return combine(promises.iterator(), promises.size(), exceptionHandler, true); - } - - default @NotNull Promise> combine(@NotNull Collection> promises, boolean cancelOnError) { - return combine(promises.iterator(), promises.size(), null, cancelOnError); - } - - default @NotNull Promise> combine(@NotNull Collection> promises) { - return combine(promises.iterator(), promises.size(), null, true); - } - - default @NotNull Promise> combine( - @NotNull Stream> promises, - @NotNull BiConsumer exceptionHandler, - boolean propagateCancel - ) { - return combine(promises.iterator(), size(promises), exceptionHandler, propagateCancel); - } - - default @NotNull Promise> combine( - @NotNull Stream> promises, - @NotNull BiConsumer exceptionHandler - ) { - return combine(promises.iterator(), size(promises), exceptionHandler, true); - } - - default @NotNull Promise> combine(@NotNull Stream> promises, boolean cancelOnError) { - return combine(promises.iterator(), size(promises), null, cancelOnError); - } - - default @NotNull Promise> combine(@NotNull Stream> promises) { - return combine(promises.iterator(), size(promises), null, true); - } - - @NotNull Promise>> allSettled( - @NotNull Iterator> promises, int estimatedSize, boolean propagateCancel); - - default @NotNull Promise>> allSettled(@NotNull Collection> promises, boolean propagateCancel) { - return allSettled(promises.iterator(), promises.size(), propagateCancel); - } - - default @NotNull Promise>> allSettled(@NotNull Collection> promises) { - return allSettled(promises.iterator(), promises.size(), true); - } - - default @NotNull Promise>> allSettled(@NotNull Stream> promises, boolean propagateCancel) { - return allSettled(promises.iterator(), size(promises), propagateCancel); - } - - default @NotNull Promise>> allSettled(@NotNull Stream> promises) { - return allSettled(promises.iterator(), size(promises), true); - } - - default @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Promise... promises) { - return allSettled(Arrays.asList(promises).iterator(), promises.length, propagateCancel); - } - - default @NotNull Promise>> allSettled(@NotNull Promise... promises) { - return allSettled(Arrays.asList(promises).iterator(), promises.length, true); - } - - @NotNull Promise all(@NotNull Iterator> promises, boolean cancelAllOnError); - - default @NotNull Promise all(@NotNull Iterable> promises, boolean cancelAllOnError) { - return all(promises.iterator(), cancelAllOnError); - } - - default @NotNull Promise all(@NotNull Iterable> promises) { - return all(promises.iterator(), true); - } - - default @NotNull Promise all(@NotNull Stream> promises, boolean cancelAllOnError) { - return all(promises.iterator(), cancelAllOnError); - } - - default @NotNull Promise all(@NotNull Stream> promises) { - return all(promises.iterator(), true); - } - - default @NotNull Promise all(boolean cancelAllOnError, @NotNull Promise... promises) { - return all(Arrays.asList(promises).iterator(), cancelAllOnError); - } - - default @NotNull Promise all(@NotNull Promise... promises) { - return all(Arrays.asList(promises).iterator(), true); - } - - @NotNull Promise race(@NotNull Iterator> promises, boolean cancelLosers); - - default @NotNull Promise race(@NotNull Iterable> promises, boolean cancelLosers) { - return race(promises.iterator(), cancelLosers); - } - - default @NotNull Promise race(@NotNull Iterable> promises) { - return race(promises.iterator(), true); - } - - default @NotNull Promise race(@NotNull Stream> promises, boolean cancelLosers) { - return race(promises.iterator(), cancelLosers); - } - - default @NotNull Promise race(@NotNull Stream> promises) { - return race(promises.iterator(), true); - } - - @NotNull Promise wrap(@NotNull CompletableFuture future); + /** + * Creates a new promise, completed with the given value. + * @param value the value to complete the promise with + * @return the new promise + */ + @NotNull Promise resolve(T value); + /** + * Creates a new promise, completed with {@code null}. + * @apiNote This method is often useful for starting promise chains. + * @return the new promise + */ default @NotNull Promise start() { return resolve(null); } - @NotNull Promise resolve(T value); - + /** + * Creates a new promise, completed exceptionally with the given error. + * @param error the error to complete the promise with + * @return the new promise + */ @NotNull Promise error(@NotNull Throwable error); + /** + * Creates a new promise backed by the given future. The promise will be completed upon completion + * of the future. + * @param future the future to wrap + * @return the new promise + */ + @NotNull Promise wrap(@NotNull CompletableFuture future); + + /** + * Combines two promises into a single promise that completes when both promises complete. If + * {@code link} is {@code true} and either input promise completes exceptionally (including + * cancellation), the other promise will be cancelled and the output promise will complete + * exceptionally. + * @param p1 the first promise + * @param p2 the second promise + * @param link whether to cancel the other promise on error + * @return the combined promise + */ + @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2, + boolean link); + + /** + * Combines two promises into a single promise that completes when both promises complete. If either + * input promise completes exceptionally, the other promise will be cancelled and the output promise + * will complete exceptionally. + * @param p1 the first promise + * @param p2 the second promise + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { + return combine(p1, p2, true); + } + + /** + * Combines key-value pairs of inputs to promises into a single promise that completes with key-value + * pairs of inputs to outputs when all promises complete. If {@code link} is {@code true} + * and any promise completes exceptionally, the other promises will be cancelled and the output + * promise will complete exceptionally. If an exception handler is present, promises that fail + * will not cause this behaviour, and instead the exception handler will be called with the key + * that failed and the exception. + * @param promises the input promises + * @param exceptionHandler the exception handler + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + @NotNull Promise> combine(@NotNull Map> promises, + @Nullable BiConsumer exceptionHandler, + boolean link); + + /** + * Combines key-value pairs of inputs to promises into a single promise that completes with key-value + * pairs of inputs to outputs when all promises complete. If any promise completes exceptionally, + * the exception handler will be called with the key that failed and the exception. The output promise + * will always complete successfully regardless of whether input promises fail. + * @param promises the input promises + * @param exceptionHandler the exception handler + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Map> promises, + @NotNull BiConsumer exceptionHandler) { + return combine(promises, exceptionHandler, true); + } + + /** + * Combines key-value pairs of inputs to promises into a single promise that completes with key-value + * pairs of inputs to outputs when all promises complete. If {@code link} is {@code true} + * and any promise completes exceptionally, the other promises will be cancelled and the output + * promise will complete exceptionally. + * @param promises the input promises + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Map> promises, boolean link) { + return combine(promises, null, link); + } + + /** + * Combines key-value pairs of inputs to promises into a single promise that completes with key-value + * pairs of inputs to outputs when all promises complete. If any promise completes exceptionally, + * the output promise will complete exceptionally. + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Map> promises) { + return combine(promises, null, true); + } + + /** + * Combines an iterator of promises into a single promise that completes with a list of results when all + * promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all + * other promises will be cancelled and the output promise will complete exceptionally. If an exception + * handler is present, promises that fail will not cause this behaviour, and instead the exception + * handler will be called with the index that failed and the exception. + * @param promises the input promises + * @param exceptionHandler the exception handler + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + @NotNull Promise> combine(@NotNull Iterator> promises, int expectedSize, + @Nullable BiConsumer exceptionHandler, + boolean link); + + /** + * Combines a collection of promises into a single promise that completes with a list of results when all + * promises complete. If any promise completes exceptionally, the exception handler will be called with + * the index that failed and the exception. The output promise will always complete successfully regardless + * of whether input promises fail. + * @param promises the input promises + * @param exceptionHandler the exception handler + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Collection> promises, + @NotNull BiConsumer exceptionHandler, + boolean link) { + return combine(promises.iterator(), promises.size(), exceptionHandler, link); + } + + /** + * Combines a collection of promises into a single promise that completes with a list of results when all + * promises complete. If any promise completes exceptionally, the exception handler will be called with + * the index that failed and the exception. The output promise will always complete successfully regardless + * of whether input promises fail. + * @param promises the input promises + * @param exceptionHandler the exception handler + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Collection> promises, + @NotNull BiConsumer exceptionHandler) { + return combine(promises.iterator(), promises.size(), exceptionHandler, true); + } + + /** + * Combines a collection of promises into a single promise that completes with a list of results when all + * promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all + * other promises will be cancelled and the output promise will complete exceptionally. + * @param promises the input promises + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Collection> promises, boolean link) { + return combine(promises.iterator(), promises.size(), null, link); + } + + /** + * Combines a collection of promises into a single promise that completes with a list of results when all + * promises complete. If any promise completes exceptionally, the output promise will complete exceptionally. + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Collection> promises) { + return combine(promises.iterator(), promises.size(), null, true); + } + + /** + * Combines a stream of promises into a single promise that completes with a list of results when all + * promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all + * other promises will be cancelled and the output promise will complete exceptionally. If an exception + * handler is present, promises that fail will not cause this behaviour, and instead the exception + * handler will be called with the index that failed and the exception. + * @param promises the input promises + * @param exceptionHandler the exception handler + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Stream> promises, + @NotNull BiConsumer exceptionHandler, + boolean link) { + return combine(promises.iterator(), PromiseUtil.estimateSize(promises), exceptionHandler, link); + } + + /** + * Combines a stream of promises into a single promise that completes with a list of results when all + * promises complete. If any promise completes exceptionally, the exception handler will be called with + * the index that failed and the exception. The output promise will always complete successfully regardless + * of whether input promises fail. + * @param promises the input promises + * @param exceptionHandler the exception handler + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Stream> promises, + @NotNull BiConsumer exceptionHandler) { + return combine(promises.iterator(), PromiseUtil.estimateSize(promises), exceptionHandler, true); + } + + /** + * Combines a stream of promises into a single promise that completes with a list of results when all + * promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all + * other promises will be cancelled and the output promise will complete exceptionally. + * @param promises the input promises + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Stream> promises, boolean link) { + return combine(promises.iterator(), PromiseUtil.estimateSize(promises), null, link); + } + + /** + * Combines a stream of promises into a single promise that completes with a list of results when all + * promises complete. If any promise completes exceptionally, the output promise will complete exceptionally. + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise> combine(@NotNull Stream> promises) { + return combine(promises.iterator(), PromiseUtil.estimateSize(promises), null, true); + } + + /** + * Combines an iterator of promises into a single promise that completes with a list of results when all + * promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all + * other promises will be cancelled. The output promise will always complete successfully regardless + * of whether input promises fail. + * @param promises the input promises + * @param expectedSize the expected size of the list (used for optimization) + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + @NotNull Promise>> allSettled(@NotNull Iterator> promises, + int expectedSize, boolean link); + + /** + * Combines a collection of promises into a single promise that completes with a list of results when all + * promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all + * other promises will be cancelled. The output promise will always complete successfully regardless + * of whether input promises fail. + * @param promises the input promises + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + default @NotNull Promise>> allSettled(@NotNull Collection> promises, + boolean link) { + return allSettled(promises.iterator(), promises.size(), link); + } + + /** + * Combines a collection of promises into a single promise that completes with a list of results when all + * promises complete. If any promise completes exceptionally, all other promises will be cancelled. + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise>> allSettled(@NotNull Collection> promises) { + return allSettled(promises.iterator(), promises.size(), true); + } + + /** + * Combines a stream of promises into a single promise that completes with a list of results when all + * promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all + * other promises will be cancelled. The output promise will always complete successfully regardless + * of whether input promises fail. + * @param promises the input promises + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + default @NotNull Promise>> allSettled(@NotNull Stream> promises, + boolean link) { + return allSettled(promises.iterator(), PromiseUtil.estimateSize(promises), link); + } + + /** + * Combines a stream of promises into a single promise that completes with a list of results when all + * promises complete. If any promise completes exceptionally, all other promises will be cancelled. + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise>> allSettled(@NotNull Stream> promises) { + return allSettled(promises.iterator(), PromiseUtil.estimateSize(promises), true); + } + + /** + * Combines an array of promises into a single promise that completes with a list of results when all + * promises complete. If {@code link} is {@code true} and any promise completes exceptionally, all + * other promises will be cancelled. The output promise will always complete successfully regardless + * of whether input promises fail. + * @param link whether to cancel all promises on any exceptional completions + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise>> allSettled(boolean link, + @NotNull Promise... promises) { + return allSettled(Arrays.asList(promises).iterator(), promises.length, link); + } + + /** + * Combines an array of promises into a single promise that completes with a list of results when all + * promises complete. If any promise completes exceptionally, all other promises will be cancelled. + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise>> allSettled(@NotNull Promise... promises) { + return allSettled(Arrays.asList(promises).iterator(), promises.length, true); + } + + /** + * Combines an iterator of promises into a single promise that completes when all promises complete. + * If {@code link} is {@code true} and any promise completes exceptionally, all other promises will + * be cancelled and the output promise will complete exceptionally. + * @param promises the input promises + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + @NotNull Promise all(@NotNull Iterator> promises, boolean link); + + /** + * Combines an iterable of promises into a single promise that completes when all promises complete. + * If {@code link} is {@code true} and any promise completes exceptionally, all other promises will + * be cancelled and the output promise will complete exceptionally. + * @param promises the input promises + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + default @NotNull Promise all(@NotNull Iterable> promises, boolean link) { + return all(promises.iterator(), link); + } + + /** + * Combines an iterable of promises into a single promise that completes when all promises complete. + * If any promise completes exceptionally, all other promises will be cancelled and the output + * promise will complete exceptionally. + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise all(@NotNull Iterable> promises) { + return all(promises.iterator(), true); + } + + /** + * Combines a stream of promises into a single promise that completes when all promises complete. + * If {@code link} is {@code true} and any promise completes exceptionally, all other promises will + * be cancelled and the output promise will complete exceptionally. + * @param promises the input promises + * @param link whether to cancel all promises on any exceptional completions + * @return the combined promise + */ + default @NotNull Promise all(@NotNull Stream> promises, boolean link) { + return all(promises.iterator(), link); + } + + /** + * Combines a stream of promises into a single promise that completes when all promises complete. + * If any promise completes exceptionally, all other promises will be cancelled and the output + * promise willcomplete exceptionally. + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise all(@NotNull Stream> promises) { + return all(promises.iterator(), true); + } + + /** + * Combines an array of promises into a single promise that completes when all promises complete. + * If {@code link} is {@code true} and any promise completes exceptionally, all other promises will + * be cancelled + * and the output promise will complete exceptionally. + * @param link whether to cancel all promises on any exceptional completions + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise all(boolean link, @NotNull Promise... promises) { + return all(Arrays.asList(promises).iterator(), link); + } + + /** + * Combines an array of promises into a single promise that completes when all promises complete. + * If any promise completes exceptionally, all other promises will be cancelled and the output + * promise will complete exceptionally. + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise all(@NotNull Promise... promises) { + return all(Arrays.asList(promises).iterator(), true); + } + + /** + * Combines an iterator of promises into a single promise that completes when the first promise + * completes (successfully or exceptionally). If {@code cancelLosers} is {@code true}, all other + * promises will be cancelled when the first promise + * completes. + * @param promises the input promises + * @param cancelLosers whether to cancel the other promises when the first completes + * @return the combined promise + */ + @NotNull Promise race(@NotNull Iterator> promises, boolean cancelLosers); + + /** + * Combines an iterable of promises into a single promise that completes when the first promise + * completes (successfully or exceptionally). All other promises will be cancelled when the first + * promise completes. + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise race(@NotNull Iterable> promises, boolean cancelLosers) { + return race(promises.iterator(), cancelLosers); + } + + /** + * Combines an iterable of promises into a single promise that completes when the first promise + * completes (successfully or exceptionally). All other promises will be cancelled when the first + * promise completes. + * @param promises the input promises + */ + default @NotNull Promise race(@NotNull Iterable> promises) { + return race(promises.iterator(), true); + } + + /** + * Combines a stream of promises into a single promise that completes when the first promise + * completes (successfully or exceptionally). If {@code cancelLosers} is {@code true}, all other + * promises will be cancelled when the first promise completes. + * @param promises the input promises + * @param cancelLosers whether to cancel the other promises when the first completes + * @return the combined promise + */ + default @NotNull Promise race(@NotNull Stream> promises, boolean cancelLosers) { + return race(promises.iterator(), cancelLosers); + } + + /** + * Combines a stream of promises into a single promise that completes when the first promise + * completes (successfully or exceptionally). All other promises will be cancelled when the first + * promise completes. + * @param promises the input promises + * @return the combined promise + */ + default @NotNull Promise race(@NotNull Stream> promises) { + return race(promises.iterator(), true); + } + } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java index 7feaf36..f5a2153 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java @@ -22,7 +22,7 @@ public class PromiseFactoryImpl extends AbstractPromiseFactory { @Override public @NotNull CompletablePromise unresolved() { - return new PromiseImpl<>(this); + return new PromiseImpl<>(); } @Override @@ -40,4 +40,13 @@ public class PromiseFactoryImpl extends AbstractPromiseFactory { return asyncExecutor; } + public class PromiseImpl extends AbstractPromise { + + @Override + public @NotNull AbstractPromiseFactory getFactory() { + return PromiseFactoryImpl.this; + } + + } + } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseImpl.java deleted file mode 100644 index ad752dc..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseImpl.java +++ /dev/null @@ -1,18 +0,0 @@ -package dev.tommyjs.futur.promise; - -import org.jetbrains.annotations.NotNull; - -public class PromiseImpl extends AbstractPromise { - - private final @NotNull AbstractPromiseFactory factory; - - public PromiseImpl(@NotNull AbstractPromiseFactory factory) { - this.factory = factory; - } - - @Override - public @NotNull AbstractPromiseFactory getFactory() { - return factory; - } - -} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseListener.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseListener.java index 7ec1a26..85b09af 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseListener.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseListener.java @@ -2,8 +2,15 @@ package dev.tommyjs.futur.promise; import org.jetbrains.annotations.NotNull; +/** + * A listener for a {@link Promise} that is called when the promise is resolved. + */ public interface PromiseListener { - void handle(@NotNull PromiseCompletion ctx); + /** + * Handles the completion of the promise. + * @param completion the promise completion + */ + void handle(@NotNull PromiseCompletion completion); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/ConcurrentResultArray.java b/futur-api/src/main/java/dev/tommyjs/futur/util/ConcurrentResultArray.java similarity index 64% rename from futur-api/src/main/java/dev/tommyjs/futur/joiner/ConcurrentResultArray.java rename to futur-api/src/main/java/dev/tommyjs/futur/util/ConcurrentResultArray.java index af34f33..ec678a8 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/joiner/ConcurrentResultArray.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/util/ConcurrentResultArray.java @@ -1,4 +1,4 @@ -package dev.tommyjs.futur.joiner; +package dev.tommyjs.futur.util; import org.jetbrains.annotations.NotNull; @@ -6,7 +6,10 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicReference; -class ConcurrentResultArray { +public class ConcurrentResultArray { + + private static final float RESIZE_THRESHOLD = 0.75F; + private static final float RESIZE_FACTOR = 1.2F; private final AtomicReference ref; @@ -17,8 +20,9 @@ class ConcurrentResultArray { public void set(int index, T element) { ref.updateAndGet(array -> { - if (array.length <= index) - return Arrays.copyOf(array, index + 6); + if (array.length * RESIZE_THRESHOLD <= index) { + array = Arrays.copyOf(array, (int) (array.length * RESIZE_FACTOR)); + } array[index] = element; return array; diff --git a/futur-api/src/main/java/dev/tommyjs/futur/util/PromiseUtil.java b/futur-api/src/main/java/dev/tommyjs/futur/util/PromiseUtil.java new file mode 100644 index 0000000..152c67c --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/util/PromiseUtil.java @@ -0,0 +1,48 @@ +package dev.tommyjs.futur.util; + +import dev.tommyjs.futur.promise.CompletablePromise; +import dev.tommyjs.futur.promise.Promise; +import org.jetbrains.annotations.NotNull; + +import java.util.stream.Stream; + +public class PromiseUtil { + + /** + * Propagates the completion, once completed, of the given promise to the given promise. + * @param from the promise to propagate the completion from + * @param to the completable promise to propagate the completion to + */ + public static void propagateCompletion(@NotNull Promise from, @NotNull CompletablePromise to) { + from.addDirectListener(to::complete, to::completeExceptionally); + } + + /** + * Propagates the cancellation, once cancelled, of the given promise to the given promise. + * @param from the promise to propagate the cancellation from + * @param to the promise to propagate the cancellation to + */ + public static void propagateCancel(@NotNull Promise from, @NotNull Promise to) { + from.onCancel(to::cancel); + } + + /** + * Cancels the given promise once the given promise is completed. + * @param from the promise to propagate the completion from + * @param to the promise to cancel upon completion + */ + public static void cancelOnComplete(@NotNull Promise from, @NotNull Promise to) { + from.addDirectListener(_ -> to.cancel()); + } + + /** + * Estimates the size of the given stream. + * @param stream the stream + * @return the estimated size + */ + public static int estimateSize(@NotNull Stream stream) { + long estimate = stream.spliterator().estimateSize(); + return estimate == Long.MAX_VALUE ? 10 : (int) estimate; + } + +}