promise util with propagate cancellation options

This commit is contained in:
WhatCats
2024-04-06 13:55:50 +02:00
parent 5bbcfdc9b3
commit 8ba023c04a
10 changed files with 330 additions and 142 deletions

View File

@@ -14,7 +14,7 @@ nexusPublishing {
subprojects { subprojects {
group = 'dev.tommyjs' group = 'dev.tommyjs'
version = '2.2.0' version = '2.3.0'
apply plugin: 'java' apply plugin: 'java'
apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'com.github.johnrengelman.shadow'

View File

@@ -3,11 +3,11 @@ package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class DualPoolExecutor implements PromiseExecutor<ScheduledFuture<?>> { public class DualPoolExecutor implements PromiseExecutor<Future<?>> {
private final @NotNull ScheduledExecutorService syncSvc; private final @NotNull ScheduledExecutorService syncSvc;
private final @NotNull ScheduledExecutorService asyncSvc; private final @NotNull ScheduledExecutorService asyncSvc;
@@ -22,17 +22,17 @@ public class DualPoolExecutor implements PromiseExecutor<ScheduledFuture<?>> {
} }
@Override @Override
public ScheduledFuture<?> runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { public Future<?> runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return syncSvc.schedule(task, delay, unit); return syncSvc.schedule(task, delay, unit);
} }
@Override @Override
public ScheduledFuture<?> runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { public Future<?> runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return asyncSvc.schedule(task, delay, unit); return asyncSvc.schedule(task, delay, unit);
} }
@Override @Override
public void cancel(ScheduledFuture<?> task) { public void cancel(Future<?> task) {
task.cancel(true); task.cancel(true);
} }

View File

@@ -27,6 +27,27 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
this.completion = new AtomicReference<>(); this.completion = new AtomicReference<>();
} }
protected static <V> void propagateResult(Promise<V> from, Promise<V> to) {
from.addListener(to::complete, to::completeExceptionally);
}
protected static void propagateCancel(Promise<?> from, Promise<?> to) {
from.onCancel(to::completeExceptionally);
}
private <V> @NotNull Runnable createRunnable(T result, @NotNull Promise<V> promise, @NotNull ExceptionalFunction<T, V> task) {
return () -> {
if (promise.isCompleted()) return;
try {
V nextResult = task.apply(result);
promise.complete(nextResult);
} catch (Throwable e) {
promise.completeExceptionally(e);
}
};
}
public abstract @NotNull AbstractPromiseFactory<F> getFactory(); public abstract @NotNull AbstractPromiseFactory<F> getFactory();
protected @NotNull PromiseExecutor<F> getExecutor() { protected @NotNull PromiseExecutor<F> getExecutor() {
@@ -124,7 +145,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
promise::completeExceptionally promise::completeExceptionally
); );
addChild(promise); propagateCancel(promise, this);
return promise; return promise;
} }
@@ -140,7 +161,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
promise::completeExceptionally promise::completeExceptionally
); );
addChild(promise); propagateCancel(promise, this);
return promise; return promise;
} }
@@ -149,13 +170,13 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
Promise<V> promise = getFactory().unresolved(); Promise<V> promise = getFactory().unresolved();
thenApplySync(task).addListener( thenApplySync(task).addListener(
nestedPromise -> { nestedPromise -> {
nestedPromise.propagateResult(promise); propagateResult(nestedPromise, promise);
nestedPromise.addChild(promise); propagateCancel(promise, nestedPromise);
}, },
promise::completeExceptionally promise::completeExceptionally
); );
addChild(promise); propagateCancel(promise, this);
return promise; return promise;
} }
@@ -221,7 +242,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
promise::completeExceptionally promise::completeExceptionally
); );
addChild(promise); propagateCancel(promise, this);
return promise; return promise;
} }
@@ -237,7 +258,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
promise::completeExceptionally promise::completeExceptionally
); );
addChild(promise); propagateCancel(promise, this);
return promise; return promise;
} }
@@ -246,28 +267,21 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
Promise<V> promise = getFactory().unresolved(); Promise<V> promise = getFactory().unresolved();
thenApplyAsync(task).addListener( thenApplyAsync(task).addListener(
nestedPromise -> { nestedPromise -> {
nestedPromise.propagateResult(promise); propagateResult(nestedPromise, promise);
nestedPromise.addChild(promise); propagateCancel(promise, nestedPromise);
}, },
promise::completeExceptionally promise::completeExceptionally
); );
addChild(promise); propagateCancel(promise, this);
return promise; return promise;
} }
private <V> @NotNull Runnable createRunnable(T result, @NotNull Promise<V> promise, @NotNull ExceptionalFunction<T, V> task) { @Override
return () -> { public @NotNull Promise<Void> erase() {
if (promise.isCompleted()) return; return thenSupplyAsync(() -> null);
}
try {
V nextResult = task.apply(result);
promise.complete(nextResult);
} catch (Throwable e) {
promise.completeExceptionally(e);
}
};
}
@Override @Override
public @NotNull Promise<T> logExceptions(@NotNull String message) { public @NotNull Promise<T> logExceptions(@NotNull String message) {
@@ -365,16 +379,6 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
return this.completion.compareAndSet(null, completion); return this.completion.compareAndSet(null, completion);
} }
@Override
public void addChild(@NotNull Promise<?> child) {
child.onCancel((e) -> this.cancel(e.getMessage()));
}
@Override
public void propagateResult(@NotNull Promise<T> target) {
addListener(target::complete, target::completeExceptionally);
}
@Override @Override
public void cancel() { public void cancel() {
completeExceptionally(new CancellationException()); completeExceptionally(new CancellationException());

View File

@@ -3,14 +3,12 @@ package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor; import dev.tommyjs.futur.executor.PromiseExecutor;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.Disposable;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
@@ -19,9 +17,9 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
public abstract @NotNull PromiseExecutor<F> getExecutor(); public abstract @NotNull PromiseExecutor<F> getExecutor();
@Override @Override
public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) { public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2) {
List<Promise<?>> promises = List.of(p1, p2); List<Promise<?>> promises = List.of(p1, p2);
return all(promises) return all(propagateCancel, promises)
.thenApplyAsync((res) -> new AbstractMap.SimpleImmutableEntry<>( .thenApplyAsync((res) -> new AbstractMap.SimpleImmutableEntry<>(
Objects.requireNonNull(p1.getCompletion()).getResult(), Objects.requireNonNull(p1.getCompletion()).getResult(),
Objects.requireNonNull(p2.getCompletion()).getResult() Objects.requireNonNull(p2.getCompletion()).getResult()
@@ -29,12 +27,16 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
} }
@Override @Override
public <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) { public <K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
if (promises.isEmpty()) return resolve(Collections.emptyMap()); if (promises.isEmpty()) return resolve(Collections.emptyMap());
Map<K, V> map = new HashMap<>(); Map<K, V> map = new HashMap<>();
Promise<Map<K, V>> promise = unresolved(); Promise<Map<K, V>> promise = unresolved();
for (Map.Entry<K, Promise<V>> entry : promises.entrySet()) { for (Map.Entry<K, Promise<V>> entry : promises.entrySet()) {
if (propagateCancel) {
AbstractPromise.propagateCancel(promise, entry.getValue());
}
entry.getValue().addListener((ctx) -> { entry.getValue().addListener((ctx) -> {
synchronized (map) { synchronized (map) {
if (ctx.getException() != null) { if (ctx.getException() != null) {
@@ -59,12 +61,13 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
} }
@Override @Override
public <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable Consumer<Throwable> exceptionHandler) { public <V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
AtomicInteger index = new AtomicInteger(); AtomicInteger index = new AtomicInteger();
return this.combine( return this.combine(
propagateCancel,
StreamSupport.stream(promises.spliterator(), false) StreamSupport.stream(promises.spliterator(), false)
.collect(Collectors.toMap(k -> index.getAndIncrement(), v -> v)), .collect(Collectors.toMap(k -> index.getAndIncrement(), v -> v)),
exceptionHandler != null ? (i, e) -> exceptionHandler.accept(e) : null exceptionHandler
).thenApplyAsync(v -> ).thenApplyAsync(v ->
v.entrySet().stream() v.entrySet().stream()
.sorted(Map.Entry.comparingByKey()) .sorted(Map.Entry.comparingByKey())
@@ -74,12 +77,7 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
} }
@Override @Override
public <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) { public @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
return combine(promises, null);
}
@Override
public @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
List<Promise<?>> promises = new ArrayList<>(); List<Promise<?>> promises = new ArrayList<>();
promiseIterable.iterator().forEachRemaining(promises::add); promiseIterable.iterator().forEachRemaining(promises::add);
@@ -91,7 +89,13 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
while (iter.hasNext()) { while (iter.hasNext()) {
int index = iter.nextIndex(); int index = iter.nextIndex();
iter.next().addListener((res) -> { var p = iter.next();
if (propagateCancel) {
AbstractPromise.propagateCancel(promise, p);
}
p.addListener((res) -> {
synchronized (results) { synchronized (results) {
results[index] = res; results[index] = res;
if (Arrays.stream(results).allMatch(Objects::nonNull)) if (Arrays.stream(results).allMatch(Objects::nonNull))
@@ -104,7 +108,7 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
} }
@Override @Override
public @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) { public @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
List<Promise<?>> promises = new ArrayList<>(); List<Promise<?>> promises = new ArrayList<>();
promiseIterable.iterator().forEachRemaining(promises::add); promiseIterable.iterator().forEachRemaining(promises::add);
@@ -113,12 +117,14 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
Promise<Void> promise = unresolved(); Promise<Void> promise = unresolved();
for (Promise<?> p : promises) { for (Promise<?> p : promises) {
if (propagateCancel) {
AbstractPromise.propagateCancel(promise, p);
}
p.addListener((res) -> { p.addListener((res) -> {
if (res.getException() != null) { if (res.getException() != null) {
promise.completeExceptionally(res.getException()); promise.completeExceptionally(res.getException());
} } else if (completed.incrementAndGet() == promises.size()) {
if (completed.incrementAndGet() == promises.size()) {
promise.complete(null); promise.complete(null);
} }
}); });
@@ -127,6 +133,18 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
return promise; return promise;
} }
@Override
public <V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises) {
Promise<V> promise = unresolved();
for (Promise<V> p : promises) {
if (cancelRaceLosers) {
promise.addListener((res) -> p.cancel());
}
AbstractPromise.propagateResult(p, promise);
}
return promise;
}
@Override @Override
public <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) { public <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) {
Promise<T> promise = unresolved(); Promise<T> promise = unresolved();
@@ -145,10 +163,7 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
@Override @Override
public <T> @NotNull Promise<T> wrap(@NotNull Mono<T> mono) { public <T> @NotNull Promise<T> wrap(@NotNull Mono<T> mono) {
Promise<T> promise = this.unresolved(); return wrap(mono.toFuture());
Disposable disposable = mono.subscribe(promise::complete, promise::completeExceptionally);
promise.onCancel((e) -> disposable.dispose());
return promise;
} }
@Override @Override
@@ -165,17 +180,4 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
return promise; return promise;
} }
@Override
public @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
Promise<Void> promise = unresolved();
p.addListener(ctx -> {
if (ctx.getException() != null) {
promise.completeExceptionally(ctx.getException());
} else {
promise.complete(null);
}
});
return promise;
}
} }

View File

@@ -55,12 +55,14 @@ public interface Promise<T> {
<V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task); <V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task);
@NotNull Promise<T> logExceptions(@NotNull String message); @NotNull Promise<Void> erase();
default @NotNull Promise<T> logExceptions() { default @NotNull Promise<T> logExceptions() {
return logExceptions("Exception caught in promise chain"); return logExceptions("Exception caught in promise chain");
} }
@NotNull Promise<T> logExceptions(@NotNull String message);
@NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener); @NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener);
@NotNull Promise<T> addListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler); @NotNull Promise<T> addListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
@@ -73,30 +75,26 @@ public interface Promise<T> {
@NotNull Promise<T> onCancel(@NotNull Consumer<CancellationException> listener); @NotNull Promise<T> onCancel(@NotNull Consumer<CancellationException> listener);
@Deprecated
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);
@Deprecated @Deprecated
default @NotNull Promise<T> timeout(long ms) { default @NotNull Promise<T> timeout(long ms) {
return timeout(ms, TimeUnit.MILLISECONDS); return timeout(ms, TimeUnit.MILLISECONDS);
} }
@NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit); @Deprecated
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);
default @NotNull Promise<T> maxWaitTime(long ms) { default @NotNull Promise<T> maxWaitTime(long ms) {
return maxWaitTime(ms, TimeUnit.MILLISECONDS); return maxWaitTime(ms, TimeUnit.MILLISECONDS);
} }
void addChild(@NotNull Promise<?> child); @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit);
void propagateResult(@NotNull Promise<T> target);
void cancel(@Nullable String reason);
default void cancel() { default void cancel() {
cancel(null); cancel(null);
} }
void cancel(@Nullable String reason);
void complete(@Nullable T result); void complete(@Nullable T result);
void completeExceptionally(@NotNull Throwable result); void completeExceptionally(@NotNull Throwable result);

View File

@@ -10,7 +10,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer;
public interface PromiseFactory { public interface PromiseFactory {
@@ -18,44 +17,84 @@ public interface PromiseFactory {
<T> @NotNull Promise<T> unresolved(); <T> @NotNull Promise<T> unresolved();
<K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2); default <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return combine(false, p1, p2);
}
<K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler); <K, V> @NotNull Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2);
default <K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises) {
return combine(propagateCancel, promises, null);
}
<K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler);
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) { default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
return combine(promises, null); return combine(promises, null);
} }
<V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable Consumer<Throwable> exceptionHandler); default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
return combine(false, promises, exceptionHandler);
}
default <V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises) {
return combine(propagateCancel, promises, null);
}
<V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler);
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) { default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
return combine(promises, null); return combine(promises, null);
} }
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable); default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
return combine(false, promises, exceptionHandler);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
return allSettled(false, promiseIterable);
}
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable);
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) { default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
return allSettled(Arrays.asList(promiseArray)); return allSettled(false, promiseArray);
} }
@NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable); default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
return allSettled(propagateCancel, Arrays.asList(promiseArray));
}
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
return all(false, promiseIterable);
}
@NotNull Promise<Void> all(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable);
default @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) { default @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
return all(Arrays.asList(promiseArray)); return all(false, promiseArray);
} }
default @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
return all(propagateCancel, Arrays.asList(promiseArray));
}
default <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises) {
return race(false, promises);
}
<V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises);
<T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future); <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future);
<T> @NotNull Promise<T> wrap(@NotNull Mono<T> mono); <T> @NotNull Promise<T> wrap(@NotNull Mono<T> mono);
<T> @NotNull Promise<T> resolve(T value);
default @NotNull Promise<Void> start() { default @NotNull Promise<Void> start() {
return resolve(null); return resolve(null);
} }
<T> @NotNull Promise<T> resolve(T value);
<T> @NotNull Promise<T> error(@NotNull Throwable error); <T> @NotNull Promise<T> error(@NotNull Throwable error);
@NotNull Promise<Void> erase(@NotNull Promise<?> p);
} }

