release version 2.4.0

Merge pull request #10 from tommyskeff/release/2.4
This commit is contained in:
Tommy
2025-01-10 21:40:57 +00:00
committed by tommyskeff
48 changed files with 3061 additions and 1181 deletions

View File

@@ -1,39 +0,0 @@
package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DualPoolExecutor implements PromiseExecutor<Future<?>> {
private final @NotNull ScheduledExecutorService syncSvc;
private final @NotNull ScheduledExecutorService asyncSvc;
public DualPoolExecutor(@NotNull ScheduledExecutorService syncSvc, @NotNull ScheduledExecutorService asyncSvc) {
this.syncSvc = syncSvc;
this.asyncSvc = asyncSvc;
}
public static @NotNull DualPoolExecutor create(int asyncPoolSize) {
return new DualPoolExecutor(Executors.newSingleThreadScheduledExecutor(), Executors.newScheduledThreadPool(asyncPoolSize));
}
@Override
public Future<?> runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return syncSvc.schedule(task, delay, unit);
}
@Override
public Future<?> runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return asyncSvc.schedule(task, delay, unit);
}
@Override
public void cancel(Future<?> task) {
task.cancel(true);
}
}

View File

@@ -0,0 +1,32 @@
package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
class ExecutorServiceImpl implements PromiseExecutor<Future<?>> {
private final ScheduledExecutorService service;
public ExecutorServiceImpl(@NotNull ScheduledExecutorService service) {
this.service = service;
}
@Override
public Future<?> run(@NotNull Runnable task) {
return service.submit(task);
}
@Override
public Future<?> run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return service.schedule(task, delay, unit);
}
@Override
public boolean cancel(Future<?> task) {
return task.cancel(true);
}
}

View File

@@ -2,22 +2,80 @@ package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* An executor that can run tasks and schedule tasks to run in the future.
*/
public interface PromiseExecutor<T> {
T runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit);
T runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit);
default T runSync(@NotNull Runnable task) {
return runSync(task, 0L, TimeUnit.MILLISECONDS);
/**
* Creates a new {@link PromiseExecutor} that runs tasks on virtual threads.
*
* @return the new executor
*/
static PromiseExecutor<?> virtualThreaded() {
return new VirtualThreadImpl();
}
default T runAsync(@NotNull Runnable task) {
return runAsync(task, 0L, TimeUnit.MILLISECONDS);
/**
* Creates a new {@link PromiseExecutor} that runs tasks on a single thread.
*
* @return the new executor
*/
static PromiseExecutor<?> singleThreaded() {
return of(Executors.newSingleThreadScheduledExecutor());
}
void cancel(T task);
/**
* Creates a new {@link PromiseExecutor} that runs tasks on multiple threads.
*
* @param threads the number of threads
* @return the new executor
*/
static PromiseExecutor<?> multiThreaded(int threads) {
return of(Executors.newScheduledThreadPool(threads));
}
/**
* Creates a new {@link PromiseExecutor} that runs tasks on the given executor service.
*
* @param service the executor service
* @return the new executor
*/
static PromiseExecutor<?> of(@NotNull ScheduledExecutorService service) {
return new ExecutorServiceImpl(service);
}
/**
* Runs the given task.
*
* @param task the task
* @return the task
* @throws Exception if scheduling the task failed
*/
T run(@NotNull Runnable task) throws Exception;
/**
* Runs the given task after the given delay.
*
* @param task the task
* @param delay the delay
* @param unit the time unit
* @return the task
* @throws Exception if scheduling the task failed
*/
T run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) throws Exception;
/**
* Cancels the given task if possible. This may interrupt the task mid-execution.
*
* @param task the task
* @return {@code true} if the task was cancelled. {@code false} if the task was already completed
* or could not be cancelled.
*/
boolean cancel(T task);
}

View File

@@ -1,18 +0,0 @@
package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class SinglePoolExecutor extends DualPoolExecutor {
public SinglePoolExecutor(@NotNull ScheduledExecutorService service) {
super(service, service);
}
public static @NotNull SinglePoolExecutor create(int threadPoolSize) {
return new SinglePoolExecutor(Executors.newScheduledThreadPool(threadPoolSize));
}
}

View File

@@ -0,0 +1,36 @@
package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.TimeUnit;
class VirtualThreadImpl implements PromiseExecutor<Thread> {
@Override
public Thread run(@NotNull Runnable task) {
return Thread.ofVirtual().start(task);
}
@Override
public Thread run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return Thread.ofVirtual().start(() -> {
try {
Thread.sleep(unit.toMillis(delay));
} catch (InterruptedException e) {
return;
}
task.run();
});
}
@Override
public boolean cancel(Thread task) {
if (task.isAlive()) {
task.interrupt();
return true;
} else {
return false;
}
}
}

View File

@@ -1,8 +1,20 @@
package dev.tommyjs.futur.function;
/**
* Represents an operation that accepts a single input argument and returns no result,
* and may throw an exception. This is a functional interface whose functional method is {@link #accept(Object)}.
*
* @param <T> the type of the input to the operation
*/
@FunctionalInterface
public interface ExceptionalConsumer<T> {
void accept(T value) throws Throwable;
/**
* Performs this operation on the given argument, potentially throwing an exception.
*
* @param value the input argument
* @throws Exception if unable to compute a result
*/
void accept(T value) throws Exception;
}
}

View File

@@ -1,8 +1,22 @@
package dev.tommyjs.futur.function;
/**
* Represents a function that accepts one argument and produces a result,
* and may throw an exception. This is a functional interface whose functional method is {@link #apply(Object)}.
*
* @param <K> the type of the input to the function
* @param <V> the type of the result of the function
*/
@FunctionalInterface
public interface ExceptionalFunction<K, V> {
V apply(K value) throws Throwable;
/**
* Applies this function to the given argument, potentially throwing an exception.
*
* @param value the input argument
* @return the function result
* @throws Exception if unable to compute a result
*/
V apply(K value) throws Exception;
}
}

View File

@@ -1,8 +1,17 @@
package dev.tommyjs.futur.function;
/**
* Represents a runnable task that may throw an exception.
* This is a functional interface whose functional method is {@link #run()}.
*/
@FunctionalInterface
public interface ExceptionalRunnable {
void run() throws Throwable;
/**
* Performs this runnable task, potentially throwing an exception.
*
* @throws Exception if unable to complete the task
*/
void run() throws Exception;
}
}

View File

@@ -1,8 +1,20 @@
package dev.tommyjs.futur.function;
/**
* Represents a supplier of results that may throw an exception.
* This is a functional interface whose functional method is {@link #get()}.
*
* @param <T> the type of results supplied by this supplier
*/
@FunctionalInterface
public interface ExceptionalSupplier<T> {
T get() throws Throwable;
/**
* Gets a result, potentially throwing an exception.
*
* @return a result
* @throws Exception if unable to supply a result
*/
T get() throws Exception;
}
}

View File

@@ -1,27 +0,0 @@
package dev.tommyjs.futur.impl;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.promise.AbstractPromise;
import dev.tommyjs.futur.promise.AbstractPromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
public class SimplePromise<T, F> extends AbstractPromise<T, F> {
private final @NotNull AbstractPromiseFactory<F> factory;
public SimplePromise(@NotNull AbstractPromiseFactory<F> factory) {
this.factory = factory;
}
@Deprecated
public SimplePromise(@NotNull PromiseExecutor<F> executor, @NotNull Logger logger, @NotNull AbstractPromiseFactory<F> factory) {
this(factory);
}
@Override
public @NotNull AbstractPromiseFactory<F> getFactory() {
return factory;
}
}

View File

@@ -1,34 +0,0 @@
package dev.tommyjs.futur.impl;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.promise.AbstractPromiseFactory;
import dev.tommyjs.futur.promise.Promise;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
public class SimplePromiseFactory<F> extends AbstractPromiseFactory<F> {
private final PromiseExecutor<F> executor;
private final Logger logger;
public SimplePromiseFactory(PromiseExecutor<F> executor, Logger logger) {
this.executor = executor;
this.logger = logger;
}
@Override
public @NotNull <T> Promise<T> unresolved() {
return new SimplePromise<>(this);
}
@Override
public @NotNull Logger getLogger() {
return logger;
}
@Override
public @NotNull PromiseExecutor<F> getExecutor() {
return executor;
}
}

View File

@@ -0,0 +1,47 @@
package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.util.ConcurrentResultArray;
import org.jetbrains.annotations.NotNull;
import java.util.Iterator;
import java.util.List;
public class CompletionJoiner extends PromiseJoiner<Promise<?>, Void, Void, List<PromiseCompletion<?>>> {
private final ConcurrentResultArray<PromiseCompletion<?>> results;
public CompletionJoiner(
@NotNull PromiseFactory factory,
@NotNull Iterator<Promise<?>> promises,
int expectedSize
) {
super(factory);
results = new ConcurrentResultArray<>(expectedSize);
join(promises);
}
@Override
protected Void getChildKey(Promise<?> value) {
return null;
}
@Override
protected @NotNull Promise<Void> getChildPromise(Promise<?> value) {
//noinspection unchecked
return (Promise<Void>) value;
}
@Override
protected void onChildComplete(int index, Void key, @NotNull PromiseCompletion<Void> res) {
results.set(index, res);
}
@Override
protected List<PromiseCompletion<?>> getResult() {
return results.toList();
}
}

View File

@@ -0,0 +1,51 @@
package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.util.ConcurrentResultArray;
import org.jetbrains.annotations.NotNull;
import java.util.*;
public class MappedResultJoiner<K, V> extends PromiseJoiner<Map.Entry<K, Promise<V>>, K, V, Map<K, V>> {
private final @NotNull ConcurrentResultArray<Map.Entry<K, V>> results;
public MappedResultJoiner(
@NotNull PromiseFactory factory,
@NotNull Iterator<Map.Entry<K, Promise<V>>> promises,
int expectedSize
) {
super(factory);
this.results = new ConcurrentResultArray<>(expectedSize);
join(promises);
}
@Override
protected K getChildKey(Map.Entry<K, Promise<V>> entry) {
return entry.getKey();
}
@Override
protected @NotNull Promise<V> getChildPromise(Map.Entry<K, Promise<V>> entry) {
return entry.getValue();
}
@Override
protected void onChildComplete(int index, K key, @NotNull PromiseCompletion<V> res) {
results.set(index, new AbstractMap.SimpleImmutableEntry<>(key, res.getResult()));
}
@Override
protected Map<K, V> getResult() {
List<Map.Entry<K, V>> list = results.toList();
Map<K, V> map = new HashMap<>(list.size());
for (Map.Entry<K, V> entry : list) {
map.put(entry.getKey(), entry.getValue());
}
return map;
}
}

