From dc5171ad31a50b8d95141d6f2d47acd0c05f7869 Mon Sep 17 00:00:00 2001 From: WhatCats Date: Thu, 11 Apr 2024 23:57:44 +0200 Subject: [PATCH] allow joining without a timeout --- build.gradle | 2 +- .../futur/promise/AbstractPromise.java | 53 ++++++++++++------- .../dev/tommyjs/futur/promise/Promise.java | 16 +++++- 3 files changed, 49 insertions(+), 22 deletions(-) diff --git a/build.gradle b/build.gradle index 4252922..dee919a 100644 --- a/build.gradle +++ b/build.gradle @@ -14,7 +14,7 @@ nexusPublishing { subprojects { group = 'dev.tommyjs' - version = '2.3.0' + version = '2.3.1' apply plugin: 'java' apply plugin: 'com.github.johnrengelman.shadow' diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 5fa166f..d487993 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -10,20 +10,22 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import java.util.Collection; +import java.util.LinkedList; +import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; public abstract class AbstractPromise implements Promise { - private final AtomicReference>> listeners; + private Collection> listeners; private final AtomicReference> completion; private final CountDownLatch latch; - private final ReentrantLock lock; + private final Lock lock; public AbstractPromise() { - this.listeners = new AtomicReference<>(); this.completion = new AtomicReference<>(); this.latch = new CountDownLatch(1); this.lock = new ReentrantLock(); @@ -61,21 +63,31 @@ public abstract class AbstractPromise implements Promise { } @Override - public T join(long timeoutMillis) throws TimeoutException { + public T await(long timeoutMillis) throws TimeoutException { try { - //noinspection ResultOfMethodCallIgnored - this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); + boolean success = this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS); + if (!success) { + throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms"); + } } catch (InterruptedException e) { throw new RuntimeException(e); } - PromiseCompletion completion = getCompletion(); - if (completion == null) - throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms"); - - return joinCompletion(completion); + return joinCompletion(Objects.requireNonNull(getCompletion())); } + @Override + public T await() { + try { + this.latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return joinCompletion(Objects.requireNonNull(getCompletion())); + } + + private T joinCompletion(PromiseCompletion completion) { if (completion.isError()) throw new RuntimeException(completion.getException()); @@ -315,19 +327,21 @@ public abstract class AbstractPromise implements Promise { } private @NotNull Promise addAnyListener(PromiseListener listener) { + PromiseCompletion completion; + lock.lock(); try { - PromiseCompletion completion = getCompletion(); - if (completion != null) { - callListener(listener, completion); - } else { - listeners.compareAndSet(null, new ConcurrentLinkedQueue<>()); - listeners.get().add(listener); + completion = getCompletion(); + if (completion == null) { + if (listeners == null) listeners = new LinkedList<>(); + listeners.add(listener); + return this; } } finally { lock.unlock(); } + callListener(listener, completion); return this; } @@ -390,12 +404,11 @@ public abstract class AbstractPromise implements Promise { } private void handleCompletion(@NotNull PromiseCompletion ctx) { - if (!setCompletion(ctx)) return; - lock.lock(); try { + if (!setCompletion(ctx)) return; + this.latch.countDown(); - Collection> listeners = this.listeners.get(); if (listeners != null) { for (PromiseListener listener : listeners) { callListener(listener, ctx); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index 456ee84..b6f9e8f 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -4,6 +4,7 @@ import dev.tommyjs.futur.function.ExceptionalConsumer; import dev.tommyjs.futur.function.ExceptionalFunction; import dev.tommyjs.futur.function.ExceptionalRunnable; import dev.tommyjs.futur.function.ExceptionalSupplier; +import org.jetbrains.annotations.Blocking; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -123,7 +124,20 @@ public interface Promise { void completeExceptionally(@NotNull Throwable result); - T join(long timeout) throws TimeoutException; + @Blocking + T await(); + + @Blocking + T await(long timeout) throws TimeoutException; + + /** + * @deprecated Use await instead. + */ + @Blocking + @Deprecated + default T join(long timeout) throws TimeoutException { + return await(timeout); + }; @Nullable PromiseCompletion getCompletion();