View File

@@ -17,38 +17,38 @@ import java.util.function.BiConsumer;
@Deprecated @Deprecated
public class Promises { public class Promises {
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2, PromiseFactory factory) {
return factory.combine(p1, p2);
}
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) { public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return combine(p1, p2, p1.getFactory()); return combine(p1, p2, p1.getFactory());
} }
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, @Nullable BiConsumer<K, Throwable> exceptionHandler, PromiseFactory factory) { public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2, PromiseFactory factory) {
return factory.combine(promises, exceptionHandler).timeout(timeout); return factory.combine(p1, p2);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory);
} }
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, PromiseFactory factory) { public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, PromiseFactory factory) {
return combine(promises, timeout, true, factory); return combine(promises, timeout, true, factory);
} }
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, @Nullable BiConsumer<K, Throwable> exceptionHandler, PromiseFactory factory) {
return factory.combine(promises, exceptionHandler).timeout(timeout);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, PromiseFactory factory) { public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, PromiseFactory factory) {
return combine(promises, 1500L, true, factory); return combine(promises, 1500L, true, factory);
} }
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
return factory.combine(promises, strict ? null : (_v) -> {}).timeout(timeout);
}
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, PromiseFactory factory) { public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, PromiseFactory factory) {
return combine(promises, timeout, true, factory); return combine(promises, timeout, true, factory);
} }
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
return factory.combine(promises, strict ? null : (_i, _v) -> {}).timeout(timeout);
}
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, PromiseFactory factory) { public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, PromiseFactory factory) {
return combine(promises, 1500L, true, factory); return combine(promises, 1500L, true, factory);
} }
@@ -57,6 +57,10 @@ public class Promises {
return factory.all(promises); return factory.all(promises);
} }
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, PromiseFactory factory) {
return combine(keys, mapper, timeout, true, factory);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, boolean strict, PromiseFactory factory) { public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, boolean strict, PromiseFactory factory) {
Map<K, Promise<V>> promises = new HashMap<>(); Map<K, Promise<V>> promises = new HashMap<>();
for (K key : keys) { for (K key : keys) {
@@ -67,22 +71,18 @@ public class Promises {
return combine(promises, timeout, strict, factory); return combine(promises, timeout, strict, factory);
} }
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, PromiseFactory factory) {
return combine(keys, mapper, timeout, true, factory);
}
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, PromiseFactory factory) { public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, PromiseFactory factory) {
return combine(keys, mapper, 1500L, true, factory); return combine(keys, mapper, 1500L, true, factory);
} }
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p, PromiseFactory factory) {
return factory.erase(p);
}
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p) { public static @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
return erase(p, p.getFactory()); return erase(p, p.getFactory());
} }
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p, PromiseFactory factory) {
return p.erase();
}
public static <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future, PromiseFactory factory) { public static <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future, PromiseFactory factory) {
return factory.wrap(future); return factory.wrap(future);
} }