View File

@@ -0,0 +1,69 @@
package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.CompletablePromise;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.util.PromiseUtil;
import org.jetbrains.annotations.NotNull;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class PromiseJoiner<T, Key, Value, Result> {
private final CompletablePromise<Result> joined;
protected PromiseJoiner(@NotNull PromiseFactory factory) {
this.joined = factory.unresolved();
}
protected abstract Key getChildKey(T value);
protected abstract @NotNull Promise<Value> getChildPromise(T value);
protected abstract void onChildComplete(int index, Key key, @NotNull PromiseCompletion<Value> completion);
protected abstract Result getResult();
protected void join(@NotNull Iterator<T> promises) {
AtomicInteger count = new AtomicInteger();
int i = 0;
do {
if (joined.isCompleted()) {
promises.forEachRemaining(v -> getChildPromise(v).cancel());
return;
}
T value = promises.next();
Promise<Value> p = getChildPromise(value);
if (!p.isCompleted()) {
PromiseUtil.cancelOnComplete(joined, p);
}
count.incrementAndGet();
Key key = getChildKey(value);
int index = i++;
p.addAsyncListener(res -> {
onChildComplete(index, key, res);
if (res.isError()) {
assert res.getException() != null;
joined.completeExceptionally(res.getException());
} else if (count.decrementAndGet() == -1) {
joined.complete(getResult());
}
});
} while (promises.hasNext());
if (count.decrementAndGet() == -1) {
joined.complete(getResult());
}
}
public @NotNull Promise<Result> joined() {
return joined;
}
}

View File

@@ -0,0 +1,46 @@
package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.util.ConcurrentResultArray;
import org.jetbrains.annotations.NotNull;
import java.util.Iterator;
import java.util.List;
public class ResultJoiner<T> extends PromiseJoiner<Promise<T>, Void, T, List<T>> {
private final ConcurrentResultArray<T> results;
public ResultJoiner(
@NotNull PromiseFactory factory,
@NotNull Iterator<Promise<T>> promises,
int expectedSize
) {
super(factory);
this.results = new ConcurrentResultArray<>(expectedSize);
join(promises);
}
@Override
protected Void getChildKey(Promise<T> value) {
return null;
}
@Override
protected @NotNull Promise<T> getChildPromise(Promise<T> value) {
return value;
}
@Override
protected void onChildComplete(int index, Void key, @NotNull PromiseCompletion<T> res) {
results.set(index, res.getResult());
}
@Override
protected List<T> getResult() {
return results.toList();
}
}

View File

@@ -0,0 +1,38 @@
package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull;
import java.util.Iterator;
public class VoidJoiner extends PromiseJoiner<Promise<?>, Void, Void, Void> {
public VoidJoiner(@NotNull PromiseFactory factory, @NotNull Iterator<Promise<?>> promises) {
super(factory);
join(promises);
}
@Override
protected Void getChildKey(Promise<?> value) {
return null;
}
@Override
protected @NotNull Promise<Void> getChildPromise(Promise<?> value) {
//noinspection unchecked
return (Promise<Void>) value;
}
@Override
protected void onChildComplete(int index, Void key, @NotNull PromiseCompletion<Void> completion) {
}
@Override
protected Void getResult() {
return null;
}
}

View File

