mirror of
https://github.com/tommyskeff/futur4j.git
synced 2026-01-17 23:16:01 +00:00
Merge pull request #9 from tommyskeff/feat/propagate-util
promise util with propagate cancellation options
This commit is contained in:
@@ -14,7 +14,7 @@ nexusPublishing {
|
|||||||
|
|
||||||
subprojects {
|
subprojects {
|
||||||
group = 'dev.tommyjs'
|
group = 'dev.tommyjs'
|
||||||
version = '2.2.0'
|
version = '2.3.0'
|
||||||
|
|
||||||
apply plugin: 'java'
|
apply plugin: 'java'
|
||||||
apply plugin: 'com.github.johnrengelman.shadow'
|
apply plugin: 'com.github.johnrengelman.shadow'
|
||||||
@@ -33,11 +33,11 @@ subprojects {
|
|||||||
implementation 'org.jetbrains:annotations:24.1.0'
|
implementation 'org.jetbrains:annotations:24.1.0'
|
||||||
implementation 'org.slf4j:slf4j-api:2.0.12'
|
implementation 'org.slf4j:slf4j-api:2.0.12'
|
||||||
compileOnly 'io.projectreactor:reactor-core:3.6.4'
|
compileOnly 'io.projectreactor:reactor-core:3.6.4'
|
||||||
|
compileOnly 'org.redisson:redisson:3.2.0'
|
||||||
|
|
||||||
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
|
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
|
||||||
testImplementation 'io.projectreactor:reactor-core:3.6.4'
|
testImplementation 'io.projectreactor:reactor-core:3.6.4'
|
||||||
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
|
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
|
||||||
testImplementation 'org.slf4j:slf4j-api:2.0.12'
|
|
||||||
testImplementation 'ch.qos.logback:logback-classic:1.5.3'
|
testImplementation 'ch.qos.logback:logback-classic:1.5.3'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,11 +3,11 @@ package dev.tommyjs.futur.executor;
|
|||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class DualPoolExecutor implements PromiseExecutor<ScheduledFuture<?>> {
|
public class DualPoolExecutor implements PromiseExecutor<Future<?>> {
|
||||||
|
|
||||||
private final @NotNull ScheduledExecutorService syncSvc;
|
private final @NotNull ScheduledExecutorService syncSvc;
|
||||||
private final @NotNull ScheduledExecutorService asyncSvc;
|
private final @NotNull ScheduledExecutorService asyncSvc;
|
||||||
@@ -22,17 +22,17 @@ public class DualPoolExecutor implements PromiseExecutor<ScheduledFuture<?>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ScheduledFuture<?> runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
|
public Future<?> runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
|
||||||
return syncSvc.schedule(task, delay, unit);
|
return syncSvc.schedule(task, delay, unit);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ScheduledFuture<?> runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
|
public Future<?> runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
|
||||||
return asyncSvc.schedule(task, delay, unit);
|
return asyncSvc.schedule(task, delay, unit);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancel(ScheduledFuture<?> task) {
|
public void cancel(Future<?> task) {
|
||||||
task.cancel(true);
|
task.cancel(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -10,21 +10,44 @@ import org.jetbrains.annotations.Nullable;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
public abstract class AbstractPromise<T, F> implements Promise<T> {
|
public abstract class AbstractPromise<T, F> implements Promise<T> {
|
||||||
|
|
||||||
private final Collection<PromiseListener<T>> listeners;
|
private final AtomicReference<Collection<PromiseListener<T>>> listeners;
|
||||||
private final AtomicReference<PromiseCompletion<T>> completion;
|
private final AtomicReference<PromiseCompletion<T>> completion;
|
||||||
|
private final CountDownLatch latch;
|
||||||
|
private final ReentrantLock lock;
|
||||||
|
|
||||||
public AbstractPromise() {
|
public AbstractPromise() {
|
||||||
this.listeners = new ConcurrentLinkedQueue<>();
|
this.listeners = new AtomicReference<>();
|
||||||
this.completion = new AtomicReference<>();
|
this.completion = new AtomicReference<>();
|
||||||
|
this.latch = new CountDownLatch(1);
|
||||||
|
this.lock = new ReentrantLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static <V> void propagateResult(Promise<V> from, Promise<V> to) {
|
||||||
|
from.addDirectListener(to::complete, to::completeExceptionally);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void propagateCancel(Promise<?> from, Promise<?> to) {
|
||||||
|
from.onCancel(to::completeExceptionally);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <V> @NotNull Runnable createRunnable(T result, @NotNull Promise<V> promise, @NotNull ExceptionalFunction<T, V> task) {
|
||||||
|
return () -> {
|
||||||
|
if (promise.isCompleted()) return;
|
||||||
|
|
||||||
|
try {
|
||||||
|
V nextResult = task.apply(result);
|
||||||
|
promise.complete(nextResult);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
promise.completeExceptionally(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract @NotNull AbstractPromiseFactory<F> getFactory();
|
public abstract @NotNull AbstractPromiseFactory<F> getFactory();
|
||||||
@@ -39,24 +62,14 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public T join(long timeoutMillis) throws TimeoutException {
|
public T join(long timeoutMillis) throws TimeoutException {
|
||||||
PromiseCompletion<T> completion;
|
try {
|
||||||
long start = System.currentTimeMillis();
|
//noinspection ResultOfMethodCallIgnored
|
||||||
long remainingTimeout = timeoutMillis;
|
this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
synchronized (this.completion) {
|
throw new RuntimeException(e);
|
||||||
completion = this.completion.get();
|
|
||||||
while (completion == null && remainingTimeout > 0) {
|
|
||||||
try {
|
|
||||||
this.completion.wait(remainingTimeout);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
completion = this.completion.get();
|
|
||||||
remainingTimeout = timeoutMillis - (System.currentTimeMillis() - start);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PromiseCompletion<T> completion = getCompletion();
|
||||||
if (completion == null)
|
if (completion == null)
|
||||||
throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms");
|
throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms");
|
||||||
|
|
||||||
@@ -115,7 +128,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
|||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenApplySync(@NotNull ExceptionalFunction<T, V> task) {
|
public <V> @NotNull Promise<V> thenApplySync(@NotNull ExceptionalFunction<T, V> task) {
|
||||||
Promise<V> promise = getFactory().unresolved();
|
Promise<V> promise = getFactory().unresolved();
|
||||||
addListener(
|
addDirectListener(
|
||||||
res -> {
|
res -> {
|
||||||
Runnable runnable = createRunnable(res, promise, task);
|
Runnable runnable = createRunnable(res, promise, task);
|
||||||
F future = getExecutor().runSync(runnable);
|
F future = getExecutor().runSync(runnable);
|
||||||
@@ -124,14 +137,14 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
|||||||
promise::completeExceptionally
|
promise::completeExceptionally
|
||||||
);
|
);
|
||||||
|
|
||||||
addChild(promise);
|
propagateCancel(promise, this);
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
|
public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
|
||||||
Promise<V> promise = getFactory().unresolved();
|
Promise<V> promise = getFactory().unresolved();
|
||||||
addListener(
|
addDirectListener(
|
||||||
res -> {
|
res -> {
|
||||||
Runnable runnable = createRunnable(res, promise, task);
|
Runnable runnable = createRunnable(res, promise, task);
|
||||||
F future = getExecutor().runSync(runnable, delay, unit);
|
F future = getExecutor().runSync(runnable, delay, unit);
|
||||||
@@ -140,22 +153,26 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
|||||||
promise::completeExceptionally
|
promise::completeExceptionally
|
||||||
);
|
);
|
||||||
|
|
||||||
addChild(promise);
|
propagateCancel(promise, this);
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, @NotNull Promise<V>> task) {
|
public <V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
|
||||||
Promise<V> promise = getFactory().unresolved();
|
Promise<V> promise = getFactory().unresolved();
|
||||||
thenApplySync(task).addListener(
|
thenApplySync(task).addDirectListener(
|
||||||
nestedPromise -> {
|
nestedPromise -> {
|
||||||
nestedPromise.propagateResult(promise);
|
if (nestedPromise == null) {
|
||||||
nestedPromise.addChild(promise);
|
promise.complete(null);
|
||||||
|
} else {
|
||||||
|
propagateResult(nestedPromise, promise);
|
||||||
|
propagateCancel(promise, nestedPromise);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
promise::completeExceptionally
|
promise::completeExceptionally
|
||||||
);
|
);
|
||||||
|
|
||||||
addChild(promise);
|
propagateCancel(promise, this);
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -212,7 +229,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
|||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task) {
|
public <V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task) {
|
||||||
Promise<V> promise = getFactory().unresolved();
|
Promise<V> promise = getFactory().unresolved();
|
||||||
addListener(
|
addDirectListener(
|
||||||
(res) -> {
|
(res) -> {
|
||||||
Runnable runnable = createRunnable(res, promise, task);
|
Runnable runnable = createRunnable(res, promise, task);
|
||||||
F future = getExecutor().runAsync(runnable);
|
F future = getExecutor().runAsync(runnable);
|
||||||
@@ -221,14 +238,14 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
|||||||
promise::completeExceptionally
|
promise::completeExceptionally
|
||||||
);
|
);
|
||||||
|
|
||||||
addChild(promise);
|
propagateCancel(promise, this);
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
|
public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
|
||||||
Promise<V> promise = getFactory().unresolved();
|
Promise<V> promise = getFactory().unresolved();
|
||||||
addListener(
|
addDirectListener(
|
||||||
res -> {
|
res -> {
|
||||||
Runnable runnable = createRunnable(res, promise, task);
|
Runnable runnable = createRunnable(res, promise, task);
|
||||||
F future = getExecutor().runAsync(runnable, delay, unit);
|
F future = getExecutor().runAsync(runnable, delay, unit);
|
||||||
@@ -237,66 +254,42 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
|||||||
promise::completeExceptionally
|
promise::completeExceptionally
|
||||||
);
|
);
|
||||||
|
|
||||||
addChild(promise);
|
propagateCancel(promise, this);
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
|
public <V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
|
||||||
Promise<V> promise = getFactory().unresolved();
|
Promise<V> promise = getFactory().unresolved();
|
||||||
thenApplyAsync(task).addListener(
|
thenApplyAsync(task).addDirectListener(
|
||||||
nestedPromise -> {
|
nestedPromise -> {
|
||||||
nestedPromise.propagateResult(promise);
|
if (nestedPromise == null) {
|
||||||
nestedPromise.addChild(promise);
|
promise.complete(null);
|
||||||
|
} else {
|
||||||
|
propagateResult(nestedPromise, promise);
|
||||||
|
propagateCancel(promise, nestedPromise);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
promise::completeExceptionally
|
promise::completeExceptionally
|
||||||
);
|
);
|
||||||
|
|
||||||
addChild(promise);
|
propagateCancel(promise, this);
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
private <V> @NotNull Runnable createRunnable(T result, @NotNull Promise<V> promise, @NotNull ExceptionalFunction<T, V> task) {
|
@Override
|
||||||
return () -> {
|
public @NotNull Promise<Void> erase() {
|
||||||
if (promise.isCompleted()) return;
|
return thenSupplyAsync(() -> null);
|
||||||
|
|
||||||
try {
|
|
||||||
V nextResult = task.apply(result);
|
|
||||||
promise.complete(nextResult);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
promise.completeExceptionally(e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<T> logExceptions(@NotNull String message) {
|
public @NotNull Promise<T> addAsyncListener(@NotNull AsyncPromiseListener<T> listener) {
|
||||||
return onError(e -> getLogger().error(message, e));
|
return addAnyListener(listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener) {
|
public @NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
|
||||||
synchronized (completion) {
|
return addAsyncListener((res) -> {
|
||||||
if (isCompleted()) {
|
|
||||||
getExecutor().runAsync(() -> {
|
|
||||||
try {
|
|
||||||
//noinspection ConstantConditions
|
|
||||||
listener.handle(getCompletion());
|
|
||||||
} catch (Exception e) {
|
|
||||||
getLogger().error("Exception caught in promise listener", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
getListeners().add(listener);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public @NotNull Promise<T> addListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
|
|
||||||
return addListener((res) -> {
|
|
||||||
if (res.isError()) {
|
if (res.isError()) {
|
||||||
if (errorListener != null) errorListener.accept(res.getException());
|
if (errorListener != null) errorListener.accept(res.getException());
|
||||||
} else {
|
} else {
|
||||||
@@ -305,14 +298,68 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public @NotNull Promise<T> addDirectListener(@NotNull PromiseListener<T> listener) {
|
||||||
|
return addAnyListener(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public @NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
|
||||||
|
return addDirectListener((res) -> {
|
||||||
|
if (res.isError()) {
|
||||||
|
if (errorListener != null) errorListener.accept(res.getException());
|
||||||
|
} else {
|
||||||
|
if (successListener != null) successListener.accept(res.getResult());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private @NotNull Promise<T> addAnyListener(PromiseListener<T> listener) {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
PromiseCompletion<T> completion = getCompletion();
|
||||||
|
if (completion != null) {
|
||||||
|
callListener(listener, completion);
|
||||||
|
} else {
|
||||||
|
listeners.compareAndSet(null, new ConcurrentLinkedQueue<>());
|
||||||
|
listeners.get().add(listener);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void callListener(PromiseListener<T> listener, PromiseCompletion<T> ctx) {
|
||||||
|
if (listener instanceof AsyncPromiseListener<T>) {
|
||||||
|
getExecutor().runAsync(() -> callListenerNow(listener, ctx));
|
||||||
|
} else {
|
||||||
|
callListenerNow(listener, ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void callListenerNow(PromiseListener<T> listener, PromiseCompletion<T> ctx) {
|
||||||
|
try {
|
||||||
|
listener.handle(ctx);
|
||||||
|
} catch (Exception e) {
|
||||||
|
getLogger().error("Exception caught in promise listener", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener) {
|
public @NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener) {
|
||||||
return addListener(listener, null);
|
return addAsyncListener(listener, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<T> onError(@NotNull Consumer<Throwable> listener) {
|
public @NotNull Promise<T> onError(@NotNull Consumer<Throwable> listener) {
|
||||||
return addListener(null, listener);
|
return addAsyncListener(null, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public @NotNull Promise<T> logExceptions(@NotNull String message) {
|
||||||
|
return onError(e -> getLogger().error(message, e));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -339,25 +386,23 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
|||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit) {
|
public @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit) {
|
||||||
F future = getExecutor().runAsync(() -> completeExceptionally(new TimeoutException("Promise stopped waiting after " + time + " " + unit)), time, unit);
|
F future = getExecutor().runAsync(() -> completeExceptionally(new TimeoutException("Promise stopped waiting after " + time + " " + unit)), time, unit);
|
||||||
return onError(e -> getExecutor().cancel(future));
|
return addListener((_v) -> getExecutor().cancel(future));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
|
private void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
|
||||||
synchronized (completion) {
|
if (!setCompletion(ctx)) return;
|
||||||
if (!setCompletion(ctx)) return;
|
|
||||||
|
|
||||||
completion.notifyAll();
|
lock.lock();
|
||||||
getExecutor().runAsync(() -> {
|
try {
|
||||||
for (PromiseListener<T> listener : getListeners()) {
|
this.latch.countDown();
|
||||||
if (!ctx.isActive()) return;
|
Collection<PromiseListener<T>> listeners = this.listeners.get();
|
||||||
|
if (listeners != null) {
|
||||||
try {
|
for (PromiseListener<T> listener : listeners) {
|
||||||
listener.handle(ctx);
|
callListener(listener, ctx);
|
||||||
} catch (Exception e) {
|
|
||||||
getLogger().error("Exception caught in promise listener", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -366,22 +411,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addChild(@NotNull Promise<?> child) {
|
public void cancel(@Nullable String message) {
|
||||||
child.onCancel((e) -> this.cancel(e.getMessage()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void propagateResult(@NotNull Promise<T> target) {
|
|
||||||
addListener(target::complete, target::completeExceptionally);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void cancel() {
|
|
||||||
completeExceptionally(new CancellationException());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void cancel(@NotNull String message) {
|
|
||||||
completeExceptionally(new CancellationException(message));
|
completeExceptionally(new CancellationException(message));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -405,8 +435,16 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
|||||||
return completion.get();
|
return completion.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Collection<PromiseListener<T>> getListeners() {
|
@Override
|
||||||
return listeners;
|
public @NotNull CompletableFuture<T> toFuture() {
|
||||||
|
CompletableFuture<T> future = new CompletableFuture<>();
|
||||||
|
this.addDirectListener(future::complete, future::completeExceptionally);
|
||||||
|
future.whenComplete((res, e) -> {
|
||||||
|
if (e instanceof CancellationException) {
|
||||||
|
this.cancel();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,14 +3,15 @@ package dev.tommyjs.futur.promise;
|
|||||||
import dev.tommyjs.futur.executor.PromiseExecutor;
|
import dev.tommyjs.futur.executor.PromiseExecutor;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
import reactor.core.Disposable;
|
import org.redisson.api.RFuture;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.StreamSupport;
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
@@ -19,9 +20,9 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
|
|||||||
public abstract @NotNull PromiseExecutor<F> getExecutor();
|
public abstract @NotNull PromiseExecutor<F> getExecutor();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
|
public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2) {
|
||||||
List<Promise<?>> promises = List.of(p1, p2);
|
List<Promise<?>> promises = List.of(p1, p2);
|
||||||
return all(promises)
|
return all(propagateCancel, promises)
|
||||||
.thenApplyAsync((res) -> new AbstractMap.SimpleImmutableEntry<>(
|
.thenApplyAsync((res) -> new AbstractMap.SimpleImmutableEntry<>(
|
||||||
Objects.requireNonNull(p1.getCompletion()).getResult(),
|
Objects.requireNonNull(p1.getCompletion()).getResult(),
|
||||||
Objects.requireNonNull(p2.getCompletion()).getResult()
|
Objects.requireNonNull(p2.getCompletion()).getResult()
|
||||||
@@ -29,13 +30,17 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
|
public <K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
|
||||||
if (promises.isEmpty()) return resolve(Collections.emptyMap());
|
if (promises.isEmpty()) return resolve(Collections.emptyMap());
|
||||||
|
|
||||||
Map<K, V> map = new HashMap<>();
|
Map<K, V> map = new HashMap<>();
|
||||||
Promise<Map<K, V>> promise = unresolved();
|
Promise<Map<K, V>> promise = unresolved();
|
||||||
for (Map.Entry<K, Promise<V>> entry : promises.entrySet()) {
|
for (Map.Entry<K, Promise<V>> entry : promises.entrySet()) {
|
||||||
entry.getValue().addListener((ctx) -> {
|
if (propagateCancel) {
|
||||||
|
AbstractPromise.propagateCancel(promise, entry.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
entry.getValue().addDirectListener((ctx) -> {
|
||||||
synchronized (map) {
|
synchronized (map) {
|
||||||
if (ctx.getException() != null) {
|
if (ctx.getException() != null) {
|
||||||
if (exceptionHandler == null) {
|
if (exceptionHandler == null) {
|
||||||
@@ -59,12 +64,13 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable Consumer<Throwable> exceptionHandler) {
|
public <V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
|
||||||
AtomicInteger index = new AtomicInteger();
|
AtomicInteger index = new AtomicInteger();
|
||||||
return this.combine(
|
return this.combine(
|
||||||
|
propagateCancel,
|
||||||
StreamSupport.stream(promises.spliterator(), false)
|
StreamSupport.stream(promises.spliterator(), false)
|
||||||
.collect(Collectors.toMap(k -> index.getAndIncrement(), v -> v)),
|
.collect(Collectors.toMap(k -> index.getAndIncrement(), v -> v)),
|
||||||
exceptionHandler != null ? (i, e) -> exceptionHandler.accept(e) : null
|
exceptionHandler
|
||||||
).thenApplyAsync(v ->
|
).thenApplyAsync(v ->
|
||||||
v.entrySet().stream()
|
v.entrySet().stream()
|
||||||
.sorted(Map.Entry.comparingByKey())
|
.sorted(Map.Entry.comparingByKey())
|
||||||
@@ -74,12 +80,7 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
|
public @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
|
||||||
return combine(promises, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
|
|
||||||
List<Promise<?>> promises = new ArrayList<>();
|
List<Promise<?>> promises = new ArrayList<>();
|
||||||
promiseIterable.iterator().forEachRemaining(promises::add);
|
promiseIterable.iterator().forEachRemaining(promises::add);
|
||||||
|
|
||||||
@@ -91,7 +92,13 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
|
|||||||
|
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
int index = iter.nextIndex();
|
int index = iter.nextIndex();
|
||||||
iter.next().addListener((res) -> {
|
var p = iter.next();
|
||||||
|
|
||||||
|
if (propagateCancel) {
|
||||||
|
AbstractPromise.propagateCancel(promise, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
p.addDirectListener((res) -> {
|
||||||
synchronized (results) {
|
synchronized (results) {
|
||||||
results[index] = res;
|
results[index] = res;
|
||||||
if (Arrays.stream(results).allMatch(Objects::nonNull))
|
if (Arrays.stream(results).allMatch(Objects::nonNull))
|
||||||
@@ -104,7 +111,7 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
|
public @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
|
||||||
List<Promise<?>> promises = new ArrayList<>();
|
List<Promise<?>> promises = new ArrayList<>();
|
||||||
promiseIterable.iterator().forEachRemaining(promises::add);
|
promiseIterable.iterator().forEachRemaining(promises::add);
|
||||||
|
|
||||||
@@ -113,12 +120,14 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
|
|||||||
Promise<Void> promise = unresolved();
|
Promise<Void> promise = unresolved();
|
||||||
|
|
||||||
for (Promise<?> p : promises) {
|
for (Promise<?> p : promises) {
|
||||||
p.addListener((res) -> {
|
if (propagateCancel) {
|
||||||
|
AbstractPromise.propagateCancel(promise, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
p.addDirectListener((res) -> {
|
||||||
if (res.getException() != null) {
|
if (res.getException() != null) {
|
||||||
promise.completeExceptionally(res.getException());
|
promise.completeExceptionally(res.getException());
|
||||||
}
|
} else if (completed.incrementAndGet() == promises.size()) {
|
||||||
|
|
||||||
if (completed.incrementAndGet() == promises.size()) {
|
|
||||||
promise.complete(null);
|
promise.complete(null);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -127,11 +136,37 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
|
|||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises) {
|
||||||
|
Promise<V> promise = unresolved();
|
||||||
|
for (Promise<V> p : promises) {
|
||||||
|
if (cancelRaceLosers) {
|
||||||
|
promise.addListener((res) -> p.cancel());
|
||||||
|
}
|
||||||
|
AbstractPromise.propagateResult(p, promise);
|
||||||
|
}
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> @NotNull Promise<T> wrapMono(@NotNull Mono<T> mono) {
|
||||||
|
return wrap(mono.toFuture());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) {
|
public <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) {
|
||||||
|
return wrap(future, future);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> @NotNull Promise<T> wrapRedisson(@NotNull RFuture<T> future) {
|
||||||
|
return wrap(future, future);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> @NotNull Promise<T> wrap(@NotNull CompletionStage<T> completion, Future<T> future) {
|
||||||
Promise<T> promise = unresolved();
|
Promise<T> promise = unresolved();
|
||||||
|
|
||||||
future.whenComplete((v, e) -> {
|
completion.whenComplete((v, e) -> {
|
||||||
if (e != null) {
|
if (e != null) {
|
||||||
promise.completeExceptionally(e);
|
promise.completeExceptionally(e);
|
||||||
} else {
|
} else {
|
||||||
@@ -143,14 +178,6 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
|
|||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> @NotNull Promise<T> wrap(@NotNull Mono<T> mono) {
|
|
||||||
Promise<T> promise = this.unresolved();
|
|
||||||
Disposable disposable = mono.subscribe(promise::complete, promise::completeExceptionally);
|
|
||||||
promise.onCancel((e) -> disposable.dispose());
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> @NotNull Promise<T> resolve(T value) {
|
public <T> @NotNull Promise<T> resolve(T value) {
|
||||||
Promise<T> promise = unresolved();
|
Promise<T> promise = unresolved();
|
||||||
@@ -165,17 +192,4 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
|
|||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
|
|
||||||
Promise<Void> promise = unresolved();
|
|
||||||
p.addListener(ctx -> {
|
|
||||||
if (ctx.getException() != null) {
|
|
||||||
promise.completeExceptionally(ctx.getException());
|
|
||||||
} else {
|
|
||||||
promise.complete(null);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,5 @@
|
|||||||
|
package dev.tommyjs.futur.promise;
|
||||||
|
|
||||||
|
public interface AsyncPromiseListener<T> extends PromiseListener<T> {
|
||||||
|
|
||||||
|
}
|
||||||
@@ -8,6 +8,7 @@ import org.jetbrains.annotations.NotNull;
|
|||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
@@ -33,7 +34,7 @@ public interface Promise<T> {
|
|||||||
|
|
||||||
<V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit);
|
<V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit);
|
||||||
|
|
||||||
<V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, @NotNull Promise<V>> task);
|
<V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, Promise<V>> task);
|
||||||
|
|
||||||
@NotNull Promise<Void> thenRunAsync(@NotNull ExceptionalRunnable task);
|
@NotNull Promise<Void> thenRunAsync(@NotNull ExceptionalRunnable task);
|
||||||
|
|
||||||
@@ -55,15 +56,34 @@ public interface Promise<T> {
|
|||||||
|
|
||||||
<V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task);
|
<V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task);
|
||||||
|
|
||||||
@NotNull Promise<T> logExceptions(@NotNull String message);
|
@NotNull Promise<Void> erase();
|
||||||
|
|
||||||
default @NotNull Promise<T> logExceptions() {
|
default @NotNull Promise<T> logExceptions() {
|
||||||
return logExceptions("Exception caught in promise chain");
|
return logExceptions("Exception caught in promise chain");
|
||||||
}
|
}
|
||||||
|
|
||||||
@NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener);
|
@NotNull Promise<T> logExceptions(@NotNull String message);
|
||||||
|
|
||||||
@NotNull Promise<T> addListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
|
/**
|
||||||
|
* @apiNote Direct listeners run on the same thread as the completion.
|
||||||
|
*/
|
||||||
|
@NotNull Promise<T> addDirectListener(@NotNull PromiseListener<T> listener);
|
||||||
|
|
||||||
|
@NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @apiNote Async listeners are run in parallel.
|
||||||
|
*/
|
||||||
|
@NotNull Promise<T> addAsyncListener(@NotNull AsyncPromiseListener<T> listener);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @apiNote Same as addAsyncListener.
|
||||||
|
*/
|
||||||
|
default @NotNull Promise<T> addListener(@NotNull AsyncPromiseListener<T> listener) {
|
||||||
|
return addAsyncListener(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
|
||||||
|
|
||||||
@NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener);
|
@NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener);
|
||||||
|
|
||||||
@@ -73,9 +93,15 @@ public interface Promise<T> {
|
|||||||
|
|
||||||
@NotNull Promise<T> onCancel(@NotNull Consumer<CancellationException> listener);
|
@NotNull Promise<T> onCancel(@NotNull Consumer<CancellationException> listener);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated Use maxWaitTime instead
|
||||||
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);
|
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated Use maxWaitTime instead
|
||||||
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
default @NotNull Promise<T> timeout(long ms) {
|
default @NotNull Promise<T> timeout(long ms) {
|
||||||
return timeout(ms, TimeUnit.MILLISECONDS);
|
return timeout(ms, TimeUnit.MILLISECONDS);
|
||||||
@@ -87,10 +113,6 @@ public interface Promise<T> {
|
|||||||
return maxWaitTime(ms, TimeUnit.MILLISECONDS);
|
return maxWaitTime(ms, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addChild(@NotNull Promise<?> child);
|
|
||||||
|
|
||||||
void propagateResult(@NotNull Promise<T> target);
|
|
||||||
|
|
||||||
void cancel(@Nullable String reason);
|
void cancel(@Nullable String reason);
|
||||||
|
|
||||||
default void cancel() {
|
default void cancel() {
|
||||||
@@ -107,4 +129,6 @@ public interface Promise<T> {
|
|||||||
|
|
||||||
boolean isCompleted();
|
boolean isCompleted();
|
||||||
|
|
||||||
|
@NotNull CompletableFuture<T> toFuture();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,29 +9,17 @@ public class PromiseCompletion<T> {
|
|||||||
|
|
||||||
private @Nullable T result;
|
private @Nullable T result;
|
||||||
private @Nullable Throwable exception;
|
private @Nullable Throwable exception;
|
||||||
private boolean active;
|
|
||||||
|
|
||||||
public PromiseCompletion(@Nullable T result) {
|
public PromiseCompletion(@Nullable T result) {
|
||||||
this.result = result;
|
this.result = result;
|
||||||
this.active = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public PromiseCompletion(@NotNull Throwable exception) {
|
public PromiseCompletion(@NotNull Throwable exception) {
|
||||||
this.exception = exception;
|
this.exception = exception;
|
||||||
this.active = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public PromiseCompletion() {
|
public PromiseCompletion() {
|
||||||
this.result = null;
|
this.result = null;
|
||||||
this.active = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void markHandled() {
|
|
||||||
this.active = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isActive() {
|
|
||||||
return active;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isError() {
|
public boolean isError() {
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package dev.tommyjs.futur.promise;
|
|||||||
|
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
import org.redisson.api.RFuture;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
@@ -10,7 +11,6 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
public interface PromiseFactory {
|
public interface PromiseFactory {
|
||||||
|
|
||||||
@@ -18,44 +18,89 @@ public interface PromiseFactory {
|
|||||||
|
|
||||||
<T> @NotNull Promise<T> unresolved();
|
<T> @NotNull Promise<T> unresolved();
|
||||||
|
|
||||||
<K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2);
|
<K, V> @NotNull Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2);
|
||||||
|
|
||||||
<K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler);
|
default <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
|
||||||
|
return combine(false, p1, p2);
|
||||||
|
}
|
||||||
|
|
||||||
|
<K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler);
|
||||||
|
|
||||||
|
default <K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises) {
|
||||||
|
return combine(propagateCancel, promises, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
|
||||||
|
return combine(false, promises, exceptionHandler);
|
||||||
|
}
|
||||||
|
|
||||||
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
|
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
|
||||||
return combine(promises, null);
|
return combine(promises, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
<V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable Consumer<Throwable> exceptionHandler);
|
<V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler);
|
||||||
|
|
||||||
|
default <V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises) {
|
||||||
|
return combine(propagateCancel, promises, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
|
||||||
|
return combine(false, promises, exceptionHandler);
|
||||||
|
}
|
||||||
|
|
||||||
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
|
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
|
||||||
return combine(promises, null);
|
return combine(promises, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable);
|
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable);
|
||||||
|
|
||||||
|
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
|
||||||
|
return allSettled(false, promiseIterable);
|
||||||
|
}
|
||||||
|
|
||||||
|
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
|
||||||
|
return allSettled(propagateCancel, Arrays.asList(promiseArray));
|
||||||
|
}
|
||||||
|
|
||||||
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
|
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
|
||||||
return allSettled(Arrays.asList(promiseArray));
|
return allSettled(false, promiseArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
@NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable);
|
@NotNull Promise<Void> all(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable);
|
||||||
|
|
||||||
|
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
|
||||||
|
return all(false, promiseIterable);
|
||||||
|
}
|
||||||
|
|
||||||
|
default @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
|
||||||
|
return all(propagateCancel, Arrays.asList(promiseArray));
|
||||||
|
}
|
||||||
|
|
||||||
default @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
|
default @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
|
||||||
return all(Arrays.asList(promiseArray));
|
return all(false, promiseArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @apiNote Even with cancelRaceLosers, it is not guaranteed that only one promise will complete.
|
||||||
|
*/
|
||||||
|
<V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises);
|
||||||
|
|
||||||
|
default <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises) {
|
||||||
|
return race(false, promises);
|
||||||
|
}
|
||||||
|
|
||||||
|
<T> @NotNull Promise<T> wrapMono(@NotNull Mono<T> mono);
|
||||||
|
|
||||||
|
<T> @NotNull Promise<T> wrapRedisson(@NotNull RFuture<T> future);
|
||||||
|
|
||||||
<T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future);
|
<T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future);
|
||||||
|
|
||||||
<T> @NotNull Promise<T> wrap(@NotNull Mono<T> mono);
|
|
||||||
|
|
||||||
<T> @NotNull Promise<T> resolve(T value);
|
|
||||||
|
|
||||||
default @NotNull Promise<Void> start() {
|
default @NotNull Promise<Void> start() {
|
||||||
return resolve(null);
|
return resolve(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<T> @NotNull Promise<T> resolve(T value);
|
||||||
|
|
||||||
<T> @NotNull Promise<T> error(@NotNull Throwable error);
|
<T> @NotNull Promise<T> error(@NotNull Throwable error);
|
||||||
|
|
||||||
@NotNull Promise<Void> erase(@NotNull Promise<?> p);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,38 +17,38 @@ import java.util.function.BiConsumer;
|
|||||||
@Deprecated
|
@Deprecated
|
||||||
public class Promises {
|
public class Promises {
|
||||||
|
|
||||||
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2, PromiseFactory factory) {
|
|
||||||
return factory.combine(p1, p2);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
|
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
|
||||||
return combine(p1, p2, p1.getFactory());
|
return combine(p1, p2, p1.getFactory());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, @Nullable BiConsumer<K, Throwable> exceptionHandler, PromiseFactory factory) {
|
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2, PromiseFactory factory) {
|
||||||
return factory.combine(promises, exceptionHandler).timeout(timeout);
|
return factory.combine(p1, p2);
|
||||||
}
|
|
||||||
|
|
||||||
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
|
|
||||||
return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, PromiseFactory factory) {
|
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, PromiseFactory factory) {
|
||||||
return combine(promises, timeout, true, factory);
|
return combine(promises, timeout, true, factory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
|
||||||
|
return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, @Nullable BiConsumer<K, Throwable> exceptionHandler, PromiseFactory factory) {
|
||||||
|
return factory.combine(promises, exceptionHandler).timeout(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, PromiseFactory factory) {
|
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, PromiseFactory factory) {
|
||||||
return combine(promises, 1500L, true, factory);
|
return combine(promises, 1500L, true, factory);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
|
|
||||||
return factory.combine(promises, strict ? null : (_v) -> {}).timeout(timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, PromiseFactory factory) {
|
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, PromiseFactory factory) {
|
||||||
return combine(promises, timeout, true, factory);
|
return combine(promises, timeout, true, factory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
|
||||||
|
return factory.combine(promises, strict ? null : (_i, _v) -> {}).timeout(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, PromiseFactory factory) {
|
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, PromiseFactory factory) {
|
||||||
return combine(promises, 1500L, true, factory);
|
return combine(promises, 1500L, true, factory);
|
||||||
}
|
}
|
||||||
@@ -57,6 +57,10 @@ public class Promises {
|
|||||||
return factory.all(promises);
|
return factory.all(promises);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, PromiseFactory factory) {
|
||||||
|
return combine(keys, mapper, timeout, true, factory);
|
||||||
|
}
|
||||||
|
|
||||||
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, boolean strict, PromiseFactory factory) {
|
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, boolean strict, PromiseFactory factory) {
|
||||||
Map<K, Promise<V>> promises = new HashMap<>();
|
Map<K, Promise<V>> promises = new HashMap<>();
|
||||||
for (K key : keys) {
|
for (K key : keys) {
|
||||||
@@ -67,22 +71,18 @@ public class Promises {
|
|||||||
return combine(promises, timeout, strict, factory);
|
return combine(promises, timeout, strict, factory);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, PromiseFactory factory) {
|
|
||||||
return combine(keys, mapper, timeout, true, factory);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, PromiseFactory factory) {
|
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, PromiseFactory factory) {
|
||||||
return combine(keys, mapper, 1500L, true, factory);
|
return combine(keys, mapper, 1500L, true, factory);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p, PromiseFactory factory) {
|
|
||||||
return factory.erase(p);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
|
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
|
||||||
return erase(p, p.getFactory());
|
return erase(p, p.getFactory());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p, PromiseFactory factory) {
|
||||||
|
return p.erase();
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future, PromiseFactory factory) {
|
public static <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future, PromiseFactory factory) {
|
||||||
return factory.wrap(future);
|
return factory.wrap(future);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,40 +9,161 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
public final class PromiseTests {
|
public final class PromiseTests {
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(PromiseTests.class);
|
private final Logger logger = LoggerFactory.getLogger(PromiseTests.class);
|
||||||
private final PromiseExecutor<ScheduledFuture<?>> executor = SinglePoolExecutor.create(1);
|
private final PromiseExecutor<Future<?>> executor = SinglePoolExecutor.create(5);
|
||||||
private final PromiseFactory pfac = new SimplePromiseFactory<>(executor, logger);
|
private final PromiseFactory pfac = new SimplePromiseFactory<>(executor, logger);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testMono() {
|
public void testMono() {
|
||||||
Exception value = new Exception("Test Error");
|
Exception value = new Exception("Test Error");
|
||||||
|
|
||||||
var error = pfac.wrap(Mono.error(value));
|
var error = pfac.wrapMono(Mono.error(value));
|
||||||
assert Objects.requireNonNull(error.getCompletion()).isError();
|
assert Objects.requireNonNull(error.getCompletion()).isError();
|
||||||
assert error.getCompletion().getException() == value;
|
assert error.getCompletion().getException() == value;
|
||||||
|
|
||||||
var resolved = pfac.wrap(Mono.just(value));
|
var resolved = pfac.wrapMono(Mono.just(value));
|
||||||
assert !Objects.requireNonNull(resolved.getCompletion()).isError();
|
assert !Objects.requireNonNull(resolved.getCompletion()).isError();
|
||||||
assert resolved.getCompletion().getResult() == value;
|
assert resolved.getCompletion().getResult() == value;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testErrorCancellation() throws InterruptedException {
|
public void testErrorCancellation() throws InterruptedException {
|
||||||
var finish = new AtomicBoolean();
|
var finished = new AtomicBoolean();
|
||||||
pfac.start()
|
pfac.start()
|
||||||
.thenRunDelayedAsync(() -> finish.set(true), 50, TimeUnit.MILLISECONDS)
|
.thenRunDelayedAsync(() -> finished.set(true), 50, TimeUnit.MILLISECONDS)
|
||||||
.thenRunAsync(() -> {})
|
.thenRunAsync(() -> {})
|
||||||
.cancel();
|
.cancel();
|
||||||
|
|
||||||
Thread.sleep(100L);
|
Thread.sleep(100L);
|
||||||
assert !finish.get();
|
assert !finished.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testToFuture() throws InterruptedException {
|
||||||
|
assert pfac.resolve(true).toFuture().getNow(false);
|
||||||
|
assert pfac.error(new Exception("Test")).toFuture().isCompletedExceptionally();
|
||||||
|
|
||||||
|
var finished = new AtomicBoolean();
|
||||||
|
pfac.start()
|
||||||
|
.thenRunDelayedAsync(() -> finished.set(true), 50, TimeUnit.MILLISECONDS)
|
||||||
|
.toFuture()
|
||||||
|
.cancel(true);
|
||||||
|
|
||||||
|
Thread.sleep(100L);
|
||||||
|
assert !finished.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCombineUtil() throws TimeoutException {
|
||||||
|
pfac.all(
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS),
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS)
|
||||||
|
)
|
||||||
|
.join(100L);
|
||||||
|
|
||||||
|
pfac.allSettled(
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS),
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS)
|
||||||
|
)
|
||||||
|
.join(100L);
|
||||||
|
|
||||||
|
pfac.combine(
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS),
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS)
|
||||||
|
)
|
||||||
|
.join(100L);
|
||||||
|
|
||||||
|
pfac.combine(
|
||||||
|
List.of(
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> {}, 49, TimeUnit.MILLISECONDS),
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS),
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> {}, 51, TimeUnit.MILLISECONDS)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.join(100L);
|
||||||
|
|
||||||
|
pfac.combine(
|
||||||
|
Map.of(
|
||||||
|
"a", pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS),
|
||||||
|
"b", pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.join(100L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCombineUtilPropagation() throws InterruptedException {
|
||||||
|
var finished1 = new AtomicBoolean();
|
||||||
|
pfac.all(
|
||||||
|
true,
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> finished1.set(true), 50, TimeUnit.MILLISECONDS),
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> finished1.set(true), 50, TimeUnit.MILLISECONDS)
|
||||||
|
)
|
||||||
|
.cancel();
|
||||||
|
|
||||||
|
var finished2 = new AtomicBoolean();
|
||||||
|
pfac.allSettled(
|
||||||
|
true,
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> finished2.set(true), 50, TimeUnit.MILLISECONDS),
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> finished2.set(true), 50, TimeUnit.MILLISECONDS)
|
||||||
|
)
|
||||||
|
.cancel();
|
||||||
|
|
||||||
|
var finished3 = new AtomicBoolean();
|
||||||
|
pfac.combine(
|
||||||
|
true,
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> finished3.set(true), 50, TimeUnit.MILLISECONDS),
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> finished3.set(true), 50, TimeUnit.MILLISECONDS)
|
||||||
|
)
|
||||||
|
.cancel();
|
||||||
|
|
||||||
|
var finished4 = new AtomicBoolean();
|
||||||
|
pfac.combine(
|
||||||
|
true,
|
||||||
|
List.of(
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS),
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS),
|
||||||
|
pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.cancel();
|
||||||
|
|
||||||
|
var finished5 = new AtomicBoolean();
|
||||||
|
pfac.combine(
|
||||||
|
true,
|
||||||
|
Map.of(
|
||||||
|
"a", pfac.start().thenRunDelayedAsync(() -> finished5.set(true), 50, TimeUnit.MILLISECONDS),
|
||||||
|
"b", pfac.start().thenRunDelayedAsync(() -> finished5.set(true), 50, TimeUnit.MILLISECONDS)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.cancel();
|
||||||
|
|
||||||
|
Thread.sleep(100L);
|
||||||
|
assert !finished1.get();
|
||||||
|
assert !finished2.get();
|
||||||
|
assert !finished3.get();
|
||||||
|
assert !finished4.get();
|
||||||
|
assert !finished5.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRace() throws TimeoutException {
|
||||||
|
assert pfac.race(
|
||||||
|
List.of(
|
||||||
|
pfac.start().thenSupplyDelayedAsync(() -> true, 150, TimeUnit.MILLISECONDS),
|
||||||
|
pfac.start().thenSupplyDelayedAsync(() -> false, 200, TimeUnit.MILLISECONDS)
|
||||||
|
)
|
||||||
|
).join(300L);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import dev.tommyjs.futur.promise.PromiseCompletion;
|
|||||||
import dev.tommyjs.futur.promise.PromiseFactory;
|
import dev.tommyjs.futur.promise.PromiseFactory;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.jetbrains.annotations.Nullable;
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
import org.redisson.api.RFuture;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
@@ -12,7 +13,6 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
public final class PromiseUtil {
|
public final class PromiseUtil {
|
||||||
|
|
||||||
@@ -30,64 +30,108 @@ public final class PromiseUtil {
|
|||||||
return pfac.unresolved();
|
return pfac.unresolved();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static @NotNull <K, V> Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2) {
|
||||||
|
return pfac.combine(propagateCancel, p1, p2);
|
||||||
|
}
|
||||||
|
|
||||||
public static @NotNull <K, V> Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
|
public static @NotNull <K, V> Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
|
||||||
return pfac.combine(p1, p2);
|
return pfac.combine(p1, p2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static @NotNull <K, V> Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
|
||||||
|
return pfac.combine(propagateCancel, promises, exceptionHandler);
|
||||||
|
}
|
||||||
|
|
||||||
public static @NotNull <K, V> Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
|
public static @NotNull <K, V> Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
|
||||||
return pfac.combine(promises, exceptionHandler);
|
return pfac.combine(promises, exceptionHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static @NotNull <K, V> Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises) {
|
||||||
|
return pfac.combine(propagateCancel, promises);
|
||||||
|
}
|
||||||
|
|
||||||
public static @NotNull <K, V> Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
|
public static @NotNull <K, V> Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
|
||||||
return pfac.combine(promises);
|
return pfac.combine(promises);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static @NotNull <V> Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable Consumer<Throwable> exceptionHandler) {
|
public static @NotNull <V> Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
|
||||||
|
return pfac.combine(propagateCancel, promises, exceptionHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static @NotNull <V> Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
|
||||||
return pfac.combine(promises, exceptionHandler);
|
return pfac.combine(promises, exceptionHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static @NotNull <V> Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises) {
|
||||||
|
return pfac.combine(propagateCancel, promises);
|
||||||
|
}
|
||||||
|
|
||||||
public static @NotNull <V> Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
|
public static @NotNull <V> Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
|
||||||
return pfac.combine(promises);
|
return pfac.combine(promises);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
|
||||||
|
return pfac.allSettled(propagateCancel, promiseIterable);
|
||||||
|
}
|
||||||
|
|
||||||
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
|
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
|
||||||
return pfac.allSettled(promiseIterable);
|
return pfac.allSettled(promiseIterable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
|
||||||
|
return pfac.allSettled(propagateCancel, promiseArray);
|
||||||
|
}
|
||||||
|
|
||||||
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
|
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
|
||||||
return pfac.allSettled(promiseArray);
|
return pfac.allSettled(promiseArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
|
||||||
|
return pfac.all(propagateCancel, promiseIterable);
|
||||||
|
}
|
||||||
|
|
||||||
public static @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
|
public static @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
|
||||||
return pfac.all(promiseIterable);
|
return pfac.all(promiseIterable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
|
||||||
|
return pfac.all(propagateCancel, promiseArray);
|
||||||
|
}
|
||||||
|
|
||||||
public static @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
|
public static @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
|
||||||
return pfac.all(promiseArray);
|
return pfac.all(promiseArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises) {
|
||||||
|
return pfac.race(promises);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises) {
|
||||||
|
return pfac.race(cancelRaceLosers, promises);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static @NotNull <T> Promise<T> wrapMono(@NotNull Mono<T> mono) {
|
||||||
|
return pfac.wrapMono(mono);
|
||||||
|
}
|
||||||
|
|
||||||
public static @NotNull <T> Promise<T> wrap(@NotNull CompletableFuture<T> future) {
|
public static @NotNull <T> Promise<T> wrap(@NotNull CompletableFuture<T> future) {
|
||||||
return pfac.wrap(future);
|
return pfac.wrap(future);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static @NotNull <T> Promise<T> wrap(@NotNull Mono<T> mono) {
|
public static @NotNull <T> Promise<T> wrapRedisson(@NotNull RFuture<T> future) {
|
||||||
return pfac.wrap(mono);
|
return pfac.wrapRedisson(future);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static @NotNull <T> Promise<T> resolve(T value) {
|
public static @NotNull <T> Promise<T> resolve(T value) {
|
||||||
return pfac.resolve(value);
|
return pfac.resolve(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static @NotNull <T> Promise<T> error(@NotNull Throwable error) {
|
|
||||||
return pfac.error(error);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
|
|
||||||
return pfac.erase(p);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static @NotNull Promise<Void> start() {
|
public static @NotNull Promise<Void> start() {
|
||||||
return pfac.start();
|
return pfac.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static @NotNull <T> Promise<T> error(@NotNull Throwable error) {
|
||||||
|
return pfac.error(error);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,9 +9,9 @@ import org.jetbrains.annotations.NotNull;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
public final class StaticPromiseFactory extends AbstractPromiseFactory<ScheduledFuture<?>> {
|
public final class StaticPromiseFactory extends AbstractPromiseFactory<Future<?>> {
|
||||||
|
|
||||||
public final static StaticPromiseFactory INSTANCE = new StaticPromiseFactory();
|
public final static StaticPromiseFactory INSTANCE = new StaticPromiseFactory();
|
||||||
private final static @NotNull SinglePoolExecutor EXECUTOR = SinglePoolExecutor.create(1);
|
private final static @NotNull SinglePoolExecutor EXECUTOR = SinglePoolExecutor.create(1);
|
||||||
@@ -21,18 +21,18 @@ public final class StaticPromiseFactory extends AbstractPromiseFactory<Scheduled
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public @NotNull <T> Promise<T> unresolved() {
|
|
||||||
return new SimplePromise<>(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Logger getLogger() {
|
public @NotNull Logger getLogger() {
|
||||||
return LOGGER;
|
return LOGGER;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull PromiseExecutor<ScheduledFuture<?>> getExecutor() {
|
public @NotNull <T> Promise<T> unresolved() {
|
||||||
|
return new SimplePromise<>(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public @NotNull PromiseExecutor<Future<?>> getExecutor() {
|
||||||
return EXECUTOR;
|
return EXECUTOR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user