diff --git a/build.gradle b/build.gradle index f811632..4252922 100644 --- a/build.gradle +++ b/build.gradle @@ -14,7 +14,7 @@ nexusPublishing { subprojects { group = 'dev.tommyjs' - version = '2.2.0' + version = '2.3.0' apply plugin: 'java' apply plugin: 'com.github.johnrengelman.shadow' @@ -33,11 +33,11 @@ subprojects { implementation 'org.jetbrains:annotations:24.1.0' implementation 'org.slf4j:slf4j-api:2.0.12' compileOnly 'io.projectreactor:reactor-core:3.6.4' + compileOnly 'org.redisson:redisson:3.2.0' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testImplementation 'io.projectreactor:reactor-core:3.6.4' testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' - testImplementation 'org.slf4j:slf4j-api:2.0.12' testImplementation 'ch.qos.logback:logback-classic:1.5.3' } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java index ff35da6..df0d998 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java @@ -3,11 +3,11 @@ package dev.tommyjs.futur.executor; import org.jetbrains.annotations.NotNull; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -public class DualPoolExecutor implements PromiseExecutor> { +public class DualPoolExecutor implements PromiseExecutor> { private final @NotNull ScheduledExecutorService syncSvc; private final @NotNull ScheduledExecutorService asyncSvc; @@ -22,17 +22,17 @@ public class DualPoolExecutor implements PromiseExecutor> { } @Override - public ScheduledFuture runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + public Future runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { return syncSvc.schedule(task, delay, unit); } @Override - public ScheduledFuture runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + public Future runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { return asyncSvc.schedule(task, delay, unit); } @Override - public void cancel(ScheduledFuture task) { + public void cancel(Future task) { task.cancel(true); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 7cbd0d1..5fa166f 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -10,21 +10,44 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import java.util.Collection; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; public abstract class AbstractPromise implements Promise { - private final Collection> listeners; + private final AtomicReference>> listeners; private final AtomicReference> completion; + private final CountDownLatch latch; + private final ReentrantLock lock; public AbstractPromise() { - this.listeners = new ConcurrentLinkedQueue<>(); + this.listeners = new AtomicReference<>(); this.completion = new AtomicReference<>(); + this.latch = new CountDownLatch(1); + this.lock = new ReentrantLock(); + } + + protected static void propagateResult(Promise from, Promise to) { + from.addDirectListener(to::complete, to::completeExceptionally); + } + + protected static void propagateCancel(Promise from, Promise to) { + from.onCancel(to::completeExceptionally); + } + + private @NotNull Runnable createRunnable(T result, @NotNull Promise promise, @NotNull ExceptionalFunction task) { + return () -> { + if (promise.isCompleted()) return; + + try { + V nextResult = task.apply(result); + promise.complete(nextResult); + } catch (Throwable e) { + promise.completeExceptionally(e); + } + }; } public abstract @NotNull AbstractPromiseFactory getFactory(); @@ -39,24 +62,14 @@ public abstract class AbstractPromise implements Promise { @Override public T join(long timeoutMillis) throws TimeoutException { - PromiseCompletion completion; - long start = System.currentTimeMillis(); - long remainingTimeout = timeoutMillis; - - synchronized (this.completion) { - completion = this.completion.get(); - while (completion == null && remainingTimeout > 0) { - try { - this.completion.wait(remainingTimeout); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - completion = this.completion.get(); - remainingTimeout = timeoutMillis - (System.currentTimeMillis() - start); - } + try { + //noinspection ResultOfMethodCallIgnored + this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + PromiseCompletion completion = getCompletion(); if (completion == null) throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms"); @@ -115,7 +128,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( res -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runSync(runnable); @@ -124,14 +137,14 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @Override public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( res -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runSync(runnable, delay, unit); @@ -140,22 +153,26 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @Override - public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { + public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { Promise promise = getFactory().unresolved(); - thenApplySync(task).addListener( + thenApplySync(task).addDirectListener( nestedPromise -> { - nestedPromise.propagateResult(promise); - nestedPromise.addChild(promise); + if (nestedPromise == null) { + promise.complete(null); + } else { + propagateResult(nestedPromise, promise); + propagateCancel(promise, nestedPromise); + } }, promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @@ -212,7 +229,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( (res) -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runAsync(runnable); @@ -221,14 +238,14 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @Override public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { Promise promise = getFactory().unresolved(); - addListener( + addDirectListener( res -> { Runnable runnable = createRunnable(res, promise, task); F future = getExecutor().runAsync(runnable, delay, unit); @@ -237,66 +254,42 @@ public abstract class AbstractPromise implements Promise { promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } @Override public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { Promise promise = getFactory().unresolved(); - thenApplyAsync(task).addListener( + thenApplyAsync(task).addDirectListener( nestedPromise -> { - nestedPromise.propagateResult(promise); - nestedPromise.addChild(promise); + if (nestedPromise == null) { + promise.complete(null); + } else { + propagateResult(nestedPromise, promise); + propagateCancel(promise, nestedPromise); + } }, promise::completeExceptionally ); - addChild(promise); + propagateCancel(promise, this); return promise; } - private @NotNull Runnable createRunnable(T result, @NotNull Promise promise, @NotNull ExceptionalFunction task) { - return () -> { - if (promise.isCompleted()) return; - - try { - V nextResult = task.apply(result); - promise.complete(nextResult); - } catch (Throwable e) { - promise.completeExceptionally(e); - } - }; + @Override + public @NotNull Promise erase() { + return thenSupplyAsync(() -> null); } @Override - public @NotNull Promise logExceptions(@NotNull String message) { - return onError(e -> getLogger().error(message, e)); + public @NotNull Promise addAsyncListener(@NotNull AsyncPromiseListener listener) { + return addAnyListener(listener); } @Override - public @NotNull Promise addListener(@NotNull PromiseListener listener) { - synchronized (completion) { - if (isCompleted()) { - getExecutor().runAsync(() -> { - try { - //noinspection ConstantConditions - listener.handle(getCompletion()); - } catch (Exception e) { - getLogger().error("Exception caught in promise listener", e); - } - }); - } else { - getListeners().add(listener); - } - } - - return this; - } - - @Override - public @NotNull Promise addListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { - return addListener((res) -> { + public @NotNull Promise addAsyncListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { + return addAsyncListener((res) -> { if (res.isError()) { if (errorListener != null) errorListener.accept(res.getException()); } else { @@ -305,14 +298,68 @@ public abstract class AbstractPromise implements Promise { }); } + @Override + public @NotNull Promise addDirectListener(@NotNull PromiseListener listener) { + return addAnyListener(listener); + } + + @Override + public @NotNull Promise addDirectListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { + return addDirectListener((res) -> { + if (res.isError()) { + if (errorListener != null) errorListener.accept(res.getException()); + } else { + if (successListener != null) successListener.accept(res.getResult()); + } + }); + } + + private @NotNull Promise addAnyListener(PromiseListener listener) { + lock.lock(); + try { + PromiseCompletion completion = getCompletion(); + if (completion != null) { + callListener(listener, completion); + } else { + listeners.compareAndSet(null, new ConcurrentLinkedQueue<>()); + listeners.get().add(listener); + } + } finally { + lock.unlock(); + } + + return this; + } + + private void callListener(PromiseListener listener, PromiseCompletion ctx) { + if (listener instanceof AsyncPromiseListener) { + getExecutor().runAsync(() -> callListenerNow(listener, ctx)); + } else { + callListenerNow(listener, ctx); + } + } + + private void callListenerNow(PromiseListener listener, PromiseCompletion ctx) { + try { + listener.handle(ctx); + } catch (Exception e) { + getLogger().error("Exception caught in promise listener", e); + } + } + @Override public @NotNull Promise onSuccess(@NotNull Consumer listener) { - return addListener(listener, null); + return addAsyncListener(listener, null); } @Override public @NotNull Promise onError(@NotNull Consumer listener) { - return addListener(null, listener); + return addAsyncListener(null, listener); + } + + @Override + public @NotNull Promise logExceptions(@NotNull String message) { + return onError(e -> getLogger().error(message, e)); } @Override @@ -339,25 +386,23 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit) { F future = getExecutor().runAsync(() -> completeExceptionally(new TimeoutException("Promise stopped waiting after " + time + " " + unit)), time, unit); - return onError(e -> getExecutor().cancel(future)); + return addListener((_v) -> getExecutor().cancel(future)); } private void handleCompletion(@NotNull PromiseCompletion ctx) { - synchronized (completion) { - if (!setCompletion(ctx)) return; + if (!setCompletion(ctx)) return; - completion.notifyAll(); - getExecutor().runAsync(() -> { - for (PromiseListener listener : getListeners()) { - if (!ctx.isActive()) return; - - try { - listener.handle(ctx); - } catch (Exception e) { - getLogger().error("Exception caught in promise listener", e); - } + lock.lock(); + try { + this.latch.countDown(); + Collection> listeners = this.listeners.get(); + if (listeners != null) { + for (PromiseListener listener : listeners) { + callListener(listener, ctx); } - }); + } + } finally { + lock.unlock(); } } @@ -366,22 +411,7 @@ public abstract class AbstractPromise implements Promise { } @Override - public void addChild(@NotNull Promise child) { - child.onCancel((e) -> this.cancel(e.getMessage())); - } - - @Override - public void propagateResult(@NotNull Promise target) { - addListener(target::complete, target::completeExceptionally); - } - - @Override - public void cancel() { - completeExceptionally(new CancellationException()); - } - - @Override - public void cancel(@NotNull String message) { + public void cancel(@Nullable String message) { completeExceptionally(new CancellationException(message)); } @@ -405,8 +435,16 @@ public abstract class AbstractPromise implements Promise { return completion.get(); } - private Collection> getListeners() { - return listeners; + @Override + public @NotNull CompletableFuture toFuture() { + CompletableFuture future = new CompletableFuture<>(); + this.addDirectListener(future::complete, future::completeExceptionally); + future.whenComplete((res, e) -> { + if (e instanceof CancellationException) { + this.cancel(); + } + }); + return future; } } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java index 607f9da..69179ed 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java @@ -3,14 +3,15 @@ package dev.tommyjs.futur.promise; import dev.tommyjs.futur.executor.PromiseExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import reactor.core.Disposable; +import org.redisson.api.RFuture; import reactor.core.publisher.Mono; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -19,9 +20,9 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { public abstract @NotNull PromiseExecutor getExecutor(); @Override - public @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { + public @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2) { List> promises = List.of(p1, p2); - return all(promises) + return all(propagateCancel, promises) .thenApplyAsync((res) -> new AbstractMap.SimpleImmutableEntry<>( Objects.requireNonNull(p1.getCompletion()).getResult(), Objects.requireNonNull(p2.getCompletion()).getResult() @@ -29,13 +30,17 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { + public @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { if (promises.isEmpty()) return resolve(Collections.emptyMap()); Map map = new HashMap<>(); Promise> promise = unresolved(); for (Map.Entry> entry : promises.entrySet()) { - entry.getValue().addListener((ctx) -> { + if (propagateCancel) { + AbstractPromise.propagateCancel(promise, entry.getValue()); + } + + entry.getValue().addDirectListener((ctx) -> { synchronized (map) { if (ctx.getException() != null) { if (exceptionHandler == null) { @@ -59,12 +64,13 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable Consumer exceptionHandler) { + public @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { AtomicInteger index = new AtomicInteger(); return this.combine( + propagateCancel, StreamSupport.stream(promises.spliterator(), false) .collect(Collectors.toMap(k -> index.getAndIncrement(), v -> v)), - exceptionHandler != null ? (i, e) -> exceptionHandler.accept(e) : null + exceptionHandler ).thenApplyAsync(v -> v.entrySet().stream() .sorted(Map.Entry.comparingByKey()) @@ -74,12 +80,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise> combine(@NotNull Iterable> promises) { - return combine(promises, null); - } - - @Override - public @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { + public @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Iterable> promiseIterable) { List> promises = new ArrayList<>(); promiseIterable.iterator().forEachRemaining(promises::add); @@ -91,7 +92,13 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { while (iter.hasNext()) { int index = iter.nextIndex(); - iter.next().addListener((res) -> { + var p = iter.next(); + + if (propagateCancel) { + AbstractPromise.propagateCancel(promise, p); + } + + p.addDirectListener((res) -> { synchronized (results) { results[index] = res; if (Arrays.stream(results).allMatch(Objects::nonNull)) @@ -104,7 +111,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise all(@NotNull Iterable> promiseIterable) { + public @NotNull Promise all(boolean propagateCancel, @NotNull Iterable> promiseIterable) { List> promises = new ArrayList<>(); promiseIterable.iterator().forEachRemaining(promises::add); @@ -113,12 +120,14 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { Promise promise = unresolved(); for (Promise p : promises) { - p.addListener((res) -> { + if (propagateCancel) { + AbstractPromise.propagateCancel(promise, p); + } + + p.addDirectListener((res) -> { if (res.getException() != null) { promise.completeExceptionally(res.getException()); - } - - if (completed.incrementAndGet() == promises.size()) { + } else if (completed.incrementAndGet() == promises.size()) { promise.complete(null); } }); @@ -127,11 +136,37 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } + @Override + public @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises) { + Promise promise = unresolved(); + for (Promise p : promises) { + if (cancelRaceLosers) { + promise.addListener((res) -> p.cancel()); + } + AbstractPromise.propagateResult(p, promise); + } + return promise; + } + + @Override + public @NotNull Promise wrapMono(@NotNull Mono mono) { + return wrap(mono.toFuture()); + } + @Override public @NotNull Promise wrap(@NotNull CompletableFuture future) { + return wrap(future, future); + } + + @Override + public @NotNull Promise wrapRedisson(@NotNull RFuture future) { + return wrap(future, future); + } + + private @NotNull Promise wrap(@NotNull CompletionStage completion, Future future) { Promise promise = unresolved(); - future.whenComplete((v, e) -> { + completion.whenComplete((v, e) -> { if (e != null) { promise.completeExceptionally(e); } else { @@ -143,14 +178,6 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } - @Override - public @NotNull Promise wrap(@NotNull Mono mono) { - Promise promise = this.unresolved(); - Disposable disposable = mono.subscribe(promise::complete, promise::completeExceptionally); - promise.onCancel((e) -> disposable.dispose()); - return promise; - } - @Override public @NotNull Promise resolve(T value) { Promise promise = unresolved(); @@ -165,17 +192,4 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } - @Override - public @NotNull Promise erase(@NotNull Promise p) { - Promise promise = unresolved(); - p.addListener(ctx -> { - if (ctx.getException() != null) { - promise.completeExceptionally(ctx.getException()); - } else { - promise.complete(null); - } - }); - return promise; - } - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java new file mode 100644 index 0000000..7e0343b --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java @@ -0,0 +1,5 @@ +package dev.tommyjs.futur.promise; + +public interface AsyncPromiseListener extends PromiseListener { + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index f958f6d..456ee84 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -8,6 +8,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -33,7 +34,7 @@ public interface Promise { @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); - @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task); + @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task); @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task); @@ -55,15 +56,34 @@ public interface Promise { @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task); - @NotNull Promise logExceptions(@NotNull String message); + @NotNull Promise erase(); default @NotNull Promise logExceptions() { return logExceptions("Exception caught in promise chain"); } - @NotNull Promise addListener(@NotNull PromiseListener listener); + @NotNull Promise logExceptions(@NotNull String message); - @NotNull Promise addListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); + /** + * @apiNote Direct listeners run on the same thread as the completion. + */ + @NotNull Promise addDirectListener(@NotNull PromiseListener listener); + + @NotNull Promise addDirectListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); + + /** + * @apiNote Async listeners are run in parallel. + */ + @NotNull Promise addAsyncListener(@NotNull AsyncPromiseListener listener); + + /** + * @apiNote Same as addAsyncListener. + */ + default @NotNull Promise addListener(@NotNull AsyncPromiseListener listener) { + return addAsyncListener(listener); + } + + @NotNull Promise addAsyncListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); @NotNull Promise onSuccess(@NotNull Consumer listener); @@ -73,9 +93,15 @@ public interface Promise { @NotNull Promise onCancel(@NotNull Consumer listener); + /** + * @deprecated Use maxWaitTime instead + */ @Deprecated @NotNull Promise timeout(long time, @NotNull TimeUnit unit); + /** + * @deprecated Use maxWaitTime instead + */ @Deprecated default @NotNull Promise timeout(long ms) { return timeout(ms, TimeUnit.MILLISECONDS); @@ -87,10 +113,6 @@ public interface Promise { return maxWaitTime(ms, TimeUnit.MILLISECONDS); } - void addChild(@NotNull Promise child); - - void propagateResult(@NotNull Promise target); - void cancel(@Nullable String reason); default void cancel() { @@ -107,4 +129,6 @@ public interface Promise { boolean isCompleted(); + @NotNull CompletableFuture toFuture(); + } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java index 4ab10c9..de7e276 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java @@ -9,29 +9,17 @@ public class PromiseCompletion { private @Nullable T result; private @Nullable Throwable exception; - private boolean active; public PromiseCompletion(@Nullable T result) { this.result = result; - this.active = true; } public PromiseCompletion(@NotNull Throwable exception) { this.exception = exception; - this.active = true; } public PromiseCompletion() { this.result = null; - this.active = true; - } - - public void markHandled() { - this.active = false; - } - - public boolean isActive() { - return active; } public boolean isError() { diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java index 9428abb..bac92b7 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -2,6 +2,7 @@ package dev.tommyjs.futur.promise; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.redisson.api.RFuture; import org.slf4j.Logger; import reactor.core.publisher.Mono; @@ -10,7 +11,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; -import java.util.function.Consumer; public interface PromiseFactory { @@ -18,44 +18,89 @@ public interface PromiseFactory { @NotNull Promise unresolved(); - @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2); + @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2); - @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler); + default @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { + return combine(false, p1, p2); + } + + @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler); + + default @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises) { + return combine(propagateCancel, promises, null); + } + + default @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { + return combine(false, promises, exceptionHandler); + } default @NotNull Promise> combine(@NotNull Map> promises) { return combine(promises, null); } - @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable Consumer exceptionHandler); + @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler); + + default @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises) { + return combine(propagateCancel, promises, null); + } + + default @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { + return combine(false, promises, exceptionHandler); + } default @NotNull Promise> combine(@NotNull Iterable> promises) { return combine(promises, null); } - @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable); + @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Iterable> promiseIterable); + + default @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { + return allSettled(false, promiseIterable); + } + + default @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Promise... promiseArray) { + return allSettled(propagateCancel, Arrays.asList(promiseArray)); + } default @NotNull Promise>> allSettled(@NotNull Promise... promiseArray) { - return allSettled(Arrays.asList(promiseArray)); + return allSettled(false, promiseArray); } - @NotNull Promise all(@NotNull Iterable> promiseIterable); + @NotNull Promise all(boolean propagateCancel, @NotNull Iterable> promiseIterable); + + default @NotNull Promise all(@NotNull Iterable> promiseIterable) { + return all(false, promiseIterable); + } + + default @NotNull Promise all(boolean propagateCancel, @NotNull Promise... promiseArray) { + return all(propagateCancel, Arrays.asList(promiseArray)); + } default @NotNull Promise all(@NotNull Promise... promiseArray) { - return all(Arrays.asList(promiseArray)); + return all(false, promiseArray); } + /** + * @apiNote Even with cancelRaceLosers, it is not guaranteed that only one promise will complete. + */ + @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises); + + default @NotNull Promise race(@NotNull Iterable> promises) { + return race(false, promises); + } + + @NotNull Promise wrapMono(@NotNull Mono mono); + + @NotNull Promise wrapRedisson(@NotNull RFuture future); + @NotNull Promise wrap(@NotNull CompletableFuture future); - @NotNull Promise wrap(@NotNull Mono mono); - - @NotNull Promise resolve(T value); - default @NotNull Promise start() { return resolve(null); } + @NotNull Promise resolve(T value); + @NotNull Promise error(@NotNull Throwable error); - @NotNull Promise erase(@NotNull Promise p); - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java index c598141..ff1f0b8 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java @@ -17,38 +17,38 @@ import java.util.function.BiConsumer; @Deprecated public class Promises { - public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2, PromiseFactory factory) { - return factory.combine(p1, p2); - } - public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { return combine(p1, p2, p1.getFactory()); } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler, PromiseFactory factory) { - return factory.combine(promises, exceptionHandler).timeout(timeout); - } - - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict, PromiseFactory factory) { - return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory); + public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2, PromiseFactory factory) { + return factory.combine(p1, p2); } public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, PromiseFactory factory) { return combine(promises, timeout, true, factory); } + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict, PromiseFactory factory) { + return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory); + } + + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler, PromiseFactory factory) { + return factory.combine(promises, exceptionHandler).timeout(timeout); + } + public static @NotNull Promise> combine(@NotNull Map> promises, PromiseFactory factory) { return combine(promises, 1500L, true, factory); } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict, PromiseFactory factory) { - return factory.combine(promises, strict ? null : (_v) -> {}).timeout(timeout); - } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout, PromiseFactory factory) { return combine(promises, timeout, true, factory); } + public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict, PromiseFactory factory) { + return factory.combine(promises, strict ? null : (_i, _v) -> {}).timeout(timeout); + } + public static @NotNull Promise> combine(@NotNull List> promises, PromiseFactory factory) { return combine(promises, 1500L, true, factory); } @@ -57,6 +57,10 @@ public class Promises { return factory.all(promises); } + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, PromiseFactory factory) { + return combine(keys, mapper, timeout, true, factory); + } + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, boolean strict, PromiseFactory factory) { Map> promises = new HashMap<>(); for (K key : keys) { @@ -67,22 +71,18 @@ public class Promises { return combine(promises, timeout, strict, factory); } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, PromiseFactory factory) { - return combine(keys, mapper, timeout, true, factory); - } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, PromiseFactory factory) { return combine(keys, mapper, 1500L, true, factory); } - public static @NotNull Promise erase(@NotNull Promise p, PromiseFactory factory) { - return factory.erase(p); - } - public static @NotNull Promise erase(@NotNull Promise p) { return erase(p, p.getFactory()); } + public static @NotNull Promise erase(@NotNull Promise p, PromiseFactory factory) { + return p.erase(); + } + public static @NotNull Promise wrap(@NotNull CompletableFuture future, PromiseFactory factory) { return factory.wrap(future); } diff --git a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java index b403a7c..ffe6f01 100644 --- a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java +++ b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java @@ -9,40 +9,161 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import java.util.List; +import java.util.Map; import java.util.Objects; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; public final class PromiseTests { private final Logger logger = LoggerFactory.getLogger(PromiseTests.class); - private final PromiseExecutor> executor = SinglePoolExecutor.create(1); + private final PromiseExecutor> executor = SinglePoolExecutor.create(5); private final PromiseFactory pfac = new SimplePromiseFactory<>(executor, logger); @Test - void testMono() { + public void testMono() { Exception value = new Exception("Test Error"); - var error = pfac.wrap(Mono.error(value)); + var error = pfac.wrapMono(Mono.error(value)); assert Objects.requireNonNull(error.getCompletion()).isError(); assert error.getCompletion().getException() == value; - var resolved = pfac.wrap(Mono.just(value)); + var resolved = pfac.wrapMono(Mono.just(value)); assert !Objects.requireNonNull(resolved.getCompletion()).isError(); assert resolved.getCompletion().getResult() == value; } @Test - void testErrorCancellation() throws InterruptedException { - var finish = new AtomicBoolean(); + public void testErrorCancellation() throws InterruptedException { + var finished = new AtomicBoolean(); pfac.start() - .thenRunDelayedAsync(() -> finish.set(true), 50, TimeUnit.MILLISECONDS) + .thenRunDelayedAsync(() -> finished.set(true), 50, TimeUnit.MILLISECONDS) .thenRunAsync(() -> {}) .cancel(); Thread.sleep(100L); - assert !finish.get(); + assert !finished.get(); + } + + @Test + public void testToFuture() throws InterruptedException { + assert pfac.resolve(true).toFuture().getNow(false); + assert pfac.error(new Exception("Test")).toFuture().isCompletedExceptionally(); + + var finished = new AtomicBoolean(); + pfac.start() + .thenRunDelayedAsync(() -> finished.set(true), 50, TimeUnit.MILLISECONDS) + .toFuture() + .cancel(true); + + Thread.sleep(100L); + assert !finished.get(); + } + + @Test + public void testCombineUtil() throws TimeoutException { + pfac.all( + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + .join(100L); + + pfac.allSettled( + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + .join(100L); + + pfac.combine( + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + .join(100L); + + pfac.combine( + List.of( + pfac.start().thenRunDelayedAsync(() -> {}, 49, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> {}, 51, TimeUnit.MILLISECONDS) + ) + ) + .join(100L); + + pfac.combine( + Map.of( + "a", pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS), + "b", pfac.start().thenRunDelayedAsync(() -> {}, 50, TimeUnit.MILLISECONDS) + ) + ) + .join(100L); + } + + @Test + public void testCombineUtilPropagation() throws InterruptedException { + var finished1 = new AtomicBoolean(); + pfac.all( + true, + pfac.start().thenRunDelayedAsync(() -> finished1.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished1.set(true), 50, TimeUnit.MILLISECONDS) + ) + .cancel(); + + var finished2 = new AtomicBoolean(); + pfac.allSettled( + true, + pfac.start().thenRunDelayedAsync(() -> finished2.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished2.set(true), 50, TimeUnit.MILLISECONDS) + ) + .cancel(); + + var finished3 = new AtomicBoolean(); + pfac.combine( + true, + pfac.start().thenRunDelayedAsync(() -> finished3.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished3.set(true), 50, TimeUnit.MILLISECONDS) + ) + .cancel(); + + var finished4 = new AtomicBoolean(); + pfac.combine( + true, + List.of( + pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS), + pfac.start().thenRunDelayedAsync(() -> finished4.set(true), 50, TimeUnit.MILLISECONDS) + ) + ) + .cancel(); + + var finished5 = new AtomicBoolean(); + pfac.combine( + true, + Map.of( + "a", pfac.start().thenRunDelayedAsync(() -> finished5.set(true), 50, TimeUnit.MILLISECONDS), + "b", pfac.start().thenRunDelayedAsync(() -> finished5.set(true), 50, TimeUnit.MILLISECONDS) + ) + ) + .cancel(); + + Thread.sleep(100L); + assert !finished1.get(); + assert !finished2.get(); + assert !finished3.get(); + assert !finished4.get(); + assert !finished5.get(); + } + + @Test + public void testRace() throws TimeoutException { + assert pfac.race( + List.of( + pfac.start().thenSupplyDelayedAsync(() -> true, 150, TimeUnit.MILLISECONDS), + pfac.start().thenSupplyDelayedAsync(() -> false, 200, TimeUnit.MILLISECONDS) + ) + ).join(300L); } } diff --git a/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java b/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java index 3c86ffc..72a4257 100644 --- a/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java +++ b/futur-static/src/main/java/dev/tommyjs/futur/lazy/PromiseUtil.java @@ -5,6 +5,7 @@ import dev.tommyjs.futur.promise.PromiseCompletion; import dev.tommyjs.futur.promise.PromiseFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.redisson.api.RFuture; import org.slf4j.Logger; import reactor.core.publisher.Mono; @@ -12,7 +13,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; -import java.util.function.Consumer; public final class PromiseUtil { @@ -30,64 +30,108 @@ public final class PromiseUtil { return pfac.unresolved(); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Promise p1, @NotNull Promise p2) { + return pfac.combine(propagateCancel, p1, p2); + } + public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { return pfac.combine(p1, p2); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { + return pfac.combine(propagateCancel, promises, exceptionHandler); + } + public static @NotNull Promise> combine(@NotNull Map> promises, @Nullable BiConsumer exceptionHandler) { return pfac.combine(promises, exceptionHandler); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Map> promises) { + return pfac.combine(propagateCancel, promises); + } + public static @NotNull Promise> combine(@NotNull Map> promises) { return pfac.combine(promises); } - public static @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable Consumer exceptionHandler) { + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { + return pfac.combine(propagateCancel, promises, exceptionHandler); + } + + public static @NotNull Promise> combine(@NotNull Iterable> promises, @Nullable BiConsumer exceptionHandler) { return pfac.combine(promises, exceptionHandler); } + public static @NotNull Promise> combine(boolean propagateCancel, @NotNull Iterable> promises) { + return pfac.combine(propagateCancel, promises); + } + public static @NotNull Promise> combine(@NotNull Iterable> promises) { return pfac.combine(promises); } + public static @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Iterable> promiseIterable) { + return pfac.allSettled(propagateCancel, promiseIterable); + } + public static @NotNull Promise>> allSettled(@NotNull Iterable> promiseIterable) { return pfac.allSettled(promiseIterable); } + public static @NotNull Promise>> allSettled(boolean propagateCancel, @NotNull Promise... promiseArray) { + return pfac.allSettled(propagateCancel, promiseArray); + } + public static @NotNull Promise>> allSettled(@NotNull Promise... promiseArray) { return pfac.allSettled(promiseArray); } + public static @NotNull Promise all(boolean propagateCancel, @NotNull Iterable> promiseIterable) { + return pfac.all(propagateCancel, promiseIterable); + } + public static @NotNull Promise all(@NotNull Iterable> promiseIterable) { return pfac.all(promiseIterable); } + public static @NotNull Promise all(boolean propagateCancel, @NotNull Promise... promiseArray) { + return pfac.all(propagateCancel, promiseArray); + } + public static @NotNull Promise all(@NotNull Promise... promiseArray) { return pfac.all(promiseArray); } + public static @NotNull Promise race(@NotNull Iterable> promises) { + return pfac.race(promises); + } + + public static @NotNull Promise race(boolean cancelRaceLosers, @NotNull Iterable> promises) { + return pfac.race(cancelRaceLosers, promises); + } + + public static @NotNull Promise wrapMono(@NotNull Mono mono) { + return pfac.wrapMono(mono); + } + public static @NotNull Promise wrap(@NotNull CompletableFuture future) { return pfac.wrap(future); } - public static @NotNull Promise wrap(@NotNull Mono mono) { - return pfac.wrap(mono); + public static @NotNull Promise wrapRedisson(@NotNull RFuture future) { + return pfac.wrapRedisson(future); } public static @NotNull Promise resolve(T value) { return pfac.resolve(value); } - public static @NotNull Promise error(@NotNull Throwable error) { - return pfac.error(error); - } - - public static @NotNull Promise erase(@NotNull Promise p) { - return pfac.erase(p); - } - public static @NotNull Promise start() { return pfac.start(); } + public static @NotNull Promise error(@NotNull Throwable error) { + return pfac.error(error); + } + } diff --git a/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java b/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java index e61908f..0d9aebf 100644 --- a/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java +++ b/futur-static/src/main/java/dev/tommyjs/futur/lazy/StaticPromiseFactory.java @@ -9,9 +9,9 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Future; -public final class StaticPromiseFactory extends AbstractPromiseFactory> { +public final class StaticPromiseFactory extends AbstractPromiseFactory> { public final static StaticPromiseFactory INSTANCE = new StaticPromiseFactory(); private final static @NotNull SinglePoolExecutor EXECUTOR = SinglePoolExecutor.create(1); @@ -21,18 +21,18 @@ public final class StaticPromiseFactory extends AbstractPromiseFactory Promise unresolved() { - return new SimplePromise<>(this); - } - @Override public @NotNull Logger getLogger() { return LOGGER; } @Override - public @NotNull PromiseExecutor> getExecutor() { + public @NotNull Promise unresolved() { + return new SimplePromise<>(this); + } + + @Override + public @NotNull PromiseExecutor> getExecutor() { return EXECUTOR; }