@@ -1,111 +1,133 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.function.ExceptionalConsumer;
import dev.tommyjs.futur.function.ExceptionalFunction;
import dev.tommyjs.futur.function.ExceptionalRunnable;
import dev.tommyjs.futur.function.ExceptionalSupplier;
import dev.tommyjs.futur.util.PromiseUtil;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
public abstract class AbstractPromise<T, F> implements Promise<T> {
public abstract class AbstractPromise<T, FS, FA> implements Promise<T> {
private Collection<PromiseListener<T>> listeners;
private final AtomicReference<PromiseCompletion<T>> completion;
private final CountDownLatch latch;
private final Lock lock;
public abstract @NotNull AbstractPromiseFactory<FS, FA> getFactory();
public AbstractPromise() {
this.completion = new AtomicReference<>();
this.latch = new CountDownLatch(1);
this.lock = new ReentrantLock();
}
protected static <V> void propagateResult(Promise<V> from, Promise<V> to) {
from.addDirectListener(to::complete, to::completeExceptionally);
}
protected static void propagateCancel(Promise<?> from, Promise<?> to) {
from.onCancel(to::completeExceptionally);
}
private <V> @NotNull Runnable createRunnable(T result, @NotNull Promise<V> promise, @NotNull ExceptionalFunction<T, V> task) {
return () -> {
if (promise.isCompleted()) return;
try {
V nextResult = task.apply(result);
promise.complete(nextResult);
} catch (Throwable e) {
promise.completeExceptionally(e);
}
};
}
public abstract @NotNull AbstractPromiseFactory<F> getFactory();
protected @NotNull PromiseExecutor<F> getExecutor() {
return getFactory().getExecutor();
}
protected abstract @NotNull Promise<T> addAnyListener(@NotNull PromiseListener<T> listener);
protected @NotNull Logger getLogger() {
return getFactory().getLogger();
}
@Override
public T awaitInterruptibly() throws InterruptedException {
this.latch.await();
return joinCompletion(Objects.requireNonNull(getCompletion()));
}
@Override
public T awaitInterruptibly(long timeoutMillis) throws TimeoutException, InterruptedException {
boolean success = this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
if (!success) {
throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms");
protected void callListener(@NotNull PromiseListener<T> listener, @NotNull PromiseCompletion<T> cmp) {
if (listener instanceof AsyncPromiseListener) {
callListenerAsync(listener, cmp);
} else {
callListenerNow(listener, cmp);
}
return joinCompletion(Objects.requireNonNull(getCompletion()));
}
@Override
public T await() {
protected <V> V supplySafe(@NotNull ExceptionalSupplier<V> supplier, @NotNull Function<Throwable, V> handler) {
try {
return awaitInterruptibly();
} catch (InterruptedException e) {
throw new RuntimeException(e);
return supplier.get();
} catch (Error error) {
// Rethrow error so the Thread can shut down
throw error;
} catch (Throwable e) {
return handler.apply(e);
}
}
protected void runSafe(@NotNull ExceptionalRunnable runnable, @NotNull Consumer<Throwable> handler) {
try {
runnable.run();
} catch (Error error) {
handler.accept(error);
// Rethrow error so the Thread can shut down
throw error;
} catch (Throwable e) {
handler.accept(e);
}
}
protected void runCompleter(@NotNull CompletablePromise<?> promise, @NotNull ExceptionalRunnable completer) {
runSafe(completer, promise::completeExceptionally);
}
protected <V> V useCompletion(Supplier<V> unresolved, Function<T, V> completed, Function<Throwable, V> failed) {
PromiseCompletion<T> completion = getCompletion();
if (completion == null) return unresolved.get();
else if (completion.isSuccess()) return completed.apply(completion.getResult());
else return failed.apply(completion.getException());
}
protected <V> @NotNull Runnable createCompleter(T result, @NotNull CompletablePromise<V> promise,
@NotNull ExceptionalFunction<T, V> completer) {
return () -> {
if (!promise.isCompleted()) {
runCompleter(promise, () -> promise.complete(completer.apply(result)));
}
};
}
protected <V> @NotNull CompletablePromise<V> createLinked() {
CompletablePromise<V> promise = getFactory().unresolved();
PromiseUtil.propagateCancel(promise, this);
return promise;
}
protected void callListenerAsync(PromiseListener<T> listener, PromiseCompletion<T> res) {
try {
getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, res));
} catch (RejectedExecutionException ignored) {
} catch (Exception e) {
getLogger().warn("Exception caught while running promise listener", e);
}
}
protected void callListenerNow(PromiseListener<T> listener, PromiseCompletion<T> res) {
runSafe(() -> listener.handle(res), e -> getLogger().error("Exception caught in promise listener", e));
}
protected void callListenerAsyncLastResort(PromiseListener<T> listener, PromiseCompletion<T> completion) {
try {
getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, completion));
} catch (Throwable ignored) {
}
}
protected T joinCompletionChecked() throws ExecutionException {
PromiseCompletion<T> completion = getCompletion();
assert completion != null;
if (completion.isSuccess()) return completion.getResult();
throw new ExecutionException(completion.getException());
}
protected T joinCompletionUnchecked() {
PromiseCompletion<T> completion = getCompletion();
assert completion != null;
if (completion.isSuccess()) return completion.getResult();
throw new CompletionException(completion.getException());
}
@Override
public T await(long timeoutMillis) throws TimeoutException {
try {
return awaitInterruptibly(timeoutMillis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public @NotNull Promise<T> fork() {
if (isCompleted()) return this;
private T joinCompletion(PromiseCompletion<T> completion) {
if (completion.isError())
throw new RuntimeException(completion.getException());
return completion.getResult();
CompletablePromise<T> fork = getFactory().unresolved();
PromiseUtil.propagateCompletion(this, fork);
return fork;
}
@Override
public @NotNull Promise<Void> thenRun(@NotNull ExceptionalRunnable task) {
return thenApply(result -> {
return thenApply(_ -> {
task.run();
return null;
});
@@ -121,43 +143,71 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenSupply(@NotNull ExceptionalSupplier<V> task) {
return thenApply(result -> task.get());
return thenApply(_ -> task.get());
}
@Override
public <V> @NotNull Promise<V> thenApply(@NotNull ExceptionalFunction<T, V> task) {
Promise<V> promise = getFactory().unresolved();
addDirectListener(
res -> createRunnable(res, promise, task).run(),
promise::completeExceptionally
);
return useCompletion(
() -> {
CompletablePromise<V> promise = createLinked();
addDirectListener(
res -> createCompleter(res, promise, task).run(),
promise::completeExceptionally
);
propagateCancel(promise, this);
return promise;
return promise;
},
result -> supplySafe(
() -> getFactory().resolve(task.apply(result)),
getFactory()::error
),
getFactory()::error
);
}
@Override
public <V> @NotNull Promise<V> thenCompose(@NotNull ExceptionalFunction<T, Promise<V>> task) {
Promise<V> promise = getFactory().unresolved();
thenApply(task).addDirectListener(
nestedPromise -> {
if (nestedPromise == null) {
promise.complete(null);
} else {
propagateResult(nestedPromise, promise);
propagateCancel(promise, nestedPromise);
}
},
promise::completeExceptionally
);
return useCompletion(
() -> {
CompletablePromise<V> promise = createLinked();
thenApply(task).addDirectListener(
result -> {
if (result == null) {
promise.complete(null);
} else {
PromiseUtil.propagateCompletion(result, promise);
PromiseUtil.propagateCancel(promise, result);
}
},
promise::completeExceptionally
);
propagateCancel(promise, this);
return promise;
return promise;
},
result -> supplySafe(
() -> {
Promise<V> nested = task.apply(result);
if (nested == null) {
return getFactory().resolve(null);
} else if (nested.isCompleted()) {
return nested;
} else {
CompletablePromise<V> promise = createLinked();
PromiseUtil.propagateCompletion(nested, promise);
PromiseUtil.propagateCancel(promise, nested);
return promise;
}
},
getFactory()::error
),
getFactory()::error
);
}
@Override
public @NotNull Promise<Void> thenRunSync(@NotNull ExceptionalRunnable task) {
return thenApplySync(result -> {
return thenApplySync(_ -> {
task.run();
return null;
});
@@ -165,7 +215,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedSync(result -> {
return thenApplyDelayedSync(_ -> {
task.run();
return null;
}, delay, unit);
@@ -189,76 +239,65 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenSupplySync(@NotNull ExceptionalSupplier<V> task) {
return thenApplySync(result -> task.get());
return thenApplySync(_ -> task.get());
}
@Override
public <V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedSync(result -> task.get(), delay, unit);
return thenApplyDelayedSync(_ -> task.get(), delay, unit);
}
@Override
public <V> @NotNull Promise<V> thenApplySync(@NotNull ExceptionalFunction<T, V> task) {
Promise<V> promise = getFactory().unresolved();
CompletablePromise<V> promise = createLinked();
addDirectListener(
res -> {
try {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runSync(runnable);
promise.onCancel((e) -> getExecutor().cancel(future));
} catch (RejectedExecutionException e) {
promise.completeExceptionally(e);
}
},
res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FS future = getFactory().getSyncExecutor().run(runnable);
promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
propagateCancel(promise, this);
return promise;
}
@Override
public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
Promise<V> promise = getFactory().unresolved();
CompletablePromise<V> promise = createLinked();
addDirectListener(
res -> {
try {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runSync(runnable, delay, unit);
promise.onCancel((e) -> getExecutor().cancel(future));
} catch (RejectedExecutionException e) {
promise.completeExceptionally(e);
}
},
res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FS future = getFactory().getSyncExecutor().run(runnable, delay, unit);
promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
propagateCancel(promise, this);
return promise;
}
@Override
public <V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
Promise<V> promise = getFactory().unresolved();
CompletablePromise<V> promise = createLinked();
thenApplySync(task).addDirectListener(
nestedPromise -> {
if (nestedPromise == null) {
promise.complete(null);
} else {
propagateResult(nestedPromise, promise);
propagateCancel(promise, nestedPromise);
PromiseUtil.propagateCompletion(nestedPromise, promise);
PromiseUtil.propagateCancel(promise, nestedPromise);
}
},
promise::completeExceptionally
);
propagateCancel(promise, this);
return promise;
}
@Override
public @NotNull Promise<Void> thenRunAsync(@NotNull ExceptionalRunnable task) {
return thenApplyAsync(result -> {
return thenApplyAsync(_ -> {
task.run();
return null;
});
@@ -266,7 +305,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<Void> thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedAsync(result -> {
return thenApplyDelayedAsync(_ -> {
task.run();
return null;
}, delay, unit);
@@ -290,84 +329,73 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenSupplyAsync(@NotNull ExceptionalSupplier<V> task) {
return thenApplyAsync(result -> task.get());
return thenApplyAsync(_ -> task.get());
}
@Override
public <V> @NotNull Promise<V> thenSupplyDelayedAsync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedAsync(result -> task.get(), delay, unit);
return thenApplyDelayedAsync(_ -> task.get(), delay, unit);
}
@Override
public <V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task) {
CompletablePromise<V> promise = createLinked();
addDirectListener(
(res) -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FA future = getFactory().getAsyncExecutor().run(runnable);
promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
return promise;
}
@Override
public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
CompletablePromise<V> promise = createLinked();
addDirectListener(
res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FA future = getFactory().getAsyncExecutor().run(runnable, delay, unit);
promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
return promise;
}
@Override
public <V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
CompletablePromise<V> promise = createLinked();
thenApplyAsync(task).addDirectListener(
nestedPromise -> {
if (nestedPromise == null) {
promise.complete(null);
} else {
PromiseUtil.propagateCompletion(nestedPromise, promise);
PromiseUtil.propagateCancel(promise, nestedPromise);
}
},
promise::completeExceptionally
);
return promise;
}
@Override
public @NotNull Promise<T> thenPopulateReference(@NotNull AtomicReference<T> reference) {
return thenApplyAsync((result) -> {
return thenApply(result -> {
reference.set(result);
return result;
});
}
@Override
public <V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task) {
Promise<V> promise = getFactory().unresolved();
addDirectListener(
(res) -> {
try {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runAsync(runnable);
promise.onCancel((e) -> getExecutor().cancel(future));
} catch (RejectedExecutionException e) {
promise.completeExceptionally(e);
}
},
promise::completeExceptionally
);
propagateCancel(promise, this);
return promise;
}
@Override
public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
Promise<V> promise = getFactory().unresolved();
addDirectListener(
res -> {
try {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runAsync(runnable, delay, unit);
promise.onCancel((e) -> getExecutor().cancel(future));
} catch (RejectedExecutionException e) {
promise.completeExceptionally(e);
}
},
promise::completeExceptionally
);
propagateCancel(promise, this);
return promise;
}
@Override
public <V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
Promise<V> promise = getFactory().unresolved();
thenApplyAsync(task).addDirectListener(
nestedPromise -> {
if (nestedPromise == null) {
promise.complete(null);
} else {
propagateResult(nestedPromise, promise);
propagateCancel(promise, nestedPromise);
}
},
promise::completeExceptionally
);
propagateCancel(promise, this);
return promise;
}
@Override
public @NotNull Promise<Void> erase() {
return thenSupplyAsync(() -> null);
return thenSupply(() -> null);
}
@Override
@@ -377,11 +405,11 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
return addAsyncListener((res) -> {
if (res.isError()) {
if (errorListener != null) errorListener.accept(res.getException());
} else {
return addAsyncListener(res -> {
if (res.isSuccess()) {
if (successListener != null) successListener.accept(res.getResult());
} else {
if (errorListener != null) errorListener.accept(res.getException());
}
});
}
@@ -393,54 +421,15 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
return addDirectListener((res) -> {
if (res.isError()) {
if (errorListener != null) errorListener.accept(res.getException());
} else {
return addDirectListener(res -> {
if (res.isSuccess()) {
if (successListener != null) successListener.accept(res.getResult());
} else {
if (errorListener != null) errorListener.accept(res.getException());
}
});
}
private @NotNull Promise<T> addAnyListener(PromiseListener<T> listener) {
PromiseCompletion<T> completion;
lock.lock();
try {
completion = getCompletion();
if (completion == null) {
if (listeners == null) listeners = new LinkedList<>();
listeners.add(listener);
return this;
}
} finally {
lock.unlock();
}
callListener(listener, completion);
return this;
}
private void callListener(PromiseListener<T> listener, PromiseCompletion<T> ctx) {
if (listener instanceof AsyncPromiseListener) {
try {
getExecutor().runAsync(() -> callListenerNow(listener, ctx));
} catch (RejectedExecutionException ignored) {
}
} else {
callListenerNow(listener, ctx);
}
}
private void callListenerNow(PromiseListener<T> listener, PromiseCompletion<T> ctx) {
try {
listener.handle(ctx);
} catch (Exception e) {
getLogger().error("Exception caught in promise listener", e);
}
}
@Override
public @NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener) {
return addAsyncListener(listener, null);
@@ -453,13 +442,14 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<T> logExceptions(@NotNull String message) {
return onError(e -> getLogger().error(message, e));
Exception wrapper = new DeferredExecutionException();
return onError(e -> getLogger().error(message, wrapper.initCause(e)));
}
@Override
public <E extends Throwable> @NotNull Promise<T> onError(@NotNull Class<E> clazz, @NotNull Consumer<E> listener) {
return onError((e) -> {
if (clazz.isAssignableFrom(e.getClass())) {
public <E extends Throwable> @NotNull Promise<T> onError(@NotNull Class<E> type, @NotNull Consumer<E> listener) {
return onError(e -> {
if (type.isAssignableFrom(e.getClass())) {
//noinspection unchecked
listener.accept((E) e);
}
@@ -471,79 +461,50 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
return onError(CancellationException.class, listener);
}
@Deprecated
@Override
public @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit) {
return maxWaitTime(time, unit);
public @NotNull Promise<T> orDefault(@Nullable T defaultValue) {
return orDefault(_ -> defaultValue);
}
@Override
public @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit) {
try {
Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit);
F future = getExecutor().runAsync(() -> completeExceptionally(e), time, unit);
return addDirectListener((_v) -> getExecutor().cancel(future));
} catch (RejectedExecutionException e) {
completeExceptionally(e);
return this;
}
}
private void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
lock.lock();
try {
if (!setCompletion(ctx)) return;
this.latch.countDown();
if (listeners != null) {
for (PromiseListener<T> listener : listeners) {
callListener(listener, ctx);
}
}
} finally {
lock.unlock();
}
}
private boolean setCompletion(PromiseCompletion<T> completion) {
return this.completion.compareAndSet(null, completion);
public @NotNull Promise<T> orDefault(@NotNull ExceptionalSupplier<T> supplier) {
return orDefault(_ -> supplier.get());
}
@Override
public void cancel(@Nullable String message) {
completeExceptionally(new CancellationException(message));
}
@Override
public void complete(@Nullable T result) {
handleCompletion(new PromiseCompletion<>(result));
}
@Override
public void completeExceptionally(@NotNull Throwable result) {
handleCompletion(new PromiseCompletion<>(result));
}
@Override
public boolean isCompleted() {
return completion.get() != null;
}
@Override
public @Nullable PromiseCompletion<T> getCompletion() {
return completion.get();
public @NotNull Promise<T> orDefault(@NotNull ExceptionalFunction<Throwable, T> function) {
return useCompletion(
() -> {
CompletablePromise<T> promise = createLinked();
addDirectListener(promise::complete, e -> runCompleter(promise, () -> promise.complete(function.apply(e))));
return promise;
},
getFactory()::resolve,
getFactory()::error
);
}
@Override
public @NotNull CompletableFuture<T> toFuture() {
CompletableFuture<T> future = new CompletableFuture<>();
this.addDirectListener(future::complete, future::completeExceptionally);
future.whenComplete((res, e) -> {
if (e instanceof CancellationException) {
this.cancel();
}
});
return future;
return useCompletion(
() -> {
CompletableFuture<T> future = new CompletableFuture<>();
addDirectListener(future::complete, future::completeExceptionally);
future.whenComplete((_, e) -> {
if (e instanceof CancellationException) {
cancel();
}
});
return future;
},
CompletableFuture::completedFuture,
CompletableFuture::failedFuture
);
}
private static class DeferredExecutionException extends ExecutionException {
}
}

View File

@@ -1,159 +1,30 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.joiner.CompletionJoiner;
import dev.tommyjs.futur.joiner.MappedResultJoiner;
import dev.tommyjs.futur.joiner.ResultJoiner;
import dev.tommyjs.futur.joiner.VoidJoiner;
import dev.tommyjs.futur.util.PromiseUtil;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
public abstract class AbstractPromiseFactory<FS, FA> implements PromiseFactory {
public abstract @NotNull PromiseExecutor<F> getExecutor();
public abstract @NotNull Logger getLogger();
public abstract @NotNull PromiseExecutor<FS> getSyncExecutor();
public abstract @NotNull PromiseExecutor<FA> getAsyncExecutor();
@Override
public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2) {
List<Promise<?>> promises = List.of(p1, p2);
return all(propagateCancel, promises)
.thenApplyAsync((res) -> new AbstractMap.SimpleImmutableEntry<>(
Objects.requireNonNull(p1.getCompletion()).getResult(),
Objects.requireNonNull(p2.getCompletion()).getResult()
));
}
@Override
public <K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
if (promises.isEmpty()) return resolve(Collections.emptyMap());
Map<K, V> map = new HashMap<>();
Promise<Map<K, V>> promise = unresolved();
for (Map.Entry<K, Promise<V>> entry : promises.entrySet()) {
if (propagateCancel) {
AbstractPromise.propagateCancel(promise, entry.getValue());
}
entry.getValue().addDirectListener((ctx) -> {
synchronized (map) {
if (ctx.getException() != null) {
if (exceptionHandler == null) {
promise.completeExceptionally(ctx.getException());
} else {
exceptionHandler.accept(entry.getKey(), ctx.getException());
map.put(entry.getKey(), null);
}
} else {
map.put(entry.getKey(), ctx.getResult());
}
if (map.size() == promises.size()) {
promise.complete(map);
}
}
});
}
return promise;
}
@Override
public <V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
AtomicInteger index = new AtomicInteger();
return this.combine(
propagateCancel,
StreamSupport.stream(promises.spliterator(), false)
.collect(Collectors.toMap(k -> index.getAndIncrement(), v -> v)),
exceptionHandler
).thenApplyAsync(v ->
v.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(Collectors.toList())
);
}
@Override
public @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
List<Promise<?>> promises = new ArrayList<>();
promiseIterable.iterator().forEachRemaining(promises::add);
if (promises.isEmpty()) return resolve(Collections.emptyList());
PromiseCompletion<?>[] results = new PromiseCompletion<?>[promises.size()];
Promise<List<PromiseCompletion<?>>> promise = unresolved();
var iter = promises.listIterator();
while (iter.hasNext()) {
int index = iter.nextIndex();
var p = iter.next();
if (propagateCancel) {
AbstractPromise.propagateCancel(promise, p);
}
p.addDirectListener((res) -> {
synchronized (results) {
results[index] = res;
if (Arrays.stream(results).allMatch(Objects::nonNull))
promise.complete(Arrays.asList(results));
}
});
}
return promise;
}
@Override
public @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
List<Promise<?>> promises = new ArrayList<>();
promiseIterable.iterator().forEachRemaining(promises::add);
if (promises.isEmpty()) return resolve(null);
AtomicInteger completed = new AtomicInteger();
Promise<Void> promise = unresolved();
for (Promise<?> p : promises) {
if (propagateCancel) {
AbstractPromise.propagateCancel(promise, p);
}
p.addDirectListener((res) -> {
if (res.getException() != null) {
promise.completeExceptionally(res.getException());
} else if (completed.incrementAndGet() == promises.size()) {
promise.complete(null);
}
});
}
return promise;
}
@Override
public <V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises) {
Promise<V> promise = unresolved();
for (Promise<V> p : promises) {
if (cancelRaceLosers) {
promise.addListener((res) -> p.cancel());
}
AbstractPromise.propagateResult(p, promise);
}
return promise;
}
@Override
public <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) {
return wrap(future, future);
}
private <T> @NotNull Promise<T> wrap(@NotNull CompletionStage<T> completion, Future<T> future) {
Promise<T> promise = unresolved();
public <T> @NotNull Promise<T> wrap(@NotNull CompletionStage<T> completion, @Nullable Future<T> future) {
CompletablePromise<T> promise = unresolved();
completion.whenComplete((v, e) -> {
if (e != null) {
promise.completeExceptionally(e);
@@ -162,21 +33,73 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
}
});
promise.onCancel((e) -> future.cancel(true));
if (future != null) {
promise.onCancel(_ -> future.cancel(true));
}
return promise;
}
@Override
public <T> @NotNull Promise<T> resolve(T value) {
Promise<T> promise = unresolved();
promise.complete(value);
return promise;
public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(
@NotNull Promise<K> p1, @NotNull Promise<V> p2
) {
return all(p1, p2).thenApply(_ -> new AbstractMap.SimpleImmutableEntry<>(
Objects.requireNonNull(p1.getCompletion()).getResult(),
Objects.requireNonNull(p2.getCompletion()).getResult()
));
}
@Override
public <T> @NotNull Promise<T> error(@NotNull Throwable error) {
Promise<T> promise = unresolved();
promise.completeExceptionally(error);
public @NotNull <K, V> Promise<Map<K, V>> combineMapped(
@NotNull Iterator<Map.Entry<K, Promise<V>>> promises,
int expectedSize
) {
if (!promises.hasNext()) return resolve(Collections.emptyMap());
return new MappedResultJoiner<>(this, promises, expectedSize).joined();
}
@Override
public <V> @NotNull Promise<List<V>> combine(
@NotNull Iterator<Promise<V>> promises,
int expectedSize
) {
if (!promises.hasNext()) return resolve(Collections.emptyList());
return new ResultJoiner<>(this, promises, expectedSize).joined();
}
@Override
public @NotNull Promise<List<PromiseCompletion<?>>> allSettled(
@NotNull Iterator<Promise<?>> promises,
int expectedSize
) {
if (!promises.hasNext()) return resolve(Collections.emptyList());
return new CompletionJoiner(this, promises, expectedSize).joined();
}
@Override
public @NotNull Promise<Void> all(@NotNull Iterator<Promise<?>> promises) {
if (!promises.hasNext()) return resolve(null);
return new VoidJoiner(this, promises).joined();
}
@Override
public <V> @NotNull Promise<V> race(
@NotNull Iterator<Promise<V>> promises,
boolean ignoreErrors
) {
CompletablePromise<V> promise = unresolved();
while (promises.hasNext()) {
if (promise.isCompleted()) {
promises.forEachRemaining(Promise::cancel);
break;
}
Promise<V> p = promises.next();
PromiseUtil.cancelOnComplete(promise, p);
p.addDirectListener(promise::complete, ignoreErrors ? null : promise::completeExceptionally);
}
return promise;
}

View File

@@ -1,5 +1,8 @@
package dev.tommyjs.futur.promise;
/**
* A listener for a {@link Promise} that is called when the promise is resolved. This listener is
* executed asynchronously by the {@link PromiseFactory} that created the completed promise.
*/
public interface AsyncPromiseListener<T> extends PromiseListener<T> {
}

View File

@@ -0,0 +1,193 @@
package dev.tommyjs.futur.promise;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
@SuppressWarnings({"FieldMayBeFinal"})
public abstract class BasePromise<T, FS, FA> extends AbstractPromise<T, FS, FA> implements CompletablePromise<T> {
private static final VarHandle COMPLETION_HANDLE;
private static final VarHandle LISTENERS_HANDLE;
static {
try {
MethodHandles.Lookup lookup = MethodHandles.lookup();
COMPLETION_HANDLE = lookup.findVarHandle(BasePromise.class, "completion", PromiseCompletion.class);
LISTENERS_HANDLE = lookup.findVarHandle(BasePromise.class, "listeners", Collection.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
private final Sync sync;
private volatile PromiseCompletion<T> completion;
@SuppressWarnings("FieldMayBeFinal")
private volatile Collection<PromiseListener<T>> listeners;
@SuppressWarnings("unchecked")
public BasePromise() {
this.sync = new Sync();
this.completion = null;
this.listeners = Collections.EMPTY_LIST;
}
protected void handleCompletion(@NotNull PromiseCompletion<T> cmp) {
if (!COMPLETION_HANDLE.compareAndSet(this, null, cmp)) return;
sync.releaseShared(1);
callListeners(cmp);
}
protected Promise<T> completeExceptionallyDelayed(Throwable e, long delay, TimeUnit unit) {
runCompleter(this, () -> {
FA future = getFactory().getAsyncExecutor().run(() -> completeExceptionally(e), delay, unit);
addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future));
});
return this;
}
@SuppressWarnings("unchecked")
protected void callListeners(@NotNull PromiseCompletion<T> cmp) {
Iterator<PromiseListener<T>> iter = ((Iterable<PromiseListener<T>>) LISTENERS_HANDLE.getAndSet(this, null)).iterator();
try {
while (iter.hasNext()) {
callListener(iter.next(), cmp);
}
} finally {
iter.forEachRemaining(v -> callListenerAsyncLastResort(v, cmp));
}
}
@Override
protected @NotNull Promise<T> addAnyListener(@NotNull PromiseListener<T> listener) {
Collection<PromiseListener<T>> prev = listeners, next = null;
for (boolean haveNext = false; ; ) {
if (!haveNext) {
next = prev == Collections.EMPTY_LIST ? new ConcurrentLinkedQueue<>() : prev;
if (next != null) next.add(listener);
}
if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next))
break;
haveNext = (prev == (prev = listeners));
}
if (next == null) {
callListener(listener, Objects.requireNonNull(getCompletion()));
}
return this;
}
@Override
public T get() throws InterruptedException, ExecutionException {
if (!isCompleted()) {
sync.acquireSharedInterruptibly(1);
}
return joinCompletionChecked();
}
@Override
public T get(long time, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!isCompleted()) {
boolean success = sync.tryAcquireSharedNanos(1, unit.toNanos(time));
if (!success) {
throw new TimeoutException("Promise stopped waiting after " + time + " " + unit);
}
}
return joinCompletionChecked();
}
@Override
public T await() {
if (!isCompleted()) {
try {
sync.acquireSharedInterruptibly(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return joinCompletionUnchecked();
}
@Override
public @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit) {
Exception e = new CancellationException("Promise timed out after " + time + " " + unit.toString().toLowerCase());
return completeExceptionallyDelayed(e, time, unit);
}
@Override
public @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit) {
Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit.toString().toLowerCase());
return completeExceptionallyDelayed(e, time, unit);
}
@Override
public void cancel(@NotNull CancellationException e) {
completeExceptionally(e);
}
@Override
public void complete(@Nullable T result) {
handleCompletion(new PromiseCompletion<>(result));
}
@Override
public void completeExceptionally(@NotNull Throwable result) {
handleCompletion(new PromiseCompletion<>(result));
}
@Override
public boolean isCompleted() {
return completion != null;
}
@Override
public @Nullable PromiseCompletion<T> getCompletion() {
return completion;
}
private static final class Sync extends AbstractQueuedSynchronizer {
private Sync() {
setState(1);
}
@Override
protected int tryAcquireShared(int acquires) {
return getState() == 0 ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int releases) {
int c1, c2;
do {
c1 = getState();
if (c1 == 0) {
return false;
}
c2 = c1 - 1;
} while (!compareAndSetState(c1, c2));
return c2 == 0;
}
}
}

View File

@@ -0,0 +1,25 @@
package dev.tommyjs.futur.promise;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* A {@link Promise} that can be completed.
*/
public interface CompletablePromise<T> extends Promise<T> {
/**
* Completes the promise successfully with the given result.
*
* @param result the result
*/
void complete(@Nullable T result);
/**
* Completes the promise exceptionally with the given exception.
*
* @param result the exception
*/
void completeExceptionally(@NotNull Throwable result);
}

View File

@@ -0,0 +1,72 @@
package dev.tommyjs.futur.promise;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public abstract class CompletedPromise<T, FS, FA> extends AbstractPromise<T, FS, FA> {
private static final PromiseCompletion<?> EMPTY = new PromiseCompletion<>();
private final @NotNull PromiseCompletion<T> completion;
public CompletedPromise(@NotNull PromiseCompletion<T> completion) {
this.completion = completion;
}
@SuppressWarnings("unchecked")
public CompletedPromise() {
this((PromiseCompletion<T>) EMPTY);
}
@Override
protected @NotNull Promise<T> addAnyListener(@NotNull PromiseListener<T> listener) {
callListener(listener, completion);
return this;
}
@Override
public @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit) {
// Promise is already completed so can't time out
return this;
}
@Override
public @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit) {
// Promise is already completed so can't time out
return this;
}
@Override
public void cancel(@NotNull CancellationException exception) {
// Promise is already completed so can't be cancelled
}
@Override
public T get() throws ExecutionException {
return joinCompletionChecked();
}
@Override
public T get(long timeout, @NotNull TimeUnit unit) throws ExecutionException {
return joinCompletionChecked();
}
@Override
public T await() {
return joinCompletionUnchecked();
}
@Override
public @NotNull PromiseCompletion<T> getCompletion() {
return completion;
}
@Override
public boolean isCompleted() {
return true;
}
}

View File

@@ -8,157 +8,603 @@ import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
* <p>
* A promise represents the result of an asynchronous computation. A promise will transition from a
* pending state to a completed state at most once, but may remain in a pending state indefinitely.
* </p>
*
* <p>
* Promises are created by a {@link PromiseFactory} and support chaining operations to be executed
* upon completion. These operations can be synchronous or asynchronous, and can be composed in a
* variety of ways. Promises can be listened to for completions, either with a result or with an
* exception. Promises can be cancelled, which will propagate a cancellation signal through the
* chain, but a promise can also be forked, which will prevent propagation of cancellations.
* </p>
*
* @see #cancel()
* @see #fork()
*/
public interface Promise<T> {
PromiseFactory getFactory();
/**
* Returns the factory that created this promise. This factory can be used to create new promises.
*/
@NotNull PromiseFactory getFactory();
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* when this promise completes. Cancelling the returned promise will cancel this promise, and
* consequently any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenRun(@NotNull ExceptionalRunnable task);
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* when this promise completes and will be passed the result of this promise. Cancelling the returned
* promise will cancel this promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenConsume(@NotNull ExceptionalConsumer<T> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* when this promise completes, and will supply a value to the next promise in the chain. Cancelling
* the returned promise will cancel this promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenSupply(@NotNull ExceptionalSupplier<V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* when this promise completes, and will apply the specified function to the result of this promise
* in order to supply a value to the next promise in the chain. Cancelling the returned promise will
* cancel this promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenApply(@NotNull ExceptionalFunction<T, V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* when this promise completes, and will compose the next promise in the chainfrom the result of
* this promise. Cancelling the returned promise will cancel this promise, and consequently any
* previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes, once this promise and the promise returned by
* the task are complete, with the result of the task promise
*/
<V> @NotNull Promise<V> thenCompose(@NotNull ExceptionalFunction<T, Promise<V>> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* sync executor of the factory that created this promise, immediately after this promise completes.
* Cancelling the returned promise will cancel this promise, and consequently any previous promises
* in the chain.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenRunSync(@NotNull ExceptionalRunnable task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* sync executor of the factory that created this promise, after the specified delay after this
* promise completes. Cancelling the returned promise will cancel this promise, and consequently
* any previous promises in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* sync executor of the factory that created this promise immediately after this promise completes,
* and will be passed the result of this promise. Cancelling the returned promise will cancel this
* promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenConsumeSync(@NotNull ExceptionalConsumer<T> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* sync executor of the factory that created this promise after the specified delay after this
* promise completes, and will be passed the result of this promise. Cancelling the returned promise
* will cancel this promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenConsumeDelayedSync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed immediately
* by the sync executor of the factory that created this promise when this promise completes, and
* will supply a value to the next promise in the chain. Cancelling the returned promise will cancel
* this promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenSupplySync(@NotNull ExceptionalSupplier<V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the sync
* executor of the factory that created this promise after the specified delay after this promise
* completes, and will supply a value to the next promise in the chain. Cancelling the returned promise
* will cancel this promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the sync
* executor of the factory that created this promise immediately after this promise completes, and
* will apply the specified function to the result of this promise in order to supply a value to the
* next promise in the chain. Cancelling the returned promise will cancel this promise, and consequently
* any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenApplySync(@NotNull ExceptionalFunction<T, V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the sync
* executor of the factory that created this promise after the specified delay after this promise
* completes, and will apply the specified function to the result of this promise in order to supply
* a value to the next promise in the chain. Cancelling the returned promise will cancel this promise,
* and consequently any previous promises in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the sync
* executor of the factory that created this promise immediately after this promise completes, and
* will compose the next promise in the chain from the result of this promise. Cancelling the returned
* promise will cancel this promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes, once this promise and the promise returned by the task are
* complete, with the result of the task promise
*/
<V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, Promise<V>> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* async executor of the factory that created this promise, immediately after this promise completes.
* Cancelling the returned promise will cancel this promise, and consequently any previous promises
* in the chain.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenRunAsync(@NotNull ExceptionalRunnable task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* async executor of the factory that created this promise after the specified delay after this
* promise completes. Cancelling the returned promise will cancel this promise, and consequently
* any previous promises in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* async executor of the factory that created this promise immediately after this promise completes,
* and will be passed the result of this promise. Cancelling the returned promise will cancel this
* promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenConsumeAsync(@NotNull ExceptionalConsumer<T> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* async executor of the factory that created this promise after the specified delay after this
* promise completes, and will be passed the result of this promise. Cancelling the returned promise
* will cancel this promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes after the task is executed
*/
@NotNull Promise<Void> thenConsumeDelayedAsync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the
* async executor of the factory that created this promise immediately after this promise completes,
* and will supply a value to the next promise in the chain. Cancelling the returned promise will
* cancel this promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenSupplyAsync(@NotNull ExceptionalSupplier<V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the async
* executor of the factory that created this promise after the specified delay after this promise
* completes, and will supply a value to the next promise in the chain. Cancelling the returned promise
* will cancel this promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenSupplyDelayedAsync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit);
@NotNull Promise<T> thenPopulateReference(@NotNull AtomicReference<T> reference);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the async
* executor of the factory that created this promise immediately after this promise completes, and
* will apply the specified function to the result of this promise in order to supply a value to the
* next promise in the chain. Cancelling the returned promise will cancel this promise, and consequently
* any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the async
* executor of the factory that created this promise after the specified delay after this promise
* completes, and will apply the specified function to the result of this promise in order to supply
* a value to the next promise in the chain. Cancelling the returned promise will cancel this promise,
* and consequently any previous promises in the chain.
*
* @param task the task to execute
* @param delay the amount of time to wait before executing the task
* @param unit the time unit of the delay
* @return a new promise that completes, after the task is executed, with the task result
*/
<V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit);
/**
* Chains a task to be executed after this promise completes. The task will be executed by the async
* executor of the factory that created this promise immediately after this promise completes, and
* will compose the next promise in the chain from the result of this promise. Cancelling the returned
* promise will cancel this promise, and consequently any previous promises in the chain.
*
* @param task the task to execute
* @return a new promise that completes, once this promise and the promise returned by the task are
* complete, with the result of the task promise
*/
<V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task);
/**
* Adds a listener to this promise that will populate the specified reference with the result of this
* promise upon successful completion. The reference will not be populated if this promise completes
* exceptionally.
*
* @param reference the reference to populate
* @return continuation of the promise chain
*/
@NotNull Promise<T> thenPopulateReference(@NotNull AtomicReference<T> reference);
/**
* Returns a promise backed by this promise that will complete with {@code null} if this promise
* completes successfully, or with the exception if this promise completes exceptionally. Cancelling
* the returned promise will cancel this promise, and consequently any previous promises in the chain.
*/
@NotNull Promise<Void> erase();
/**
* Logs any exceptions that occur in the promise chain with the specified message. The stack trace
* will be captured immediately when invoking this method, and logged alongside an exception if
* encountered, to allow for easier debugging.
*
* @param message the message to log
* @return continuation of the promise chain
*/
@NotNull Promise<T> logExceptions(@NotNull String message);
/**
* Logs any exceptions that occur in the promise chain. The stack trace will be captured immediately
* when invoking this method, and logged alongside an exception if encountered, to allow for easier
* debugging.
*
* @return continuation of the promise chain
*/
default @NotNull Promise<T> logExceptions() {
return logExceptions("Exception caught in promise chain");
}
@NotNull Promise<T> logExceptions(@NotNull String message);
/**
* @apiNote Direct listeners run on the same thread as the completion.
* Adds a listener to this promise that will be executed immediately when this promise completes,
* on the same thread as the completion call.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
@NotNull Promise<T> addDirectListener(@NotNull PromiseListener<T> listener);
/**
* Adds a listener to this promise that will be executed immediately when this promise completes,
* on the same thread as the completion call. One of {@code successHandler} and {@code errorHandler} will be
* called when the promise completes successfully or exceptionally, respectively.
*
* @param successHandler the function to call on success
* @param errorHandler the function to call on error
* @return continuation of the promise chain
*/
@NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
/**
* @apiNote Async listeners are run in parallel.
* Adds a listener to this promise that will be executed immediately when this promise completes,
* by the async executor of the factory that created this promise.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
@NotNull Promise<T> addAsyncListener(@NotNull AsyncPromiseListener<T> listener);
/**
* @apiNote Same as addAsyncListener.
* Adds a listener to this promise that will be executed immediately when this promise completes.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
default @NotNull Promise<T> addListener(@NotNull AsyncPromiseListener<T> listener) {
return addAsyncListener(listener);
}
/**
* Adds a listener to this promise that will be executed immediately when this promise completes,
* by the async executor of the factory that created this promise. One of {@code successHandler} and
* {@code errorHandler} will be called when the promise completes successfully or exceptionally, respectively.
*
* @param successHandler the function to call on success
* @param errorHandler the function to call on error
*/
@NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
/**
* Adds a listener to this promise that will be called if the promise is completed successfully.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
@NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener);
/**
* Adds a listener to this promise that will be called if the promise is completed exceptionally.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
@NotNull Promise<T> onError(@NotNull Consumer<Throwable> listener);
<E extends Throwable> @NotNull Promise<T> onError(@NotNull Class<E> clazz, @NotNull Consumer<E> listener);
/**
* Adds a listener to this promise that will be called if the promise is completed exceptionally
* with an exception of the specified type.
*
* @param listener the listener to add
* @param type the class of the exception to listen for
* @return continuation of the promise chain
*/
<E extends Throwable> @NotNull Promise<T> onError(@NotNull Class<E> type, @NotNull Consumer<E> listener);
/**
* Adds a listener to this promise that will be called if the promise is cancelled.
*
* @param listener the listener to add
* @return continuation of the promise chain
*/
@NotNull Promise<T> onCancel(@NotNull Consumer<CancellationException> listener);
/**
* @deprecated Use maxWaitTime instead
* Creates a new promise that will always complete successfully - either with the result of this
* promise, or with the specified default value if this promise completes exceptionally. Cancelling
* the returned promise will cancel this promise, and consequently any previous promises in the chain.
*
* @param defaultValue the default value to complete the promise with if this promise completes exceptionally
* @return a new promise that completes with the result of this promise, or with the default value if this
* promise completes exceptionally
*/
@NotNull Promise<T> orDefault(@Nullable T defaultValue);
/**
* Creates a new promise that will attempt to always complete successfully - either with the result
* of this promise, or with the result of the specified supplier if this promise completes exceptionally.
* If an exception is encountered while executing the supplier, the promise will complete exceptionally
* with that exception. Cancelling the returned promise will cancel this promise, and consequently any
* previous promises in the chain.
*
* @param supplier the supplier to complete the promise with if this promise completes exceptionally
* @return a new promise that completes with the result of this promise, or with the result of the
* supplier if this promise completes exceptionally
*/
@NotNull Promise<T> orDefault(@NotNull ExceptionalSupplier<T> supplier);
/**
* Creates a new promise that will attempt to always complete successfully - either with the result
* of this promise, or with the result of the specified function if this promise completes
* exceptionally. If an exception is encountered while executing the function, the promise will
* complete exceptionally with that exception. Cancelling the returned promise will cancel this
* promise, and consequently any previous promises in the chain.
*
* @param function the function to complete the promise with if this promise completes exceptionally
* @return a new promise that completes with the result of this promise, or with the result of the
* function if this promise completes exceptionally
*/
@NotNull Promise<T> orDefault(@NotNull ExceptionalFunction<Throwable, T> function);
/**
* Cancels the promise if not already completed after the specified timeout. This will result in
* an exceptional completion with a {@link CancellationException}.
*
* @param time the amount of time to wait before cancelling the promise
* @param unit the time unit of the delay
* @return continuation of the promise chain
*/
@Deprecated
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);
/**
* @deprecated Use maxWaitTime instead
* Cancels the promise if not already completed after the specified timeout. This will result in
* an exceptional completion with a {@link CancellationException}.
*
* @param ms the amount of time to wait before cancelling the promise (in milliseconds)
* @return continuation of the promise chain
*/
@Deprecated
default @NotNull Promise<T> timeout(long ms) {
return timeout(ms, TimeUnit.MILLISECONDS);
}
/**
* Times out the promise if not already completed after the specified timeout. This will result
* in an exceptional completion with a {@link TimeoutException}. This will not result in the
* promise being cancelled.
*
* @param time the amount of time to wait before timing out the promise
* @param unit the time unit of the delay
* @return continuation of the promise chain
*/
@NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit);
/**
* Times out the promise if not already completed after the specified timeout. This will result
* in an exceptional completion with a {@link TimeoutException}. This will not result in the
* promise being cancelled.
*
* @param ms the amount of time to wait before timing out the promise (in milliseconds)
* @return continuation of the promise chain
*/
default @NotNull Promise<T> maxWaitTime(long ms) {
return maxWaitTime(ms, TimeUnit.MILLISECONDS);
}
void cancel(@Nullable String reason);
/**
* Cancels the promise if not already completed after the specified timeout. This will result in
* an exceptional completion with the specified cancellation.
*
* @param exception the cancellation exception to complete the promise with
*/
void cancel(@NotNull CancellationException exception);
default void cancel() {
cancel(null);
/**
* Cancels the promise if not already completed after the specified timeout. This will result in
* an exceptional completion with a {@link CancellationException}.
*
* @param reason the reason for the cancellation
*/
default void cancel(@NotNull String reason) {
cancel(new CancellationException(reason));
}
void complete(@Nullable T result);
void completeExceptionally(@NotNull Throwable result);
/**
* Cancels the promise if not already completed after the specified timeout. This will result in
* an exceptional completion with a {@link CancellationException}.
*/
default void cancel() {
cancel(new CancellationException());
}
/**
* Blocks until this promise has completed, and then returns its result. This method will throw
* checked exceptions if the promise completes exceptionally or the thread is interrupted.
*
* @return the result of the promise
* @throws CancellationException if the promise was cancelled
* @throws ExecutionException if the promise completed exceptionally
* @throws InterruptedException if the current thread was interrupted while waiting
*/
@Blocking
T awaitInterruptibly() throws InterruptedException;
T get() throws InterruptedException, ExecutionException;
/**
* Blocks until either this promise has completed or the timeout has been exceeded, and then
* returns its result, if available. This method will throw checked exceptions if the promise
* completes exceptionally or the thread is interrupted, or the timeout is exceeded.
*
* @return the result of the promise
* @throws CancellationException if the promise was cancelled
* @throws ExecutionException if the promise completed exceptionally
* @throws InterruptedException if the current thread was interrupted while waiting
* @throws TimeoutException if the timeout was exceeded
*/
@Blocking
T awaitInterruptibly(long timeout) throws TimeoutException, InterruptedException;
T get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
/**
* Blocks until this promise has completed, and then returns its result. This method is similar
* to {@link #get()}, but will throw unchecked exceptions instead of checked exceptions if the
* promise completes exceptionally or the thread is interrupted.
*
* @return the result of the promise
* @throws CancellationException if the promise was cancelled
* @throws CompletionException if the promise completed exceptionally
*/
@Blocking
T await();
@Blocking
T await(long timeout) throws TimeoutException;
/**
* Returns a new promise, backed by this promise, that will not propagate cancellations. This means
* that if the returned promise is cancelled, the cancellation will not be propagated to this promise,
* and consequently any previous promises in the chain.
*
* @return continuation the promise chain that will not propagate cancellations
*/
@NotNull Promise<T> fork();
/**
* @deprecated Use await instead.
* Returns the current completion state of this promise. If the promise has not completed, this method
* will return {@code null}.
*
* @return the completion state of this promise, or {@code null} if the promise has not completed
*/
@Blocking
@Deprecated
default T join(long timeout) throws TimeoutException {
return await(timeout);
};
@Nullable PromiseCompletion<T> getCompletion();
/**
* Returns whether this promise has completed.
*
* @return {@code true} if the promise has completed, {@code false} otherwise
*/
boolean isCompleted();
/**
* Converts this promise to a {@link CompletableFuture}. The returned future will complete with the
* result of this promise when it completes.
*
* @return a future that will complete with the result of this promise
*/
@NotNull CompletableFuture<T> toFuture();
}

View File

@@ -5,35 +5,85 @@ import org.jetbrains.annotations.Nullable;
import java.util.concurrent.CancellationException;
/**
* Represents the result of a {@link Promise}, containing either an optional result or an exception.
*/
public class PromiseCompletion<T> {
private @Nullable T result;
private @Nullable Throwable exception;
/**
* Creates a new successful completion.
*
* @param result the result
*/
public PromiseCompletion(@Nullable T result) {
this.result = result;
}
/**
* Creates a new exceptional completion.
*
* @param exception the exception
*/
public PromiseCompletion(@NotNull Throwable exception) {
this.exception = exception;
}
/**
* Creates a new successful completion with a result of {@code null}.
*/
public PromiseCompletion() {
this.result = null;
this((T) null);
}
/**
* Checks if the completion was successful.
*
* @return {@code true} if the completion was successful, {@code false} otherwise
*/
public boolean isSuccess() {
return exception == null;
}
/**
* Checks if the completion was exceptional.
*
* @return {@code true} if the completion was exceptional, {@code false} otherwise
*/
public boolean isError() {
return getException() != null;
return exception != null;
}
/**
* Checks if the completion was cancelled.
*
* @return {@code true} if the completion was cancelled, {@code false} otherwise
*/
public boolean wasCancelled() {
return exception instanceof CancellationException;
}
@Deprecated
public boolean wasCanceled() {
return getException() instanceof CancellationException;
return wasCancelled();
}
/**
* Gets the result of the completion.
*
* @return the result, or {@code null} if the completion was exceptional
*/
public @Nullable T getResult() {
return result;
}
/**
* Gets the exception of the completion.
*
* @return the exception, or {@code null} if the completion was successful
*/
public @Nullable Throwable getException() {
return exception;
}

View File

@@ -1,100 +1,502 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.util.PromiseUtil;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* A factory for creating and combining promises.
*/
@SuppressWarnings("unchecked")
public interface PromiseFactory {
@NotNull Logger getLogger();
<T> @NotNull Promise<T> unresolved();
<K, V> @NotNull Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2);
default <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return combine(false, p1, p2);
}
<K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler);
default <K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises) {
return combine(propagateCancel, promises, null);
}
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
return combine(false, promises, exceptionHandler);
}
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
return combine(promises, null);
}
<V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler);
default <V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises) {
return combine(propagateCancel, promises, null);
}
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
return combine(false, promises, exceptionHandler);
}
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
return combine(promises, null);
}
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable);
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
return allSettled(false, promiseIterable);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
return allSettled(propagateCancel, Arrays.asList(promiseArray));
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
return allSettled(false, promiseArray);
}
@NotNull Promise<Void> all(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable);
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
return all(false, promiseIterable);
}
default @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
return all(propagateCancel, Arrays.asList(promiseArray));
}
default @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
return all(false, promiseArray);
/**
* Creates a new {@link PromiseFactory} with the given logger and executors.
*
* @param logger the logger
* @param syncExecutor the synchronous executor
* @param asyncExecutor the asynchronous executor
* @return the new promise factory
*/
static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor<?> syncExecutor,
@NotNull PromiseExecutor<?> asyncExecutor) {
return new PromiseFactoryImpl<>(logger, syncExecutor, asyncExecutor);
}
/**
* @apiNote Even with cancelRaceLosers, it is not guaranteed that only one promise will complete.
* Creates a new {@link PromiseFactory} with the given logger and dual executor.
*
* @param logger the logger
* @param executor the executor
* @return the new promise factory
*/
<V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises);
static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor<?> executor) {
return new PromiseFactoryImpl<>(logger, executor, executor);
}
/**
* Creates a new {@link PromiseFactory} with the given logger and executor.
*
* @param logger the logger
* @param executor the executor
* @return the new promise factory
*/
static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull ScheduledExecutorService executor) {
return of(logger, PromiseExecutor.of(executor));
}
/**
* Creates a new uncompleted promise.
*
* @return the new promise
*/
<T> @NotNull CompletablePromise<T> unresolved();
/**
* Creates a new promise, completed with the given value.
*
* @param value the value to complete the promise with
* @return the new promise
*/
<T> @NotNull Promise<T> resolve(T value);
/**
* Creates a new promise, completed with {@code null}. This method is often useful for starting
* promise chains.
*
* @return the new promise
*/
@NotNull Promise<Void> start();
/**
* Creates a new promise, completed exceptionally with the given error.
*
* @param error the error to complete the promise with
* @return the new promise
*/
<T> @NotNull Promise<T> error(@NotNull Throwable error);
/**
* Creates a new promise backed by the given completion and future.
* The promise will be completed upon completion of the {@link CompletionStage}
* and the {@link Future} will be cancelled upon cancellation of the promise.
*
* @param completion the completion stage to wrap
* @param future the future to wrap
* @return the new promise
*/
<T> @NotNull Promise<T> wrap(@NotNull CompletionStage<T> completion, @Nullable Future<T> future);
/**
* Creates a new promise backed by the given future.
* The promise will be completed upon completion of the {@link CompletableFuture}
* and the {@link CompletableFuture} will be cancelled upon cancellation of the promise.
*
* @param future the future to wrap
* @return the new promise
*/
default <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) {
return wrap(future, future);
};
/**
* Combines two promises into a single promise that resolves when both promises are completed.
* If either input promise completes exceptionally, the other promise will be cancelled
* and the output promise will complete exceptionally.
*
* @param p1 the first promise
* @param p2 the second promise
* @return the combined promise
*/
<K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2);
/**
* Combines key-value pairs of promises into a single promise that completes
* when all promises are completed, with the results mapped by their keys.
* If any promise completes exceptionally, the other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param promises the input promises
* @param expectedSize the expected size of the iterator (used for optimization)
* @return the combined promise
*/
<K, V> @NotNull Promise<Map<K, V>> combineMapped(@NotNull Iterator<Map.Entry<K, Promise<V>>> promises,
int expectedSize);
/**
* Combines key-value pairs of promises into a single promise that completes
* when all promises are completed, with the results mapped by their keys.
* If any promise completes exceptionally, the other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default <K, V> @NotNull Promise<Map<K, V>> combineMapped(@NotNull Spliterator<Map.Entry<K, Promise<V>>> promises) {
return combineMapped(Spliterators.iterator(promises), PromiseUtil.estimateSize(promises));
}
/**
* Combines key-value pairs of promises into a single promise that completes
* when all promises are completed, with the results mapped by their keys.
* If any promise completes exceptionally, the other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default <K, V> @NotNull Promise<Map<K, V>> combineMapped(@NotNull Stream<Map.Entry<K, Promise<V>>> promises) {
return combineMapped(promises.spliterator());
}
/**
* Combines key-value pairs of promises into a single promise that completes
* when all promises are completed, with the results mapped by their keys.
* If any promise completes exceptionally, the other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default <K, V> @NotNull Promise<Map<K, V>> combineMapped(@NotNull Iterable<Map.Entry<K, Promise<V>>> promises) {
return combineMapped(promises.spliterator());
}
/**
* Combines key-value pairs of promises into a single promise that completes
* when all promises are completed, with the results mapped by their keys.
* If any promise completes exceptionally, the other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default <K, V> @NotNull Promise<Map<K, V>> combineMapped(@NotNull Map.Entry<K, Promise<V>>... promises) {
return combineMapped(Arrays.spliterator(promises));
}
/**
* Combines key-value pairs of promises into a single promise that completes
* when all promises are completed, with the results mapped by their keys.
* If any promise completes exceptionally, the other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default <K, V> @NotNull Promise<Map<K, V>> combineMapped(@NotNull Map<K, Promise<V>> promises) {
return combineMapped(promises.entrySet().iterator(), promises.size());
}
/**
* Combines key-value pairs of promises into a single promise that completes
* when all promises are completed, with the results mapped by their keys.
* If any promise completes exceptionally, the other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param keys the keys to map to promises
* @param mapper the function to map keys to promises
* @return the combined promise
*/
default <K, V> @NotNull Promise<Map<K, V>> combineMapped(@NotNull Stream<K> keys,
@NotNull Function<K, Promise<V>> mapper) {
return combineMapped(keys.map(k -> new AbstractMap.SimpleImmutableEntry<>(k, mapper.apply(k))));
}
/**
* Combines key-value pairs of promises into a single promise that completes
* when all promises are completed, with the results mapped by their keys.
* If any promise completes exceptionally, the other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param keys the keys to map to promises
* @param mapper the function to map keys to promises
* @return the combined promise
*/
default <K, V> @NotNull Promise<Map<K, V>> combineMapped(@NotNull Iterable<K> keys,
@NotNull Function<K, Promise<V>> mapper) {
return combineMapped(StreamSupport.stream(keys.spliterator(), false), mapper);
}
/**
* @deprecated Use combineMapped instead.
*/
@Deprecated
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
return combineMapped(promises);
}
/**
* Combines multiple promises into a single promise that completes when all promises
* are completed, with a list of results in the original order.
* If any promise completes exceptionally, all other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param promises the input promises
* @param expectedSize the expected size of the iterator (used for optimization)
* @return the combined promise
*/
<V> @NotNull Promise<List<V>> combine(@NotNull Iterator<Promise<V>> promises, int expectedSize);
/**
* Combines multiple promises into a single promise that completes when all promises
* are completed, with a list of results in the original order.
* If any promise completes exceptionally, all other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Spliterator<Promise<V>> promises) {
return combine(Spliterators.iterator(promises), PromiseUtil.estimateSize(promises));
}
/**
* Combines multiple promises into a single promise that completes when all promises
* are completed, with a list of results in the original order.
* If any promise completes exceptionally, all other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Stream<Promise<V>> promises) {
return combine(promises.spliterator());
}
/**
* Combines multiple promises into a single promise that completes when all promises
* are completed, with a list of results in the original order.
* If any promise completes exceptionally, all other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
return combine(promises.spliterator());
}
/**
* Combines multiple promises into a single promise that completes when all promises
* are completed, with a list of results in the original order.
* If any promise completes exceptionally, all other promises will be cancelled
* and the combined promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default <V> @NotNull Promise<List<V>> combine(@NotNull Promise<V>... promises) {
return combine(Arrays.spliterator(promises));
}
/**
* Combines multiple promises into a single promise that completes when all promises
* are completed, with a list of completions in the original order.
*
* @param promises the input promises
* @param expectedSize the expected size of the iterator (used for optimization)
* @return the combined promise
*/
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterator<Promise<?>> promises,
int expectedSize);
/**
* Combines multiple promises into a single promise that completes when all promises
* are completed, with a list of completions in the original order.
*
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Spliterator<Promise<?>> promises) {
return allSettled(Spliterators.iterator(promises), PromiseUtil.estimateSize(promises));
}
/**
* Combines multiple promises into a single promise that completes when all promises
* are completed, with a list of completions in the original order.
*
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Stream<Promise<?>> promises) {
return allSettled(promises.spliterator());
}
/**
* Combines multiple promises into a single promise that completes when all promises
* are completed, with a list of completions in the original order.
*
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promises) {
return allSettled(promises.spliterator());
}
/**
* Combines multiple promises into a single promise that completes when all promises
* are completed, with a list of completions in the original order.
*
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promises) {
return allSettled(Arrays.spliterator(promises));
}
/**
* Combines multiple promises into a single promise that completes when all promises complete.
* If any promise completes exceptionally, all other promises will be cancelled
* and the output promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
@NotNull Promise<Void> all(@NotNull Iterator<Promise<?>> promises);
/**
* Combines multiple promises into a single promise that completes when all promises complete.
* If any promise completes exceptionally, all other promises will be cancelled
* and the output promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<Void> all(@NotNull Stream<Promise<?>> promises) {
return all(promises.iterator());
}
/**
* Combines multiple promises into a single promise that completes when all promises complete.
* If any promise completes exceptionally, all other promises will be cancelled
* and the output promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promises) {
return all(promises.iterator());
}
/**
* Combines multiple promises into a single promise that completes when all promises complete.
* If any promise completes exceptionally, all other promises will be cancelled
* and the output promise will complete exceptionally.
*
* @param promises the input promises
* @return the combined promise
*/
default @NotNull Promise<Void> all(@NotNull Promise<?>... promises) {
return all(Arrays.asList(promises).iterator());
}
/**
* Combines multiple promises into a single promise that completes when any promise is completed.
* If {@code ignoreErrors} is {@code false} and the first promise completed exceptionally, the
* combined promise will also complete exceptionally. Otherwise, the combined promise will wait for a
* successful completion or complete with {@code null} if all promises complete exceptionally.
* Additionally, if {@code cancelLosers} is {@code true}, the other promises will be cancelled
* once the combined promise is completed.
*
* @param promises the input promises
* @param ignoreErrors whether to ignore promises that complete exceptionally
* @return the combined promise
*/
<V> @NotNull Promise<V> race(@NotNull Iterator<Promise<V>> promises, boolean ignoreErrors);
/**
* Combines multiple promises into a single promise that completes when any promise is completed.
* If {@code ignoreErrors} is {@code false} and the first promise completed exceptionally, the
* combined promise will also complete exceptionally. Otherwise, the combined promise will wait for a
* successful completion or complete with {@code null} if all promises complete exceptionally.
* Additionally, The other promises will be cancelled once the combined promise is completed.
*
* @param promises the input promises
* @param ignoreErrors whether to ignore promises that complete exceptionally
* @return the combined promise
*/
default <V> @NotNull Promise<V> race(@NotNull Stream<Promise<V>> promises, boolean ignoreErrors) {
return race(promises.iterator(), ignoreErrors);
}
/**
* Combines multiple promises into a single promise that completes when any promise is completed.
* If the first promise completed exceptionally, the combined promise will also complete exceptionally.
* Additionally, the other promises will be cancelled once the combined promise is completed.
*
* @param promises the input promises
* @return the combined promise
*/
default <V> @NotNull Promise<V> race(@NotNull Stream<Promise<V>> promises) {
return race(promises, false);
}
/**
* Combines multiple promises into a single promise that completes when any promise is completed.
* If {@code ignoreErrors} is {@code false} and the first promise completed exceptionally, the
* combined promise will also complete exceptionally. Otherwise, the combined promise will wait for a
* successful completion or complete with {@code null} if all promises complete exceptionally.
* Additionally, The other promises will be cancelled once the combined promise is completed.
*
* @param promises the input promises
* @param ignoreErrors whether to ignore promises that complete exceptionally
* @return the combined promise
*/
default <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises, boolean ignoreErrors) {
return race(promises.iterator(), ignoreErrors);
}
/**
* Combines multiple promises into a single promise that completes when any promise is completed.
* If the first promise completed exceptionally, the combined promise will also complete exceptionally.
* Additionally, the other promises will be cancelled once the combined promise is completed.
*
* @param promises the input promises
* @return the combined promise
*/
default <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises) {
return race(promises, false);
}
/**
* Combines multiple promises into a single promise that completes when any promise is completed.
* If {@code ignoreErrors} is {@code false} and the first promise completed exceptionally, the
* combined promise will also complete exceptionally. Otherwise, the combined promise will wait for a
* successful completion or complete with {@code null} if all promises complete exceptionally.
* Additionally, The other promises will be cancelled once the combined promise is completed.
*
* @param promises the input promises
* @param ignoreErrors whether to ignore promises that complete exceptionally
* @return the combined promise
*/
default <V> @NotNull Promise<V> race(boolean ignoreErrors, @NotNull Promise<V>... promises) {
return race(Arrays.asList(promises), ignoreErrors);
}
/**
* Combines multiple promises into a single promise that completes when any promise is completed.
* If the first promise completed exceptionally, the combined promise will also complete exceptionally.
* Additionally, the other promises will be cancelled once the combined promise is completed.
*
* @param promises the input promises
* @return the combined promise
*/
default <V> @NotNull Promise<V> race(@NotNull Promise<V>... promises) {
return race(false, promises);
}
<T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future);
default @NotNull Promise<Void> start() {
return resolve(null);
}
<T> @NotNull Promise<T> resolve(T value);
<T> @NotNull Promise<T> error(@NotNull Throwable error);
}

