optimizations, more comfortable PromiseFactory api and support virtual threaded executors

This commit is contained in:
WhatCats
2025-01-06 14:06:39 +01:00
parent 18d334a530
commit 9e392c91ba
39 changed files with 1205 additions and 833 deletions

View File

@@ -1,39 +0,0 @@
package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DualPoolExecutor implements PromiseExecutor<Future<?>> {
private final @NotNull ScheduledExecutorService syncSvc;
private final @NotNull ScheduledExecutorService asyncSvc;
public DualPoolExecutor(@NotNull ScheduledExecutorService syncSvc, @NotNull ScheduledExecutorService asyncSvc) {
this.syncSvc = syncSvc;
this.asyncSvc = asyncSvc;
}
public static @NotNull DualPoolExecutor create(int asyncPoolSize) {
return new DualPoolExecutor(Executors.newSingleThreadScheduledExecutor(), Executors.newScheduledThreadPool(asyncPoolSize));
}
@Override
public Future<?> runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return syncSvc.schedule(task, delay, unit);
}
@Override
public Future<?> runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return asyncSvc.schedule(task, delay, unit);
}
@Override
public void cancel(Future<?> task) {
task.cancel(true);
}
}

View File

@@ -0,0 +1,32 @@
package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
class ExecutorServiceImpl implements PromiseExecutor<Future<?>> {
private final ScheduledExecutorService service;
public ExecutorServiceImpl(@NotNull ScheduledExecutorService service) {
this.service = service;
}
@Override
public Future<?> run(@NotNull Runnable task) {
return service.submit(task);
}
@Override
public Future<?> run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return service.schedule(task, delay, unit);
}
@Override
public void cancel(Future<?> task) {
task.cancel(true);
}
}

View File

@@ -2,22 +2,32 @@ package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public interface PromiseExecutor<T> {
T runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit);
T runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit);
default T runSync(@NotNull Runnable task) {
return runSync(task, 0L, TimeUnit.MILLISECONDS);
static PromiseExecutor<?> virtualThreaded() {
return new VirtualThreadImpl();
}
default T runAsync(@NotNull Runnable task) {
return runAsync(task, 0L, TimeUnit.MILLISECONDS);
static PromiseExecutor<?> singleThreaded() {
return of(Executors.newSingleThreadScheduledExecutor());
}
static PromiseExecutor<?> multiThreaded(int threads) {
return of(Executors.newScheduledThreadPool(threads));
}
static PromiseExecutor<?> of(@NotNull ScheduledExecutorService service) {
return new ExecutorServiceImpl(service);
}
T run(@NotNull Runnable task) throws Exception;
T run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) throws Exception;
void cancel(T task);
}

View File

@@ -1,18 +0,0 @@
package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class SinglePoolExecutor extends DualPoolExecutor {
public SinglePoolExecutor(@NotNull ScheduledExecutorService service) {
super(service, service);
}
public static @NotNull SinglePoolExecutor create(int threadPoolSize) {
return new SinglePoolExecutor(Executors.newScheduledThreadPool(threadPoolSize));
}
}

View File

@@ -0,0 +1,31 @@
package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.TimeUnit;
class VirtualThreadImpl implements PromiseExecutor<Thread> {
@Override
public Thread run(@NotNull Runnable task) {
return Thread.ofVirtual().start(task);
}
@Override
public Thread run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return Thread.ofVirtual().start(() -> {
try {
Thread.sleep(unit.toMillis(delay));
} catch (InterruptedException e) {
return;
}
task.run();
});
}
@Override
public void cancel(Thread task) {
task.interrupt();
}
}

View File

@@ -3,6 +3,6 @@ package dev.tommyjs.futur.function;
@FunctionalInterface
public interface ExceptionalConsumer<T> {
void accept(T value) throws Throwable;
void accept(T value) throws Exception;
}

View File

@@ -3,6 +3,6 @@ package dev.tommyjs.futur.function;
@FunctionalInterface
public interface ExceptionalFunction<K, V> {
V apply(K value) throws Throwable;
V apply(K value) throws Exception;
}

View File

@@ -3,6 +3,6 @@ package dev.tommyjs.futur.function;
@FunctionalInterface
public interface ExceptionalRunnable {
void run() throws Throwable;
void run() throws Exception;
}

View File

@@ -3,6 +3,6 @@ package dev.tommyjs.futur.function;
@FunctionalInterface
public interface ExceptionalSupplier<T> {
T get() throws Throwable;
T get() throws Exception;
}

View File

@@ -1,27 +0,0 @@
package dev.tommyjs.futur.impl;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.promise.AbstractPromise;
import dev.tommyjs.futur.promise.AbstractPromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
public class SimplePromise<T, F> extends AbstractPromise<T, F> {
private final @NotNull AbstractPromiseFactory<F> factory;
public SimplePromise(@NotNull AbstractPromiseFactory<F> factory) {
this.factory = factory;
}
@Deprecated
public SimplePromise(@NotNull PromiseExecutor<F> executor, @NotNull Logger logger, @NotNull AbstractPromiseFactory<F> factory) {
this(factory);
}
@Override
public @NotNull AbstractPromiseFactory<F> getFactory() {
return factory;
}
}

View File

@@ -1,34 +0,0 @@
package dev.tommyjs.futur.impl;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.promise.AbstractPromiseFactory;
import dev.tommyjs.futur.promise.Promise;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
public class SimplePromiseFactory<F> extends AbstractPromiseFactory<F> {
private final PromiseExecutor<F> executor;
private final Logger logger;
public SimplePromiseFactory(PromiseExecutor<F> executor, Logger logger) {
this.executor = executor;
this.logger = logger;
}
@Override
public @NotNull <T> Promise<T> unresolved() {
return new SimplePromise<>(this);
}
@Override
public @NotNull Logger getLogger() {
return logger;
}
@Override
public @NotNull PromiseExecutor<F> getExecutor() {
return executor;
}
}

View File

