From c2e4e8c5220a9a1ce052bc862654aacb658ff0af Mon Sep 17 00:00:00 2001 From: WhatCats Date: Sun, 7 Apr 2024 11:57:53 +0200 Subject: [PATCH] direct listeners concept --- build.gradle | 1 + .../futur/promise/AbstractPromise.java | 129 ++++++++++-------- .../futur/promise/AbstractPromiseFactory.java | 30 ++-- .../futur/promise/AsyncPromiseListener.java | 5 + .../dev/tommyjs/futur/promise/Promise.java | 37 ++++- .../futur/promise/PromiseCompletion.java | 12 -- .../tommyjs/futur/promise/PromiseFactory.java | 52 +++---- .../java/dev/tommyjs/futur/PromiseTests.java | 4 +- .../dev/tommyjs/futur/lazy/PromiseUtil.java | 11 +- 9 files changed, 169 insertions(+), 112 deletions(-) create mode 100644 futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java diff --git a/build.gradle b/build.gradle index 21da84c..327d71d 100644 --- a/build.gradle +++ b/build.gradle @@ -33,6 +33,7 @@ subprojects { implementation 'org.jetbrains:annotations:24.1.0' implementation 'org.slf4j:slf4j-api:2.0.12' compileOnly 'io.projectreactor:reactor-core:3.6.4' + compileOnly 'org.redisson:redisson:3.2.0' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testImplementation 'io.projectreactor:reactor-core:3.6.4' diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 91f692b..0cd56e8 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -19,16 +19,16 @@ import java.util.function.Consumer; public abstract class AbstractPromise implements Promise { - private final Collection> listeners; + private final AtomicReference>> listeners; private final AtomicReference> completion; public AbstractPromise() { - this.listeners = new ConcurrentLinkedQueue<>(); + this.listeners = new AtomicReference<>(); this.completion = new AtomicReference<>(); } protected static void propagateResult(Promise from, Promise to) { - from.addListener(to::complete, to::completeExceptionally); + from.addDirectListener(to::complete, to::completeExceptionally); } protected static void propagateCancel(Promise from, Promise to) { @@ -136,7 +136,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( res -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runSync(runnable); @@ -152,7 +152,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( res -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runSync(runnable, delay, unit); @@ -168,7 +168,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { Promise promise = getFactory().unresolved(); - thenApplySync(task).addListener( + thenApplySync(task).addDirectListener( nestedPromise -> { propagateResult(nestedPromise, promise); propagateCancel(promise, nestedPromise); @@ -233,7 +233,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( (res) -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runAsync(runnable); @@ -249,7 +249,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( res -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runAsync(runnable, delay, unit); @@ -265,7 +265,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { Promise promise = getFactory().unresolved(); - thenApplyAsync(task).addListener( + thenApplyAsync(task).addDirectListener( nestedPromise -> { propagateResult(nestedPromise, promise); propagateCancel(promise, nestedPromise); @@ -282,35 +282,14 @@ public abstract class AbstractPromise implements Promise { return thenSupplyAsync(() -> null); } - @Override - public @NotNull Promise logExceptions(@NotNull String message) { - return onError(e -> getLogger().error(message, e)); + public @NotNull Promise addAsyncListener(@NotNull AsyncPromiseListener listener) { + return addAnyListener(listener); } @Override - public @NotNull Promise addListener(@NotNull PromiseListener listener) { - synchronized (completion) { - if (isCompleted()) { - getExecutor().runAsync(() -> { - try { - //noinspection ConstantConditions - listener.handle(getCompletion()); - } catch (Exception e) { - getLogger().error("Exception caught in promise listener", e); - } - }); - } else { - getListeners().add(listener); - } - } - - return this; - } - - @Override - public @NotNull Promise addListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { - return addListener((res) -> { + public @NotNull Promise addAsyncListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { + return addAsyncListener((res) -> { if (res.isError()) { if (errorListener != null) errorListener.accept(res.getException()); } else { @@ -319,14 +298,65 @@ public abstract class AbstractPromise implements Promise { }); } + @Override + public @NotNull Promise addDirectListener(@NotNull PromiseListener listener) { + return addAnyListener(listener); + } + + @Override + public @NotNull Promise addDirectListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { + return addDirectListener((res) -> { + if (res.isError()) { + if (errorListener != null) errorListener.accept(res.getException()); + } else { + if (successListener != null) successListener.accept(res.getResult()); + } + }); + } + + private @NotNull Promise addAnyListener(PromiseListener listener) { + synchronized (completion) { + PromiseCompletion completion = getCompletion(); + if (completion != null) { + callListener(listener, completion); + } else { + listeners.compareAndSet(null, new ConcurrentLinkedQueue<>()); + listeners.get().add(listener); + } + } + + return this; + } + + private void callListener(PromiseListener listener, PromiseCompletion ctx) { + if (listener instanceof AsyncPromiseListener) { + getExecutor().runAsync(() -> callListenerNow(listener, ctx)); + } else { + callListenerNow(listener, ctx); + } + } + + private void callListenerNow(PromiseListener listener, PromiseCompletion ctx) { + try { + listener.handle(ctx); + } catch (Exception e) { + getLogger().error("Exception caught in promise listener", e); + } + } + @Override public @NotNull Promise onSuccess(@NotNull Consumer listener) { - return addListener(listener, null); + return addAsyncListener(listener, null); } @Override public @NotNull Promise onError(@NotNull Consumer listener) { - return addListener(null, listener); + return addAsyncListener(null, listener); + } + + @Override + public @NotNull Promise logExceptions(@NotNull String message) { + return onError(e -> getLogger().error(message, e)); } @Override @@ -353,7 +383,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit) { F future = getExecutor().runAsync(() -> completeExceptionally(new TimeoutException("Promise stopped waiting after " + time + " " + unit)), time, unit); - return onError(e -> getExecutor().cancel(future)); + return addListener((_v) -> getExecutor().cancel(future)); } private void handleCompletion(@NotNull PromiseCompletion ctx) { @@ -361,17 +391,13 @@ public abstract class AbstractPromise implements Promise { if (!setCompletion(ctx)) return; completion.notifyAll(); - getExecutor().runAsync(() -> { - for (PromiseListener listener : getListeners()) { - if (!ctx.isActive()) return; - try { - listener.handle(ctx); - } catch (Exception e) { - getLogger().error("Exception caught in promise listener", e); - } + Collection> listeners = this.listeners.get(); + if (listeners != null) { + for (PromiseListener listener : listeners) { + callListener(listener, ctx); } - }); + } } } @@ -380,12 +406,7 @@ public abstract class AbstractPromise implements Promise { } @Override - public void cancel() { - completeExceptionally(new CancellationException()); - } - - @Override - public void cancel(@NotNull String message) { + public void cancel(@Nullable String message) { completeExceptionally(new CancellationException(message)); } @@ -409,8 +430,4 @@ public abstract class AbstractPromise implements Promise { return completion.get(); } - private Collection> getListeners() { - return listeners; - } - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java index 4f31ec9..69179ed 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java @@ -3,10 +3,13 @@ package dev.tommyjs.futur.promise; import dev.tommyjs.futur.executor.PromiseExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.redisson.api.RFuture; import reactor.core.publisher.Mono; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -37,7 +40,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { AbstractPromise.propagateCancel(promise, entry.getValue()); } - entry.getValue().addListener((ctx) -> { + entry.getValue().addDirectListener((ctx) -> { synchronized (map) { if (ctx.getException() != null) { if (exceptionHandler == null) { @@ -95,7 +98,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { AbstractPromise.propagateCancel(promise, p); } - p.addListener((res) -> { + p.addDirectListener((res) -> { synchronized (results) { results[index] = res; if (Arrays.stream(results).allMatch(Objects::nonNull)) @@ -121,7 +124,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { AbstractPromise.propagateCancel(promise, p); } - p.addListener((res) -> { + p.addDirectListener((res) -> { if (res.getException() != null) { promise.completeExceptionally(res.getException()); } else if (completed.incrementAndGet() == promises.size()) { @@ -145,11 +148,25 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } + @Override + public @NotNull Promise wrapMono(@NotNull Mono mono) { + return wrap(mono.toFuture()); + } + @Override public @NotNull Promise wrap(@NotNull CompletableFuture future) { + return wrap(future, future); + } + + @Override + public @NotNull Promise wrapRedisson(@NotNull RFuture future) { + return wrap(future, future); + } + + private @NotNull Promise wrap(@NotNull CompletionStage completion, Future future) { Promise promise = unresolved(); - future.whenComplete((v, e) -> { + completion.whenComplete((v, e) -> { if (e != null) { promise.completeExceptionally(e); } else { @@ -161,11 +178,6 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } - @Override - public @NotNull Promise wrap(@NotNull Mono mono) { - return wrap(mono.toFuture()); - } - @Override public @NotNull Promise resolve(T value) { Promise promise = unresolved(); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java new file mode 100644 index 0000000..7e0343b --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java @@ -0,0 +1,5 @@ +package dev.tommyjs.futur.promise; + +public interface AsyncPromiseListener extends PromiseListener { + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index a8281ae..9efcbf8 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -63,9 +63,26 @@ public interface Promise { @NotNull Promise logExceptions(@NotNull String message); - @NotNull Promise addListener(@NotNull PromiseListener listener); + /** + * @apiNote Direct listeners run on the same thread as the completion. + */ + @NotNull Promise addDirectListener(@NotNull PromiseListener listener); - @NotNull Promise addListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); + @NotNull Promise addDirectListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); + + /** + * @apiNote Async listeners are run in parallel. + */ + @NotNull Promise addAsyncListener(@NotNull AsyncPromiseListener listener); + + /** + * @apiNote Same as addAsyncListener. + */ + default @NotNull Promise addListener(@NotNull AsyncPromiseListener listener) { + return addAsyncListener(listener); + } + + @NotNull Promise addAsyncListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); @NotNull Promise onSuccess(@NotNull Consumer listener); @@ -75,26 +92,32 @@ public interface Promise { @NotNull Promise onCancel(@NotNull Consumer listener); + /** + * @deprecated Use maxWaitTime instead + */ + @Deprecated + @NotNull Promise timeout(long time, @NotNull TimeUnit unit); + + /** + * @deprecated Use maxWaitTime instead + */ @Deprecated default @NotNull Promise timeout(long ms) { return timeout(ms, TimeUnit.MILLISECONDS); } - @Deprecated - @NotNull Promise timeout(long time, @NotNull TimeUnit unit); + @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit); default @NotNull Promise maxWaitTime(long ms) { return maxWaitTime(ms, TimeUnit.MILLISECONDS); } - @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit); + void cancel(@Nullable String reason); default void cancel() { cancel(null); } - void cancel(@Nullable String reason); - void complete(@Nullable T result); void completeExceptionally(@NotNull Throwable result); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java index 4ab10c9..de7e276 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java @@ -9,29 +9,17 @@ public class PromiseCompletion { private @Nullable T result; private @Nullable Throwable exception; - private boolean active; public PromiseCompletion(@Nullable T result) { this.result = result; - this.active = true; } public PromiseCompletion(@NotNull Throwable exception) { this.exception = exception; - this.active = true; } public PromiseCompletion() { this.result = null; - this.active = true; - } - - public void markHandled() { - this.active = false; - } - - public boolean isActive() { - return active; } public boolean isError() { diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java index f0257e9..bac92b7 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -2,6 +2,7 @@ package dev.tommyjs.futur.promise; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.redisson.api.RFuture; import org.slf4j.Logger; import reactor.core.publisher.Mono; @@ -17,78 +18,83 @@ public interface PromiseFactory { @NotNull Promise unresolved(); + @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2); + default @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { return combine(false, p1, p2); } - @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2); + @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler); default @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises) { return combine(propagateCancel, promises, null); } - @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler); - - default @NotNull Promise> combine(@NotNull Map> promises) { - return combine(promises, null); - } - default @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { return combine(false, promises, exceptionHandler); } - default @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises) { - return combine(propagateCancel, promises, null); + default @NotNull Promise> combine(@NotNull Map> promises) { + return combine(promises, null); } @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler); - default @NotNull Promise> combine(@NotNull Iterable> promises) { - return combine(promises, null); + default @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises) { + return combine(propagateCancel, promises, null); } default @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { return combine(false, promises, exceptionHandler); } - default @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { - return allSettled(false, promiseIterable); + default @NotNull Promise> combine(@NotNull Iterable> promises) { + return combine(promises, null); } @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Iterable> promiseIterable); - default @NotNull Promise>> allSettled(@NotNull Promise... promiseArray) { - return allSettled(false, promiseArray); + default @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { + return allSettled(false, promiseIterable); } default @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Promise... promiseArray) { return allSettled(propagateCancel, Arrays.asList(promiseArray)); } - default @NotNull Promise all(@NotNull Iterable> promiseIterable) { - return all(false, promiseIterable); + default @NotNull Promise>> allSettled(@NotNull Promise... promiseArray) { + return allSettled(false, promiseArray); } @NotNull Promise all(boolean propagateCancel, @NotNull Iterable> promiseIterable); - default @NotNull Promise all(@NotNull Promise... promiseArray) { - return all(false, promiseArray); + default @NotNull Promise all(@NotNull Iterable> promiseIterable) { + return all(false, promiseIterable); } default @NotNull Promise all(boolean propagateCancel, @NotNull Promise... promiseArray) { return all(propagateCancel, Arrays.asList(promiseArray)); } + default @NotNull Promise all(@NotNull Promise... promiseArray) { + return all(false, promiseArray); + } + + /** + * @apiNote Even with cancelRaceLosers, it is not guaranteed that only one promise will complete. + */ + @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises); + default @NotNull Promise race(@NotNull Iterable> promises) { return race(false, promises); } - @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises); + @NotNull Promise wrapMono(@NotNull Mono mono); + + @NotNull Promise wrapRedisson(@NotNull RFuture future); @NotNull Promise wrap(@NotNull CompletableFuture future); - @NotNull Promise wrap(@NotNull Mono mono); - default @NotNull Promise start() { return resolve(null); } @@ -96,5 +102,5 @@ public interface PromiseFactory { @NotNull Promise resolve(T value); @NotNull Promise error(@NotNull Throwable error); - + } diff --git a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java index f635984..c489766 100644 --- a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java +++ b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java @@ -27,11 +27,11 @@ public final class PromiseTests { public void testMono() { Exception value = new Exception("Test Error"); - var error = pfac.wrap(Mono.error(value)); + var error = pfac.wrapMono(Mono.error(value)); assert Objects.requireNonNull(error.getCompletion()).isError(); assert error.getCompletion().getException() == value; - var resolved = pfac.wrap(Mono.just(value)); + var resolved = pfac.wrapMono(Mono.just(value)); assert !Objects.requireNonNull(resolved.getCompletion()).isError(); assert resolved.getCompletion().getResult() == value; } diff --git a/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java b/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java index fd13040..72a4257 100644 --- a/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java +++ b/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java @@ -5,6 +5,7 @@ import dev.tommyjs.futur.promise.PromiseCompletion; import dev.tommyjs.futur.promise.PromiseFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.redisson.api.RFuture; import org.slf4j.Logger; import reactor.core.publisher.Mono; @@ -109,12 +110,16 @@ public final class PromiseUtil { return pfac.race(cancelRaceLosers, promises); } + public static @NotNull Promise wrapMono(@NotNull Mono mono) { + return pfac.wrapMono(mono); + } + public static @NotNull Promise wrap(@NotNull CompletableFuture future) { return pfac.wrap(future); } - public static @NotNull Promise wrap(@NotNull Mono mono) { - return pfac.wrap(mono); + public static @NotNull Promise wrapRedisson(@NotNull RFuture future) { + return pfac.wrapRedisson(future); } public static @NotNull Promise resolve(T value) { @@ -128,5 +133,5 @@ public final class PromiseUtil { public static @NotNull Promise error(@NotNull Throwable error) { return pfac.error(error); } - + }