View File

@@ -0,0 +1,89 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
public class PromiseFactoryImpl<FS, FA> extends AbstractPromiseFactory<FS, FA> {
private final @NotNull Logger logger;
private final @NotNull PromiseExecutor<FS> syncExecutor;
private final @NotNull PromiseExecutor<FA> asyncExecutor;
public PromiseFactoryImpl(
@NotNull Logger logger,
@NotNull PromiseExecutor<FS> syncExecutor,
@NotNull PromiseExecutor<FA> asyncExecutor
) {
this.logger = logger;
this.syncExecutor = syncExecutor;
this.asyncExecutor = asyncExecutor;
}
@Override
public @NotNull <T> CompletablePromise<T> unresolved() {
return new PromiseImpl<>();
}
@Override
public @NotNull <T> Promise<T> resolve(T value) {
return new CompletedPromiseImpl<>(value);
}
@Override
public @NotNull Promise<Void> start() {
return new CompletedPromiseImpl<>();
}
@Override
public @NotNull <T> Promise<T> error(@NotNull Throwable error) {
return new CompletedPromiseImpl<>(error);
}
@Override
public @NotNull Logger getLogger() {
return logger;
}
@Override
public @NotNull PromiseExecutor<FS> getSyncExecutor() {
return syncExecutor;
}
@Override
public @NotNull PromiseExecutor<FA> getAsyncExecutor() {
return asyncExecutor;
}
private class PromiseImpl<T> extends BasePromise<T, FS, FA> {
@Override
public @NotNull AbstractPromiseFactory<FS, FA> getFactory() {
return PromiseFactoryImpl.this;
}
}
private class CompletedPromiseImpl<T> extends CompletedPromise<T, FS, FA> {
public CompletedPromiseImpl(@Nullable T result) {
super(new PromiseCompletion<>(result));
}
public CompletedPromiseImpl(@NotNull Throwable exception) {
super(new PromiseCompletion<>(exception));
}
public CompletedPromiseImpl() {
super();
}
@Override
public @NotNull AbstractPromiseFactory<FS, FA> getFactory() {
return PromiseFactoryImpl.this;
}
}
}