@@ -0,0 +1,48 @@
package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Iterator;
import java.util.List;
public class CompletionJoiner extends PromiseJoiner<Promise<?>, Void, Void, List<PromiseCompletion<?>>> {
private final ConcurrentResultArray<PromiseCompletion<?>> results;
public CompletionJoiner(
@NotNull PromiseFactory factory,
@NotNull Iterator<Promise<?>> promises,
int expectedSize, boolean link
) {
super(factory);
results = new ConcurrentResultArray<>(expectedSize);
join(promises, link);
}
@Override
protected Void getKey(Promise<?> value) {
return null;
}
@Override
protected @NotNull Promise<Void> getPromise(Promise<?> value) {
//noinspection unchecked
return (Promise<Void>) value;
}
@Override
protected @Nullable Throwable onFinish(int index, Void key, @NotNull PromiseCompletion<Void> res) {
results.set(index, res);
return null;
}
@Override
protected List<PromiseCompletion<?>> getResult() {
return results.toList();
}
}

View File

@@ -0,0 +1,32 @@
package dev.tommyjs.futur.joiner;
import org.jetbrains.annotations.NotNull;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
class ConcurrentResultArray<T> {
private final AtomicReference<T[]> ref;
public ConcurrentResultArray(int expectedSize) {
//noinspection unchecked
this.ref = new AtomicReference<>((T[]) new Object[expectedSize]);
}
public void set(int index, T element) {
ref.updateAndGet(array -> {
if (array.length <= index)
return Arrays.copyOf(array, index + 6);
array[index] = element;
return array;
});
}
public @NotNull List<T> toList() {
return Arrays.asList(ref.get());
}
}

View File

@@ -0,0 +1,60 @@
package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.*;
import java.util.function.BiConsumer;
public class MappedResultJoiner<K, V> extends PromiseJoiner<Map.Entry<K, Promise<V>>, K, V, Map<K, V>> {
private final @Nullable BiConsumer<K, Throwable> exceptionHandler;
private final @NotNull ConcurrentResultArray<Map.Entry<K, V>> results;
public MappedResultJoiner(
@NotNull PromiseFactory factory,
@NotNull Iterator<Map.Entry<K, Promise<V>>> promises,
@Nullable BiConsumer<K, Throwable> exceptionHandler,
int expectedSize, boolean link
) {
super(factory);
this.exceptionHandler = exceptionHandler;
this.results = new ConcurrentResultArray<>(expectedSize);
join(promises, link);
}
@Override
protected K getKey(Map.Entry<K, Promise<V>> entry) {
return entry.getKey();
}
@Override
protected @NotNull Promise<V> getPromise(Map.Entry<K, Promise<V>> entry) {
return entry.getValue();
}
@Override
protected @Nullable Throwable onFinish(int index, K key, @NotNull PromiseCompletion<V> res) {
if (res.isError()) {
if (exceptionHandler == null) return res.getException();
exceptionHandler.accept(key, res.getException());
}
results.set(index, new AbstractMap.SimpleImmutableEntry<>(key, res.getResult()));
return null;
}
@Override
protected Map<K, V> getResult() {
List<Map.Entry<K, V>> list = results.toList();
Map<K, V> map = new HashMap<>(list.size());
for (Map.Entry<K, V> entry : list) {
map.put(entry.getKey(), entry.getValue());
}
return map;
}
}

View File

@@ -0,0 +1,66 @@
package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
abstract class PromiseJoiner<V, K, T, R> {
private final CompletablePromise<R> joined;
protected PromiseJoiner(@NotNull PromiseFactory factory) {
this.joined = factory.unresolved();
}
public @NotNull Promise<R> joined() {
return joined;
}
protected abstract K getKey(V value);
protected abstract @NotNull Promise<T> getPromise(V value);
protected abstract @Nullable Throwable onFinish(int index, K key, @NotNull PromiseCompletion<T> completion);
protected abstract R getResult();
protected void join(@NotNull Iterator<V> promises, boolean link) {
AtomicBoolean waiting = new AtomicBoolean();
AtomicInteger count = new AtomicInteger();
int i = 0;
do {
V value = promises.next();
Promise<T> p = getPromise(value);
if (link) {
AbstractPromise.cancelOnFinish(p, joined);
}
if (!joined.isCompleted()) {
count.incrementAndGet();
K key = getKey(value);
int index = i++;
p.addListener((res) -> {
Throwable e = onFinish(index, key, res);
if (e != null) {
joined.completeExceptionally(e);
} else if (count.decrementAndGet() == 0 && waiting.get()) {
joined.complete(getResult());
}
});
}
} while (promises.hasNext());
count.updateAndGet((v) -> {
if (v == 0) joined.complete(getResult());
else waiting.set(true);
return v;
});
}
}

View File

@@ -0,0 +1,56 @@
package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;
public class ResultJoiner<T> extends PromiseJoiner<Promise<T>, Void, T, List<T>> {
private final @Nullable BiConsumer<Integer, Throwable> exceptionHandler;
private final ConcurrentResultArray<T> results;
public ResultJoiner(
@NotNull PromiseFactory factory,
@NotNull Iterator<Promise<T>> promises,
@Nullable BiConsumer<Integer, Throwable> exceptionHandler,
int expectedSize, boolean link
) {
super(factory);
this.exceptionHandler = exceptionHandler;
this.results = new ConcurrentResultArray<>(expectedSize);
join(promises, link);
}
@Override
protected Void getKey(Promise<T> value) {
return null;
}
@Override
protected @NotNull Promise<T> getPromise(Promise<T> value) {
return value;
}
@Override
protected @Nullable Throwable onFinish(int index, Void key, @NotNull PromiseCompletion<T> res) {
if (res.isError()) {
if (exceptionHandler == null) return res.getException();
exceptionHandler.accept(index, res.getException());
}
results.set(index, res.getResult());
return null;
}
@Override
protected List<T> getResult() {
return results.toList();
}
}

View File

