respect executors that can't schedule

This commit is contained in:
WhatCats
2025-05-28 15:18:51 +02:00
parent fea0575392
commit 9137eed426
4 changed files with 47 additions and 24 deletions

View File

@@ -1,6 +1,7 @@
package dev.tommyjs.futur.executor; package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@@ -24,8 +25,8 @@ class ExecutorImpl implements PromiseExecutor<Void> {
} }
@Override @Override
public @NotNull PromiseScheduler<?> scheduler() { public @Nullable PromiseScheduler<?> scheduler() {
return PromiseScheduler.getDefault(); return null;
} }
} }

View File

@@ -1,6 +1,7 @@
package dev.tommyjs.futur.executor; package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@@ -24,8 +25,8 @@ class ExecutorServiceImpl implements PromiseExecutor<Future<?>> {
} }
@Override @Override
public @NotNull PromiseScheduler<?> scheduler() { public @Nullable PromiseScheduler<?> scheduler() {
return PromiseScheduler.getDefault(); return null;
} }
} }

View File

@@ -1,6 +1,7 @@
package dev.tommyjs.futur.executor; package dev.tommyjs.futur.executor;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@@ -77,6 +78,6 @@ public interface PromiseExecutor<T> {
*/ */
boolean cancel(@NotNull T task); boolean cancel(@NotNull T task);
@NotNull PromiseScheduler<?> scheduler(); @Nullable PromiseScheduler<?> scheduler();
} }

View File

@@ -215,9 +215,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
CompletablePromise<V> promise = createLinked(); CompletablePromise<V> promise = createLinked();
addDirectListener( addDirectListener(
res -> runCompleter(promise, () -> { res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task); Runnable completer = createCompleter(res, promise, task);
F future = executor.run(runnable); execute(promise, completer, executor);
promise.addDirectListener(_ -> executor.cancel(future));
}), }),
promise::completeExceptionally promise::completeExceptionally
); );
@@ -227,14 +226,22 @@ public abstract class AbstractPromise<T> implements Promise<T> {
private <F, V> @NotNull Promise<V> thenApplyDelayed( private <F, V> @NotNull Promise<V> thenApplyDelayed(
@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull ExceptionalFunction<T, V> task, long delay,
@NotNull TimeUnit unit, @NotNull PromiseScheduler<F> scheduler @NotNull TimeUnit unit, @NotNull PromiseExecutor<F> executor
) { ) {
CompletablePromise<V> promise = createLinked(); CompletablePromise<V> promise = createLinked();
addDirectListener( addDirectListener(
res -> runCompleter(promise, () -> { res -> runCompleter(promise, () -> {
Runnable runnable = createCompleter(res, promise, task); Runnable completer = createCompleter(res, promise, task);
F future = scheduler.schedule(runnable, delay, unit); PromiseScheduler<?> scheduler = executor.scheduler();
promise.addDirectListener(_ -> scheduler.cancel(future)); if (scheduler == null) {
schedule(
promise,
() -> runCompleter(promise, () -> execute(promise, completer, executor)),
delay, unit, PromiseScheduler.getDefault()
);
} else {
schedule(promise, completer, delay, unit, scheduler);
}
}), }),
promise::completeExceptionally promise::completeExceptionally
); );
@@ -242,6 +249,19 @@ public abstract class AbstractPromise<T> implements Promise<T> {
return promise; return promise;
} }
private <F> void execute(@NotNull Promise<?> promise, @NotNull Runnable task, @NotNull PromiseExecutor<F >executor) throws Exception {
F future = executor.run(task);
promise.addDirectListener(_ -> executor.cancel(future));
}
private <F> void schedule(
@NotNull Promise<?> promise, @NotNull Runnable task,
long delay, @NotNull TimeUnit unit, @NotNull PromiseScheduler<F> scheduler
) throws Exception {
F future = scheduler.schedule(task, delay, unit);
promise.addDirectListener(_ -> scheduler.cancel(future));
}
private <V> @NotNull Promise<V> thenCompose( private <V> @NotNull Promise<V> thenCompose(
@NotNull ExceptionalFunction<T, Promise<V>> task, @NotNull ExceptionalFunction<T, Promise<V>> task,
@NotNull PromiseExecutor<?> executor @NotNull PromiseExecutor<?> executor
@@ -269,7 +289,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public @NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { public @NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor().scheduler()); return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor());
} }
@Override @Override
@@ -279,7 +299,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public @NotNull Promise<Void> thenConsumeDelayedSync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) { public @NotNull Promise<Void> thenConsumeDelayedSync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor().scheduler()); return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor());
} }
@Override @Override
@@ -289,7 +309,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public <V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) { public <V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor().scheduler()); return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor());
} }
@Override @Override
@@ -299,7 +319,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) { public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(task, delay, unit, getFactory().getSyncExecutor().scheduler()); return thenApplyDelayed(task, delay, unit, getFactory().getSyncExecutor());
} }
@Override @Override
@@ -314,7 +334,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public @NotNull Promise<Void> thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { public @NotNull Promise<Void> thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor().scheduler()); return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor());
} }
@Override @Override
@@ -324,7 +344,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public @NotNull Promise<Void> thenConsumeDelayedAsync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) { public @NotNull Promise<Void> thenConsumeDelayedAsync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor().scheduler()); return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor());
} }
@Override @Override
@@ -334,7 +354,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public <V> @NotNull Promise<V> thenSupplyDelayedAsync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) { public <V> @NotNull Promise<V> thenSupplyDelayedAsync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor().scheduler()); return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor());
} }
@Override @Override
@@ -344,7 +364,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) { public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(task, delay, unit, getFactory().getAsyncExecutor().scheduler()); return thenApplyDelayed(task, delay, unit, getFactory().getAsyncExecutor());
} }
@Override @Override
@@ -359,7 +379,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public @NotNull Promise<Void> thenRunDelayedVirtual(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { public @NotNull Promise<Void> thenRunDelayedVirtual(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor().scheduler()); return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor());
} }
@Override @Override
@@ -369,7 +389,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public @NotNull Promise<Void> thenConsumeDelayedVirtual(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) { public @NotNull Promise<Void> thenConsumeDelayedVirtual(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor().scheduler()); return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor());
} }
@Override @Override
@@ -379,7 +399,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public <V> @NotNull Promise<V> thenSupplyDelayedVirtual(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) { public <V> @NotNull Promise<V> thenSupplyDelayedVirtual(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor().scheduler()); return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor());
} }
@Override @Override
@@ -389,7 +409,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public <V> @NotNull Promise<V> thenApplyDelayedVirtual(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) { public <V> @NotNull Promise<V> thenApplyDelayedVirtual(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayed(task, delay, unit, getFactory().getVirtualExecutor().scheduler()); return thenApplyDelayed(task, delay, unit, getFactory().getVirtualExecutor());
} }
@Override @Override