diff --git a/.gitignore b/.gitignore
index f35ca9e..918bf19 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
.gradle
build/
+node_modules/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
diff --git a/README.md b/README.md
index 80ea3a6..8e94b11 100644
--- a/README.md
+++ b/README.md
@@ -2,10 +2,9 @@
Futur4J is a powerful and intuitive open-source Java library that simplifies asynchronous task scheduling, inspired by the concept of JavaScript promises.
-**This documentation is outdated. Please don't read it.**
-
## Dependency
-The Futur4J project is composed of multiple modules. It is required to include the `futur-api` module, and the other modules depend on it at runtime, however the others are optional and dependent on your use case.
+The Futur4J project has a `futur-api` module that provides the core functionality, and a `futur-lazy` module that provides additional static versions of factory methods. It is
+recommended to use the main module for customization of logging and execution.
### Gradle
```gradle
repositories {
@@ -15,8 +14,8 @@ repositories {
}
dependencies {
- compile 'dev.tommyjs:futur-api:2.1.3'
- compile 'dev.tommyjs:futur-reactor:2.1.3'
+ compile 'dev.tommyjs:futur-api:2.4.0'
+ compile 'dev.tommyjs:futur-lazy:2.4.0'
}
```
### Gradle DSL
@@ -26,8 +25,8 @@ repositories {
}
dependencies {
- implementation("dev.tommyjs:futur-api:2.1.3")
- implementation("dev.tommyjs:futur-reactor:2.1.3")
+ implementation("dev.tommyjs:futur-api:2.4.0")
+ implementation("dev.tommyjs:futur-lazy:2.4.0")
}
```
### Maven
@@ -43,121 +42,12 @@ dependencies {
dev.tommyjs
futur-api
- 2.1.3
+ 2.4.0
dev.tommyjs
- futur-reactor
- 2.1.3
+ futur-lazy
+ 2.4.0
-```
-
-
-## Getting started
-Futur4J uses an underlying `Scheduler` instance to power both synchronous and asynchronous task execution.
-
-The library was originally built for use in Minecraft servers, and therefore has native support for a "main" thread with can be seamlessly switched to in a `Promise` chain (read more below).
-
-It is recommended, but not required, to manually create a `Scheduler`. If this is not done, a single-threaded default `Scheduler` will be used.
-```java
-// check if scheduler has already been loaded
-if (!Schedulers.isLoaded()) {
- Scheduler scheduler = ThreadPoolScheduler.create(6); // create a scheduler using an underlying thread pool (6 threads)
- Schedulers.setScheduler(scheduler); // initialize the scheduling
-}
-```
-
-The `futur-standalone` module has two scheduler implementations available. The `ExclusiveThreadPoolScheduler` operates one thread pool, and throws an exception when sync tasks are attempted to be executed.
-The `ThreadPoolScheduler` uses a single-threaded "main" pool and a multi-threaded async pool.
-
-For Minecraft, Bukkit, Velocity and BungeeCord support is coming soon. Feel free to make PRs for other external library support as modules.
-
-### Using the scheduler
-You can invoke tasks using the synchronous and asynchronous scheduler `Scheduler` methods. It is worth noting that exceptions in these tasks are swallowed silently, and it is therefore better to
-use promise chains, or use the `Schedulers` utility class, which will wrap tasks in a try-catch and log to the standard error.
-
-```java
-scheduler.runDelayedSync(() -> {
- System.out.println("I was scheduled to execute 10 seconds later!");
-}, 10L, TimeUnit.SECONDS);
-
-Schedulers.runAsync(() -> {
- throw new RuntimeException("I will be caught by the exception wrapper in the Schedulers class!");
-});
-```
-
-### Reacting to task completions
-You may have recieved some form of "future" instance (a task that will complete at an unspecified time in the future). This may be a future4j-native `Promise` instance, or another form of future. In the case of the latter, you may obtain a `Promise`
- instance through the various available wrappers (e.g. `ReactorTransformer` for use with reactor core).
-
-You can then add listeners to the `Promise`, which will be executed asynchronously on promise "completion". A promise is deemed completed once the task has either concluded successfully or an exception has been thrown. You can access this information in a `PromiseCompletion`,
-which is passed to promise listeners. Promise listeners should not throw exceptions, and will print a stack trace if they do to avoid silent failure. However, callbacks in promise chain methods are permitted to throw exceptions as they will be passed down the chain.
-```java
-Promise promise = doSomeAsyncProcessing();
-promise.addListener(ctx -> {
- if (ctx.isError()) {
- System.out.println("Error! Oh no!");
- } else {
- String result = ctx.getResult();
- System.out.println(result);
- }
-});
-```
-
-### Promise chains
-`Promise` also contains convenience wrapper methods to avoid checking the completion for exceptions every time. These methods also return a new promise which will resolve once the original promise, and then handler callback have finished execution. We can use this feature to create
-"promise chains", which are a sequence of promise handlers. These handlers are permitted to throw exceptions, which will be passed down promise chains until they are handled. If exceptions are not handled properly, they will be silently swallowed. It is recommend to append `logExceptions()`
-to chains unless exceptions are handled explicitly with a custom listener. This will simply log exceptions in the promise chain to the standard error.
-
-You can also use the sync and async methods to choose which executor is chosen for task execution. The ability to seamlessly switch between executors comes in handy with applications where you must be on the "main thread" to perform non-threadsafe operations.
-
-`Promise.start()` can be used to create a "resolved" promise. This is useful when starting a promise chain or returning an already-completed promise (common in compose callbacks, mentioned below).
-
-```java
-Promise.start().thenSupplyAsync(() -> {
- // do some async processing
- return "Hello World!";
-}).thenApplyAsync(input -> {
- // convert string to upper case
- return input.toUpperCase();
-}).thenConsumeSync(result -> {
- // display result
- System.out.println("Result: " + result);
-}).logExceptions(); // log exceptions to the standard error
-```
-
-The promise chain methods follow Java convention of `Supplier`, `Consumer` and `Runnable` through the `thenSupply`, `thenConsume` and `thenRun` methods. There is also `thenApply` (which acts as a Java `Function`), and `thenCompose`, which is explained below. Futur4J has its own implementations of the Java concepts though to allow for
-exceptions within handlers.
-
-The `thenCompose` method is similar to the concept in the Java `CompletableFuture` API. You are able to return another `Promise` within a promise handler, and a `Promise>` will be wrapped up into just a `Promise`. This is often useful when using an external library that returns some sort of "future" inside a handler.
-
-```java
-String username = "abc123";
-mySuperSecretApi.usernameToId(username) // returns Promise
- .thenComposeAsync(id -> {
- return userManager.fetchFromDatabase(id); // returns Promise
- }).thenConsumeSync(playerData -> {
- System.out.println(username + " has " + playerData.getCoins() + " coins!");
- }).logExceptions();
-```
-
-### Utility methods
-The `Promises` utility class provides many helpful methods for working with promises and groups of promises.
-
-`Promises.combine(Promise, Promise)` combines two promises into one `Promise>`.
-
-`Promises.erase(Promise)` erases the type on a `Promise` instance and returns a `Promise`. This is also supported for lists of promises with `Promises.all(List)`, which will return a `Promise` representing the future whereby all promises have completed.
-
-If all promises are of identical type, you can use `Promises.combine(List>)` which will return one `Promise>`, similarly representing the future whereby all promises have completed.
-
-This can also be applied to key-value scenarios in the same way, where you can provide a mapper function to be applied to all elements and combine into a single promise.
-
-
-
-
-
-### Future wrappers
-External libaries provide asynchronous ways to interact with their output in all shapes and sizes. Futur4J currently has wrappers for the following libraries/frameworks:
-- Reactor Core (via futur-reactor)
-- Java CompletableFuture (via `Promises.wrap` in futur-api)
+```
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index efb73d0..052bbc3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,7 +1,7 @@
plugins {
- id 'java'
+ id 'java-library'
id 'com.github.johnrengelman.shadow' version '8.1.1'
- id 'io.github.gradle-nexus.publish-plugin' version '1.3.0'
+ id 'io.github.gradle-nexus.publish-plugin' version '2.0.0'
}
nexusPublishing {
@@ -14,9 +14,9 @@ nexusPublishing {
subprojects {
group = 'dev.tommyjs'
- version = '2.3.4'
+ version = '2.4.0'
- apply plugin: 'java'
+ apply plugin: 'java-library'
apply plugin: 'com.github.johnrengelman.shadow'
tasks {
@@ -30,8 +30,9 @@ subprojects {
}
dependencies {
- implementation 'org.jetbrains:annotations:24.1.0'
+ compileOnly 'org.jetbrains:annotations:24.1.0'
implementation 'org.slf4j:slf4j-api:2.0.12'
+
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testImplementation 'io.projectreactor:reactor-core:3.6.4'
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
@@ -42,6 +43,13 @@ subprojects {
useJUnitPlatform()
testLogging {
exceptionFormat = 'full'
+ showStandardStreams = true
}
}
+
+ java {
+ sourceCompatibility = JavaVersion.VERSION_22
+ targetCompatibility = JavaVersion.VERSION_22
+ withSourcesJar()
+ }
}
\ No newline at end of file
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java
deleted file mode 100644
index df0d998..0000000
--- a/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package dev.tommyjs.futur.executor;
-
-import org.jetbrains.annotations.NotNull;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class DualPoolExecutor implements PromiseExecutor> {
-
- private final @NotNull ScheduledExecutorService syncSvc;
- private final @NotNull ScheduledExecutorService asyncSvc;
-
- public DualPoolExecutor(@NotNull ScheduledExecutorService syncSvc, @NotNull ScheduledExecutorService asyncSvc) {
- this.syncSvc = syncSvc;
- this.asyncSvc = asyncSvc;
- }
-
- public static @NotNull DualPoolExecutor create(int asyncPoolSize) {
- return new DualPoolExecutor(Executors.newSingleThreadScheduledExecutor(), Executors.newScheduledThreadPool(asyncPoolSize));
- }
-
- @Override
- public Future> runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
- return syncSvc.schedule(task, delay, unit);
- }
-
- @Override
- public Future> runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
- return asyncSvc.schedule(task, delay, unit);
- }
-
- @Override
- public void cancel(Future> task) {
- task.cancel(true);
- }
-
-}
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorServiceImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorServiceImpl.java
new file mode 100644
index 0000000..acefe38
--- /dev/null
+++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorServiceImpl.java
@@ -0,0 +1,32 @@
+package dev.tommyjs.futur.executor;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+class ExecutorServiceImpl implements PromiseExecutor> {
+
+ private final ScheduledExecutorService service;
+
+ public ExecutorServiceImpl(@NotNull ScheduledExecutorService service) {
+ this.service = service;
+ }
+
+ @Override
+ public Future> run(@NotNull Runnable task) {
+ return service.submit(task);
+ }
+
+ @Override
+ public Future> run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
+ return service.schedule(task, delay, unit);
+ }
+
+ @Override
+ public boolean cancel(Future> task) {
+ return task.cancel(true);
+ }
+
+}
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java
index caeaad8..1143ec2 100644
--- a/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java
+++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java
@@ -2,22 +2,80 @@ package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+/**
+ * An executor that can run tasks and schedule tasks to run in the future.
+ */
public interface PromiseExecutor {
- T runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit);
-
- T runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit);
-
- default T runSync(@NotNull Runnable task) {
- return runSync(task, 0L, TimeUnit.MILLISECONDS);
+ /**
+ * Creates a new {@link PromiseExecutor} that runs tasks on virtual threads.
+ *
+ * @return the new executor
+ */
+ static PromiseExecutor> virtualThreaded() {
+ return new VirtualThreadImpl();
}
- default T runAsync(@NotNull Runnable task) {
- return runAsync(task, 0L, TimeUnit.MILLISECONDS);
+ /**
+ * Creates a new {@link PromiseExecutor} that runs tasks on a single thread.
+ *
+ * @return the new executor
+ */
+ static PromiseExecutor> singleThreaded() {
+ return of(Executors.newSingleThreadScheduledExecutor());
}
- void cancel(T task);
+ /**
+ * Creates a new {@link PromiseExecutor} that runs tasks on multiple threads.
+ *
+ * @param threads the number of threads
+ * @return the new executor
+ */
+ static PromiseExecutor> multiThreaded(int threads) {
+ return of(Executors.newScheduledThreadPool(threads));
+ }
+
+ /**
+ * Creates a new {@link PromiseExecutor} that runs tasks on the given executor service.
+ *
+ * @param service the executor service
+ * @return the new executor
+ */
+ static PromiseExecutor> of(@NotNull ScheduledExecutorService service) {
+ return new ExecutorServiceImpl(service);
+ }
+
+ /**
+ * Runs the given task.
+ *
+ * @param task the task
+ * @return the task
+ * @throws Exception if scheduling the task failed
+ */
+ T run(@NotNull Runnable task) throws Exception;
+
+ /**
+ * Runs the given task after the given delay.
+ *
+ * @param task the task
+ * @param delay the delay
+ * @param unit the time unit
+ * @return the task
+ * @throws Exception if scheduling the task failed
+ */
+ T run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) throws Exception;
+
+ /**
+ * Cancels the given task if possible. This may interrupt the task mid-execution.
+ *
+ * @param task the task
+ * @return {@code true} if the task was cancelled. {@code false} if the task was already completed
+ * or could not be cancelled.
+ */
+ boolean cancel(T task);
}
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/SinglePoolExecutor.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/SinglePoolExecutor.java
deleted file mode 100644
index c63dab6..0000000
--- a/futur-api/src/main/java/dev/tommyjs/futur/executor/SinglePoolExecutor.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package dev.tommyjs.futur.executor;
-
-import org.jetbrains.annotations.NotNull;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class SinglePoolExecutor extends DualPoolExecutor {
-
- public SinglePoolExecutor(@NotNull ScheduledExecutorService service) {
- super(service, service);
- }
-
- public static @NotNull SinglePoolExecutor create(int threadPoolSize) {
- return new SinglePoolExecutor(Executors.newScheduledThreadPool(threadPoolSize));
- }
-
-}
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/VirtualThreadImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/VirtualThreadImpl.java
new file mode 100644
index 0000000..397efc3
--- /dev/null
+++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/VirtualThreadImpl.java
@@ -0,0 +1,36 @@
+package dev.tommyjs.futur.executor;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.TimeUnit;
+
+class VirtualThreadImpl implements PromiseExecutor {
+
+ @Override
+ public Thread run(@NotNull Runnable task) {
+ return Thread.ofVirtual().start(task);
+ }
+
+ @Override
+ public Thread run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
+ return Thread.ofVirtual().start(() -> {
+ try {
+ Thread.sleep(unit.toMillis(delay));
+ } catch (InterruptedException e) {
+ return;
+ }
+ task.run();
+ });
+ }
+
+ @Override
+ public boolean cancel(Thread task) {
+ if (task.isAlive()) {
+ task.interrupt();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java
index 3998e57..8d96919 100644
--- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java
+++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java
@@ -1,8 +1,20 @@
package dev.tommyjs.futur.function;
+/**
+ * Represents an operation that accepts a single input argument and returns no result,
+ * and may throw an exception. This is a functional interface whose functional method is {@link #accept(Object)}.
+ *
+ * @param the type of the input to the operation
+ */
@FunctionalInterface
public interface ExceptionalConsumer {
- void accept(T value) throws Throwable;
+ /**
+ * Performs this operation on the given argument, potentially throwing an exception.
+ *
+ * @param value the input argument
+ * @throws Exception if unable to compute a result
+ */
+ void accept(T value) throws Exception;
-}
+}
\ No newline at end of file
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java
index ebebf5f..403da2b 100644
--- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java
+++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java
@@ -1,8 +1,22 @@
package dev.tommyjs.futur.function;
+/**
+ * Represents a function that accepts one argument and produces a result,
+ * and may throw an exception. This is a functional interface whose functional method is {@link #apply(Object)}.
+ *
+ * @param the type of the input to the function
+ * @param the type of the result of the function
+ */
@FunctionalInterface
public interface ExceptionalFunction {
- V apply(K value) throws Throwable;
+ /**
+ * Applies this function to the given argument, potentially throwing an exception.
+ *
+ * @param value the input argument
+ * @return the function result
+ * @throws Exception if unable to compute a result
+ */
+ V apply(K value) throws Exception;
-}
+}
\ No newline at end of file
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java
index c4b8002..fc130cb 100644
--- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java
+++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java
@@ -1,8 +1,17 @@
package dev.tommyjs.futur.function;
+/**
+ * Represents a runnable task that may throw an exception.
+ * This is a functional interface whose functional method is {@link #run()}.
+ */
@FunctionalInterface
public interface ExceptionalRunnable {
- void run() throws Throwable;
+ /**
+ * Performs this runnable task, potentially throwing an exception.
+ *
+ * @throws Exception if unable to complete the task
+ */
+ void run() throws Exception;
-}
+}
\ No newline at end of file
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java
index e47f977..ba39049 100644
--- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java
+++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java
@@ -1,8 +1,20 @@
package dev.tommyjs.futur.function;
+/**
+ * Represents a supplier of results that may throw an exception.
+ * This is a functional interface whose functional method is {@link #get()}.
+ *
+ * @param the type of results supplied by this supplier
+ */
@FunctionalInterface
public interface ExceptionalSupplier {
- T get() throws Throwable;
+ /**
+ * Gets a result, potentially throwing an exception.
+ *
+ * @return a result
+ * @throws Exception if unable to supply a result
+ */
+ T get() throws Exception;
-}
+}
\ No newline at end of file
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java
deleted file mode 100644
index 8427c8a..0000000
--- a/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package dev.tommyjs.futur.impl;
-
-import dev.tommyjs.futur.executor.PromiseExecutor;
-import dev.tommyjs.futur.promise.AbstractPromise;
-import dev.tommyjs.futur.promise.AbstractPromiseFactory;
-import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-
-public class SimplePromise extends AbstractPromise {
-
- private final @NotNull AbstractPromiseFactory factory;
-
- public SimplePromise(@NotNull AbstractPromiseFactory factory) {
- this.factory = factory;
- }
-
- @Deprecated
- public SimplePromise(@NotNull PromiseExecutor executor, @NotNull Logger logger, @NotNull AbstractPromiseFactory factory) {
- this(factory);
- }
-
- @Override
- public @NotNull AbstractPromiseFactory getFactory() {
- return factory;
- }
-
-}
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java
deleted file mode 100644
index 1bc6ecb..0000000
--- a/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package dev.tommyjs.futur.impl;
-
-import dev.tommyjs.futur.executor.PromiseExecutor;
-import dev.tommyjs.futur.promise.AbstractPromiseFactory;
-import dev.tommyjs.futur.promise.Promise;
-import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-
-public class SimplePromiseFactory extends AbstractPromiseFactory {
-
- private final PromiseExecutor executor;
- private final Logger logger;
-
- public SimplePromiseFactory(PromiseExecutor executor, Logger logger) {
- this.executor = executor;
- this.logger = logger;
- }
-
- @Override
- public @NotNull Promise unresolved() {
- return new SimplePromise<>(this);
- }
-
- @Override
- public @NotNull Logger getLogger() {
- return logger;
- }
-
- @Override
- public @NotNull PromiseExecutor getExecutor() {
- return executor;
- }
-
-}
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/CompletionJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/CompletionJoiner.java
new file mode 100644
index 0000000..82f9539
--- /dev/null
+++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/CompletionJoiner.java
@@ -0,0 +1,47 @@
+package dev.tommyjs.futur.joiner;
+
+import dev.tommyjs.futur.promise.Promise;
+import dev.tommyjs.futur.promise.PromiseCompletion;
+import dev.tommyjs.futur.promise.PromiseFactory;
+import dev.tommyjs.futur.util.ConcurrentResultArray;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class CompletionJoiner extends PromiseJoiner, Void, Void, List>> {
+
+ private final ConcurrentResultArray> results;
+
+ public CompletionJoiner(
+ @NotNull PromiseFactory factory,
+ @NotNull Iterator> promises,
+ int expectedSize
+ ) {
+ super(factory);
+ results = new ConcurrentResultArray<>(expectedSize);
+ join(promises);
+ }
+
+ @Override
+ protected Void getChildKey(Promise> value) {
+ return null;
+ }
+
+ @Override
+ protected @NotNull Promise getChildPromise(Promise> value) {
+ //noinspection unchecked
+ return (Promise) value;
+ }
+
+ @Override
+ protected void onChildComplete(int index, Void key, @NotNull PromiseCompletion res) {
+ results.set(index, res);
+ }
+
+ @Override
+ protected List> getResult() {
+ return results.toList();
+ }
+
+}
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/MappedResultJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/MappedResultJoiner.java
new file mode 100644
index 0000000..ee64b0e
--- /dev/null
+++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/MappedResultJoiner.java
@@ -0,0 +1,51 @@
+package dev.tommyjs.futur.joiner;
+
+import dev.tommyjs.futur.promise.Promise;
+import dev.tommyjs.futur.promise.PromiseCompletion;
+import dev.tommyjs.futur.promise.PromiseFactory;
+import dev.tommyjs.futur.util.ConcurrentResultArray;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.*;
+
+public class MappedResultJoiner extends PromiseJoiner>, K, V, Map> {
+
+ private final @NotNull ConcurrentResultArray> results;
+
+ public MappedResultJoiner(
+ @NotNull PromiseFactory factory,
+ @NotNull Iterator>> promises,
+ int expectedSize
+ ) {
+ super(factory);
+ this.results = new ConcurrentResultArray<>(expectedSize);
+ join(promises);
+ }
+
+ @Override
+ protected K getChildKey(Map.Entry> entry) {
+ return entry.getKey();
+ }
+
+ @Override
+ protected @NotNull Promise getChildPromise(Map.Entry> entry) {
+ return entry.getValue();
+ }
+
+ @Override
+ protected void onChildComplete(int index, K key, @NotNull PromiseCompletion res) {
+ results.set(index, new AbstractMap.SimpleImmutableEntry<>(key, res.getResult()));
+ }
+
+ @Override
+ protected Map getResult() {
+ List> list = results.toList();
+ Map map = new HashMap<>(list.size());
+ for (Map.Entry entry : list) {
+ map.put(entry.getKey(), entry.getValue());
+ }
+
+ return map;
+ }
+
+}
\ No newline at end of file
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/PromiseJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/PromiseJoiner.java
new file mode 100644
index 0000000..56b465f
--- /dev/null
+++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/PromiseJoiner.java
@@ -0,0 +1,69 @@
+package dev.tommyjs.futur.joiner;
+
+import dev.tommyjs.futur.promise.CompletablePromise;
+import dev.tommyjs.futur.promise.Promise;
+import dev.tommyjs.futur.promise.PromiseCompletion;
+import dev.tommyjs.futur.promise.PromiseFactory;
+import dev.tommyjs.futur.util.PromiseUtil;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class PromiseJoiner {
+
+ private final CompletablePromise joined;
+
+ protected PromiseJoiner(@NotNull PromiseFactory factory) {
+ this.joined = factory.unresolved();
+ }
+
+ protected abstract Key getChildKey(T value);
+
+ protected abstract @NotNull Promise getChildPromise(T value);
+
+ protected abstract void onChildComplete(int index, Key key, @NotNull PromiseCompletion completion);
+
+ protected abstract Result getResult();
+
+ protected void join(@NotNull Iterator promises) {
+ AtomicInteger count = new AtomicInteger();
+
+ int i = 0;
+ do {
+ if (joined.isCompleted()) {
+ promises.forEachRemaining(v -> getChildPromise(v).cancel());
+ return;
+ }
+
+ T value = promises.next();
+ Promise p = getChildPromise(value);
+ if (!p.isCompleted()) {
+ PromiseUtil.cancelOnComplete(joined, p);
+ }
+
+ count.incrementAndGet();
+ Key key = getChildKey(value);
+ int index = i++;
+
+ p.addAsyncListener(res -> {
+ onChildComplete(index, key, res);
+ if (res.isError()) {
+ assert res.getException() != null;
+ joined.completeExceptionally(res.getException());
+ } else if (count.decrementAndGet() == -1) {
+ joined.complete(getResult());
+ }
+ });
+ } while (promises.hasNext());
+
+ if (count.decrementAndGet() == -1) {
+ joined.complete(getResult());
+ }
+ }
+
+ public @NotNull Promise joined() {
+ return joined;
+ }
+
+}
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/ResultJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/ResultJoiner.java
new file mode 100644
index 0000000..b4e1a2d
--- /dev/null
+++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/ResultJoiner.java
@@ -0,0 +1,46 @@
+package dev.tommyjs.futur.joiner;
+
+import dev.tommyjs.futur.promise.Promise;
+import dev.tommyjs.futur.promise.PromiseCompletion;
+import dev.tommyjs.futur.promise.PromiseFactory;
+import dev.tommyjs.futur.util.ConcurrentResultArray;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class ResultJoiner extends PromiseJoiner, Void, T, List> {
+
+ private final ConcurrentResultArray results;
+
+ public ResultJoiner(
+ @NotNull PromiseFactory factory,
+ @NotNull Iterator> promises,
+ int expectedSize
+ ) {
+ super(factory);
+ this.results = new ConcurrentResultArray<>(expectedSize);
+ join(promises);
+ }
+
+ @Override
+ protected Void getChildKey(Promise value) {
+ return null;
+ }
+
+ @Override
+ protected @NotNull Promise getChildPromise(Promise value) {
+ return value;
+ }
+
+ @Override
+ protected void onChildComplete(int index, Void key, @NotNull PromiseCompletion res) {
+ results.set(index, res.getResult());
+ }
+
+ @Override
+ protected List getResult() {
+ return results.toList();
+ }
+
+}
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/VoidJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/VoidJoiner.java
new file mode 100644
index 0000000..56a58d5
--- /dev/null
+++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/VoidJoiner.java
@@ -0,0 +1,38 @@
+package dev.tommyjs.futur.joiner;
+
+import dev.tommyjs.futur.promise.Promise;
+import dev.tommyjs.futur.promise.PromiseCompletion;
+import dev.tommyjs.futur.promise.PromiseFactory;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Iterator;
+
+public class VoidJoiner extends PromiseJoiner, Void, Void, Void> {
+
+ public VoidJoiner(@NotNull PromiseFactory factory, @NotNull Iterator> promises) {
+ super(factory);
+ join(promises);
+ }
+
+ @Override
+ protected Void getChildKey(Promise> value) {
+ return null;
+ }
+
+ @Override
+ protected @NotNull Promise getChildPromise(Promise> value) {
+ //noinspection unchecked
+ return (Promise) value;
+ }
+
+ @Override
+ protected void onChildComplete(int index, Void key, @NotNull PromiseCompletion completion) {
+
+ }
+
+ @Override
+ protected Void getResult() {
+ return null;
+ }
+
+}
diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java
index bb823eb..149df3c 100644
--- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java
+++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java
@@ -1,111 +1,133 @@
package dev.tommyjs.futur.promise;
-import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.function.ExceptionalConsumer;
import dev.tommyjs.futur.function.ExceptionalFunction;
import dev.tommyjs.futur.function.ExceptionalRunnable;
import dev.tommyjs.futur.function.ExceptionalSupplier;
+import dev.tommyjs.futur.util.PromiseUtil;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
-public abstract class AbstractPromise implements Promise {
+public abstract class AbstractPromise implements Promise {
- private Collection> listeners;
- private final AtomicReference> completion;
- private final CountDownLatch latch;
- private final Lock lock;
+ public abstract @NotNull AbstractPromiseFactory getFactory();
- public AbstractPromise() {
- this.completion = new AtomicReference<>();
- this.latch = new CountDownLatch(1);
- this.lock = new ReentrantLock();
- }
-
- protected static void propagateResult(Promise from, Promise to) {
- from.addDirectListener(to::complete, to::completeExceptionally);
- }
-
- protected static void propagateCancel(Promise> from, Promise> to) {
- from.onCancel(to::completeExceptionally);
- }
-
- private @NotNull Runnable createRunnable(T result, @NotNull Promise promise, @NotNull ExceptionalFunction task) {
- return () -> {
- if (promise.isCompleted()) return;
-
- try {
- V nextResult = task.apply(result);
- promise.complete(nextResult);
- } catch (Throwable e) {
- promise.completeExceptionally(e);
- }
- };
- }
-
- public abstract @NotNull AbstractPromiseFactory getFactory();
-
- protected @NotNull PromiseExecutor getExecutor() {
- return getFactory().getExecutor();
- }
+ protected abstract @NotNull Promise addAnyListener(@NotNull PromiseListener listener);
protected @NotNull Logger getLogger() {
return getFactory().getLogger();
}
- @Override
- public T awaitInterruptibly() throws InterruptedException {
- this.latch.await();
- return joinCompletion(Objects.requireNonNull(getCompletion()));
- }
-
- @Override
- public T awaitInterruptibly(long timeoutMillis) throws TimeoutException, InterruptedException {
- boolean success = this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
- if (!success) {
- throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms");
+ protected void callListener(@NotNull PromiseListener listener, @NotNull PromiseCompletion cmp) {
+ if (listener instanceof AsyncPromiseListener) {
+ callListenerAsync(listener, cmp);
+ } else {
+ callListenerNow(listener, cmp);
}
-
- return joinCompletion(Objects.requireNonNull(getCompletion()));
}
- @Override
- public T await() {
+ protected V supplySafe(@NotNull ExceptionalSupplier supplier, @NotNull Function handler) {
try {
- return awaitInterruptibly();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ return supplier.get();
+ } catch (Error error) {
+ // Rethrow error so the Thread can shut down
+ throw error;
+ } catch (Throwable e) {
+ return handler.apply(e);
}
}
+ protected void runSafe(@NotNull ExceptionalRunnable runnable, @NotNull Consumer handler) {
+ try {
+ runnable.run();
+ } catch (Error error) {
+ handler.accept(error);
+ // Rethrow error so the Thread can shut down
+ throw error;
+ } catch (Throwable e) {
+ handler.accept(e);
+ }
+ }
+
+ protected void runCompleter(@NotNull CompletablePromise> promise, @NotNull ExceptionalRunnable completer) {
+ runSafe(completer, promise::completeExceptionally);
+ }
+
+ protected V useCompletion(Supplier unresolved, Function completed, Function failed) {
+ PromiseCompletion completion = getCompletion();
+ if (completion == null) return unresolved.get();
+ else if (completion.isSuccess()) return completed.apply(completion.getResult());
+ else return failed.apply(completion.getException());
+ }
+
+ protected @NotNull Runnable createCompleter(T result, @NotNull CompletablePromise promise,
+ @NotNull ExceptionalFunction completer) {
+ return () -> {
+ if (!promise.isCompleted()) {
+ runCompleter(promise, () -> promise.complete(completer.apply(result)));
+ }
+ };
+ }
+
+ protected @NotNull CompletablePromise createLinked() {
+ CompletablePromise promise = getFactory().unresolved();
+ PromiseUtil.propagateCancel(promise, this);
+ return promise;
+ }
+
+ protected void callListenerAsync(PromiseListener listener, PromiseCompletion res) {
+ try {
+ getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, res));
+ } catch (RejectedExecutionException ignored) {
+ } catch (Exception e) {
+ getLogger().warn("Exception caught while running promise listener", e);
+ }
+ }
+
+ protected void callListenerNow(PromiseListener listener, PromiseCompletion res) {
+ runSafe(() -> listener.handle(res), e -> getLogger().error("Exception caught in promise listener", e));
+ }
+
+ protected void callListenerAsyncLastResort(PromiseListener listener, PromiseCompletion completion) {
+ try {
+ getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, completion));
+ } catch (Throwable ignored) {
+ }
+ }
+
+ protected T joinCompletionChecked() throws ExecutionException {
+ PromiseCompletion completion = getCompletion();
+ assert completion != null;
+ if (completion.isSuccess()) return completion.getResult();
+ throw new ExecutionException(completion.getException());
+ }
+
+ protected T joinCompletionUnchecked() {
+ PromiseCompletion completion = getCompletion();
+ assert completion != null;
+ if (completion.isSuccess()) return completion.getResult();
+ throw new CompletionException(completion.getException());
+ }
+
@Override
- public T await(long timeoutMillis) throws TimeoutException {
- try {
- return awaitInterruptibly(timeoutMillis);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
+ public @NotNull Promise fork() {
+ if (isCompleted()) return this;
- private T joinCompletion(PromiseCompletion completion) {
- if (completion.isError())
- throw new RuntimeException(completion.getException());
-
- return completion.getResult();
+ CompletablePromise fork = getFactory().unresolved();
+ PromiseUtil.propagateCompletion(this, fork);
+ return fork;
}
@Override
public @NotNull Promise thenRun(@NotNull ExceptionalRunnable task) {
- return thenApply(result -> {
+ return thenApply(_ -> {
task.run();
return null;
});
@@ -121,43 +143,71 @@ public abstract class AbstractPromise implements Promise {
@Override
public @NotNull Promise thenSupply(@NotNull ExceptionalSupplier task) {
- return thenApply(result -> task.get());
+ return thenApply(_ -> task.get());
}
@Override
public @NotNull Promise thenApply(@NotNull ExceptionalFunction task) {
- Promise promise = getFactory().unresolved();
- addDirectListener(
- res -> createRunnable(res, promise, task).run(),
- promise::completeExceptionally
- );
+ return useCompletion(
+ () -> {
+ CompletablePromise promise = createLinked();
+ addDirectListener(
+ res -> createCompleter(res, promise, task).run(),
+ promise::completeExceptionally
+ );
- propagateCancel(promise, this);
- return promise;
+ return promise;
+ },
+ result -> supplySafe(
+ () -> getFactory().resolve(task.apply(result)),
+ getFactory()::error
+ ),
+ getFactory()::error
+ );
}
@Override
public @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task) {
- Promise promise = getFactory().unresolved();
- thenApply(task).addDirectListener(
- nestedPromise -> {
- if (nestedPromise == null) {
- promise.complete(null);
- } else {
- propagateResult(nestedPromise, promise);
- propagateCancel(promise, nestedPromise);
- }
- },
- promise::completeExceptionally
- );
+ return useCompletion(
+ () -> {
+ CompletablePromise promise = createLinked();
+ thenApply(task).addDirectListener(
+ result -> {
+ if (result == null) {
+ promise.complete(null);
+ } else {
+ PromiseUtil.propagateCompletion(result, promise);
+ PromiseUtil.propagateCancel(promise, result);
+ }
+ },
+ promise::completeExceptionally
+ );
- propagateCancel(promise, this);
- return promise;
+ return promise;
+ },
+ result -> supplySafe(
+ () -> {
+ Promise nested = task.apply(result);
+ if (nested == null) {
+ return getFactory().resolve(null);
+ } else if (nested.isCompleted()) {
+ return nested;
+ } else {
+ CompletablePromise promise = createLinked();
+ PromiseUtil.propagateCompletion(nested, promise);
+ PromiseUtil.propagateCancel(promise, nested);
+ return promise;
+ }
+ },
+ getFactory()::error
+ ),
+ getFactory()::error
+ );
}
@Override
public @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task) {
- return thenApplySync(result -> {
+ return thenApplySync(_ -> {
task.run();
return null;
});
@@ -165,7 +215,7 @@ public abstract class AbstractPromise implements Promise {
@Override
public @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
- return thenApplyDelayedSync(result -> {
+ return thenApplyDelayedSync(_ -> {
task.run();
return null;
}, delay, unit);
@@ -189,76 +239,65 @@ public abstract class AbstractPromise implements Promise {
@Override
public @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task) {
- return thenApplySync(result -> task.get());
+ return thenApplySync(_ -> task.get());
}
@Override
public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) {
- return thenApplyDelayedSync(result -> task.get(), delay, unit);
+ return thenApplyDelayedSync(_ -> task.get(), delay, unit);
}
@Override
public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) {
- Promise promise = getFactory().unresolved();
+ CompletablePromise promise = createLinked();
addDirectListener(
- res -> {
- try {
- Runnable runnable = createRunnable(res, promise, task);
- F future = getExecutor().runSync(runnable);
- promise.onCancel((e) -> getExecutor().cancel(future));
- } catch (RejectedExecutionException e) {
- promise.completeExceptionally(e);
- }
- },
+ res -> runCompleter(promise, () -> {
+ Runnable runnable = createCompleter(res, promise, task);
+ FS future = getFactory().getSyncExecutor().run(runnable);
+ promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future));
+ }),
promise::completeExceptionally
);
- propagateCancel(promise, this);
return promise;
}
@Override
public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) {
- Promise promise = getFactory().unresolved();
+ CompletablePromise promise = createLinked();
addDirectListener(
- res -> {
- try {
- Runnable runnable = createRunnable(res, promise, task);
- F future = getExecutor().runSync(runnable, delay, unit);
- promise.onCancel((e) -> getExecutor().cancel(future));
- } catch (RejectedExecutionException e) {
- promise.completeExceptionally(e);
- }
- },
+ res -> runCompleter(promise, () -> {
+ Runnable runnable = createCompleter(res, promise, task);
+ FS future = getFactory().getSyncExecutor().run(runnable, delay, unit);
+ promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future));
+ }),
promise::completeExceptionally
);
- propagateCancel(promise, this);
return promise;
}
@Override
public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) {
- Promise promise = getFactory().unresolved();
+ CompletablePromise promise = createLinked();
thenApplySync(task).addDirectListener(
nestedPromise -> {
if (nestedPromise == null) {
promise.complete(null);
} else {
- propagateResult(nestedPromise, promise);
- propagateCancel(promise, nestedPromise);
+ PromiseUtil.propagateCompletion(nestedPromise, promise);
+ PromiseUtil.propagateCancel(promise, nestedPromise);
}
},
promise::completeExceptionally
);
- propagateCancel(promise, this);
return promise;
}
@Override
public @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task) {
- return thenApplyAsync(result -> {
+ return thenApplyAsync(_ -> {
task.run();
return null;
});
@@ -266,7 +305,7 @@ public abstract class AbstractPromise implements Promise {
@Override
public @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
- return thenApplyDelayedAsync(result -> {
+ return thenApplyDelayedAsync(_ -> {
task.run();
return null;
}, delay, unit);
@@ -290,84 +329,73 @@ public abstract class AbstractPromise implements Promise {
@Override
public @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task) {
- return thenApplyAsync(result -> task.get());
+ return thenApplyAsync(_ -> task.get());
}
@Override
public @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) {
- return thenApplyDelayedAsync(result -> task.get(), delay, unit);
+ return thenApplyDelayedAsync(_ -> task.get(), delay, unit);
+ }
+
+ @Override
+ public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) {
+ CompletablePromise promise = createLinked();
+ addDirectListener(
+ (res) -> runCompleter(promise, () -> {
+ Runnable runnable = createCompleter(res, promise, task);
+ FA future = getFactory().getAsyncExecutor().run(runnable);
+ promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future));
+ }),
+ promise::completeExceptionally
+ );
+
+ return promise;
+ }
+
+ @Override
+ public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) {
+ CompletablePromise promise = createLinked();
+ addDirectListener(
+ res -> runCompleter(promise, () -> {
+ Runnable runnable = createCompleter(res, promise, task);
+ FA future = getFactory().getAsyncExecutor().run(runnable, delay, unit);
+ promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future));
+ }),
+ promise::completeExceptionally
+ );
+
+ return promise;
+ }
+
+ @Override
+ public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) {
+ CompletablePromise promise = createLinked();
+ thenApplyAsync(task).addDirectListener(
+ nestedPromise -> {
+ if (nestedPromise == null) {
+ promise.complete(null);
+ } else {
+ PromiseUtil.propagateCompletion(nestedPromise, promise);
+ PromiseUtil.propagateCancel(promise, nestedPromise);
+ }
+ },
+ promise::completeExceptionally
+ );
+
+ return promise;
}
@Override
public @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference) {
- return thenApplyAsync((result) -> {
+ return thenApply(result -> {
reference.set(result);
return result;
});
}
- @Override
- public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) {
- Promise promise = getFactory().unresolved();
- addDirectListener(
- (res) -> {
- try {
- Runnable runnable = createRunnable(res, promise, task);
- F future = getExecutor().runAsync(runnable);
- promise.onCancel((e) -> getExecutor().cancel(future));
- } catch (RejectedExecutionException e) {
- promise.completeExceptionally(e);
- }
- },
- promise::completeExceptionally
- );
-
- propagateCancel(promise, this);
- return promise;
- }
-
- @Override
- public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) {
- Promise promise = getFactory().unresolved();
- addDirectListener(
- res -> {
- try {
- Runnable runnable = createRunnable(res, promise, task);
- F future = getExecutor().runAsync(runnable, delay, unit);
- promise.onCancel((e) -> getExecutor().cancel(future));
- } catch (RejectedExecutionException e) {
- promise.completeExceptionally(e);
- }
- },
- promise::completeExceptionally
- );
-
- propagateCancel(promise, this);
- return promise;
- }
-
- @Override
- public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) {
- Promise promise = getFactory().unresolved();
- thenApplyAsync(task).addDirectListener(
- nestedPromise -> {
- if (nestedPromise == null) {
- promise.complete(null);
- } else {
- propagateResult(nestedPromise, promise);
- propagateCancel(promise, nestedPromise);
- }
- },
- promise::completeExceptionally
- );
-
- propagateCancel(promise, this);
- return promise;
- }
-
@Override
public @NotNull Promise erase() {
- return thenSupplyAsync(() -> null);
+ return thenSupply(() -> null);
}
@Override
@@ -377,11 +405,11 @@ public abstract class AbstractPromise implements Promise {
@Override
public @NotNull Promise addAsyncListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) {
- return addAsyncListener((res) -> {
- if (res.isError()) {
- if (errorListener != null) errorListener.accept(res.getException());
- } else {
+ return addAsyncListener(res -> {
+ if (res.isSuccess()) {
if (successListener != null) successListener.accept(res.getResult());
+ } else {
+ if (errorListener != null) errorListener.accept(res.getException());
}
});
}
@@ -393,54 +421,15 @@ public abstract class AbstractPromise implements Promise {
@Override
public @NotNull Promise addDirectListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) {
- return addDirectListener((res) -> {
- if (res.isError()) {
- if (errorListener != null) errorListener.accept(res.getException());
- } else {
+ return addDirectListener(res -> {
+ if (res.isSuccess()) {
if (successListener != null) successListener.accept(res.getResult());
+ } else {
+ if (errorListener != null) errorListener.accept(res.getException());
}
});
}
- private @NotNull Promise addAnyListener(PromiseListener listener) {
- PromiseCompletion completion;
-
- lock.lock();
- try {
- completion = getCompletion();
- if (completion == null) {
- if (listeners == null) listeners = new LinkedList<>();
- listeners.add(listener);
- return this;
- }
- } finally {
- lock.unlock();
- }
-
- callListener(listener, completion);
- return this;
- }
-
- private void callListener(PromiseListener listener, PromiseCompletion ctx) {
- if (listener instanceof AsyncPromiseListener) {
- try {
- getExecutor().runAsync(() -> callListenerNow(listener, ctx));
- } catch (RejectedExecutionException ignored) {
-
- }
- } else {
- callListenerNow(listener, ctx);
- }
- }
-
- private void callListenerNow(PromiseListener listener, PromiseCompletion ctx) {
- try {
- listener.handle(ctx);
- } catch (Exception e) {
- getLogger().error("Exception caught in promise listener", e);
- }
- }
-
@Override
public @NotNull Promise onSuccess(@NotNull Consumer listener) {
return addAsyncListener(listener, null);
@@ -453,13 +442,14 @@ public abstract class AbstractPromise implements Promise {
@Override
public @NotNull Promise logExceptions(@NotNull String message) {
- return onError(e -> getLogger().error(message, e));
+ Exception wrapper = new DeferredExecutionException();
+ return onError(e -> getLogger().error(message, wrapper.initCause(e)));
}
@Override
- public @NotNull Promise onError(@NotNull Class clazz, @NotNull Consumer listener) {
- return onError((e) -> {
- if (clazz.isAssignableFrom(e.getClass())) {
+ public @NotNull Promise onError(@NotNull Class type, @NotNull Consumer listener) {
+ return onError(e -> {
+ if (type.isAssignableFrom(e.getClass())) {
//noinspection unchecked
listener.accept((E) e);
}
@@ -471,79 +461,50 @@ public abstract class AbstractPromise implements Promise {
return onError(CancellationException.class, listener);
}
- @Deprecated
@Override
- public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) {
- return maxWaitTime(time, unit);
+ public @NotNull Promise orDefault(@Nullable T defaultValue) {
+ return orDefault(_ -> defaultValue);
}
@Override
- public @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit) {
- try {
- Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit);
- F future = getExecutor().runAsync(() -> completeExceptionally(e), time, unit);
- return addDirectListener((_v) -> getExecutor().cancel(future));
- } catch (RejectedExecutionException e) {
- completeExceptionally(e);
- return this;
- }
- }
-
- private void handleCompletion(@NotNull PromiseCompletion ctx) {
- lock.lock();
- try {
- if (!setCompletion(ctx)) return;
-
- this.latch.countDown();
- if (listeners != null) {
- for (PromiseListener listener : listeners) {
- callListener(listener, ctx);
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- private boolean setCompletion(PromiseCompletion completion) {
- return this.completion.compareAndSet(null, completion);
+ public @NotNull Promise orDefault(@NotNull ExceptionalSupplier supplier) {
+ return orDefault(_ -> supplier.get());
}
@Override
- public void cancel(@Nullable String message) {
- completeExceptionally(new CancellationException(message));
- }
-
- @Override
- public void complete(@Nullable T result) {
- handleCompletion(new PromiseCompletion<>(result));
- }
-
- @Override
- public void completeExceptionally(@NotNull Throwable result) {
- handleCompletion(new PromiseCompletion<>(result));
- }
-
- @Override
- public boolean isCompleted() {
- return completion.get() != null;
- }
-
- @Override
- public @Nullable PromiseCompletion getCompletion() {
- return completion.get();
+ public @NotNull Promise orDefault(@NotNull ExceptionalFunction function) {
+ return useCompletion(
+ () -> {
+ CompletablePromise promise = createLinked();
+ addDirectListener(promise::complete, e -> runCompleter(promise, () -> promise.complete(function.apply(e))));
+ return promise;
+ },
+ getFactory()::resolve,
+ getFactory()::error
+ );
}
@Override
public @NotNull CompletableFuture