View File

@@ -2,8 +2,16 @@ package dev.tommyjs.futur.promise;
import org.jetbrains.annotations.NotNull;
/**
* A listener for a {@link Promise} that is called when the promise is resolved.
*/
public interface PromiseListener<T> {
void handle(@NotNull PromiseCompletion<T> ctx);
/**
* Handles the completion of the promise.
*
* @param completion the promise completion
*/
void handle(@NotNull PromiseCompletion<T> completion);
}

View File

@@ -1,90 +0,0 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.function.ExceptionalFunction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
/**
* @deprecated Use PromiseFactory instance methods instead.
*/
@Deprecated
public class Promises {
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return combine(p1, p2, p1.getFactory());
}
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2, PromiseFactory factory) {
return factory.combine(p1, p2);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, PromiseFactory factory) {
return combine(promises, timeout, true, factory);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, @Nullable BiConsumer<K, Throwable> exceptionHandler, PromiseFactory factory) {
return factory.combine(promises, exceptionHandler).timeout(timeout);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, PromiseFactory factory) {
return combine(promises, 1500L, true, factory);
}
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, PromiseFactory factory) {
return combine(promises, timeout, true, factory);
}
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
return factory.combine(promises, strict ? null : (_i, _v) -> {}).timeout(timeout);
}
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, PromiseFactory factory) {
return combine(promises, 1500L, true, factory);
}
public static @NotNull Promise<Void> all(@NotNull List<Promise<?>> promises, PromiseFactory factory) {
return factory.all(promises);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, PromiseFactory factory) {
return combine(keys, mapper, timeout, true, factory);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, boolean strict, PromiseFactory factory) {
Map<K, Promise<V>> promises = new HashMap<>();
for (K key : keys) {
Promise<V> promise = factory.resolve(key).thenApplyAsync(mapper);
promises.put(key, promise);
}
return combine(promises, timeout, strict, factory);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, PromiseFactory factory) {
return combine(keys, mapper, 1500L, true, factory);
}
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
return erase(p, p.getFactory());
}
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p, PromiseFactory factory) {
return p.erase();
}
public static <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future, PromiseFactory factory) {
return factory.wrap(future);
}
}