View File

@@ -9,19 +9,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public final class PromiseTests { public final class PromiseTests {
private final Logger logger = LoggerFactory.getLogger(PromiseTests.class); private final Logger logger = LoggerFactory.getLogger(PromiseTests.class);
private final PromiseExecutor<ScheduledFuture<?>> executor = SinglePoolExecutor.create(1); private final PromiseExecutor<Future<?>> executor = SinglePoolExecutor.create(5);
private final PromiseFactory pfac = new SimplePromiseFactory<>(executor, logger); private final PromiseFactory pfac = new SimplePromiseFactory<>(executor, logger);
@Test @Test
void testMono() { public void testMono() {
Exception value = new Exception("Test Error"); Exception value = new Exception("Test Error");
var error = pfac.wrap(Mono.error(value)); var error = pfac.wrap(Mono.error(value));
@@ -34,15 +37,118 @@ public final class PromiseTests {
} }
@Test @Test
void testErrorCancellation() throws InterruptedException { public void testErrorCancellation() throws InterruptedException {
var finish = new AtomicBoolean(); var finished = new AtomicBoolean();
pfac.start() pfac.start()
.thenRunDelayedAsync(() -> finish.set(true), 50, TimeUnit.MILLISECONDS) .thenRunDelayedAsync(() -> finished.set(true), 50, TimeUnit.MILLISECONDS)
.thenRunAsync(() -> {}) .thenRunAsync(() -> {})
.cancel(); .cancel();
Thread.sleep(100L); Thread.sleep(100L);
assert !finish.get(); assert !finished.get();
}
@Test
public void testCombineUtil() throws TimeoutException {
pfac.all(
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS),
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS)
)
.join(100L);
pfac.allSettled(
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS),
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS)
)
.join(100L);
pfac.combine(
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS),
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS)
)
.join(100L);
pfac.combine(
List.of(
pfac.start().thenRunDelayedAsync(() -> {}, 49, TimeUnit.MILLISECONDS),
pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS),
pfac.start().thenRunDelayedAsync(() -> {}, 51, TimeUnit.MILLISECONDS)
)
)
.join(100L);
pfac.combine(
Map.of(
"a", pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS),
"b", pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS)
)
)
.join(100L);
}
@Test
public void testCombineUtilPropagation() throws InterruptedException {
var finished1 = new AtomicBoolean();
pfac.all(
true,
pfac.start().thenRunDelayedAsync(() -> finished1.set(true), 50, TimeUnit.MILLISECONDS),
pfac.start().thenRunDelayedAsync(() -> finished1.set(true), 50, TimeUnit.MILLISECONDS)
)
.cancel();
var finished2 = new AtomicBoolean();
pfac.allSettled(
true,
pfac.start().thenRunDelayedAsync(() -> finished2.set(true), 50, TimeUnit.MILLISECONDS),
pfac.start().thenRunDelayedAsync(() -> finished2.set(true), 50, TimeUnit.MILLISECONDS)
)
.cancel();
var finished3 = new AtomicBoolean();
pfac.combine(
true,
pfac.start().thenRunDelayedAsync(() -> finished3.set(true), 50, TimeUnit.MILLISECONDS),
pfac.start().thenRunDelayedAsync(() -> finished3.set(true), 50, TimeUnit.MILLISECONDS)
)
.cancel();
var finished4 = new AtomicBoolean();
pfac.combine(
true,
List.of(
pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS),
pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS),
pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS)
)
)
.cancel();
var finished5 = new AtomicBoolean();
pfac.combine(
true,
Map.of(
"a", pfac.start().thenRunDelayedAsync(() -> finished5.set(true), 50, TimeUnit.MILLISECONDS),
"b", pfac.start().thenRunDelayedAsync(() -> finished5.set(true), 50, TimeUnit.MILLISECONDS)
)
)
.cancel();
Thread.sleep(100L);
assert !finished1.get();
assert !finished2.get();
assert !finished3.get();
assert !finished4.get();
assert !finished5.get();
}
@Test
public void testRace() throws TimeoutException {
assert pfac.race(
List.of(
pfac.start().thenSupplyDelayedAsync(() -> true, 50, TimeUnit.MILLISECONDS),
pfac.start().thenSupplyDelayedAsync(() -> false, 150, TimeUnit.MILLISECONDS)
)
).join(100L);
} }
} }

