mirror of
https://github.com/tommyskeff/futur4j.git
synced 2026-01-18 07:16:45 +00:00
allow joining without a timeout
This commit is contained in:
@@ -10,20 +10,22 @@ import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public abstract class AbstractPromise<T, F> implements Promise<T> {
|
||||
|
||||
private final AtomicReference<Collection<PromiseListener<T>>> listeners;
|
||||
private Collection<PromiseListener<T>> listeners;
|
||||
private final AtomicReference<PromiseCompletion<T>> completion;
|
||||
private final CountDownLatch latch;
|
||||
private final ReentrantLock lock;
|
||||
private final Lock lock;
|
||||
|
||||
public AbstractPromise() {
|
||||
this.listeners = new AtomicReference<>();
|
||||
this.completion = new AtomicReference<>();
|
||||
this.latch = new CountDownLatch(1);
|
||||
this.lock = new ReentrantLock();
|
||||
@@ -61,21 +63,31 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public T join(long timeoutMillis) throws TimeoutException {
|
||||
public T await(long timeoutMillis) throws TimeoutException {
|
||||
try {
|
||||
//noinspection ResultOfMethodCallIgnored
|
||||
this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
boolean success = this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
if (!success) {
|
||||
throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
PromiseCompletion<T> completion = getCompletion();
|
||||
if (completion == null)
|
||||
throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms");
|
||||
|
||||
return joinCompletion(completion);
|
||||
return joinCompletion(Objects.requireNonNull(getCompletion()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public T await() {
|
||||
try {
|
||||
this.latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return joinCompletion(Objects.requireNonNull(getCompletion()));
|
||||
}
|
||||
|
||||
|
||||
private T joinCompletion(PromiseCompletion<T> completion) {
|
||||
if (completion.isError())
|
||||
throw new RuntimeException(completion.getException());
|
||||
@@ -315,19 +327,21 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
||||
}
|
||||
|
||||
private @NotNull Promise<T> addAnyListener(PromiseListener<T> listener) {
|
||||
PromiseCompletion<T> completion;
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
PromiseCompletion<T> completion = getCompletion();
|
||||
if (completion != null) {
|
||||
callListener(listener, completion);
|
||||
} else {
|
||||
listeners.compareAndSet(null, new ConcurrentLinkedQueue<>());
|
||||
listeners.get().add(listener);
|
||||
completion = getCompletion();
|
||||
if (completion == null) {
|
||||
if (listeners == null) listeners = new LinkedList<>();
|
||||
listeners.add(listener);
|
||||
return this;
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
callListener(listener, completion);
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -390,12 +404,11 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
|
||||
}
|
||||
|
||||
private void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
|
||||
if (!setCompletion(ctx)) return;
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
if (!setCompletion(ctx)) return;
|
||||
|
||||
this.latch.countDown();
|
||||
Collection<PromiseListener<T>> listeners = this.listeners.get();
|
||||
if (listeners != null) {
|
||||
for (PromiseListener<T> listener : listeners) {
|
||||
callListener(listener, ctx);
|
||||
|
||||
Reference in New Issue
Block a user