View File

@@ -0,0 +1,35 @@
package dev.tommyjs.futur.util;
import org.jetbrains.annotations.NotNull;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public class ConcurrentResultArray<T> {
private static final float RESIZE_FACTOR = 1.2F;
private final AtomicReference<T[]> ref;
public ConcurrentResultArray(int expectedSize) {
//noinspection unchecked
this.ref = new AtomicReference<>((T[]) new Object[expectedSize]);
}
public void set(int index, T element) {
ref.updateAndGet(array -> {
if (array.length <= index) {
array = Arrays.copyOf(array, (int) (array.length * RESIZE_FACTOR));
}
array[index] = element;
return array;
});
}
public @NotNull List<T> toList() {
return Arrays.asList(ref.get());
}
}

View File

@@ -0,0 +1,52 @@
package dev.tommyjs.futur.util;
import dev.tommyjs.futur.promise.CompletablePromise;
import dev.tommyjs.futur.promise.Promise;
import org.jetbrains.annotations.NotNull;
import java.util.Spliterator;
public class PromiseUtil {
/**
* Propagates the completion, once completed, of the given promise to the given promise.
*
* @param from the promise to propagate the completion from
* @param to the completable promise to propagate the completion to
*/
public static <V> void propagateCompletion(@NotNull Promise<V> from, @NotNull CompletablePromise<V> to) {
from.addDirectListener(to::complete, to::completeExceptionally);
}
/**
* Propagates the cancellation, once cancelled, of the given promise to the given promise.
*
* @param from the promise to propagate the cancellation from
* @param to the promise to propagate the cancellation to
*/
public static void propagateCancel(@NotNull Promise<?> from, @NotNull Promise<?> to) {
from.onCancel(to::cancel);
}
/**
* Cancels the given promise once the given promise is completed.
*
* @param from the promise to propagate the completion from
* @param to the promise to cancel upon completion
*/
public static void cancelOnComplete(@NotNull Promise<?> from, @NotNull Promise<?> to) {
from.addDirectListener(_ -> to.cancel());
}
/**
* Estimates the size of the given stream.
*
* @param stream the stream
* @return the estimated size
*/
public static int estimateSize(@NotNull Spliterator<?> stream) {
long estimate = stream.estimateSize();
return estimate == Long.MAX_VALUE ? 10 : (int) estimate;
}
}