mirror of
https://github.com/tommyskeff/futur4j.git
synced 2026-01-17 23:16:01 +00:00
better promise joining and combining
This commit is contained in:
@@ -10,4 +10,12 @@ public interface PromiseExecutor {
|
||||
|
||||
void runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit);
|
||||
|
||||
default void runSync(@NotNull Runnable task) {
|
||||
runSync(task, 0L, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
default void runAsync(@NotNull Runnable task) {
|
||||
runAsync(task, 0L, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ import java.util.Collection;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public abstract class AbstractPromise<T> implements Promise<T> {
|
||||
@@ -30,28 +29,42 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
||||
|
||||
protected abstract Logger getLogger();
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public T join(long interval, long timeout) throws TimeoutException {
|
||||
long start = System.currentTimeMillis();
|
||||
while (!isCompleted()) {
|
||||
if (System.currentTimeMillis() > start + timeout)
|
||||
throw new TimeoutException("Promise timed out after " + timeout + "ms");
|
||||
public T join(long interval, long timeoutMillis) throws TimeoutException {
|
||||
return join(timeoutMillis);
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(interval);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
@Override
|
||||
public T join(long timeoutMillis) throws TimeoutException {
|
||||
PromiseCompletion<T> completion = this.completion.get();
|
||||
if (completion != null) return joinCompletion(completion);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
long remainingTimeout = timeoutMillis;
|
||||
|
||||
synchronized (this.completion) {
|
||||
while (completion == null && remainingTimeout > 0){
|
||||
try {
|
||||
this.completion.wait(remainingTimeout);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
completion = this.completion.get();
|
||||
remainingTimeout = timeoutMillis - (System.currentTimeMillis() - start);
|
||||
}
|
||||
}
|
||||
|
||||
PromiseCompletion<T> completion = getCompletion();
|
||||
if (completion == null) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
if (completion == null)
|
||||
throw new TimeoutException("Promise timed out after " + timeoutMillis + "ms");
|
||||
|
||||
if (completion.isError()) {
|
||||
return joinCompletion(completion);
|
||||
}
|
||||
|
||||
private T joinCompletion(PromiseCompletion<T> completion) {
|
||||
if (completion.isError())
|
||||
throw new RuntimeException(completion.getException());
|
||||
}
|
||||
|
||||
return completion.getResult();
|
||||
}
|
||||
@@ -275,26 +288,33 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
||||
|
||||
@Override
|
||||
public @NotNull Promise<T> logExceptions() {
|
||||
return logExceptions("Exception caught in promise chain");
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NotNull Promise<T> logExceptions(@NotNull String message) {
|
||||
return addListener(ctx -> {
|
||||
if (ctx.isError()) {
|
||||
getLogger().error("Exception caught in promise chain", ctx.getException());
|
||||
getLogger().error(message, ctx.getException());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener) {
|
||||
if (isCompleted()) {
|
||||
getExecutor().runAsync(() -> {
|
||||
try {
|
||||
//noinspection ConstantConditions
|
||||
listener.handle(getCompletion());
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Exception caught in promise listener", e);
|
||||
}
|
||||
}, 0L, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
getListeners().add(listener);
|
||||
synchronized (completion) {
|
||||
if (isCompleted()) {
|
||||
getExecutor().runAsync(() -> {
|
||||
try {
|
||||
//noinspection ConstantConditions
|
||||
listener.handle(getCompletion());
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Exception caught in promise listener", e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
getListeners().add(listener);
|
||||
}
|
||||
}
|
||||
|
||||
return this;
|
||||
@@ -316,35 +336,27 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
||||
return timeout(ms, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
protected void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
|
||||
AtomicBoolean success = new AtomicBoolean();
|
||||
completion.getAndUpdate(c -> {
|
||||
if (c == null) {
|
||||
success.set(true);
|
||||
return ctx;
|
||||
} else {
|
||||
success.set(false);
|
||||
return c;
|
||||
}
|
||||
});
|
||||
private void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
|
||||
synchronized (completion) {
|
||||
if (!setCompletion(ctx)) return;
|
||||
|
||||
if (success.get()) {
|
||||
handleCompletion0(ctx);
|
||||
completion.notifyAll();
|
||||
getExecutor().runAsync(() -> {
|
||||
for (PromiseListener<T> listener : getListeners()) {
|
||||
if (!ctx.isActive()) return;
|
||||
|
||||
try {
|
||||
listener.handle(ctx);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Exception caught in promise listener", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected void handleCompletion0(@NotNull PromiseCompletion<T> ctx) {
|
||||
getExecutor().runAsync(() -> {
|
||||
for (PromiseListener<T> listener : getListeners()) {
|
||||
if (!ctx.isActive()) return;
|
||||
|
||||
try {
|
||||
listener.handle(ctx);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Exception caught in promise listener", e);
|
||||
}
|
||||
}
|
||||
}, 0L, TimeUnit.MILLISECONDS);
|
||||
private boolean setCompletion(PromiseCompletion<T> completion) {
|
||||
return this.completion.compareAndSet(null, completion);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -367,7 +379,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
||||
return completion.get();
|
||||
}
|
||||
|
||||
protected Collection<PromiseListener<T>> getListeners() {
|
||||
private Collection<PromiseListener<T>> getListeners() {
|
||||
return listeners;
|
||||
}
|
||||
|
||||
|
||||
@@ -31,8 +31,11 @@ public interface Promise<T> {
|
||||
|
||||
PromiseFactory getFactory();
|
||||
|
||||
@Deprecated
|
||||
T join(long interval, long timeout) throws TimeoutException;
|
||||
|
||||
T join(long timeout) throws TimeoutException;
|
||||
|
||||
@NotNull Promise<Void> thenRunSync(@NotNull ExceptionalRunnable task);
|
||||
|
||||
@NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit);
|
||||
@@ -73,6 +76,8 @@ public interface Promise<T> {
|
||||
|
||||
@NotNull Promise<T> logExceptions();
|
||||
|
||||
@NotNull Promise<T> logExceptions(@NotNull String message);
|
||||
|
||||
@NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener);
|
||||
|
||||
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);
|
||||
|
||||
@@ -42,15 +42,13 @@ public class Promises {
|
||||
}
|
||||
|
||||
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, @Nullable BiConsumer<K, Throwable> exceptionHandler, PromiseFactory factory) {
|
||||
Map<K, V> map = new HashMap<>();
|
||||
ReentrantLock lock = new ReentrantLock();
|
||||
if (promises.isEmpty()) return factory.resolve(Collections.emptyMap());
|
||||
|
||||
Map<K, V> map = new HashMap<>();
|
||||
Promise<Map<K, V>> promise = factory.unresolved();
|
||||
for (Map.Entry<K, Promise<V>> entry : promises.entrySet()) {
|
||||
entry.getValue().addListener((ctx) -> {
|
||||
lock.lock();
|
||||
|
||||
try {
|
||||
synchronized (map) {
|
||||
if (ctx.isError()) {
|
||||
if (exceptionHandler == null) {
|
||||
//noinspection ConstantConditions
|
||||
@@ -63,8 +61,6 @@ public class Promises {
|
||||
map.put(entry.getKey(), ctx.getResult());
|
||||
}
|
||||
if (map.size() == promises.size()) promise.complete(map);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -105,6 +101,8 @@ public class Promises {
|
||||
}
|
||||
|
||||
public static @NotNull Promise<Void> all(@NotNull List<Promise<?>> promises, PromiseFactory factory) {
|
||||
if (promises.isEmpty()) return factory.resolve(null);
|
||||
|
||||
Promise<Void> promise = factory.unresolved();
|
||||
for (Promise<?> p : promises) {
|
||||
p.addListener((ctx) -> {
|
||||
|
||||
Reference in New Issue
Block a user