finalize changes for 2.5.0 release

This commit is contained in:
2025-05-28 14:34:33 +01:00
parent 9137eed426
commit 8cba210a77
17 changed files with 305 additions and 271 deletions

View File

@@ -8,7 +8,6 @@ import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
@@ -44,12 +43,16 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
}
protected void handleCompletion(@NotNull PromiseCompletion<T> cmp) {
if (!COMPLETION_HANDLE.compareAndSet(this, null, cmp)) return;
if (!COMPLETION_HANDLE.compareAndSet(this, null, cmp)) {
return;
}
sync.releaseShared(1);
callListeners(cmp);
}
protected <F> Promise<T> completeExceptionallyDelayed(Throwable e, long delay, TimeUnit unit, PromiseScheduler<F> scheduler) {
protected <F> Promise<T> completeExceptionallyDelayed(Throwable e, long delay, TimeUnit unit,
PromiseScheduler<F> scheduler) {
runCompleter(this, () -> {
F future = scheduler.schedule(() -> completeExceptionally(e), delay, unit);
addDirectListener(_ -> scheduler.cancel(future));
@@ -60,7 +63,7 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
@SuppressWarnings("unchecked")
protected void callListeners(@NotNull PromiseCompletion<T> cmp) {
Iterator<PromiseListener<T>> iter = ((Iterable<PromiseListener<T>>) LISTENERS_HANDLE.getAndSet(this, null)).iterator();
var iter = ((Iterable<PromiseListener<T>>) LISTENERS_HANDLE.getAndSet(this, null)).iterator();
try {
while (iter.hasNext()) {
callListener(iter.next(), cmp);
@@ -76,11 +79,14 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
for (boolean haveNext = false; ; ) {
if (!haveNext) {
next = prev == Collections.EMPTY_LIST ? new ConcurrentLinkedQueue<>() : prev;
if (next != null) next.add(listener);
if (next != null) {
next.add(listener);
}
}
if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next))
if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next)) {
break;
}
haveNext = (prev == (prev = listeners));
}
@@ -133,13 +139,15 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
@Override
public @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit) {
Exception e = new CancellationException("Promise timed out after " + time + " " + unit.toString().toLowerCase());
Exception e = new CancellationException(
"Promise timed out after " + time + " " + unit.toString().toLowerCase());
return completeExceptionallyDelayed(e, time, unit, PromiseScheduler.getDefault());
}
@Override
public @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit) {
Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit.toString().toLowerCase());
Exception e = new TimeoutException(
"Promise stopped waiting after " + time + " " + unit.toString().toLowerCase());
return completeExceptionallyDelayed(e, time, unit, PromiseScheduler.getDefault());
}
@@ -170,36 +178,28 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
@Override
public @NotNull CompletableFuture<T> toFuture() {
return useCompletion(
() -> {
CompletableFuture<T> future = new CompletableFuture<>();
addDirectListener(future::complete, future::completeExceptionally);
future.whenComplete((result, error) -> {
if (error == null) {
complete(result);
} else {
completeExceptionally(error);
}
});
return useCompletion(() -> {
CompletableFuture<T> future = new CompletableFuture<>();
addDirectListener(future::complete, future::completeExceptionally);
future.whenComplete((result, error) -> {
if (error == null) {
complete(result);
} else {
completeExceptionally(error);
}
});
return future;
},
CompletableFuture::completedFuture,
CompletableFuture::failedFuture
);
return future;
}, CompletableFuture::completedFuture, CompletableFuture::failedFuture);
}
@Override
public @NotNull CompletionStage<T> toCompletionStage() {
return useCompletion(
() -> {
CompletableFuture<T> future = new CompletableFuture<>();
addDirectListener(future::complete, future::completeExceptionally);
return future;
},
CompletableFuture::completedStage,
CompletableFuture::failedStage
);
return useCompletion(() -> {
CompletableFuture<T> future = new CompletableFuture<>();
addDirectListener(future::complete, future::completeExceptionally);
return future;
}, CompletableFuture::completedStage, CompletableFuture::failedStage);
}
private static final class Sync extends AbstractQueuedSynchronizer {