cleanup combine methods

This commit is contained in:
WhatCats
2025-01-10 21:15:52 +01:00
parent e4e61c4b6c
commit 4d01a8a418
9 changed files with 578 additions and 678 deletions

View File

@@ -10,49 +10,51 @@ import org.jetbrains.annotations.NotNull;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class PromiseJoiner<V, K, T, R> {
public abstract class PromiseJoiner<T, Key, Value, Result> {
private final CompletablePromise<R> joined;
private final CompletablePromise<Result> joined;
protected PromiseJoiner(@NotNull PromiseFactory factory) {
this.joined = factory.unresolved();
}
protected abstract K getChildKey(V value);
protected abstract Key getChildKey(T value);
protected abstract @NotNull Promise<T> getChildPromise(V value);
protected abstract @NotNull Promise<Value> getChildPromise(T value);
protected abstract void onChildComplete(int index, K key, @NotNull PromiseCompletion<T> completion);
protected abstract void onChildComplete(int index, Key key, @NotNull PromiseCompletion<Value> completion);
protected abstract R getResult();
protected abstract Result getResult();
protected void join(@NotNull Iterator<V> promises, boolean link) {
protected void join(@NotNull Iterator<T> promises) {
AtomicInteger count = new AtomicInteger();
int i = 0;
do {
V value = promises.next();
Promise<T> p = getChildPromise(value);
if (joined.isCompleted()) {
promises.forEachRemaining(v -> getChildPromise(v).cancel());
return;
}
if (link) {
T value = promises.next();
Promise<Value> p = getChildPromise(value);
if (!p.isCompleted()) {
PromiseUtil.cancelOnComplete(joined, p);
}
if (!joined.isCompleted()) {
count.incrementAndGet();
K key = getChildKey(value);
int index = i++;
count.incrementAndGet();
Key key = getChildKey(value);
int index = i++;
p.addAsyncListener(res -> {
onChildComplete(index, key, res);
if (res.isError()) {
assert res.getException() != null;
joined.completeExceptionally(res.getException());
} else if (count.decrementAndGet() == -1) {
joined.complete(getResult());
}
});
}
p.addAsyncListener(res -> {
onChildComplete(index, key, res);
if (res.isError()) {
assert res.getException() != null;
joined.completeExceptionally(res.getException());
} else if (count.decrementAndGet() == -1) {
joined.complete(getResult());
}
});
} while (promises.hasNext());
if (count.decrementAndGet() == -1) {
@@ -60,7 +62,7 @@ public abstract class PromiseJoiner<V, K, T, R> {
}
}
public @NotNull Promise<R> joined() {
public @NotNull Promise<Result> joined() {
return joined;
}