diff --git a/build.gradle b/build.gradle index f811632..21da84c 100644 --- a/build.gradle +++ b/build.gradle @@ -14,7 +14,7 @@ nexusPublishing { subprojects { group = 'dev.tommyjs' - version = '2.2.0' + version = '2.3.0' apply plugin: 'java' apply plugin: 'com.github.johnrengelman.shadow' diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java index ff35da6..df0d998 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java @@ -3,11 +3,11 @@ package dev.tommyjs.futur.executor; import org.jetbrains.annotations.NotNull; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -public class DualPoolExecutor implements PromiseExecutor> { +public class DualPoolExecutor implements PromiseExecutor> { private final @NotNull ScheduledExecutorService syncSvc; private final @NotNull ScheduledExecutorService asyncSvc; @@ -22,17 +22,17 @@ public class DualPoolExecutor implements PromiseExecutor> { } @Override - public ScheduledFuture runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + public Future runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { return syncSvc.schedule(task, delay, unit); } @Override - public ScheduledFuture runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + public Future runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { return asyncSvc.schedule(task, delay, unit); } @Override - public void cancel(ScheduledFuture task) { + public void cancel(Future task) { task.cancel(true); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 7cbd0d1..91f692b 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -27,6 +27,27 @@ public abstract class AbstractPromise implements Promise { this.completion = new AtomicReference<>(); } + protected static void propagateResult(Promise from, Promise to) { + from.addListener(to::complete, to::completeExceptionally); + } + + protected static void propagateCancel(Promise from, Promise to) { + from.onCancel(to::completeExceptionally); + } + + private @NotNull Runnable createRunnable(T result, @NotNull Promise promise, @NotNull ExceptionalFunction task) { + return () -> { + if (promise.isCompleted()) return; + + try { + V nextResult = task.apply(result); + promise.complete(nextResult); + } catch (Throwable e) { + promise.completeExceptionally(e); + } + }; + } + public abstract @NotNull AbstractPromiseFactory getFactory(); protected @NotNull PromiseExecutor getExecutor() { @@ -124,7 +145,7 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @@ -140,7 +161,7 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @@ -149,13 +170,13 @@ public abstract class AbstractPromise implements Promise { Promise promise = getFactory().unresolved(); thenApplySync(task).addListener( nestedPromise -> { - nestedPromise.propagateResult(promise); - nestedPromise.addChild(promise); + propagateResult(nestedPromise, promise); + propagateCancel(promise, nestedPromise); }, promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @@ -221,7 +242,7 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @@ -237,7 +258,7 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @@ -246,29 +267,22 @@ public abstract class AbstractPromise implements Promise { Promise promise = getFactory().unresolved(); thenApplyAsync(task).addListener( nestedPromise -> { - nestedPromise.propagateResult(promise); - nestedPromise.addChild(promise); + propagateResult(nestedPromise, promise); + propagateCancel(promise, nestedPromise); }, promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } - private @NotNull Runnable createRunnable(T result, @NotNull Promise promise, @NotNull ExceptionalFunction task) { - return () -> { - if (promise.isCompleted()) return; - - try { - V nextResult = task.apply(result); - promise.complete(nextResult); - } catch (Throwable e) { - promise.completeExceptionally(e); - } - }; + @Override + public @NotNull Promise erase() { + return thenSupplyAsync(() -> null); } + @Override public @NotNull Promise logExceptions(@NotNull String message) { return onError(e -> getLogger().error(message, e)); @@ -365,16 +379,6 @@ public abstract class AbstractPromise implements Promise { return this.completion.compareAndSet(null, completion); } - @Override - public void addChild(@NotNull Promise child) { - child.onCancel((e) -> this.cancel(e.getMessage())); - } - - @Override - public void propagateResult(@NotNull Promise target) { - addListener(target::complete, target::completeExceptionally); - } - @Override public void cancel() { completeExceptionally(new CancellationException()); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java index 607f9da..4f31ec9 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java @@ -3,14 +3,12 @@ package dev.tommyjs.futur.promise; import dev.tommyjs.futur.executor.PromiseExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import reactor.core.Disposable; import reactor.core.publisher.Mono; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -19,9 +17,9 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { public abstract @NotNull PromiseExecutor getExecutor(); @Override - public @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { + public @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2) { List> promises = List.of(p1, p2); - return all(promises) + return all(propagateCancel, promises) .thenApplyAsync((res) -> new AbstractMap.SimpleImmutableEntry<>( Objects.requireNonNull(p1.getCompletion()).getResult(), Objects.requireNonNull(p2.getCompletion()).getResult() @@ -29,12 +27,16 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { + public @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { if (promises.isEmpty()) return resolve(Collections.emptyMap()); Map map = new HashMap<>(); Promise> promise = unresolved(); for (Map.Entry> entry : promises.entrySet()) { + if (propagateCancel) { + AbstractPromise.propagateCancel(promise, entry.getValue()); + } + entry.getValue().addListener((ctx) -> { synchronized (map) { if (ctx.getException() != null) { @@ -59,12 +61,13 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable Consumer exceptionHandler) { + public @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { AtomicInteger index = new AtomicInteger(); return this.combine( + propagateCancel, StreamSupport.stream(promises.spliterator(), false) .collect(Collectors.toMap(k -> index.getAndIncrement(), v -> v)), - exceptionHandler != null ? (i, e) -> exceptionHandler.accept(e) : null + exceptionHandler ).thenApplyAsync(v -> v.entrySet().stream() .sorted(Map.Entry.comparingByKey()) @@ -74,12 +77,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise> combine(@NotNull Iterable> promises) { - return combine(promises, null); - } - - @Override - public @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { + public @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Iterable> promiseIterable) { List> promises = new ArrayList<>(); promiseIterable.iterator().forEachRemaining(promises::add); @@ -91,7 +89,13 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { while (iter.hasNext()) { int index = iter.nextIndex(); - iter.next().addListener((res) -> { + var p = iter.next(); + + if (propagateCancel) { + AbstractPromise.propagateCancel(promise, p); + } + + p.addListener((res) -> { synchronized (results) { results[index] = res; if (Arrays.stream(results).allMatch(Objects::nonNull)) @@ -104,7 +108,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise all(@NotNull Iterable> promiseIterable) { + public @NotNull Promise all(boolean propagateCancel, @NotNull Iterable> promiseIterable) { List> promises = new ArrayList<>(); promiseIterable.iterator().forEachRemaining(promises::add); @@ -113,12 +117,14 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { Promise promise = unresolved(); for (Promise p : promises) { + if (propagateCancel) { + AbstractPromise.propagateCancel(promise, p); + } + p.addListener((res) -> { if (res.getException() != null) { promise.completeExceptionally(res.getException()); - } - - if (completed.incrementAndGet() == promises.size()) { + } else if (completed.incrementAndGet() == promises.size()) { promise.complete(null); } }); @@ -127,6 +133,18 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } + @Override + public @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises) { + Promise promise = unresolved(); + for (Promise p : promises) { + if (cancelRaceLosers) { + promise.addListener((res) -> p.cancel()); + } + AbstractPromise.propagateResult(p, promise); + } + return promise; + } + @Override public @NotNull Promise wrap(@NotNull CompletableFuture future) { Promise promise = unresolved(); @@ -145,10 +163,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { @Override public @NotNull Promise wrap(@NotNull Mono mono) { - Promise promise = this.unresolved(); - Disposable disposable = mono.subscribe(promise::complete, promise::completeExceptionally); - promise.onCancel((e) -> disposable.dispose()); - return promise; + return wrap(mono.toFuture()); } @Override @@ -165,17 +180,4 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } - @Override - public @NotNull Promise erase(@NotNull Promise p) { - Promise promise = unresolved(); - p.addListener(ctx -> { - if (ctx.getException() != null) { - promise.completeExceptionally(ctx.getException()); - } else { - promise.complete(null); - } - }); - return promise; - } - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index f958f6d..a8281ae 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -55,12 +55,14 @@ public interface Promise { @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task); - @NotNull Promise logExceptions(@NotNull String message); + @NotNull Promise erase(); default @NotNull Promise logExceptions() { return logExceptions("Exception caught in promise chain"); } + @NotNull Promise logExceptions(@NotNull String message); + @NotNull Promise addListener(@NotNull PromiseListener listener); @NotNull Promise addListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); @@ -73,30 +75,26 @@ public interface Promise { @NotNull Promise onCancel(@NotNull Consumer listener); - @Deprecated - @NotNull Promise timeout(long time, @NotNull TimeUnit unit); - @Deprecated default @NotNull Promise timeout(long ms) { return timeout(ms, TimeUnit.MILLISECONDS); } - @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit); + @Deprecated + @NotNull Promise timeout(long time, @NotNull TimeUnit unit); default @NotNull Promise maxWaitTime(long ms) { return maxWaitTime(ms, TimeUnit.MILLISECONDS); } - void addChild(@NotNull Promise child); - - void propagateResult(@NotNull Promise target); - - void cancel(@Nullable String reason); + @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit); default void cancel() { cancel(null); } + void cancel(@Nullable String reason); + void complete(@Nullable T result); void completeExceptionally(@NotNull Throwable result); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java index 9428abb..f0257e9 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -10,7 +10,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; -import java.util.function.Consumer; public interface PromiseFactory { @@ -18,44 +17,84 @@ public interface PromiseFactory { @NotNull Promise unresolved(); - @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2); + default @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { + return combine(false, p1, p2); + } - @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler); + @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2); + + default @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises) { + return combine(propagateCancel, promises, null); + } + + @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler); default @NotNull Promise> combine(@NotNull Map> promises) { return combine(promises, null); } - @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable Consumer exceptionHandler); + default @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { + return combine(false, promises, exceptionHandler); + } + + default @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises) { + return combine(propagateCancel, promises, null); + } + + @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler); default @NotNull Promise> combine(@NotNull Iterable> promises) { return combine(promises, null); } - @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable); + default @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { + return combine(false, promises, exceptionHandler); + } + + default @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { + return allSettled(false, promiseIterable); + } + + @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Iterable> promiseIterable); default @NotNull Promise>> allSettled(@NotNull Promise... promiseArray) { - return allSettled(Arrays.asList(promiseArray)); + return allSettled(false, promiseArray); } - @NotNull Promise all(@NotNull Iterable> promiseIterable); + default @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Promise... promiseArray) { + return allSettled(propagateCancel, Arrays.asList(promiseArray)); + } + + default @NotNull Promise all(@NotNull Iterable> promiseIterable) { + return all(false, promiseIterable); + } + + @NotNull Promise all(boolean propagateCancel, @NotNull Iterable> promiseIterable); default @NotNull Promise all(@NotNull Promise... promiseArray) { - return all(Arrays.asList(promiseArray)); + return all(false, promiseArray); } + default @NotNull Promise all(boolean propagateCancel, @NotNull Promise... promiseArray) { + return all(propagateCancel, Arrays.asList(promiseArray)); + } + + default @NotNull Promise race(@NotNull Iterable> promises) { + return race(false, promises); + } + + @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises); + @NotNull Promise wrap(@NotNull CompletableFuture future); @NotNull Promise wrap(@NotNull Mono mono); - @NotNull Promise resolve(T value); - default @NotNull Promise start() { return resolve(null); } + @NotNull Promise resolve(T value); + @NotNull Promise error(@NotNull Throwable error); - - @NotNull Promise erase(@NotNull Promise p); - + } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java index c598141..ff1f0b8 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java @@ -17,38 +17,38 @@ import java.util.function.BiConsumer; @Deprecated public class Promises { - public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2, PromiseFactory factory) { - return factory.combine(p1, p2); - } - public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { return combine(p1, p2, p1.getFactory()); } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler, PromiseFactory factory) { - return factory.combine(promises, exceptionHandler).timeout(timeout); - } - - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict, PromiseFactory factory) { - return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory); + public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2, PromiseFactory factory) { + return factory.combine(p1, p2); } public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, PromiseFactory factory) { return combine(promises, timeout, true, factory); } + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict, PromiseFactory factory) { + return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory); + } + + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler, PromiseFactory factory) { + return factory.combine(promises, exceptionHandler).timeout(timeout); + } + public static @NotNull Promise> combine(@NotNull Map> promises, PromiseFactory factory) { return combine(promises, 1500L, true, factory); } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict, PromiseFactory factory) { - return factory.combine(promises, strict ? null : (_v) -> {}).timeout(timeout); - } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout, PromiseFactory factory) { return combine(promises, timeout, true, factory); } + public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict, PromiseFactory factory) { + return factory.combine(promises, strict ? null : (_i, _v) -> {}).timeout(timeout); + } + public static @NotNull Promise> combine(@NotNull List> promises, PromiseFactory factory) { return combine(promises, 1500L, true, factory); } @@ -57,6 +57,10 @@ public class Promises { return factory.all(promises); } + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, PromiseFactory factory) { + return combine(keys, mapper, timeout, true, factory); + } + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, boolean strict, PromiseFactory factory) { Map> promises = new HashMap<>(); for (K key : keys) { @@ -67,22 +71,18 @@ public class Promises { return combine(promises, timeout, strict, factory); } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, PromiseFactory factory) { - return combine(keys, mapper, timeout, true, factory); - } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, PromiseFactory factory) { return combine(keys, mapper, 1500L, true, factory); } - public static @NotNull Promise erase(@NotNull Promise p, PromiseFactory factory) { - return factory.erase(p); - } - public static @NotNull Promise erase(@NotNull Promise p) { return erase(p, p.getFactory()); } + public static @NotNull Promise erase(@NotNull Promise p, PromiseFactory factory) { + return p.erase(); + } + public static @NotNull Promise wrap(@NotNull CompletableFuture future, PromiseFactory factory) { return factory.wrap(future); } diff --git a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java index b403a7c..f635984 100644 --- a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java +++ b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java @@ -9,19 +9,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import java.util.List; +import java.util.Map; import java.util.Objects; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; public final class PromiseTests { private final Logger logger = LoggerFactory.getLogger(PromiseTests.class); - private final PromiseExecutor> executor = SinglePoolExecutor.create(1); + private final PromiseExecutor> executor = SinglePoolExecutor.create(5); private final PromiseFactory pfac = new SimplePromiseFactory<>(executor, logger); @Test - void testMono() { + public void testMono() { Exception value = new Exception("Test Error"); var error = pfac.wrap(Mono.error(value)); @@ -34,15 +37,118 @@ public final class PromiseTests { } @Test - void testErrorCancellation() throws InterruptedException { - var finish = new AtomicBoolean(); + public void testErrorCancellation() throws InterruptedException { + var finished = new AtomicBoolean(); pfac.start() - .thenRunDelayedAsync(() -> finish.set(true), 50, TimeUnit.MILLISECONDS) + .thenRunDelayedAsync(() -> finished.set(true), 50, TimeUnit.MILLISECONDS) .thenRunAsync(() -> {}) .cancel(); Thread.sleep(100L); - assert !finish.get(); + assert !finished.get(); + } + + @Test + public void testCombineUtil() throws TimeoutException { + pfac.all( + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + .join(100L); + + pfac.allSettled( + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + .join(100L); + + pfac.combine( + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + .join(100L); + + pfac.combine( + List.of( + pfac.start().thenRunDelayedAsync(() -> {}, 49, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 51, TimeUnit.MILLISECONDS) + ) + ) + .join(100L); + + pfac.combine( + Map.of( + "a", pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + "b", pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + ) + .join(100L); + } + + @Test + public void testCombineUtilPropagation() throws InterruptedException { + var finished1 = new AtomicBoolean(); + pfac.all( + true, + pfac.start().thenRunDelayedAsync(() -> finished1.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished1.set(true), 50, TimeUnit.MILLISECONDS) + ) + .cancel(); + + var finished2 = new AtomicBoolean(); + pfac.allSettled( + true, + pfac.start().thenRunDelayedAsync(() -> finished2.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished2.set(true), 50, TimeUnit.MILLISECONDS) + ) + .cancel(); + + var finished3 = new AtomicBoolean(); + pfac.combine( + true, + pfac.start().thenRunDelayedAsync(() -> finished3.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished3.set(true), 50, TimeUnit.MILLISECONDS) + ) + .cancel(); + + var finished4 = new AtomicBoolean(); + pfac.combine( + true, + List.of( + pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS) + ) + ) + .cancel(); + + var finished5 = new AtomicBoolean(); + pfac.combine( + true, + Map.of( + "a", pfac.start().thenRunDelayedAsync(() -> finished5.set(true), 50, TimeUnit.MILLISECONDS), + "b", pfac.start().thenRunDelayedAsync(() -> finished5.set(true), 50, TimeUnit.MILLISECONDS) + ) + ) + .cancel(); + + Thread.sleep(100L); + assert !finished1.get(); + assert !finished2.get(); + assert !finished3.get(); + assert !finished4.get(); + assert !finished5.get(); + } + + @Test + public void testRace() throws TimeoutException { + assert pfac.race( + List.of( + pfac.start().thenSupplyDelayedAsync(() -> true, 50, TimeUnit.MILLISECONDS), + pfac.start().thenSupplyDelayedAsync(() -> false, 150, TimeUnit.MILLISECONDS) + ) + ).join(100L); } } diff --git a/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java b/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java index 3c86ffc..fd13040 100644 --- a/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java +++ b/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java @@ -12,7 +12,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; -import java.util.function.Consumer; public final class PromiseUtil { @@ -30,42 +29,86 @@ public final class PromiseUtil { return pfac.unresolved(); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2) { + return pfac.combine(propagateCancel, p1, p2); + } + public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { return pfac.combine(p1, p2); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { + return pfac.combine(propagateCancel, promises, exceptionHandler); + } + public static @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { return pfac.combine(promises, exceptionHandler); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises) { + return pfac.combine(propagateCancel, promises); + } + public static @NotNull Promise> combine(@NotNull Map> promises) { return pfac.combine(promises); } - public static @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable Consumer exceptionHandler) { + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { + return pfac.combine(propagateCancel, promises, exceptionHandler); + } + + public static @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { return pfac.combine(promises, exceptionHandler); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises) { + return pfac.combine(propagateCancel, promises); + } + public static @NotNull Promise> combine(@NotNull Iterable> promises) { return pfac.combine(promises); } + public static @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Iterable> promiseIterable) { + return pfac.allSettled(propagateCancel, promiseIterable); + } + public static @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { return pfac.allSettled(promiseIterable); } + public static @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Promise... promiseArray) { + return pfac.allSettled(propagateCancel, promiseArray); + } + public static @NotNull Promise>> allSettled(@NotNull Promise... promiseArray) { return pfac.allSettled(promiseArray); } + public static @NotNull Promise all(boolean propagateCancel, @NotNull Iterable> promiseIterable) { + return pfac.all(propagateCancel, promiseIterable); + } + public static @NotNull Promise all(@NotNull Iterable> promiseIterable) { return pfac.all(promiseIterable); } + public static @NotNull Promise all(boolean propagateCancel, @NotNull Promise... promiseArray) { + return pfac.all(propagateCancel, promiseArray); + } + public static @NotNull Promise all(@NotNull Promise... promiseArray) { return pfac.all(promiseArray); } + public static @NotNull Promise race(@NotNull Iterable> promises) { + return pfac.race(promises); + } + + public static @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises) { + return pfac.race(cancelRaceLosers, promises); + } + public static @NotNull Promise wrap(@NotNull CompletableFuture future) { return pfac.wrap(future); } @@ -78,16 +121,12 @@ public final class PromiseUtil { return pfac.resolve(value); } - public static @NotNull Promise error(@NotNull Throwable error) { - return pfac.error(error); - } - - public static @NotNull Promise erase(@NotNull Promise p) { - return pfac.erase(p); - } - public static @NotNull Promise start() { return pfac.start(); } + public static @NotNull Promise error(@NotNull Throwable error) { + return pfac.error(error); + } + } diff --git a/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java b/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java index e61908f..0d9aebf 100644 --- a/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java +++ b/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java @@ -9,9 +9,9 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Future; -public final class StaticPromiseFactory extends AbstractPromiseFactory> { +public final class StaticPromiseFactory extends AbstractPromiseFactory> { public final static StaticPromiseFactory INSTANCE = new StaticPromiseFactory(); private final static @NotNull SinglePoolExecutor EXECUTOR = SinglePoolExecutor.create(1); @@ -21,18 +21,18 @@ public final class StaticPromiseFactory extends AbstractPromiseFactory Promise unresolved() { - return new SimplePromise<>(this); - } - @Override public @NotNull Logger getLogger() { return LOGGER; } @Override - public @NotNull PromiseExecutor> getExecutor() { + public @NotNull Promise unresolved() { + return new SimplePromise<>(this); + } + + @Override + public @NotNull PromiseExecutor> getExecutor() { return EXECUTOR; }