View File

@@ -12,7 +12,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer;
public final class PromiseUtil { public final class PromiseUtil {
@@ -30,42 +29,86 @@ public final class PromiseUtil {
return pfac.unresolved(); return pfac.unresolved();
} }
public static @NotNull <K, V> Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return pfac.combine(propagateCancel, p1, p2);
}
public static @NotNull <K, V> Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) { public static @NotNull <K, V> Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return pfac.combine(p1, p2); return pfac.combine(p1, p2);
} }
public static @NotNull <K, V> Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
return pfac.combine(propagateCancel, promises, exceptionHandler);
}
public static @NotNull <K, V> Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) { public static @NotNull <K, V> Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
return pfac.combine(promises, exceptionHandler); return pfac.combine(promises, exceptionHandler);
} }
public static @NotNull <K, V> Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises) {
return pfac.combine(propagateCancel, promises);
}
public static @NotNull <K, V> Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) { public static @NotNull <K, V> Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
return pfac.combine(promises); return pfac.combine(promises);
} }
public static @NotNull <V> Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable Consumer<Throwable> exceptionHandler) { public static @NotNull <V> Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
return pfac.combine(propagateCancel, promises, exceptionHandler);
}
public static @NotNull <V> Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
return pfac.combine(promises, exceptionHandler); return pfac.combine(promises, exceptionHandler);
} }
public static @NotNull <V> Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises) {
return pfac.combine(propagateCancel, promises);
}
public static @NotNull <V> Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) { public static @NotNull <V> Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
return pfac.combine(promises); return pfac.combine(promises);
} }
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
return pfac.allSettled(propagateCancel, promiseIterable);
}
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) { public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
return pfac.allSettled(promiseIterable); return pfac.allSettled(promiseIterable);
} }
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
return pfac.allSettled(propagateCancel, promiseArray);
}
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) { public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
return pfac.allSettled(promiseArray); return pfac.allSettled(promiseArray);
} }
public static @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable) {
return pfac.all(propagateCancel, promiseIterable);
}
public static @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) { public static @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
return pfac.all(promiseIterable); return pfac.all(promiseIterable);
} }
public static @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
return pfac.all(propagateCancel, promiseArray);
}
public static @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) { public static @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
return pfac.all(promiseArray); return pfac.all(promiseArray);
} }
public static <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises) {
return pfac.race(promises);
}
public static <V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises) {
return pfac.race(cancelRaceLosers, promises);
}
public static @NotNull <T> Promise<T> wrap(@NotNull CompletableFuture<T> future) { public static @NotNull <T> Promise<T> wrap(@NotNull CompletableFuture<T> future) {
return pfac.wrap(future); return pfac.wrap(future);
} }
@@ -78,16 +121,12 @@ public final class PromiseUtil {
return pfac.resolve(value); return pfac.resolve(value);
} }
public static @NotNull <T> Promise<T> error(@NotNull Throwable error) {
return pfac.error(error);
}
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
return pfac.erase(p);
}
public static @NotNull Promise<Void> start() { public static @NotNull Promise<Void> start() {
return pfac.start(); return pfac.start();
} }
public static @NotNull <T> Promise<T> error(@NotNull Throwable error) {
return pfac.error(error);
}
} }

