direct listeners concept

This commit is contained in:
WhatCats
2024-04-07 11:57:53 +02:00
parent 8ba023c04a
commit c2e4e8c522
9 changed files with 169 additions and 112 deletions

View File

@@ -33,6 +33,7 @@ subprojects {
implementation 'org.jetbrains:annotations:24.1.0'
implementation 'org.slf4j:slf4j-api:2.0.12'
compileOnly 'io.projectreactor:reactor-core:3.6.4'
compileOnly 'org.redisson:redisson:3.2.0'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testImplementation 'io.projectreactor:reactor-core:3.6.4'

View File

@@ -19,16 +19,16 @@ import java.util.function.Consumer;
public abstract class AbstractPromise<T, F> implements Promise<T> {
private final Collection<PromiseListener<T>> listeners;
private final AtomicReference<Collection<PromiseListener<T>>> listeners;
private final AtomicReference<PromiseCompletion<T>> completion;
public AbstractPromise() {
this.listeners = new ConcurrentLinkedQueue<>();
this.listeners = new AtomicReference<>();
this.completion = new AtomicReference<>();
}
protected static <V> void propagateResult(Promise<V> from, Promise<V> to) {
from.addListener(to::complete, to::completeExceptionally);
from.addDirectListener(to::complete, to::completeExceptionally);
}
protected static void propagateCancel(Promise<?> from, Promise<?> to) {
@@ -136,7 +136,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenApplySync(@NotNull ExceptionalFunction<T, V> task) {
Promise<V> promise = getFactory().unresolved();
addListener(
addDirectListener(
res -> {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runSync(runnable);
@@ -152,7 +152,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
Promise<V> promise = getFactory().unresolved();
addListener(
addDirectListener(
res -> {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runSync(runnable, delay, unit);
@@ -168,7 +168,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, @NotNull Promise<V>> task) {
Promise<V> promise = getFactory().unresolved();
thenApplySync(task).addListener(
thenApplySync(task).addDirectListener(
nestedPromise -> {
propagateResult(nestedPromise, promise);
propagateCancel(promise, nestedPromise);
@@ -233,7 +233,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task) {
Promise<V> promise = getFactory().unresolved();
addListener(
addDirectListener(
(res) -> {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runAsync(runnable);
@@ -249,7 +249,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
Promise<V> promise = getFactory().unresolved();
addListener(
addDirectListener(
res -> {
Runnable runnable = createRunnable(res, promise, task);
F future = getExecutor().runAsync(runnable, delay, unit);
@@ -265,7 +265,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public <V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
Promise<V> promise = getFactory().unresolved();
thenApplyAsync(task).addListener(
thenApplyAsync(task).addDirectListener(
nestedPromise -> {
propagateResult(nestedPromise, promise);
propagateCancel(promise, nestedPromise);
@@ -282,35 +282,14 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
return thenSupplyAsync(() -> null);
}
@Override
public @NotNull Promise<T> logExceptions(@NotNull String message) {
return onError(e -> getLogger().error(message, e));
public @NotNull Promise<T> addAsyncListener(@NotNull AsyncPromiseListener<T> listener) {
return addAnyListener(listener);
}
@Override
public @NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener) {
synchronized (completion) {
if (isCompleted()) {
getExecutor().runAsync(() -> {
try {
//noinspection ConstantConditions
listener.handle(getCompletion());
} catch (Exception e) {
getLogger().error("Exception caught in promise listener", e);
}
});
} else {
getListeners().add(listener);
}
}
return this;
}
@Override
public @NotNull Promise<T> addListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
return addListener((res) -> {
public @NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
return addAsyncListener((res) -> {
if (res.isError()) {
if (errorListener != null) errorListener.accept(res.getException());
} else {
@@ -319,14 +298,65 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
});
}
@Override
public @NotNull Promise<T> addDirectListener(@NotNull PromiseListener<T> listener) {
return addAnyListener(listener);
}
@Override
public @NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
return addDirectListener((res) -> {
if (res.isError()) {
if (errorListener != null) errorListener.accept(res.getException());
} else {
if (successListener != null) successListener.accept(res.getResult());
}
});
}
private @NotNull Promise<T> addAnyListener(PromiseListener<T> listener) {
synchronized (completion) {
PromiseCompletion<T> completion = getCompletion();
if (completion != null) {
callListener(listener, completion);
} else {
listeners.compareAndSet(null, new ConcurrentLinkedQueue<>());
listeners.get().add(listener);
}
}
return this;
}
private void callListener(PromiseListener<T> listener, PromiseCompletion<T> ctx) {
if (listener instanceof AsyncPromiseListener<T>) {
getExecutor().runAsync(() -> callListenerNow(listener, ctx));
} else {
callListenerNow(listener, ctx);
}
}
private void callListenerNow(PromiseListener<T> listener, PromiseCompletion<T> ctx) {
try {
listener.handle(ctx);
} catch (Exception e) {
getLogger().error("Exception caught in promise listener", e);
}
}
@Override
public @NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener) {
return addListener(listener, null);
return addAsyncListener(listener, null);
}
@Override
public @NotNull Promise<T> onError(@NotNull Consumer<Throwable> listener) {
return addListener(null, listener);
return addAsyncListener(null, listener);
}
@Override
public @NotNull Promise<T> logExceptions(@NotNull String message) {
return onError(e -> getLogger().error(message, e));
}
@Override
@@ -353,7 +383,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override
public @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit) {
F future = getExecutor().runAsync(() -> completeExceptionally(new TimeoutException("Promise stopped waiting after " + time + " " + unit)), time, unit);
return onError(e -> getExecutor().cancel(future));
return addListener((_v) -> getExecutor().cancel(future));
}
private void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
@@ -361,17 +391,13 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
if (!setCompletion(ctx)) return;
completion.notifyAll();
getExecutor().runAsync(() -> {
for (PromiseListener<T> listener : getListeners()) {
if (!ctx.isActive()) return;
try {
listener.handle(ctx);
} catch (Exception e) {
getLogger().error("Exception caught in promise listener", e);
}
Collection<PromiseListener<T>> listeners = this.listeners.get();
if (listeners != null) {
for (PromiseListener<T> listener : listeners) {
callListener(listener, ctx);
}
});
}
}
}
@@ -380,12 +406,7 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
}
@Override
public void cancel() {
completeExceptionally(new CancellationException());
}
@Override
public void cancel(@NotNull String message) {
public void cancel(@Nullable String message) {
completeExceptionally(new CancellationException(message));
}
@@ -409,8 +430,4 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
return completion.get();
}
private Collection<PromiseListener<T>> getListeners() {
return listeners;
}
}

View File

@@ -3,10 +3,13 @@ package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.redisson.api.RFuture;
import reactor.core.publisher.Mono;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -37,7 +40,7 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
AbstractPromise.propagateCancel(promise, entry.getValue());
}
entry.getValue().addListener((ctx) -> {
entry.getValue().addDirectListener((ctx) -> {
synchronized (map) {
if (ctx.getException() != null) {
if (exceptionHandler == null) {
@@ -95,7 +98,7 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
AbstractPromise.propagateCancel(promise, p);
}
p.addListener((res) -> {
p.addDirectListener((res) -> {
synchronized (results) {
results[index] = res;
if (Arrays.stream(results).allMatch(Objects::nonNull))
@@ -121,7 +124,7 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
AbstractPromise.propagateCancel(promise, p);
}
p.addListener((res) -> {
p.addDirectListener((res) -> {
if (res.getException() != null) {
promise.completeExceptionally(res.getException());
} else if (completed.incrementAndGet() == promises.size()) {
@@ -145,11 +148,25 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
return promise;
}
@Override
public <T> @NotNull Promise<T> wrapMono(@NotNull Mono<T> mono) {
return wrap(mono.toFuture());
}
@Override
public <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) {
return wrap(future, future);
}
@Override
public <T> @NotNull Promise<T> wrapRedisson(@NotNull RFuture<T> future) {
return wrap(future, future);
}
private <T> @NotNull Promise<T> wrap(@NotNull CompletionStage<T> completion, Future<T> future) {
Promise<T> promise = unresolved();
future.whenComplete((v, e) -> {
completion.whenComplete((v, e) -> {
if (e != null) {
promise.completeExceptionally(e);
} else {
@@ -161,11 +178,6 @@ public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
return promise;
}
@Override
public <T> @NotNull Promise<T> wrap(@NotNull Mono<T> mono) {
return wrap(mono.toFuture());
}
@Override
public <T> @NotNull Promise<T> resolve(T value) {
Promise<T> promise = unresolved();

View File

@@ -0,0 +1,5 @@
package dev.tommyjs.futur.promise;
public interface AsyncPromiseListener<T> extends PromiseListener<T> {
}

View File

@@ -63,9 +63,26 @@ public interface Promise<T> {
@NotNull Promise<T> logExceptions(@NotNull String message);
@NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener);
/**
* @apiNote Direct listeners run on the same thread as the completion.
*/
@NotNull Promise<T> addDirectListener(@NotNull PromiseListener<T> listener);
@NotNull Promise<T> addListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
@NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
/**
* @apiNote Async listeners are run in parallel.
*/
@NotNull Promise<T> addAsyncListener(@NotNull AsyncPromiseListener<T> listener);
/**
* @apiNote Same as addAsyncListener.
*/
default @NotNull Promise<T> addListener(@NotNull AsyncPromiseListener<T> listener) {
return addAsyncListener(listener);
}
@NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
@NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener);
@@ -75,26 +92,32 @@ public interface Promise<T> {
@NotNull Promise<T> onCancel(@NotNull Consumer<CancellationException> listener);
/**
* @deprecated Use maxWaitTime instead
*/
@Deprecated
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);
/**
* @deprecated Use maxWaitTime instead
*/
@Deprecated
default @NotNull Promise<T> timeout(long ms) {
return timeout(ms, TimeUnit.MILLISECONDS);
}
@Deprecated
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);
@NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit);
default @NotNull Promise<T> maxWaitTime(long ms) {
return maxWaitTime(ms, TimeUnit.MILLISECONDS);
}
@NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit);
void cancel(@Nullable String reason);
default void cancel() {
cancel(null);
}
void cancel(@Nullable String reason);
void complete(@Nullable T result);
void completeExceptionally(@NotNull Throwable result);

View File

@@ -9,29 +9,17 @@ public class PromiseCompletion<T> {
private @Nullable T result;
private @Nullable Throwable exception;
private boolean active;
public PromiseCompletion(@Nullable T result) {
this.result = result;
this.active = true;
}
public PromiseCompletion(@NotNull Throwable exception) {
this.exception = exception;
this.active = true;
}
public PromiseCompletion() {
this.result = null;
this.active = true;
}
public void markHandled() {
this.active = false;
}
public boolean isActive() {
return active;
}
public boolean isError() {

View File

@@ -2,6 +2,7 @@ package dev.tommyjs.futur.promise;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.redisson.api.RFuture;
import org.slf4j.Logger;
import reactor.core.publisher.Mono;
@@ -17,78 +18,83 @@ public interface PromiseFactory {
<T> @NotNull Promise<T> unresolved();
<K, V> @NotNull Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2);
default <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return combine(false, p1, p2);
}
<K, V> @NotNull Promise<Map.Entry<K, V>> combine(boolean propagateCancel, @NotNull Promise<K> p1, @NotNull Promise<V> p2);
<K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler);
default <K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises) {
return combine(propagateCancel, promises, null);
}
<K, V> @NotNull Promise<Map<K, V>> combine(boolean propagateCancel, @NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler);
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
return combine(promises, null);
}
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
return combine(false, promises, exceptionHandler);
}
default <V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises) {
return combine(propagateCancel, promises, null);
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
return combine(promises, null);
}
<V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler);
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
return combine(promises, null);
default <V> @NotNull Promise<List<V>> combine(boolean propagateCancel, @NotNull Iterable<Promise<V>> promises) {
return combine(propagateCancel, promises, null);
}
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable BiConsumer<Integer, Throwable> exceptionHandler) {
return combine(false, promises, exceptionHandler);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
return allSettled(false, promiseIterable);
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
return combine(promises, null);
}
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable);
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
return allSettled(false, promiseArray);
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
return allSettled(false, promiseIterable);
}
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
return allSettled(propagateCancel, Arrays.asList(promiseArray));
}
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
return all(false, promiseIterable);
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
return allSettled(false, promiseArray);
}
@NotNull Promise<Void> all(boolean propagateCancel, @NotNull Iterable<Promise<?>> promiseIterable);
default @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
return all(false, promiseArray);
default @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
return all(false, promiseIterable);
}
default @NotNull Promise<Void> all(boolean propagateCancel, @NotNull Promise<?>... promiseArray) {
return all(propagateCancel, Arrays.asList(promiseArray));
}
default @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
return all(false, promiseArray);
}
/**
* @apiNote Even with cancelRaceLosers, it is not guaranteed that only one promise will complete.
*/
<V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises);
default <V> @NotNull Promise<V> race(@NotNull Iterable<Promise<V>> promises) {
return race(false, promises);
}
<V> @NotNull Promise<V> race(boolean cancelRaceLosers, @NotNull Iterable<Promise<V>> promises);
<T> @NotNull Promise<T> wrapMono(@NotNull Mono<T> mono);
<T> @NotNull Promise<T> wrapRedisson(@NotNull RFuture<T> future);
<T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future);
<T> @NotNull Promise<T> wrap(@NotNull Mono<T> mono);
default @NotNull Promise<Void> start() {
return resolve(null);
}

View File

@@ -27,11 +27,11 @@ public final class PromiseTests {
public void testMono() {
Exception value = new Exception("Test Error");
var error = pfac.wrap(Mono.error(value));
var error = pfac.wrapMono(Mono.error(value));
assert Objects.requireNonNull(error.getCompletion()).isError();
assert error.getCompletion().getException() == value;
var resolved = pfac.wrap(Mono.just(value));
var resolved = pfac.wrapMono(Mono.just(value));
assert !Objects.requireNonNull(resolved.getCompletion()).isError();
assert resolved.getCompletion().getResult() == value;
}

View File

@@ -5,6 +5,7 @@ import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.redisson.api.RFuture;
import org.slf4j.Logger;
import reactor.core.publisher.Mono;
@@ -109,12 +110,16 @@ public final class PromiseUtil {
return pfac.race(cancelRaceLosers, promises);
}
public static @NotNull <T> Promise<T> wrapMono(@NotNull Mono<T> mono) {
return pfac.wrapMono(mono);
}
public static @NotNull <T> Promise<T> wrap(@NotNull CompletableFuture<T> future) {
return pfac.wrap(future);
}
public static @NotNull <T> Promise<T> wrap(@NotNull Mono<T> mono) {
return pfac.wrap(mono);
public static @NotNull <T> Promise<T> wrapRedisson(@NotNull RFuture<T> future) {
return pfac.wrapRedisson(future);
}
public static @NotNull <T> Promise<T> resolve(T value) {