From 8ba023c04aa70b9e94462e1a40330ba1721f3db2 Mon Sep 17 00:00:00 2001 From: WhatCats Date: Sat, 6 Apr 2024 13:55:50 +0200 Subject: [PATCH 1/5] promise util with propagate cancellation options --- build.gradle | 2 +- .../futur/executor/DualPoolExecutor.java | 10 +- .../futur/promise/AbstractPromise.java | 66 +++++----- .../futur/promise/AbstractPromiseFactory.java | 72 ++++++----- .../dev/tommyjs/futur/promise/Promise.java | 18 ++- .../tommyjs/futur/promise/PromiseFactory.java | 65 ++++++++-- .../dev/tommyjs/futur/promise/Promises.java | 44 +++---- .../java/dev/tommyjs/futur/PromiseTests.java | 120 +++++++++++++++++- .../dev/tommyjs/futur/lazy/PromiseUtil.java | 59 +++++++-- .../futur/lazy/StaticPromiseFactory.java | 16 +-- 10 files changed, 330 insertions(+), 142 deletions(-) diff --git a/build.gradle b/build.gradle index f811632..21da84c 100644 --- a/build.gradle +++ b/build.gradle @@ -14,7 +14,7 @@ nexusPublishing { subprojects { group = 'dev.tommyjs' - version = '2.2.0' + version = '2.3.0' apply plugin: 'java' apply plugin: 'com.github.johnrengelman.shadow' diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java index ff35da6..df0d998 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java @@ -3,11 +3,11 @@ package dev.tommyjs.futur.executor; import org.jetbrains.annotations.NotNull; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -public class DualPoolExecutor implements PromiseExecutor> { +public class DualPoolExecutor implements PromiseExecutor> { private final @NotNull ScheduledExecutorService syncSvc; private final @NotNull ScheduledExecutorService asyncSvc; @@ -22,17 +22,17 @@ public class DualPoolExecutor implements PromiseExecutor> { } @Override - public ScheduledFuture runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + public Future runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { return syncSvc.schedule(task, delay, unit); } @Override - public ScheduledFuture runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + public Future runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { return asyncSvc.schedule(task, delay, unit); } @Override - public void cancel(ScheduledFuture task) { + public void cancel(Future task) { task.cancel(true); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 7cbd0d1..91f692b 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -27,6 +27,27 @@ public abstract class AbstractPromise implements Promise { this.completion = new AtomicReference<>(); } + protected static void propagateResult(Promise from, Promise to) { + from.addListener(to::complete, to::completeExceptionally); + } + + protected static void propagateCancel(Promise from, Promise to) { + from.onCancel(to::completeExceptionally); + } + + private @NotNull Runnable createRunnable(T result, @NotNull Promise promise, @NotNull ExceptionalFunction task) { + return () -> { + if (promise.isCompleted()) return; + + try { + V nextResult = task.apply(result); + promise.complete(nextResult); + } catch (Throwable e) { + promise.completeExceptionally(e); + } + }; + } + public abstract @NotNull AbstractPromiseFactory getFactory(); protected @NotNull PromiseExecutor getExecutor() { @@ -124,7 +145,7 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @@ -140,7 +161,7 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @@ -149,13 +170,13 @@ public abstract class AbstractPromise implements Promise { Promise promise = getFactory().unresolved(); thenApplySync(task).addListener( nestedPromise -> { - nestedPromise.propagateResult(promise); - nestedPromise.addChild(promise); + propagateResult(nestedPromise, promise); + propagateCancel(promise, nestedPromise); }, promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @@ -221,7 +242,7 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @@ -237,7 +258,7 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @@ -246,29 +267,22 @@ public abstract class AbstractPromise implements Promise { Promise promise = getFactory().unresolved(); thenApplyAsync(task).addListener( nestedPromise -> { - nestedPromise.propagateResult(promise); - nestedPromise.addChild(promise); + propagateResult(nestedPromise, promise); + propagateCancel(promise, nestedPromise); }, promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } - private @NotNull Runnable createRunnable(T result, @NotNull Promise promise, @NotNull ExceptionalFunction task) { - return () -> { - if (promise.isCompleted()) return; - - try { - V nextResult = task.apply(result); - promise.complete(nextResult); - } catch (Throwable e) { - promise.completeExceptionally(e); - } - }; + @Override + public @NotNull Promise erase() { + return thenSupplyAsync(() -> null); } + @Override public @NotNull Promise logExceptions(@NotNull String message) { return onError(e -> getLogger().error(message, e)); @@ -365,16 +379,6 @@ public abstract class AbstractPromise implements Promise { return this.completion.compareAndSet(null, completion); } - @Override - public void addChild(@NotNull Promise child) { - child.onCancel((e) -> this.cancel(e.getMessage())); - } - - @Override - public void propagateResult(@NotNull Promise target) { - addListener(target::complete, target::completeExceptionally); - } - @Override public void cancel() { completeExceptionally(new CancellationException()); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java index 607f9da..4f31ec9 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java @@ -3,14 +3,12 @@ package dev.tommyjs.futur.promise; import dev.tommyjs.futur.executor.PromiseExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import reactor.core.Disposable; import reactor.core.publisher.Mono; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -19,9 +17,9 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { public abstract @NotNull PromiseExecutor getExecutor(); @Override - public @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { + public @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2) { List> promises = List.of(p1, p2); - return all(promises) + return all(propagateCancel, promises) .thenApplyAsync((res) -> new AbstractMap.SimpleImmutableEntry<>( Objects.requireNonNull(p1.getCompletion()).getResult(), Objects.requireNonNull(p2.getCompletion()).getResult() @@ -29,12 +27,16 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { + public @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { if (promises.isEmpty()) return resolve(Collections.emptyMap()); Map map = new HashMap<>(); Promise> promise = unresolved(); for (Map.Entry> entry : promises.entrySet()) { + if (propagateCancel) { + AbstractPromise.propagateCancel(promise, entry.getValue()); + } + entry.getValue().addListener((ctx) -> { synchronized (map) { if (ctx.getException() != null) { @@ -59,12 +61,13 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable Consumer exceptionHandler) { + public @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { AtomicInteger index = new AtomicInteger(); return this.combine( + propagateCancel, StreamSupport.stream(promises.spliterator(), false) .collect(Collectors.toMap(k -> index.getAndIncrement(), v -> v)), - exceptionHandler != null ? (i, e) -> exceptionHandler.accept(e) : null + exceptionHandler ).thenApplyAsync(v -> v.entrySet().stream() .sorted(Map.Entry.comparingByKey()) @@ -74,12 +77,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise> combine(@NotNull Iterable> promises) { - return combine(promises, null); - } - - @Override - public @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { + public @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Iterable> promiseIterable) { List> promises = new ArrayList<>(); promiseIterable.iterator().forEachRemaining(promises::add); @@ -91,7 +89,13 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { while (iter.hasNext()) { int index = iter.nextIndex(); - iter.next().addListener((res) -> { + var p = iter.next(); + + if (propagateCancel) { + AbstractPromise.propagateCancel(promise, p); + } + + p.addListener((res) -> { synchronized (results) { results[index] = res; if (Arrays.stream(results).allMatch(Objects::nonNull)) @@ -104,7 +108,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise all(@NotNull Iterable> promiseIterable) { + public @NotNull Promise all(boolean propagateCancel, @NotNull Iterable> promiseIterable) { List> promises = new ArrayList<>(); promiseIterable.iterator().forEachRemaining(promises::add); @@ -113,12 +117,14 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { Promise promise = unresolved(); for (Promise p : promises) { + if (propagateCancel) { + AbstractPromise.propagateCancel(promise, p); + } + p.addListener((res) -> { if (res.getException() != null) { promise.completeExceptionally(res.getException()); - } - - if (completed.incrementAndGet() == promises.size()) { + } else if (completed.incrementAndGet() == promises.size()) { promise.complete(null); } }); @@ -127,6 +133,18 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } + @Override + public @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises) { + Promise promise = unresolved(); + for (Promise p : promises) { + if (cancelRaceLosers) { + promise.addListener((res) -> p.cancel()); + } + AbstractPromise.propagateResult(p, promise); + } + return promise; + } + @Override public @NotNull Promise wrap(@NotNull CompletableFuture future) { Promise promise = unresolved(); @@ -145,10 +163,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { @Override public @NotNull Promise wrap(@NotNull Mono mono) { - Promise promise = this.unresolved(); - Disposable disposable = mono.subscribe(promise::complete, promise::completeExceptionally); - promise.onCancel((e) -> disposable.dispose()); - return promise; + return wrap(mono.toFuture()); } @Override @@ -165,17 +180,4 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } - @Override - public @NotNull Promise erase(@NotNull Promise p) { - Promise promise = unresolved(); - p.addListener(ctx -> { - if (ctx.getException() != null) { - promise.completeExceptionally(ctx.getException()); - } else { - promise.complete(null); - } - }); - return promise; - } - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index f958f6d..a8281ae 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -55,12 +55,14 @@ public interface Promise { @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task); - @NotNull Promise logExceptions(@NotNull String message); + @NotNull Promise erase(); default @NotNull Promise logExceptions() { return logExceptions("Exception caught in promise chain"); } + @NotNull Promise logExceptions(@NotNull String message); + @NotNull Promise addListener(@NotNull PromiseListener listener); @NotNull Promise addListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); @@ -73,30 +75,26 @@ public interface Promise { @NotNull Promise onCancel(@NotNull Consumer listener); - @Deprecated - @NotNull Promise timeout(long time, @NotNull TimeUnit unit); - @Deprecated default @NotNull Promise timeout(long ms) { return timeout(ms, TimeUnit.MILLISECONDS); } - @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit); + @Deprecated + @NotNull Promise timeout(long time, @NotNull TimeUnit unit); default @NotNull Promise maxWaitTime(long ms) { return maxWaitTime(ms, TimeUnit.MILLISECONDS); } - void addChild(@NotNull Promise child); - - void propagateResult(@NotNull Promise target); - - void cancel(@Nullable String reason); + @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit); default void cancel() { cancel(null); } + void cancel(@Nullable String reason); + void complete(@Nullable T result); void completeExceptionally(@NotNull Throwable result); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java index 9428abb..f0257e9 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -10,7 +10,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; -import java.util.function.Consumer; public interface PromiseFactory { @@ -18,44 +17,84 @@ public interface PromiseFactory { @NotNull Promise unresolved(); - @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2); + default @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { + return combine(false, p1, p2); + } - @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler); + @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2); + + default @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises) { + return combine(propagateCancel, promises, null); + } + + @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler); default @NotNull Promise> combine(@NotNull Map> promises) { return combine(promises, null); } - @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable Consumer exceptionHandler); + default @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { + return combine(false, promises, exceptionHandler); + } + + default @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises) { + return combine(propagateCancel, promises, null); + } + + @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler); default @NotNull Promise> combine(@NotNull Iterable> promises) { return combine(promises, null); } - @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable); + default @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { + return combine(false, promises, exceptionHandler); + } + + default @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { + return allSettled(false, promiseIterable); + } + + @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Iterable> promiseIterable); default @NotNull Promise>> allSettled(@NotNull Promise... promiseArray) { - return allSettled(Arrays.asList(promiseArray)); + return allSettled(false, promiseArray); } - @NotNull Promise all(@NotNull Iterable> promiseIterable); + default @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Promise... promiseArray) { + return allSettled(propagateCancel, Arrays.asList(promiseArray)); + } + + default @NotNull Promise all(@NotNull Iterable> promiseIterable) { + return all(false, promiseIterable); + } + + @NotNull Promise all(boolean propagateCancel, @NotNull Iterable> promiseIterable); default @NotNull Promise all(@NotNull Promise... promiseArray) { - return all(Arrays.asList(promiseArray)); + return all(false, promiseArray); } + default @NotNull Promise all(boolean propagateCancel, @NotNull Promise... promiseArray) { + return all(propagateCancel, Arrays.asList(promiseArray)); + } + + default @NotNull Promise race(@NotNull Iterable> promises) { + return race(false, promises); + } + + @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises); + @NotNull Promise wrap(@NotNull CompletableFuture future); @NotNull Promise wrap(@NotNull Mono mono); - @NotNull Promise resolve(T value); - default @NotNull Promise start() { return resolve(null); } + @NotNull Promise resolve(T value); + @NotNull Promise error(@NotNull Throwable error); - - @NotNull Promise erase(@NotNull Promise p); - + } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java index c598141..ff1f0b8 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java @@ -17,38 +17,38 @@ import java.util.function.BiConsumer; @Deprecated public class Promises { - public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2, PromiseFactory factory) { - return factory.combine(p1, p2); - } - public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { return combine(p1, p2, p1.getFactory()); } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler, PromiseFactory factory) { - return factory.combine(promises, exceptionHandler).timeout(timeout); - } - - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict, PromiseFactory factory) { - return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory); + public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2, PromiseFactory factory) { + return factory.combine(p1, p2); } public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, PromiseFactory factory) { return combine(promises, timeout, true, factory); } + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict, PromiseFactory factory) { + return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory); + } + + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler, PromiseFactory factory) { + return factory.combine(promises, exceptionHandler).timeout(timeout); + } + public static @NotNull Promise> combine(@NotNull Map> promises, PromiseFactory factory) { return combine(promises, 1500L, true, factory); } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict, PromiseFactory factory) { - return factory.combine(promises, strict ? null : (_v) -> {}).timeout(timeout); - } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout, PromiseFactory factory) { return combine(promises, timeout, true, factory); } + public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict, PromiseFactory factory) { + return factory.combine(promises, strict ? null : (_i, _v) -> {}).timeout(timeout); + } + public static @NotNull Promise> combine(@NotNull List> promises, PromiseFactory factory) { return combine(promises, 1500L, true, factory); } @@ -57,6 +57,10 @@ public class Promises { return factory.all(promises); } + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, PromiseFactory factory) { + return combine(keys, mapper, timeout, true, factory); + } + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, boolean strict, PromiseFactory factory) { Map> promises = new HashMap<>(); for (K key : keys) { @@ -67,22 +71,18 @@ public class Promises { return combine(promises, timeout, strict, factory); } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, PromiseFactory factory) { - return combine(keys, mapper, timeout, true, factory); - } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, PromiseFactory factory) { return combine(keys, mapper, 1500L, true, factory); } - public static @NotNull Promise erase(@NotNull Promise p, PromiseFactory factory) { - return factory.erase(p); - } - public static @NotNull Promise erase(@NotNull Promise p) { return erase(p, p.getFactory()); } + public static @NotNull Promise erase(@NotNull Promise p, PromiseFactory factory) { + return p.erase(); + } + public static @NotNull Promise wrap(@NotNull CompletableFuture future, PromiseFactory factory) { return factory.wrap(future); } diff --git a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java index b403a7c..f635984 100644 --- a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java +++ b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java @@ -9,19 +9,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import java.util.List; +import java.util.Map; import java.util.Objects; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; public final class PromiseTests { private final Logger logger = LoggerFactory.getLogger(PromiseTests.class); - private final PromiseExecutor> executor = SinglePoolExecutor.create(1); + private final PromiseExecutor> executor = SinglePoolExecutor.create(5); private final PromiseFactory pfac = new SimplePromiseFactory<>(executor, logger); @Test - void testMono() { + public void testMono() { Exception value = new Exception("Test Error"); var error = pfac.wrap(Mono.error(value)); @@ -34,15 +37,118 @@ public final class PromiseTests { } @Test - void testErrorCancellation() throws InterruptedException { - var finish = new AtomicBoolean(); + public void testErrorCancellation() throws InterruptedException { + var finished = new AtomicBoolean(); pfac.start() - .thenRunDelayedAsync(() -> finish.set(true), 50, TimeUnit.MILLISECONDS) + .thenRunDelayedAsync(() -> finished.set(true), 50, TimeUnit.MILLISECONDS) .thenRunAsync(() -> {}) .cancel(); Thread.sleep(100L); - assert !finish.get(); + assert !finished.get(); + } + + @Test + public void testCombineUtil() throws TimeoutException { + pfac.all( + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + .join(100L); + + pfac.allSettled( + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + .join(100L); + + pfac.combine( + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + .join(100L); + + pfac.combine( + List.of( + pfac.start().thenRunDelayedAsync(() -> {}, 49, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 51, TimeUnit.MILLISECONDS) + ) + ) + .join(100L); + + pfac.combine( + Map.of( + "a", pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + "b", pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + ) + .join(100L); + } + + @Test + public void testCombineUtilPropagation() throws InterruptedException { + var finished1 = new AtomicBoolean(); + pfac.all( + true, + pfac.start().thenRunDelayedAsync(() -> finished1.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished1.set(true), 50, TimeUnit.MILLISECONDS) + ) + .cancel(); + + var finished2 = new AtomicBoolean(); + pfac.allSettled( + true, + pfac.start().thenRunDelayedAsync(() -> finished2.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished2.set(true), 50, TimeUnit.MILLISECONDS) + ) + .cancel(); + + var finished3 = new AtomicBoolean(); + pfac.combine( + true, + pfac.start().thenRunDelayedAsync(() -> finished3.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished3.set(true), 50, TimeUnit.MILLISECONDS) + ) + .cancel(); + + var finished4 = new AtomicBoolean(); + pfac.combine( + true, + List.of( + pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS) + ) + ) + .cancel(); + + var finished5 = new AtomicBoolean(); + pfac.combine( + true, + Map.of( + "a", pfac.start().thenRunDelayedAsync(() -> finished5.set(true), 50, TimeUnit.MILLISECONDS), + "b", pfac.start().thenRunDelayedAsync(() -> finished5.set(true), 50, TimeUnit.MILLISECONDS) + ) + ) + .cancel(); + + Thread.sleep(100L); + assert !finished1.get(); + assert !finished2.get(); + assert !finished3.get(); + assert !finished4.get(); + assert !finished5.get(); + } + + @Test + public void testRace() throws TimeoutException { + assert pfac.race( + List.of( + pfac.start().thenSupplyDelayedAsync(() -> true, 50, TimeUnit.MILLISECONDS), + pfac.start().thenSupplyDelayedAsync(() -> false, 150, TimeUnit.MILLISECONDS) + ) + ).join(100L); } } diff --git a/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java b/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java index 3c86ffc..fd13040 100644 --- a/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java +++ b/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java @@ -12,7 +12,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; -import java.util.function.Consumer; public final class PromiseUtil { @@ -30,42 +29,86 @@ public final class PromiseUtil { return pfac.unresolved(); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2) { + return pfac.combine(propagateCancel, p1, p2); + } + public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { return pfac.combine(p1, p2); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { + return pfac.combine(propagateCancel, promises, exceptionHandler); + } + public static @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { return pfac.combine(promises, exceptionHandler); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises) { + return pfac.combine(propagateCancel, promises); + } + public static @NotNull Promise> combine(@NotNull Map> promises) { return pfac.combine(promises); } - public static @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable Consumer exceptionHandler) { + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { + return pfac.combine(propagateCancel, promises, exceptionHandler); + } + + public static @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { return pfac.combine(promises, exceptionHandler); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises) { + return pfac.combine(propagateCancel, promises); + } + public static @NotNull Promise> combine(@NotNull Iterable> promises) { return pfac.combine(promises); } + public static @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Iterable> promiseIterable) { + return pfac.allSettled(propagateCancel, promiseIterable); + } + public static @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { return pfac.allSettled(promiseIterable); } + public static @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Promise... promiseArray) { + return pfac.allSettled(propagateCancel, promiseArray); + } + public static @NotNull Promise>> allSettled(@NotNull Promise... promiseArray) { return pfac.allSettled(promiseArray); } + public static @NotNull Promise all(boolean propagateCancel, @NotNull Iterable> promiseIterable) { + return pfac.all(propagateCancel, promiseIterable); + } + public static @NotNull Promise all(@NotNull Iterable> promiseIterable) { return pfac.all(promiseIterable); } + public static @NotNull Promise all(boolean propagateCancel, @NotNull Promise... promiseArray) { + return pfac.all(propagateCancel, promiseArray); + } + public static @NotNull Promise all(@NotNull Promise... promiseArray) { return pfac.all(promiseArray); } + public static @NotNull Promise race(@NotNull Iterable> promises) { + return pfac.race(promises); + } + + public static @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises) { + return pfac.race(cancelRaceLosers, promises); + } + public static @NotNull Promise wrap(@NotNull CompletableFuture future) { return pfac.wrap(future); } @@ -78,16 +121,12 @@ public final class PromiseUtil { return pfac.resolve(value); } - public static @NotNull Promise error(@NotNull Throwable error) { - return pfac.error(error); - } - - public static @NotNull Promise erase(@NotNull Promise p) { - return pfac.erase(p); - } - public static @NotNull Promise start() { return pfac.start(); } + public static @NotNull Promise error(@NotNull Throwable error) { + return pfac.error(error); + } + } diff --git a/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java b/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java index e61908f..0d9aebf 100644 --- a/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java +++ b/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java @@ -9,9 +9,9 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Future; -public final class StaticPromiseFactory extends AbstractPromiseFactory> { +public final class StaticPromiseFactory extends AbstractPromiseFactory> { public final static StaticPromiseFactory INSTANCE = new StaticPromiseFactory(); private final static @NotNull SinglePoolExecutor EXECUTOR = SinglePoolExecutor.create(1); @@ -21,18 +21,18 @@ public final class StaticPromiseFactory extends AbstractPromiseFactory Promise unresolved() { - return new SimplePromise<>(this); - } - @Override public @NotNull Logger getLogger() { return LOGGER; } @Override - public @NotNull PromiseExecutor> getExecutor() { + public @NotNull Promise unresolved() { + return new SimplePromise<>(this); + } + + @Override + public @NotNull PromiseExecutor> getExecutor() { return EXECUTOR; } From c2e4e8c5220a9a1ce052bc862654aacb658ff0af Mon Sep 17 00:00:00 2001 From: WhatCats Date: Sun, 7 Apr 2024 11:57:53 +0200 Subject: [PATCH 2/5] direct listeners concept --- build.gradle | 1 + .../futur/promise/AbstractPromise.java | 129 ++++++++++-------- .../futur/promise/AbstractPromiseFactory.java | 30 ++-- .../futur/promise/AsyncPromiseListener.java | 5 + .../dev/tommyjs/futur/promise/Promise.java | 37 ++++- .../futur/promise/PromiseCompletion.java | 12 -- .../tommyjs/futur/promise/PromiseFactory.java | 52 +++---- .../java/dev/tommyjs/futur/PromiseTests.java | 4 +- .../dev/tommyjs/futur/lazy/PromiseUtil.java | 11 +- 9 files changed, 169 insertions(+), 112 deletions(-) create mode 100644 futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java diff --git a/build.gradle b/build.gradle index 21da84c..327d71d 100644 --- a/build.gradle +++ b/build.gradle @@ -33,6 +33,7 @@ subprojects { implementation 'org.jetbrains:annotations:24.1.0' implementation 'org.slf4j:slf4j-api:2.0.12' compileOnly 'io.projectreactor:reactor-core:3.6.4' + compileOnly 'org.redisson:redisson:3.2.0' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testImplementation 'io.projectreactor:reactor-core:3.6.4' diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 91f692b..0cd56e8 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -19,16 +19,16 @@ import java.util.function.Consumer; public abstract class AbstractPromise implements Promise { - private final Collection> listeners; + private final AtomicReference>> listeners; private final AtomicReference> completion; public AbstractPromise() { - this.listeners = new ConcurrentLinkedQueue<>(); + this.listeners = new AtomicReference<>(); this.completion = new AtomicReference<>(); } protected static void propagateResult(Promise from, Promise to) { - from.addListener(to::complete, to::completeExceptionally); + from.addDirectListener(to::complete, to::completeExceptionally); } protected static void propagateCancel(Promise from, Promise to) { @@ -136,7 +136,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( res -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runSync(runnable); @@ -152,7 +152,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( res -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runSync(runnable, delay, unit); @@ -168,7 +168,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { Promise promise = getFactory().unresolved(); - thenApplySync(task).addListener( + thenApplySync(task).addDirectListener( nestedPromise -> { propagateResult(nestedPromise, promise); propagateCancel(promise, nestedPromise); @@ -233,7 +233,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( (res) -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runAsync(runnable); @@ -249,7 +249,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( res -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runAsync(runnable, delay, unit); @@ -265,7 +265,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { Promise promise = getFactory().unresolved(); - thenApplyAsync(task).addListener( + thenApplyAsync(task).addDirectListener( nestedPromise -> { propagateResult(nestedPromise, promise); propagateCancel(promise, nestedPromise); @@ -282,35 +282,14 @@ public abstract class AbstractPromise implements Promise { return thenSupplyAsync(() -> null); } - @Override - public @NotNull Promise logExceptions(@NotNull String message) { - return onError(e -> getLogger().error(message, e)); + public @NotNull Promise addAsyncListener(@NotNull AsyncPromiseListener listener) { + return addAnyListener(listener); } @Override - public @NotNull Promise addListener(@NotNull PromiseListener listener) { - synchronized (completion) { - if (isCompleted()) { - getExecutor().runAsync(() -> { - try { - //noinspection ConstantConditions - listener.handle(getCompletion()); - } catch (Exception e) { - getLogger().error("Exception caught in promise listener", e); - } - }); - } else { - getListeners().add(listener); - } - } - - return this; - } - - @Override - public @NotNull Promise addListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { - return addListener((res) -> { + public @NotNull Promise addAsyncListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { + return addAsyncListener((res) -> { if (res.isError()) { if (errorListener != null) errorListener.accept(res.getException()); } else { @@ -319,14 +298,65 @@ public abstract class AbstractPromise implements Promise { }); } + @Override + public @NotNull Promise addDirectListener(@NotNull PromiseListener listener) { + return addAnyListener(listener); + } + + @Override + public @NotNull Promise addDirectListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { + return addDirectListener((res) -> { + if (res.isError()) { + if (errorListener != null) errorListener.accept(res.getException()); + } else { + if (successListener != null) successListener.accept(res.getResult()); + } + }); + } + + private @NotNull Promise addAnyListener(PromiseListener listener) { + synchronized (completion) { + PromiseCompletion completion = getCompletion(); + if (completion != null) { + callListener(listener, completion); + } else { + listeners.compareAndSet(null, new ConcurrentLinkedQueue<>()); + listeners.get().add(listener); + } + } + + return this; + } + + private void callListener(PromiseListener listener, PromiseCompletion ctx) { + if (listener instanceof AsyncPromiseListener) { + getExecutor().runAsync(() -> callListenerNow(listener, ctx)); + } else { + callListenerNow(listener, ctx); + } + } + + private void callListenerNow(PromiseListener listener, PromiseCompletion ctx) { + try { + listener.handle(ctx); + } catch (Exception e) { + getLogger().error("Exception caught in promise listener", e); + } + } + @Override public @NotNull Promise onSuccess(@NotNull Consumer listener) { - return addListener(listener, null); + return addAsyncListener(listener, null); } @Override public @NotNull Promise onError(@NotNull Consumer listener) { - return addListener(null, listener); + return addAsyncListener(null, listener); + } + + @Override + public @NotNull Promise logExceptions(@NotNull String message) { + return onError(e -> getLogger().error(message, e)); } @Override @@ -353,7 +383,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit) { F future = getExecutor().runAsync(() -> completeExceptionally(new TimeoutException("Promise stopped waiting after " + time + " " + unit)), time, unit); - return onError(e -> getExecutor().cancel(future)); + return addListener((_v) -> getExecutor().cancel(future)); } private void handleCompletion(@NotNull PromiseCompletion ctx) { @@ -361,17 +391,13 @@ public abstract class AbstractPromise implements Promise { if (!setCompletion(ctx)) return; completion.notifyAll(); - getExecutor().runAsync(() -> { - for (PromiseListener listener : getListeners()) { - if (!ctx.isActive()) return; - try { - listener.handle(ctx); - } catch (Exception e) { - getLogger().error("Exception caught in promise listener", e); - } + Collection> listeners = this.listeners.get(); + if (listeners != null) { + for (PromiseListener listener : listeners) { + callListener(listener, ctx); } - }); + } } } @@ -380,12 +406,7 @@ public abstract class AbstractPromise implements Promise { } @Override - public void cancel() { - completeExceptionally(new CancellationException()); - } - - @Override - public void cancel(@NotNull String message) { + public void cancel(@Nullable String message) { completeExceptionally(new CancellationException(message)); } @@ -409,8 +430,4 @@ public abstract class AbstractPromise implements Promise { return completion.get(); } - private Collection> getListeners() { - return listeners; - } - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java index 4f31ec9..69179ed 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java @@ -3,10 +3,13 @@ package dev.tommyjs.futur.promise; import dev.tommyjs.futur.executor.PromiseExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.redisson.api.RFuture; import reactor.core.publisher.Mono; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -37,7 +40,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { AbstractPromise.propagateCancel(promise, entry.getValue()); } - entry.getValue().addListener((ctx) -> { + entry.getValue().addDirectListener((ctx) -> { synchronized (map) { if (ctx.getException() != null) { if (exceptionHandler == null) { @@ -95,7 +98,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { AbstractPromise.propagateCancel(promise, p); } - p.addListener((res) -> { + p.addDirectListener((res) -> { synchronized (results) { results[index] = res; if (Arrays.stream(results).allMatch(Objects::nonNull)) @@ -121,7 +124,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { AbstractPromise.propagateCancel(promise, p); } - p.addListener((res) -> { + p.addDirectListener((res) -> { if (res.getException() != null) { promise.completeExceptionally(res.getException()); } else if (completed.incrementAndGet() == promises.size()) { @@ -145,11 +148,25 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } + @Override + public @NotNull Promise wrapMono(@NotNull Mono mono) { + return wrap(mono.toFuture()); + } + @Override public @NotNull Promise wrap(@NotNull CompletableFuture future) { + return wrap(future, future); + } + + @Override + public @NotNull Promise wrapRedisson(@NotNull RFuture future) { + return wrap(future, future); + } + + private @NotNull Promise wrap(@NotNull CompletionStage completion, Future future) { Promise promise = unresolved(); - future.whenComplete((v, e) -> { + completion.whenComplete((v, e) -> { if (e != null) { promise.completeExceptionally(e); } else { @@ -161,11 +178,6 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } - @Override - public @NotNull Promise wrap(@NotNull Mono mono) { - return wrap(mono.toFuture()); - } - @Override public @NotNull Promise resolve(T value) { Promise promise = unresolved(); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java new file mode 100644 index 0000000..7e0343b --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java @@ -0,0 +1,5 @@ +package dev.tommyjs.futur.promise; + +public interface AsyncPromiseListener extends PromiseListener { + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index a8281ae..9efcbf8 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -63,9 +63,26 @@ public interface Promise { @NotNull Promise logExceptions(@NotNull String message); - @NotNull Promise addListener(@NotNull PromiseListener listener); + /** + * @apiNote Direct listeners run on the same thread as the completion. + */ + @NotNull Promise addDirectListener(@NotNull PromiseListener listener); - @NotNull Promise addListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); + @NotNull Promise addDirectListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); + + /** + * @apiNote Async listeners are run in parallel. + */ + @NotNull Promise addAsyncListener(@NotNull AsyncPromiseListener listener); + + /** + * @apiNote Same as addAsyncListener. + */ + default @NotNull Promise addListener(@NotNull AsyncPromiseListener listener) { + return addAsyncListener(listener); + } + + @NotNull Promise addAsyncListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); @NotNull Promise onSuccess(@NotNull Consumer listener); @@ -75,26 +92,32 @@ public interface Promise { @NotNull Promise onCancel(@NotNull Consumer listener); + /** + * @deprecated Use maxWaitTime instead + */ + @Deprecated + @NotNull Promise timeout(long time, @NotNull TimeUnit unit); + + /** + * @deprecated Use maxWaitTime instead + */ @Deprecated default @NotNull Promise timeout(long ms) { return timeout(ms, TimeUnit.MILLISECONDS); } - @Deprecated - @NotNull Promise timeout(long time, @NotNull TimeUnit unit); + @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit); default @NotNull Promise maxWaitTime(long ms) { return maxWaitTime(ms, TimeUnit.MILLISECONDS); } - @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit); + void cancel(@Nullable String reason); default void cancel() { cancel(null); } - void cancel(@Nullable String reason); - void complete(@Nullable T result); void completeExceptionally(@NotNull Throwable result); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java index 4ab10c9..de7e276 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java @@ -9,29 +9,17 @@ public class PromiseCompletion { private @Nullable T result; private @Nullable Throwable exception; - private boolean active; public PromiseCompletion(@Nullable T result) { this.result = result; - this.active = true; } public PromiseCompletion(@NotNull Throwable exception) { this.exception = exception; - this.active = true; } public PromiseCompletion() { this.result = null; - this.active = true; - } - - public void markHandled() { - this.active = false; - } - - public boolean isActive() { - return active; } public boolean isError() { diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java index f0257e9..bac92b7 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -2,6 +2,7 @@ package dev.tommyjs.futur.promise; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.redisson.api.RFuture; import org.slf4j.Logger; import reactor.core.publisher.Mono; @@ -17,78 +18,83 @@ public interface PromiseFactory { @NotNull Promise unresolved(); + @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2); + default @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { return combine(false, p1, p2); } - @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2); + @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler); default @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises) { return combine(propagateCancel, promises, null); } - @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler); - - default @NotNull Promise> combine(@NotNull Map> promises) { - return combine(promises, null); - } - default @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { return combine(false, promises, exceptionHandler); } - default @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises) { - return combine(propagateCancel, promises, null); + default @NotNull Promise> combine(@NotNull Map> promises) { + return combine(promises, null); } @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler); - default @NotNull Promise> combine(@NotNull Iterable> promises) { - return combine(promises, null); + default @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises) { + return combine(propagateCancel, promises, null); } default @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { return combine(false, promises, exceptionHandler); } - default @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { - return allSettled(false, promiseIterable); + default @NotNull Promise> combine(@NotNull Iterable> promises) { + return combine(promises, null); } @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Iterable> promiseIterable); - default @NotNull Promise>> allSettled(@NotNull Promise... promiseArray) { - return allSettled(false, promiseArray); + default @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { + return allSettled(false, promiseIterable); } default @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Promise... promiseArray) { return allSettled(propagateCancel, Arrays.asList(promiseArray)); } - default @NotNull Promise all(@NotNull Iterable> promiseIterable) { - return all(false, promiseIterable); + default @NotNull Promise>> allSettled(@NotNull Promise... promiseArray) { + return allSettled(false, promiseArray); } @NotNull Promise all(boolean propagateCancel, @NotNull Iterable> promiseIterable); - default @NotNull Promise all(@NotNull Promise... promiseArray) { - return all(false, promiseArray); + default @NotNull Promise all(@NotNull Iterable> promiseIterable) { + return all(false, promiseIterable); } default @NotNull Promise all(boolean propagateCancel, @NotNull Promise... promiseArray) { return all(propagateCancel, Arrays.asList(promiseArray)); } + default @NotNull Promise all(@NotNull Promise... promiseArray) { + return all(false, promiseArray); + } + + /** + * @apiNote Even with cancelRaceLosers, it is not guaranteed that only one promise will complete. + */ + @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises); + default @NotNull Promise race(@NotNull Iterable> promises) { return race(false, promises); } - @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises); + @NotNull Promise wrapMono(@NotNull Mono mono); + + @NotNull Promise wrapRedisson(@NotNull RFuture future); @NotNull Promise wrap(@NotNull CompletableFuture future); - @NotNull Promise wrap(@NotNull Mono mono); - default @NotNull Promise start() { return resolve(null); } @@ -96,5 +102,5 @@ public interface PromiseFactory { @NotNull Promise resolve(T value); @NotNull Promise error(@NotNull Throwable error); - + } diff --git a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java index f635984..c489766 100644 --- a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java +++ b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java @@ -27,11 +27,11 @@ public final class PromiseTests { public void testMono() { Exception value = new Exception("Test Error"); - var error = pfac.wrap(Mono.error(value)); + var error = pfac.wrapMono(Mono.error(value)); assert Objects.requireNonNull(error.getCompletion()).isError(); assert error.getCompletion().getException() == value; - var resolved = pfac.wrap(Mono.just(value)); + var resolved = pfac.wrapMono(Mono.just(value)); assert !Objects.requireNonNull(resolved.getCompletion()).isError(); assert resolved.getCompletion().getResult() == value; } diff --git a/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java b/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java index fd13040..72a4257 100644 --- a/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java +++ b/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java @@ -5,6 +5,7 @@ import dev.tommyjs.futur.promise.PromiseCompletion; import dev.tommyjs.futur.promise.PromiseFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.redisson.api.RFuture; import org.slf4j.Logger; import reactor.core.publisher.Mono; @@ -109,12 +110,16 @@ public final class PromiseUtil { return pfac.race(cancelRaceLosers, promises); } + public static @NotNull Promise wrapMono(@NotNull Mono mono) { + return pfac.wrapMono(mono); + } + public static @NotNull Promise wrap(@NotNull CompletableFuture future) { return pfac.wrap(future); } - public static @NotNull Promise wrap(@NotNull Mono mono) { - return pfac.wrap(mono); + public static @NotNull Promise wrapRedisson(@NotNull RFuture future) { + return pfac.wrapRedisson(future); } public static @NotNull Promise resolve(T value) { @@ -128,5 +133,5 @@ public final class PromiseUtil { public static @NotNull Promise error(@NotNull Throwable error) { return pfac.error(error); } - + } From 54d7b02675a4b235083b8099006547745a53e8b8 Mon Sep 17 00:00:00 2001 From: WhatCats Date: Sun, 7 Apr 2024 14:30:42 +0200 Subject: [PATCH 3/5] handle null in compose methods --- .../tommyjs/futur/promise/AbstractPromise.java | 18 +++++++++++++----- .../dev/tommyjs/futur/promise/Promise.java | 2 +- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 0cd56e8..1e9470d 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -166,12 +166,16 @@ public abstract class AbstractPromise implements Promise { } @Override - public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { + public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { Promise promise = getFactory().unresolved(); thenApplySync(task).addDirectListener( nestedPromise -> { - propagateResult(nestedPromise, promise); - propagateCancel(promise, nestedPromise); + if (nestedPromise == null) { + promise.complete(null); + } else { + propagateResult(nestedPromise, promise); + propagateCancel(promise, nestedPromise); + } }, promise::completeExceptionally ); @@ -267,8 +271,12 @@ public abstract class AbstractPromise implements Promise { Promise promise = getFactory().unresolved(); thenApplyAsync(task).addDirectListener( nestedPromise -> { - propagateResult(nestedPromise, promise); - propagateCancel(promise, nestedPromise); + if (nestedPromise == null) { + promise.complete(null); + } else { + propagateResult(nestedPromise, promise); + propagateCancel(promise, nestedPromise); + } }, promise::completeExceptionally ); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index 9efcbf8..b9bc09d 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -33,7 +33,7 @@ public interface Promise { @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); - @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task); + @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task); @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task); From e8512df5042c29ecfaa36555858eb8369429fd5c Mon Sep 17 00:00:00 2001 From: WhatCats Date: Mon, 8 Apr 2024 10:05:05 +0200 Subject: [PATCH 4/5] add toFuture method --- .../tommyjs/futur/promise/AbstractPromise.java | 17 +++++++++++++---- .../java/dev/tommyjs/futur/promise/Promise.java | 3 +++ .../java/dev/tommyjs/futur/PromiseTests.java | 15 +++++++++++++++ 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 1e9470d..05b611e 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -10,10 +10,7 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import java.util.Collection; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -438,4 +435,16 @@ public abstract class AbstractPromise implements Promise { return completion.get(); } + @Override + public @NotNull CompletableFuture toFuture() { + CompletableFuture future = new CompletableFuture<>(); + this.addDirectListener(future::complete, future::completeExceptionally); + future.whenComplete((res, e) -> { + if (e instanceof CancellationException) { + this.cancel(); + } + }); + return future; + } + } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index b9bc09d..456ee84 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -8,6 +8,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -128,4 +129,6 @@ public interface Promise { boolean isCompleted(); + @NotNull CompletableFuture toFuture(); + } diff --git a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java index c489766..790cf7a 100644 --- a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java +++ b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java @@ -48,6 +48,21 @@ public final class PromiseTests { assert !finished.get(); } + @Test + public void testToFuture() throws InterruptedException { + assert pfac.resolve(true).toFuture().getNow(false); + assert pfac.error(new Exception("Test")).toFuture().isCompletedExceptionally(); + + var finished = new AtomicBoolean(); + pfac.start() + .thenRunDelayedAsync(() -> finished.set(true), 50, TimeUnit.MILLISECONDS) + .toFuture() + .cancel(true); + + Thread.sleep(100L); + assert !finished.get(); + } + @Test public void testCombineUtil() throws TimeoutException { pfac.all( From 29c614f5d73aa8d5cc0c11b7c269394d6ed6779e Mon Sep 17 00:00:00 2001 From: WhatCats Date: Tue, 9 Apr 2024 19:17:29 +0200 Subject: [PATCH 5/5] optimization for virtual threads --- build.gradle | 1 - .../futur/promise/AbstractPromise.java | 42 +++++++++---------- .../java/dev/tommyjs/futur/PromiseTests.java | 6 +-- 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/build.gradle b/build.gradle index 327d71d..4252922 100644 --- a/build.gradle +++ b/build.gradle @@ -38,7 +38,6 @@ subprojects { testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testImplementation 'io.projectreactor:reactor-core:3.6.4' testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' - testImplementation 'org.slf4j:slf4j-api:2.0.12' testImplementation 'ch.qos.logback:logback-classic:1.5.3' } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 05b611e..5fa166f 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -12,16 +12,21 @@ import org.slf4j.Logger; import java.util.Collection; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; public abstract class AbstractPromise implements Promise { private final AtomicReference>> listeners; private final AtomicReference> completion; + private final CountDownLatch latch; + private final ReentrantLock lock; public AbstractPromise() { this.listeners = new AtomicReference<>(); this.completion = new AtomicReference<>(); + this.latch = new CountDownLatch(1); + this.lock = new ReentrantLock(); } protected static void propagateResult(Promise from, Promise to) { @@ -57,24 +62,14 @@ public abstract class AbstractPromise implements Promise { @Override public T join(long timeoutMillis) throws TimeoutException { - PromiseCompletion completion; - long start = System.currentTimeMillis(); - long remainingTimeout = timeoutMillis; - - synchronized (this.completion) { - completion = this.completion.get(); - while (completion == null && remainingTimeout > 0) { - try { - this.completion.wait(remainingTimeout); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - completion = this.completion.get(); - remainingTimeout = timeoutMillis - (System.currentTimeMillis() - start); - } + try { + //noinspection ResultOfMethodCallIgnored + this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + PromiseCompletion completion = getCompletion(); if (completion == null) throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms"); @@ -320,7 +315,8 @@ public abstract class AbstractPromise implements Promise { } private @NotNull Promise addAnyListener(PromiseListener listener) { - synchronized (completion) { + lock.lock(); + try { PromiseCompletion completion = getCompletion(); if (completion != null) { callListener(listener, completion); @@ -328,6 +324,8 @@ public abstract class AbstractPromise implements Promise { listeners.compareAndSet(null, new ConcurrentLinkedQueue<>()); listeners.get().add(listener); } + } finally { + lock.unlock(); } return this; @@ -392,17 +390,19 @@ public abstract class AbstractPromise implements Promise { } private void handleCompletion(@NotNull PromiseCompletion ctx) { - synchronized (completion) { - if (!setCompletion(ctx)) return; - - completion.notifyAll(); + if (!setCompletion(ctx)) return; + lock.lock(); + try { + this.latch.countDown(); Collection> listeners = this.listeners.get(); if (listeners != null) { for (PromiseListener listener : listeners) { callListener(listener, ctx); } } + } finally { + lock.unlock(); } } diff --git a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java index 790cf7a..ffe6f01 100644 --- a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java +++ b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java @@ -160,10 +160,10 @@ public final class PromiseTests { public void testRace() throws TimeoutException { assert pfac.race( List.of( - pfac.start().thenSupplyDelayedAsync(() -> true, 50, TimeUnit.MILLISECONDS), - pfac.start().thenSupplyDelayedAsync(() -> false, 150, TimeUnit.MILLISECONDS) + pfac.start().thenSupplyDelayedAsync(() -> true, 150, TimeUnit.MILLISECONDS), + pfac.start().thenSupplyDelayedAsync(() -> false, 200, TimeUnit.MILLISECONDS) ) - ).join(100L); + ).join(300L); } }