add generator for futur-lazy

This commit is contained in:
WhatCats
2025-01-06 23:08:59 +01:00
parent 4236adbd9e
commit 6174193145
11 changed files with 595 additions and 188 deletions

View File

@@ -9,7 +9,6 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class PromiseJoiner<V, K, T, R> {
@@ -29,7 +28,6 @@ public abstract class PromiseJoiner<V, K, T, R> {
protected abstract R getResult();
protected void join(@NotNull Iterator<V> promises, boolean link) {
AtomicBoolean waiting = new AtomicBoolean();
AtomicInteger count = new AtomicInteger();
int i = 0;
@@ -50,19 +48,15 @@ public abstract class PromiseJoiner<V, K, T, R> {
Throwable e = onChildComplete(index, key, res);
if (e != null) {
joined.completeExceptionally(e);
} else if (count.decrementAndGet() == 0 && waiting.get()) {
} else if (count.decrementAndGet() == -1) {
joined.complete(getResult());
}
});
}
} while (promises.hasNext());
if (!joined.isCompleted()) {
count.updateAndGet(v -> {
if (v == 0) joined.complete(getResult());
else waiting.set(true);
return v;
});
if (count.decrementAndGet() == -1) {
joined.complete(getResult());
}
}

View File

@@ -48,8 +48,9 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
@NotNull ExceptionalFunction<T, V> completer
) {
return () -> {
if (promise.isCompleted()) return;
runCompleter(promise, () -> promise.complete(completer.apply(result)));
if (!promise.isCompleted()) {
runCompleter(promise, () -> promise.complete(completer.apply(result)));
}
};
}
@@ -479,14 +480,14 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
while (iter.hasNext()) {
PromiseListener<T> listener = iter.next();
if (listener instanceof AsyncPromiseListener) {
callListenerAsync(listener, ctx);
} else {
try {
try {
if (listener instanceof AsyncPromiseListener) {
callListenerAsync(listener, ctx);
} else {
callListenerNow(listener, ctx);
} finally {
iter.forEachRemaining(v -> callListenerAsyncLastResort(v, ctx));
}
} finally {
iter.forEachRemaining(v -> callListenerAsyncLastResort(v, ctx));
}
}
}
@@ -531,16 +532,18 @@ public abstract class AbstractPromise<T, FS, FA> implements CompletablePromise<T
@Override
public @NotNull CompletableFuture<T> toFuture() {
CompletableFuture<T> future = new CompletableFuture<>();
this.addDirectListener(future::complete, future::completeExceptionally);
addDirectListener(future::complete, future::completeExceptionally);
future.whenComplete((_, e) -> {
if (e instanceof CancellationException) {
this.cancel();
}
});
return future;
}
private static class DeferredExecutionException extends ExecutionException {
}
}

View File

@@ -5,4 +5,5 @@ package dev.tommyjs.futur.promise;
* executed asynchronously by the {@link PromiseFactory} that created the completed promise.
*/
public interface AsyncPromiseListener<T> extends PromiseListener<T> {
}

View File

@@ -8,7 +8,6 @@ import java.util.concurrent.atomic.AtomicReference;
public class ConcurrentResultArray<T> {
private static final float RESIZE_THRESHOLD = 0.75F;
private static final float RESIZE_FACTOR = 1.2F;
private final AtomicReference<T[]> ref;
@@ -20,7 +19,7 @@ public class ConcurrentResultArray<T> {
public void set(int index, T element) {
ref.updateAndGet(array -> {
if (array.length * RESIZE_THRESHOLD <= index) {
if (array.length <= index) {
array = Arrays.copyOf(array, (int) (array.length * RESIZE_FACTOR));
}