@@ -0,0 +1,39 @@
package dev.tommyjs.futur.joiner;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Iterator;
public class VoidJoiner extends PromiseJoiner<Promise<?>, Void, Void, Void> {
public VoidJoiner(@NotNull PromiseFactory factory, @NotNull Iterator<Promise<?>> promises, boolean link) {
super(factory);
join(promises, link);
}
@Override
protected Void getKey(Promise<?> value) {
return null;
}
@Override
protected @NotNull Promise<Void> getPromise(Promise<?> value) {
//noinspection unchecked
return (Promise<Void>) value;
}
@Override
protected @Nullable Throwable onFinish(int index, Void key, @NotNull PromiseCompletion<Void> completion) {
return completion.getException();
}
@Override
protected Void getResult() {
return null;
}
}

View File

@@ -1,6 +1,5 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.function.ExceptionalConsumer;
import dev.tommyjs.futur.function.ExceptionalFunction;
import dev.tommyjs.futur.function.ExceptionalRunnable;
@@ -10,102 +9,110 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
public abstract class AbstractPromise<T, F> implements Promise<T> {
public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T> {
private Collection<PromiseListener<T>> listeners;
private final AtomicReference<PromiseCompletion<T>> completion;
private final CountDownLatch latch;
private final Lock lock;
public AbstractPromise() {
this.completion = new AtomicReference<>();
this.latch = new CountDownLatch(1);
this.lock = new ReentrantLock();
}
protected static <V> void propagateResult(Promise<V> from, Promise<V> to) {
public static <V> void propagateResult(Promise<V> from, CompletablePromise<V> to) {
from.addDirectListener(to::complete, to::completeExceptionally);
}
protected static void propagateCancel(Promise<?> from, Promise<?> to) {
from.onCancel(to::completeExceptionally);
public static void propagateCancel(Promise<?> from, Promise<?> to) {
from.onCancel(to::cancel);
}
private <V> @NotNull Runnable createRunnable(T result, @NotNull Promise<V> promise, @NotNull ExceptionalFunction<T, V> task) {
public static void cancelOnFinish(Promise<?> toCancel, Promise<?> toFinish) {
toFinish.addDirectListener(_ -> toCancel.cancel());
}
private final AtomicReference<Collection<PromiseListener<T>>> listeners;
private final AtomicReference<PromiseCompletion<T>> completion;
private final CountDownLatch latch;
public AbstractPromise() {
this.listeners = new AtomicReference<>(Collections.emptyList());
this.completion = new AtomicReference<>();
this.latch = new CountDownLatch(1);
}
private void runCompleter(@NotNull CompletablePromise<?> promise, @NotNull ExceptionalRunnable completer) {
try {
completer.run();
} catch (Error e) {
promise.completeExceptionally(e);
throw e;
} catch (Throwable e) {
promise.completeExceptionally(e);
}
}
private <V> @NotNull Runnable createCompleter(
T result,
@NotNull CompletablePromise<V> promise,
@NotNull ExceptionalFunction<T, V> completer
) {
return () -> {
if (promise.isCompleted()) return;
try {
V nextResult = task.apply(result);
promise.complete(nextResult);
} catch (Throwable e) {
promise.completeExceptionally(e);
}
runCompleter(promise, () -> promise.complete(completer.apply(result)));
};
}
public abstract @NotNull AbstractPromiseFactory<F> getFactory();
protected @NotNull PromiseExecutor<F> getExecutor() {
return getFactory().getExecutor();
}
public abstract @NotNull AbstractPromiseFactory<FS, FA> getFactory();
protected @NotNull Logger getLogger() {
return getFactory().getLogger();
}
@Override
public T awaitInterruptibly() throws InterruptedException {
public T get() throws InterruptedException, ExecutionException {
this.latch.await();
return joinCompletion(Objects.requireNonNull(getCompletion()));
return joinCompletion();
}
@Override
public T awaitInterruptibly(long timeoutMillis) throws TimeoutException, InterruptedException {
boolean success = this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
public T get(long time, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
boolean success = this.latch.await(time, unit);
if (!success) {
throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms");
throw new TimeoutException("Promise stopped waiting after " + time + " " + unit);
}
return joinCompletion(Objects.requireNonNull(getCompletion()));
return joinCompletion();
}
@Override
public T await() {
try {
return awaitInterruptibly();
this.latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
PromiseCompletion<T> completion = Objects.requireNonNull(getCompletion());
if (completion.isSuccess()) return completion.getResult();
throw new CompletionException(completion.getException());
}
private T joinCompletion() throws ExecutionException {
PromiseCompletion<T> completion = Objects.requireNonNull(getCompletion());
if (completion.isSuccess()) return completion.getResult();
throw new ExecutionException(completion.getException());
}
@Override
public T await(long timeoutMillis) throws TimeoutException {
try {
return awaitInterruptibly(timeoutMillis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private T joinCompletion(PromiseCompletion<T> completion) {
if (completion.isError())
throw new RuntimeException(completion.getException());
return completion.getResult();
public @NotNull Promise<T> fork() {
CompletablePromise<T> fork = getFactory().unresolved();
propagateResult(this, fork);
return fork;
}
@Override
public @NotNull Promise<Void> thenRun(@NotNull ExceptionalRunnable task) {
return thenApply(result -> {
return thenApply(_ -> {
task.run();
return null;
});
@@ -121,14 +128,14 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenSupply(@NotNull ExceptionalSupplier<V> task) {
return thenApply(result -> task.get());
return thenApply(_ -> task.get());
}
@Override
public <V> @NotNull Promise<V> thenApply(@NotNull ExceptionalFunction<T, V> task) {
Promise<V> promise = getFactory().unresolved();
CompletablePromise<V> promise = getFactory().unresolved();
addDirectListener(
res -> createRunnable(res, promise, task).run(),
res -> createCompleter(res, promise, task).run(),
promise::completeExceptionally
);
@@ -138,7 +145,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenCompose(@NotNull ExceptionalFunction<T, Promise<V>> task) {
Promise<V> promise = getFactory().unresolved();
CompletablePromise<V> promise = getFactory().unresolved();
thenApply(task).addDirectListener(
nestedPromise -> {
if (nestedPromise == null) {
@@ -157,7 +164,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<Void> thenRunSync(@NotNull ExceptionalRunnable task) {
return thenApplySync(result -> {
return thenApplySync(_ -> {
task.run();
return null;
});
@@ -165,7 +172,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedSync(result -> {
return thenApplyDelayedSync(_ -> {
task.run();
return null;
}, delay, unit);
@@ -189,27 +196,23 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenSupplySync(@NotNull ExceptionalSupplier<V> task) {
return thenApplySync(result -> task.get());
return thenApplySync(_ -> task.get());
}
@Override
public <V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedSync(result -> task.get(), delay, unit);
return thenApplyDelayedSync(_ -> task.get(), delay, unit);
}
@Override
public <V> @NotNull Promise<V> thenApplySync(@NotNull ExceptionalFunction<T, V> task) {
Promise<V> promise = getFactory().unresolved();
CompletablePromise<V> promise = getFactory().unresolved();
addDirectListener(
res -> {
try {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runSync(runnable);
promise.onCancel((e) -> getExecutor().cancel(future));
} catch (RejectedExecutionException e) {
promise.completeExceptionally(e);
}
},
res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FS future = getFactory().getSyncExecutor().run(runnable);
promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
@@ -219,17 +222,13 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
Promise<V> promise = getFactory().unresolved();
CompletablePromise<V> promise = getFactory().unresolved();
addDirectListener(
res -> {
try {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runSync(runnable, delay, unit);
promise.onCancel((e) -> getExecutor().cancel(future));
} catch (RejectedExecutionException e) {
promise.completeExceptionally(e);
}
},
res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FS future = getFactory().getSyncExecutor().run(runnable, delay, unit);
promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
@@ -239,7 +238,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
Promise<V> promise = getFactory().unresolved();
CompletablePromise<V> promise = getFactory().unresolved();
thenApplySync(task).addDirectListener(
nestedPromise -> {
if (nestedPromise == null) {
@@ -258,7 +257,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<Void> thenRunAsync(@NotNull ExceptionalRunnable task) {
return thenApplyAsync(result -> {
return thenApplyAsync(_ -> {
task.run();
return null;
});
@@ -266,7 +265,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<Void> thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedAsync(result -> {
return thenApplyDelayedAsync(_ -> {
task.run();
return null;
}, delay, unit);
@@ -290,17 +289,17 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenSupplyAsync(@NotNull ExceptionalSupplier<V> task) {
return thenApplyAsync(result -> task.get());
return thenApplyAsync(_ -> task.get());
}
@Override
public <V> @NotNull Promise<V> thenSupplyDelayedAsync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedAsync(result -> task.get(), delay, unit);
return thenApplyDelayedAsync(_ -> task.get(), delay, unit);
}
@Override
public @NotNull Promise<T> thenPopulateReference(@NotNull AtomicReference<T> reference) {
return thenApplyAsync((result) -> {
return thenApplyAsync(result -> {
reference.set(result);
return result;
});
@@ -308,17 +307,13 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task) {
Promise<V> promise = getFactory().unresolved();
CompletablePromise<V> promise = getFactory().unresolved();
addDirectListener(
(res) -> {
try {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runAsync(runnable);
promise.onCancel((e) -> getExecutor().cancel(future));
} catch (RejectedExecutionException e) {
promise.completeExceptionally(e);
}
},
(res) -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FA future = getFactory().getAsyncExecutor().run(runnable);
promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
@@ -328,17 +323,13 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
Promise<V> promise = getFactory().unresolved();
CompletablePromise<V> promise = getFactory().unresolved();
addDirectListener(
res -> {
try {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runAsync(runnable, delay, unit);
promise.onCancel((e) -> getExecutor().cancel(future));
} catch (RejectedExecutionException e) {
promise.completeExceptionally(e);
}
},
res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FA future = getFactory().getAsyncExecutor().run(runnable, delay, unit);
promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
@@ -348,7 +339,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
Promise<V> promise = getFactory().unresolved();
CompletablePromise<V> promise = getFactory().unresolved();
thenApplyAsync(task).addDirectListener(
nestedPromise -> {
if (nestedPromise == null) {
@@ -367,7 +358,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<Void> erase() {
return thenSupplyAsync(() -> null);
return thenSupply(() -> null);
}
@Override
@@ -378,10 +369,10 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
return addAsyncListener((res) -> {
if (res.isError()) {
if (errorListener != null) errorListener.accept(res.getException());
} else {
if (res.isSuccess()) {
if (successListener != null) successListener.accept(res.getResult());
} else {
if (errorListener != null) errorListener.accept(res.getException());
}
});
}
@@ -394,49 +385,47 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
return addDirectListener((res) -> {
if (res.isError()) {
if (errorListener != null) errorListener.accept(res.getException());
} else {
if (res.isSuccess()) {
if (successListener != null) successListener.accept(res.getResult());
} else {
if (errorListener != null) errorListener.accept(res.getException());
}
});
}
private @NotNull Promise<T> addAnyListener(PromiseListener<T> listener) {
PromiseCompletion<T> completion;
Collection<PromiseListener<T>> res = listeners.updateAndGet(v -> {
if (v == Collections.EMPTY_LIST) v = new ConcurrentLinkedQueue<>();
if (v != null) v.add(listener);
return v;
});
lock.lock();
try {
completion = getCompletion();
if (completion == null) {
if (listeners == null) listeners = new LinkedList<>();
listeners.add(listener);
return this;
if (res == null) {
if (listener instanceof AsyncPromiseListener) {
callListenerAsync(listener, Objects.requireNonNull(getCompletion()));
} else {
callListenerNow(listener, Objects.requireNonNull(getCompletion()));
}
} finally {
lock.unlock();
}
callListener(listener, completion);
return this;
}
private void callListener(PromiseListener<T> listener, PromiseCompletion<T> ctx) {
if (listener instanceof AsyncPromiseListener) {
try {
getExecutor().runAsync(() -> callListenerNow(listener, ctx));
} catch (RejectedExecutionException ignored) {
}
} else {
callListenerNow(listener, ctx);
private void callListenerAsync(PromiseListener<T> listener, PromiseCompletion<T> res) {
try {
getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, res));
} catch (Exception e) {
getLogger().warn("Exception caught while running promise listener", e);
}
}
private void callListenerNow(PromiseListener<T> listener, PromiseCompletion<T> ctx) {
private void callListenerNow(PromiseListener<T> listener, PromiseCompletion<T> res) {
try {
listener.handle(ctx);
} catch (Exception e) {
listener.handle(res);
} catch (Error e) {
getLogger().error("Error caught in promise listener", e);
throw e;
} catch (Throwable e) {
getLogger().error("Exception caught in promise listener", e);
}
}
@@ -453,13 +442,15 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<T> logExceptions(@NotNull String message) {
return onError(e -> getLogger().error(message, e));
Exception wrapper = new DeferredExecutionException();
return onError(e -> getLogger().error(message, wrapper.initCause(e)));
}
@Override
public <E extends Throwable> @NotNull Promise<T> onError(@NotNull Class<E> clazz, @NotNull Consumer<E> listener) {
return onError((e) -> {
if (clazz.isAssignableFrom(e.getClass())) {
getLogger().info("On Error {}", e.getClass());
//noinspection unchecked
listener.accept((E) e);
}
@@ -471,37 +462,51 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
return onError(CancellationException.class, listener);
}
@Deprecated
@Override
public @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit) {
return maxWaitTime(time, unit);
Exception e = new CancellationException("Promise timed out after " + time + " " + unit);
return completeExceptionallyDelayed(e, time, unit);
}
@Override
public @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit) {
try {
Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit);
F future = getExecutor().runAsync(() -> completeExceptionally(e), time, unit);
return addDirectListener((_v) -> getExecutor().cancel(future));
} catch (RejectedExecutionException e) {
completeExceptionally(e);
return this;
}
Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit);
return completeExceptionallyDelayed(e, time, unit);
}
private Promise<T> completeExceptionallyDelayed(Throwable e, long delay, TimeUnit unit) {
runCompleter(this, () -> {
FA future = getFactory().getAsyncExecutor().run(() -> completeExceptionally(e), delay, unit);
addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future));
});
return this;
}
private void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
lock.lock();
try {
if (!setCompletion(ctx)) return;
if (!setCompletion(ctx)) return;
latch.countDown();
this.latch.countDown();
if (listeners != null) {
for (PromiseListener<T> listener : listeners) {
callListener(listener, ctx);
Iterator<PromiseListener<T>> iter = listeners.getAndSet(null).iterator();
while (iter.hasNext()) {
PromiseListener<T> listener = iter.next();
if (listener instanceof AsyncPromiseListener) {
callListenerAsync(listener, ctx);
} else {
try {
callListenerNow(listener, ctx);
} finally {
iter.forEachRemaining(v -> callListenerAsyncLastResort(v, ctx));
}
}
} finally {
lock.unlock();
}
}
private void callListenerAsyncLastResort(PromiseListener<T> listener, PromiseCompletion<T> ctx) {
try {
getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, ctx));
} catch (Throwable ignored) {
}
}
@@ -510,8 +515,8 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
}
@Override
public void cancel(@Nullable String message) {
completeExceptionally(new CancellationException(message));
public void cancel(@NotNull CancellationException e) {
completeExceptionally(e);
}
@Override
@@ -538,7 +543,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
public @NotNull CompletableFuture<T> toFuture() {
CompletableFuture<T> future = new CompletableFuture<>();
this.addDirectListener(future::complete, future::completeExceptionally);
future.whenComplete((res, e) -> {
future.whenComplete((_, e) -> {
if (e instanceof CancellationException) {
this.cancel();
}

View File

@@ -1,6 +1,10 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.joiner.CompletionJoiner;
import dev.tommyjs.futur.joiner.MappedResultJoiner;
import dev.tommyjs.futur.joiner.ResultJoiner;
import dev.tommyjs.futur.joiner.VoidJoiner;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -8,142 +12,84 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import java.util.stream.Stream;
public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
public abstract class AbstractPromiseFactory<FS, FA> implements PromiseFactory {
public abstract @NotNull PromiseExecutor<F> getExecutor();
public abstract @NotNull PromiseExecutor<FS> getSyncExecutor();
public abstract @NotNull PromiseExecutor<FA> getAsyncExecutor();
@Override
public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2) {
List<Promise<?>> promises = List.of(p1, p2);
return all(propagateCancel, promises)
.thenApplyAsync((res) -> new AbstractMap.SimpleImmutableEntry<>(
Objects.requireNonNull(p1.getCompletion()).getResult(),
Objects.requireNonNull(p2.getCompletion()).getResult()
));
public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(
@NotNull Promise<K> p1,
@NotNull Promise<V> p2,
boolean dontFork
) {
return all(dontFork, p1, p2).thenApply((_) -> new AbstractMap.SimpleImmutableEntry<>(
Objects.requireNonNull(p1.getCompletion()).getResult(),
Objects.requireNonNull(p2.getCompletion()).getResult()
));
}
@Override
public <K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
public <K, V> @NotNull Promise<Map<K, V>> combine(
@NotNull Map<K, Promise<V>> promises,
@Nullable BiConsumer<K, Throwable> exceptionHandler,
boolean link
) {
if (promises.isEmpty()) return resolve(Collections.emptyMap());
return new MappedResultJoiner<>(this,
promises.entrySet().iterator(), exceptionHandler, promises.size(), link).joined();
}
Map<K, V> map = new HashMap<>();
Promise<Map<K, V>> promise = unresolved();
for (Map.Entry<K, Promise<V>> entry : promises.entrySet()) {
if (propagateCancel) {
AbstractPromise.propagateCancel(promise, entry.getValue());
}
@Override
public <V> @NotNull Promise<List<V>> combine(
@NotNull Iterator<Promise<V>> promises, int expectedSize,
@Nullable BiConsumer<Integer, Throwable> exceptionHandler, boolean link
) {
if (!promises.hasNext()) return resolve(Collections.emptyList());
return new ResultJoiner<>(
this, promises, exceptionHandler, expectedSize, link).joined();
}
entry.getValue().addDirectListener((ctx) -> {
synchronized (map) {
if (ctx.getException() != null) {
if (exceptionHandler == null) {
promise.completeExceptionally(ctx.getException());
} else {
exceptionHandler.accept(entry.getKey(), ctx.getException());
map.put(entry.getKey(), null);
}
} else {
map.put(entry.getKey(), ctx.getResult());
}
@Override
public @NotNull Promise<List<PromiseCompletion<?>>> allSettled(
@NotNull Iterator<Promise<?>> promises,
int expectedSize,
boolean link
) {
if (!promises.hasNext()) return resolve(Collections.emptyList());
return new CompletionJoiner(this, promises, expectedSize, link).joined();
}
if (map.size() == promises.size()) {
promise.complete(map);
}
}
});
}
@Override
public @NotNull Promise<Void> all(@NotNull Iterator<Promise<?>> promises, boolean link) {
if (!promises.hasNext()) return resolve(null);
return new VoidJoiner(this, promises, link).joined();
}
@Override
public <V> @NotNull Promise<V> race(@NotNull Iterator<Promise<V>> promises, boolean link) {
CompletablePromise<V> promise = unresolved();
promises.forEachRemaining(p -> {
if (link) AbstractPromise.cancelOnFinish(p, promise);
if (!promise.isCompleted())
AbstractPromise.propagateResult(p, promise);
});
return promise;
}
@Override
public <V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
AtomicInteger index = new AtomicInteger();
return this.combine(
propagateCancel,
StreamSupport.stream(promises.spliterator(), false)
.collect(Collectors.toMap(k -> index.getAndIncrement(), v -> v)),
exceptionHandler
).thenApplyAsync(v ->
v.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(Collectors.toList())
);
public <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises, boolean link) {
return race(promises.iterator(), link);
}
@Override
public @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
List<Promise<?>> promises = new ArrayList<>();
promiseIterable.iterator().forEachRemaining(promises::add);
if (promises.isEmpty()) return resolve(Collections.emptyList());
PromiseCompletion<?>[] results = new PromiseCompletion<?>[promises.size()];
Promise<List<PromiseCompletion<?>>> promise = unresolved();
var iter = promises.listIterator();
while (iter.hasNext()) {
int index = iter.nextIndex();
var p = iter.next();
if (propagateCancel) {
AbstractPromise.propagateCancel(promise, p);
}
p.addDirectListener((res) -> {
synchronized (results) {
results[index] = res;
if (Arrays.stream(results).allMatch(Objects::nonNull))
promise.complete(Arrays.asList(results));
}
});
}
return promise;
}
@Override
public @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
List<Promise<?>> promises = new ArrayList<>();
promiseIterable.iterator().forEachRemaining(promises::add);
if (promises.isEmpty()) return resolve(null);
AtomicInteger completed = new AtomicInteger();
Promise<Void> promise = unresolved();
for (Promise<?> p : promises) {
if (propagateCancel) {
AbstractPromise.propagateCancel(promise, p);
}
p.addDirectListener((res) -> {
if (res.getException() != null) {
promise.completeExceptionally(res.getException());
} else if (completed.incrementAndGet() == promises.size()) {
promise.complete(null);
}
});
}
return promise;
}
@Override
public <V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises) {
Promise<V> promise = unresolved();
for (Promise<V> p : promises) {
if (cancelRaceLosers) {
promise.addListener((res) -> p.cancel());
}
AbstractPromise.propagateResult(p, promise);
}
return promise;
public <V> @NotNull Promise<V> race(@NotNull Stream<Promise<V>> promises, boolean link) {
return race(promises.iterator(), link);
}
@Override
@@ -152,7 +98,7 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
}
private <T> @NotNull Promise<T> wrap(@NotNull CompletionStage<T> completion, Future<T> future) {
Promise<T> promise = unresolved();
CompletablePromise<T> promise = unresolved();
completion.whenComplete((v, e) -> {
if (e != null) {
@@ -162,20 +108,20 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
}
});
promise.onCancel((e) -> future.cancel(true));
promise.onCancel(_ -> future.cancel(true));
return promise;
}
@Override
public <T> @NotNull Promise<T> resolve(T value) {
Promise<T> promise = unresolved();
CompletablePromise<T> promise = unresolved();
promise.complete(value);
return promise;
}
@Override
public <T> @NotNull Promise<T> error(@NotNull Throwable error) {
Promise<T> promise = unresolved();
CompletablePromise<T> promise = unresolved();
promise.completeExceptionally(error);
return promise;
}

View File

@@ -0,0 +1,12 @@
package dev.tommyjs.futur.promise;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public interface CompletablePromise<T> extends Promise<T> {
void complete(@Nullable T result);
void completeExceptionally(@NotNull Throwable result);
}

View File

@@ -0,0 +1,11 @@
package dev.tommyjs.futur.promise;
import java.util.concurrent.ExecutionException;
class DeferredExecutionException extends ExecutionException {
public DeferredExecutionException() {
super();
}
}

View File

@@ -8,16 +8,13 @@ import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
public interface Promise<T> {
PromiseFactory getFactory();
@NotNull PromiseFactory getFactory();
@NotNull Promise<Void> thenRun(@NotNull ExceptionalRunnable task);
@@ -80,6 +77,9 @@ public interface Promise<T> {
*/
@NotNull Promise<T> addDirectListener(@NotNull PromiseListener<T> listener);
/**
* @apiNote Direct listeners run on the same thread as the completion.
*/
@NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
/**
@@ -94,6 +94,9 @@ public interface Promise<T> {
return addAsyncListener(listener);
}
/**
* @apiNote Async listeners are run in parallel.
*/
@NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
@NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener);
@@ -105,55 +108,70 @@ public interface Promise<T> {
@NotNull Promise<T> onCancel(@NotNull Consumer<CancellationException> listener);
/**
* @deprecated Use maxWaitTime instead
* Cancels the promise with a TimeoutException after the specified time.
*/
@Deprecated
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);
/**
* @deprecated Use maxWaitTime instead
* Cancels the promise with a TimeoutException after the specified time.
*/
@Deprecated
default @NotNull Promise<T> timeout(long ms) {
return timeout(ms, TimeUnit.MILLISECONDS);
}
/**
* Completes the promise exceptionally with a TimeoutException after the specified time.
*/
@NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit);
/**
* Completes the promise exceptionally with a TimeoutException after the specified time.
*/
default @NotNull Promise<T> maxWaitTime(long ms) {
return maxWaitTime(ms, TimeUnit.MILLISECONDS);
}
void cancel(@Nullable String reason);
void cancel(@NotNull CancellationException exception);
default void cancel(@NotNull String reason) {
cancel(new CancellationException(reason));
};
default void cancel() {
cancel(null);
cancel(new CancellationException());
}
void complete(@Nullable T result);
void completeExceptionally(@NotNull Throwable result);
@Blocking
T awaitInterruptibly() throws InterruptedException;
@Blocking
T awaitInterruptibly(long timeout) throws TimeoutException, InterruptedException;
/**
* Waits if necessary for this promise to complete, and then returns its result.
* @throws CancellationException if the computation was cancelled
* @throws CompletionException if this promise completed exceptionally
*/
@Blocking
T await();
@Blocking
T await(long timeout) throws TimeoutException;
/**
* @deprecated Use await instead.
* Waits if necessary for this promise to complete, and then returns its result.
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if this promise completed exceptionally
* @throws InterruptedException if the current thread was interrupted while waiting
*/
@Blocking
@Deprecated
default T join(long timeout) throws TimeoutException {
return await(timeout);
};
T get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for this future to complete, and then returns its result, if available.
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if this promise completed exceptionally
* @throws InterruptedException if the current thread was interrupted while waiting
* @throws TimeoutException if the wait timed out
*/
@Blocking
T get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
/**
* Stops this promise from propagating up cancellations.
*/
@NotNull Promise<T> fork();
@Nullable PromiseCompletion<T> getCompletion();

View File

@@ -22,12 +22,16 @@ public class PromiseCompletion<T> {
this.result = null;
}
public boolean isSuccess() {
return exception == null;
}
public boolean isError() {
return getException() != null;
return exception != null;
}
public boolean wasCanceled() {
return getException() instanceof CancellationException;
return exception instanceof CancellationException;
}
public @Nullable T getResult() {

View File

@@ -1,90 +1,183 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
public interface PromiseFactory {
static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor<?> syncExecutor, @NotNull PromiseExecutor<?> asyncExecutor) {
return new PromiseFactoryImpl<>(logger, syncExecutor, asyncExecutor);
}
static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor<?> executor) {
return new PromiseFactoryImpl<>(logger, executor, executor);
}
static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull ScheduledExecutorService executor) {
return of(logger, PromiseExecutor.of(executor));
}
private static int size(@NotNull Stream<?> stream) {
long estimate = stream.spliterator().estimateSize();
return estimate == Long.MAX_VALUE ? 10 : (int) estimate;
}
@NotNull Logger getLogger();
<T> @NotNull Promise<T> unresolved();
<T> @NotNull CompletablePromise<T> unresolved();
<K, V> @NotNull Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2);
<K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2, boolean cancelOnError);
default <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return combine(false, p1, p2);
return combine(p1, p2, true);
}
<K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler);
<K, V> @NotNull Promise<Map<K, V>> combine(
@NotNull Map<K, Promise<V>> promises,
@Nullable BiConsumer<K, Throwable> exceptionHandler,
boolean propagateCancel
);
default <K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises) {
return combine(propagateCancel, promises, null);
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @NotNull BiConsumer<K, Throwable> exceptionHandler) {
return combine(promises, exceptionHandler, true);
}
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
return combine(false, promises, exceptionHandler);
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, boolean cancelOnError) {
return combine(promises, null, cancelOnError);
}
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
return combine(promises, null);
return combine(promises, null, true);
}
<V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler);
<V> @NotNull Promise<List<V>> combine(
@NotNull Iterator<Promise<V>> promises, int expectedSize,
@Nullable BiConsumer<Integer, Throwable> exceptionHandler, boolean propagateCancel
);
default <V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises) {
return combine(propagateCancel, promises, null);
default <V> @NotNull Promise<List<V>> combine(
@NotNull Collection<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler,
boolean propagateCancel
) {
return combine(promises.iterator(), promises.size(), exceptionHandler, propagateCancel);
}
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
return combine(false, promises, exceptionHandler);
default <V> @NotNull Promise<List<V>> combine(
@NotNull Collection<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler
) {
return combine(promises.iterator(), promises.size(), exceptionHandler, true);
}
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
return combine(promises, null);
default <V> @NotNull Promise<List<V>> combine(@NotNull Collection<Promise<V>> promises, boolean cancelOnError) {
return combine(promises.iterator(), promises.size(), null, cancelOnError);
}
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable);
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
return allSettled(false, promiseIterable);
default <V> @NotNull Promise<List<V>> combine(@NotNull Collection<Promise<V>> promises) {
return combine(promises.iterator(), promises.size(), null, true);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
return allSettled(propagateCancel, Arrays.asList(promiseArray));
default <V> @NotNull Promise<List<V>> combine(
@NotNull Stream<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler,
boolean propagateCancel
) {
return combine(promises.iterator(), size(promises), exceptionHandler, propagateCancel);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
return allSettled(false, promiseArray);
default <V> @NotNull Promise<List<V>> combine(
@NotNull Stream<Promise<V>> promises,
@NotNull BiConsumer<Integer, Throwable> exceptionHandler
) {
return combine(promises.iterator(), size(promises), exceptionHandler, true);
}
@NotNull Promise<Void> all(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable);
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
return all(false, promiseIterable);
default <V> @NotNull Promise<List<V>> combine(@NotNull Stream<Promise<V>> promises, boolean cancelOnError) {
return combine(promises.iterator(), size(promises), null, cancelOnError);
}
default @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
return all(propagateCancel, Arrays.asList(promiseArray));
default <V> @NotNull Promise<List<V>> combine(@NotNull Stream<Promise<V>> promises) {
return combine(promises.iterator(), size(promises), null, true);
}
default @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
return all(false, promiseArray);
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(
@NotNull Iterator<Promise<?>> promises, int estimatedSize, boolean propagateCancel);
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Collection<Promise<?>> promises, boolean propagateCancel) {
return allSettled(promises.iterator(), promises.size(), propagateCancel);
}
/**
* @apiNote Even with cancelRaceLosers, it is not guaranteed that only one promise will complete.
*/
<V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises);
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Collection<Promise<?>> promises) {
return allSettled(promises.iterator(), promises.size(), true);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Stream<Promise<?>> promises, boolean propagateCancel) {
return allSettled(promises.iterator(), size(promises), propagateCancel);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Stream<Promise<?>> promises) {
return allSettled(promises.iterator(), size(promises), true);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Promise<?>... promises) {
return allSettled(Arrays.asList(promises).iterator(), promises.length, propagateCancel);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promises) {
return allSettled(Arrays.asList(promises).iterator(), promises.length, true);
}
@NotNull Promise<Void> all(@NotNull Iterator<Promise<?>> promises, boolean cancelAllOnError);
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promises, boolean cancelAllOnError) {
return all(promises.iterator(), cancelAllOnError);
}
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promises) {
return all(promises.iterator(), true);
}
default @NotNull Promise<Void> all(@NotNull Stream<Promise<?>> promises, boolean cancelAllOnError) {
return all(promises.iterator(), cancelAllOnError);
}
default @NotNull Promise<Void> all(@NotNull Stream<Promise<?>> promises) {
return all(promises.iterator(), true);
}
default @NotNull Promise<Void> all(boolean cancelAllOnError, @NotNull Promise<?>... promises) {
return all(Arrays.asList(promises).iterator(), cancelAllOnError);
}
default @NotNull Promise<Void> all(@NotNull Promise<?>... promises) {
return all(Arrays.asList(promises).iterator(), true);
}
<V> @NotNull Promise<V> race(@NotNull Iterator<Promise<V>> promises, boolean cancelLosers);
default <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises, boolean cancelLosers) {
return race(promises.iterator(), cancelLosers);
}
default <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises) {
return race(false, promises);
return race(promises.iterator(), true);
}
default <V> @NotNull Promise<V> race(@NotNull Stream<Promise<V>> promises, boolean cancelLosers) {
return race(promises.iterator(), cancelLosers);
}
default <V> @NotNull Promise<V> race(@NotNull Stream<Promise<V>> promises) {
return race(promises.iterator(), true);
}
<T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future);

View File

@@ -0,0 +1,43 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
public class PromiseFactoryImpl<FS, FA> extends AbstractPromiseFactory<FS, FA> {
private final @NotNull Logger logger;
private final @NotNull PromiseExecutor<FS> syncExecutor;
private final @NotNull PromiseExecutor<FA> asyncExecutor;
public PromiseFactoryImpl(
@NotNull Logger logger,
@NotNull PromiseExecutor<FS> syncExecutor,
@NotNull PromiseExecutor<FA> asyncExecutor
) {
this.logger = logger;
this.syncExecutor = syncExecutor;
this.asyncExecutor = asyncExecutor;
}
@Override
public @NotNull <T> CompletablePromise<T> unresolved() {
return new PromiseImpl<>(this);
}
@Override
public @NotNull Logger getLogger() {
return logger;
}
@Override
public @NotNull PromiseExecutor<FS> getSyncExecutor() {
return syncExecutor;
}
@Override
public @NotNull PromiseExecutor<FA> getAsyncExecutor() {
return asyncExecutor;
}
}

View File

@@ -0,0 +1,18 @@
package dev.tommyjs.futur.promise;
import org.jetbrains.annotations.NotNull;
public class PromiseImpl<T, FS, FA> extends AbstractPromise<T, FS, FA> {
private final @NotNull AbstractPromiseFactory<FS, FA> factory;
public PromiseImpl(@NotNull AbstractPromiseFactory<FS, FA> factory) {
this.factory = factory;
}
@Override
public @NotNull AbstractPromiseFactory<FS, FA> getFactory() {
return factory;
}
}

View File

@@ -1,90 +0,0 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.function.ExceptionalFunction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
/**
* @deprecated Use PromiseFactory instance methods instead.
*/
@Deprecated
public class Promises {
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return combine(p1, p2, p1.getFactory());
}
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2, PromiseFactory factory) {
return factory.combine(p1, p2);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, PromiseFactory factory) {
return combine(promises, timeout, true, factory);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, @Nullable BiConsumer<K, Throwable> exceptionHandler, PromiseFactory factory) {
return factory.combine(promises, exceptionHandler).timeout(timeout);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, PromiseFactory factory) {
return combine(promises, 1500L, true, factory);
}
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, PromiseFactory factory) {
return combine(promises, timeout, true, factory);
}
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
return factory.combine(promises, strict ? null : (_i, _v) -> {}).timeout(timeout);
}
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, PromiseFactory factory) {
return combine(promises, 1500L, true, factory);
}
public static @NotNull Promise<Void> all(@NotNull List<Promise<?>> promises, PromiseFactory factory) {
return factory.all(promises);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, PromiseFactory factory) {
return combine(keys, mapper, timeout, true, factory);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, boolean strict, PromiseFactory factory) {
Map<K, Promise<V>> promises = new HashMap<>();
for (K key : keys) {
Promise<V> promise = factory.resolve(key).thenApplyAsync(mapper);
promises.put(key, promise);
}
return combine(promises, timeout, strict, factory);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, PromiseFactory factory) {
return combine(keys, mapper, 1500L, true, factory);
}
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
return erase(p, p.getFactory());
}
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p, PromiseFactory factory) {
return p.erase();
}
public static <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future, PromiseFactory factory) {
return factory.wrap(future);
}
}