View File

@@ -9,9 +9,9 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Future;
public final class StaticPromiseFactory extends AbstractPromiseFactory<ScheduledFuture<?>> { public final class StaticPromiseFactory extends AbstractPromiseFactory<Future<?>> {
public final static StaticPromiseFactory INSTANCE = new StaticPromiseFactory(); public final static StaticPromiseFactory INSTANCE = new StaticPromiseFactory();
private final static @NotNull SinglePoolExecutor EXECUTOR = SinglePoolExecutor.create(1); private final static @NotNull SinglePoolExecutor EXECUTOR = SinglePoolExecutor.create(1);
@@ -21,18 +21,18 @@ public final class StaticPromiseFactory extends AbstractPromiseFactory<Scheduled
} }
@Override
public @NotNull <T> Promise<T> unresolved() {
return new SimplePromise<>(this);
}
@Override @Override
public @NotNull Logger getLogger() { public @NotNull Logger getLogger() {
return LOGGER; return LOGGER;
} }
@Override @Override
public @NotNull PromiseExecutor<ScheduledFuture<?>> getExecutor() { public @NotNull <T> Promise<T> unresolved() {
return new SimplePromise<>(this);
}
@Override
public @NotNull PromiseExecutor<Future<?>> getExecutor() {
return EXECUTOR; return EXECUTOR;
} }