seperate scheduler and virtual executor promise chain methods

This commit is contained in:
WhatCats
2025-05-28 13:23:23 +02:00
parent a630984cb0
commit fea0575392
21 changed files with 595 additions and 231 deletions

View File

@@ -1,23 +1,25 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.function.ExceptionalConsumer;
import dev.tommyjs.futur.function.ExceptionalFunction;
import dev.tommyjs.futur.function.ExceptionalRunnable;
import dev.tommyjs.futur.function.ExceptionalSupplier;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.executor.PromiseScheduler;
import dev.tommyjs.futur.function.*;
import dev.tommyjs.futur.util.PromiseUtil;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import java.util.concurrent.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
public abstract class AbstractPromise<T, FS, FA> implements Promise<T> {
public abstract class AbstractPromise<T> implements Promise<T> {
public abstract @NotNull AbstractPromiseFactory<FS, FA> getFactory();
public abstract @NotNull AbstractPromiseFactory getFactory();
protected abstract @NotNull Promise<T> addAnyListener(@NotNull PromiseListener<T> listener);
@@ -37,7 +39,7 @@ public abstract class AbstractPromise<T, FS, FA> implements Promise<T> {
try {
return supplier.get();
} catch (Error error) {
// Rethrow error so the Thread can shut down
// Rethrow error so the Thread can shut down or whatever
throw error;
} catch (Throwable e) {
return handler.apply(e);
@@ -49,7 +51,7 @@ public abstract class AbstractPromise<T, FS, FA> implements Promise<T> {
runnable.run();
} catch (Error error) {
handler.accept(error);
// Rethrow error so the Thread can shut down
// Rethrow error so the Thread can shut down or whatever
throw error;
} catch (Throwable e) {
handler.accept(e);
@@ -104,16 +106,20 @@ public abstract class AbstractPromise<T, FS, FA> implements Promise<T> {
protected T joinCompletionChecked() throws ExecutionException {
PromiseCompletion<T> completion = getCompletion();
assert completion != null;
if (completion.isSuccess()) return completion.getResult();
throw new ExecutionException(completion.getException());
if (completion == null) {
throw new IllegalStateException("Promise is not completed yet.");
}
return completion.getChecked();
}
protected T joinCompletionUnchecked() {
PromiseCompletion<T> completion = getCompletion();
assert completion != null;
if (completion.isSuccess()) return completion.getResult();
throw new CompletionException(completion.getException());
if (completion == null) {
throw new IllegalStateException("Promise is not completed yet.");
}
return completion.get();
}
@Override
@@ -205,184 +211,190 @@ public abstract class AbstractPromise<T, FS, FA> implements Promise<T> {
);
}
private <F, V> @NotNull Promise<V> thenApply(@NotNull ExceptionalFunction<T, V> task, @NotNull PromiseExecutor<F> executor) {
CompletablePromise<V> promise = createLinked();
addDirectListener(
res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
F future = executor.run(runnable);
promise.addDirectListener(_ -> executor.cancel(future));
}),
promise::completeExceptionally
);
return promise;
}
private <F, V> @NotNull Promise<V> thenApplyDelayed(
@NotNull ExceptionalFunction<T, V> task, long delay,
@NotNull TimeUnit unit, @NotNull PromiseScheduler<F> scheduler
) {
CompletablePromise<V> promise = createLinked();
addDirectListener(
res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
F future = scheduler.schedule(runnable, delay, unit);
promise.addDirectListener(_ -> scheduler.cancel(future));
}),
promise::completeExceptionally
);
return promise;
}
private <V> @NotNull Promise<V> thenCompose(
@NotNull ExceptionalFunction<T, Promise<V>> task,
@NotNull PromiseExecutor<?> executor
) {
CompletablePromise<V> promise = createLinked();
thenApply(task, executor).addDirectListener(
nestedPromise -> {
if (nestedPromise == null) {
promise.complete(null);
} else {
PromiseUtil.propagateCompletion(nestedPromise, promise);
PromiseUtil.propagateCancel(promise, nestedPromise);
}
},
promise::completeExceptionally
);
return promise;
}
@Override
public @NotNull Promise<Void> thenRunSync(@NotNull ExceptionalRunnable task) {
return thenApplySync(_ -> {
task.run();
return null;
});
return thenApply(FunctionAdapter.adapt(task), getFactory().getSyncExecutor());
}
@Override
public @NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedSync(_ -> {
task.run();
return null;
}, delay, unit);
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor().scheduler());
}
@Override
public @NotNull Promise<Void> thenConsumeSync(@NotNull ExceptionalConsumer<T> task) {
return thenApplySync(result -> {
task.accept(result);
return null;
});
return thenApply(FunctionAdapter.adapt(task), getFactory().getSyncExecutor());
}
@Override
public @NotNull Promise<Void> thenConsumeDelayedSync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedSync(result -> {
task.accept(result);
return null;
}, delay, unit);
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor().scheduler());
}
@Override
public <V> @NotNull Promise<V> thenSupplySync(@NotNull ExceptionalSupplier<V> task) {
return thenApplySync(_ -> task.get());
return thenApply(FunctionAdapter.adapt(task), getFactory().getSyncExecutor());
}
@Override
public <V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedSync(_ -> task.get(), delay, unit);
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor().scheduler());
}
@Override
public <V> @NotNull Promise<V> thenApplySync(@NotNull ExceptionalFunction<T, V> task) {
CompletablePromise<V> promise = createLinked();
addDirectListener(
res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FS future = getFactory().getSyncExecutor().run(runnable);
promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
return promise;
return thenApply(task, getFactory().getSyncExecutor());
}
@Override
public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
CompletablePromise<V> promise = createLinked();
addDirectListener(
res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FS future = getFactory().getSyncExecutor().run(runnable, delay, unit);
promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
return promise;
return thenApplyDelayed(task, delay, unit, getFactory().getSyncExecutor().scheduler());
}
@Override
public <V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
CompletablePromise<V> promise = createLinked();
thenApplySync(task).addDirectListener(
nestedPromise -> {
if (nestedPromise == null) {
promise.complete(null);
} else {
PromiseUtil.propagateCompletion(nestedPromise, promise);
PromiseUtil.propagateCancel(promise, nestedPromise);
}
},
promise::completeExceptionally
);
return promise;
return thenCompose(task, getFactory().getSyncExecutor());
}
@Override
public @NotNull Promise<Void> thenRunAsync(@NotNull ExceptionalRunnable task) {
return thenApplyAsync(_ -> {
task.run();
return null;
});
return thenApply(FunctionAdapter.adapt(task), getFactory().getAsyncExecutor());
}
@Override
public @NotNull Promise<Void> thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedAsync(_ -> {
task.run();
return null;
}, delay, unit);
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor().scheduler());
}
@Override
public @NotNull Promise<Void> thenConsumeAsync(@NotNull ExceptionalConsumer<T> task) {
return thenApplyAsync(result -> {
task.accept(result);
return null;
});
return thenApply(FunctionAdapter.adapt(task), getFactory().getAsyncExecutor());
}
@Override
public @NotNull Promise<Void> thenConsumeDelayedAsync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedAsync(result -> {
task.accept(result);
return null;
}, delay, unit);
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor().scheduler());
}
@Override
public <V> @NotNull Promise<V> thenSupplyAsync(@NotNull ExceptionalSupplier<V> task) {
return thenApplyAsync(_ -> task.get());
return thenApply(FunctionAdapter.adapt(task), getFactory().getAsyncExecutor());
}
@Override
public <V> @NotNull Promise<V> thenSupplyDelayedAsync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedAsync(_ -> task.get(), delay, unit);
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor().scheduler());
}
@Override
public <V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task) {
CompletablePromise<V> promise = createLinked();
addDirectListener(
(res) -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FA future = getFactory().getAsyncExecutor().run(runnable);
promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
return promise;
return thenApply(task, getFactory().getAsyncExecutor());
}
@Override
public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
CompletablePromise<V> promise = createLinked();
addDirectListener(
res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task);
FA future = getFactory().getAsyncExecutor().run(runnable, delay, unit);
promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future));
}),
promise::completeExceptionally
);
return promise;
return thenApplyDelayed(task, delay, unit, getFactory().getAsyncExecutor().scheduler());
}
@Override
public <V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
CompletablePromise<V> promise = createLinked();
thenApplyAsync(task).addDirectListener(
nestedPromise -> {
if (nestedPromise == null) {
promise.complete(null);
} else {
PromiseUtil.propagateCompletion(nestedPromise, promise);
PromiseUtil.propagateCancel(promise, nestedPromise);
}
},
promise::completeExceptionally
);
return thenCompose(task, getFactory().getAsyncExecutor());
}
return promise;
@Override
public @NotNull Promise<Void> thenRunVirtual(@NotNull ExceptionalRunnable task) {
return thenApply(FunctionAdapter.adapt(task), getFactory().getVirtualExecutor());
}
@Override
public @NotNull Promise<Void> thenRunDelayedVirtual(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor().scheduler());
}
@Override
public @NotNull Promise<Void> thenConsumeVirtual(@NotNull ExceptionalConsumer<T> task) {
return thenApply(FunctionAdapter.adapt(task), getFactory().getVirtualExecutor());
}
@Override
public @NotNull Promise<Void> thenConsumeDelayedVirtual(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor().scheduler());
}
@Override
public <V> @NotNull Promise<V> thenSupplyVirtual(@NotNull ExceptionalSupplier<V> task) {
return thenApply(FunctionAdapter.adapt(task), getFactory().getVirtualExecutor());
}
@Override
public <V> @NotNull Promise<V> thenSupplyDelayedVirtual(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor().scheduler());
}
@Override
public <V> @NotNull Promise<V> thenApplyVirtual(@NotNull ExceptionalFunction<T, V> task) {
return thenApply(task, getFactory().getVirtualExecutor());
}
@Override
public <V> @NotNull Promise<V> thenApplyDelayedVirtual(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(task, delay, unit, getFactory().getVirtualExecutor().scheduler());
}
@Override
public <V> @NotNull Promise<V> thenComposeVirtual(@NotNull ExceptionalFunction<T, Promise<V>> task) {
return thenCompose(task, getFactory().getVirtualExecutor());
}
@Override
@@ -491,25 +503,6 @@ public abstract class AbstractPromise<T, FS, FA> implements Promise<T> {
);
}
@Override
public @NotNull CompletableFuture<T> toFuture() {
return useCompletion(
() -> {
CompletableFuture<T> future = new CompletableFuture<>();
addDirectListener(future::complete, future::completeExceptionally);
future.whenComplete((_, e) -> {
if (e instanceof CancellationException) {
cancel();
}
});
return future;
},
CompletableFuture::completedFuture,
CompletableFuture::failedFuture
);
}
private static class DeferredExecutionException extends ExecutionException {
}