diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..1f19ed9 --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,26 @@ +name: Build and publish + +on: + push: + branches: + - main + +jobs: + publish: + name: Publish build + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Java + uses: actions/setup-java@v4 + with: + distribution: corretto + java-version: 23 + + - name: Make Gradle executable + run: chmod +x ./gradlew + + - name: Build and publish project + run: ./gradlew publish -PtommyjsUsername=${{ secrets.NEXUS_USERNAME }} -PtommyjsPassword=${{ secrets.NEXUS_PASSWORD }} diff --git a/README.md b/README.md index 8e94b11..efc8187 100644 --- a/README.md +++ b/README.md @@ -14,8 +14,8 @@ repositories { } dependencies { - compile 'dev.tommyjs:futur-api:2.4.0' - compile 'dev.tommyjs:futur-lazy:2.4.0' + compile 'dev.tommyjs:futur-api:2.5.0' + compile 'dev.tommyjs:futur-lazy:2.5.0' } ``` ### Gradle DSL @@ -25,8 +25,8 @@ repositories { } dependencies { - implementation("dev.tommyjs:futur-api:2.4.0") - implementation("dev.tommyjs:futur-lazy:2.4.0") + implementation("dev.tommyjs:futur-api:2.5.0") + implementation("dev.tommyjs:futur-lazy:2.5.0") } ``` ### Maven @@ -42,12 +42,12 @@ dependencies { dev.tommyjs futur-api - 2.4.0 + 2.5.0 dev.tommyjs futur-lazy - 2.4.0 + 2.5.0 ``` \ No newline at end of file diff --git a/build.gradle b/build.gradle index ffd6e04..1eb3b4d 100644 --- a/build.gradle +++ b/build.gradle @@ -1,23 +1,38 @@ plugins { id 'java-library' id 'com.github.johnrengelman.shadow' version '8.1.1' - id 'io.github.gradle-nexus.publish-plugin' version '2.0.0' -} - -nexusPublishing { - repositories { - tommyjs { - nexusUrl = uri("https://repo.tommyjs.dev/repository/maven-releases") - } - } + id 'maven-publish' } subprojects { group = 'dev.tommyjs' - version = '2.4.1' + version = '2.5.0' apply plugin: 'java-library' apply plugin: 'com.github.johnrengelman.shadow' + apply plugin : 'maven-publish' + + publishing { + publications { + mavenJava(MavenPublication) { + from(components["java"]) + pom { + name = project.name + } + } + } + + repositories { + maven { + name = 'tommyjs' + url = uri("https://repo.tommyjs.dev/repository/maven-releases/") + credentials { + username = findProperty("tommyjsUsername") as String + password = findProperty("tommyjsPassword") as String + } + } + } + } tasks { build { @@ -48,8 +63,8 @@ subprojects { } java { - sourceCompatibility = JavaVersion.VERSION_22 - targetCompatibility = JavaVersion.VERSION_22 + sourceCompatibility = JavaVersion.VERSION_23 + targetCompatibility = JavaVersion.VERSION_23 withSourcesJar() } } \ No newline at end of file diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorImpl.java new file mode 100644 index 0000000..0e0da40 --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorImpl.java @@ -0,0 +1,32 @@ +package dev.tommyjs.futur.executor; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.Executor; + +class ExecutorImpl implements PromiseExecutor { + + private final Executor executor; + + public ExecutorImpl(@NotNull Executor executor) { + this.executor = executor; + } + + @Override + public @NotNull Void run(@NotNull Runnable task) { + executor.execute(task); + return null; + } + + @Override + public boolean cancel(@NotNull Void task) { + return false; + } + + @Override + public @Nullable PromiseScheduler scheduler() { + return null; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorServiceImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorServiceImpl.java index acefe38..975bace 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorServiceImpl.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/ExecutorServiceImpl.java @@ -1,32 +1,32 @@ package dev.tommyjs.futur.executor; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; class ExecutorServiceImpl implements PromiseExecutor> { - private final ScheduledExecutorService service; + private final ExecutorService executor; - public ExecutorServiceImpl(@NotNull ScheduledExecutorService service) { - this.service = service; + public ExecutorServiceImpl(@NotNull ExecutorService executor) { + this.executor = executor; } @Override - public Future run(@NotNull Runnable task) { - return service.submit(task); + public @NotNull Future run(@NotNull Runnable task) { + return executor.submit(task); } @Override - public Future run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { - return service.schedule(task, delay, unit); - } - - @Override - public boolean cancel(Future task) { + public boolean cancel(@NotNull Future task) { return task.cancel(true); } + @Override + public @Nullable PromiseScheduler scheduler() { + return null; + } + } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java index 1143ec2..86bee7b 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java @@ -1,10 +1,11 @@ package dev.tommyjs.futur.executor; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; /** * An executor that can run tasks and schedule tasks to run in the future. @@ -42,11 +43,21 @@ public interface PromiseExecutor { /** * Creates a new {@link PromiseExecutor} that runs tasks on the given executor service. * - * @param service the executor service + * @param executor the executor service * @return the new executor */ - static PromiseExecutor of(@NotNull ScheduledExecutorService service) { - return new ExecutorServiceImpl(service); + static PromiseExecutor of(@NotNull ScheduledExecutorService executor) { + return new ScheduledExecutorImpl(executor); + } + + /** + * Creates a new {@link PromiseExecutor} that runs tasks on the given executor service. + * + * @param executor the executor service + * @return the new executor + */ + static PromiseExecutor of(@NotNull ExecutorService executor) { + return new ExecutorServiceImpl(executor); } /** @@ -56,26 +67,17 @@ public interface PromiseExecutor { * @return the task * @throws Exception if scheduling the task failed */ - T run(@NotNull Runnable task) throws Exception; - - /** - * Runs the given task after the given delay. - * - * @param task the task - * @param delay the delay - * @param unit the time unit - * @return the task - * @throws Exception if scheduling the task failed - */ - T run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) throws Exception; + @NotNull T run(@NotNull Runnable task) throws Exception; /** * Cancels the given task if possible. This may interrupt the task mid-execution. * * @param task the task - * @return {@code true} if the task was cancelled. {@code false} if the task was already completed - * or could not be cancelled. + * @return {@code true} if the task was cancelled, {@code false} if the task was already completed + * or could not be cancelled */ - boolean cancel(T task); + boolean cancel(@NotNull T task); + + @Nullable PromiseScheduler scheduler(); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseScheduler.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseScheduler.java new file mode 100644 index 0000000..d9fdafc --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseScheduler.java @@ -0,0 +1,36 @@ +package dev.tommyjs.futur.executor; + +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.TimeUnit; + +/** + * A scheduler for running tasks after a delay. + */ +public interface PromiseScheduler { + + static @NotNull PromiseScheduler getDefault() { + return PromiseSchedulerDefault.INSTANCE; + } + + /** + * Runs the given task after the given delay. + * + * @param task the task + * @param delay the delay + * @param unit the time unit + * @return the task + * @throws Exception if scheduling the task failed + */ + @NotNull T schedule(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) throws Exception; + + /** + * Cancels the given task if possible. This may interrupt the task mid-execution. + * + * @param task the task + * @return {@code true} if the task was cancelled, {@code false} if the task was already completed + * or could not be cancelled + */ + boolean cancel(@NotNull T task); + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseSchedulerDefault.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseSchedulerDefault.java new file mode 100644 index 0000000..bc6c367 --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseSchedulerDefault.java @@ -0,0 +1,28 @@ +package dev.tommyjs.futur.executor; + +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.*; + +class PromiseSchedulerDefault implements PromiseScheduler> { + + static final PromiseSchedulerDefault INSTANCE = new PromiseSchedulerDefault(); + + private final ScheduledExecutorService executor; + + PromiseSchedulerDefault() { + ThreadFactory factory = Thread.ofPlatform().name("promise-scheduler").daemon(true).factory(); + this.executor = Executors.newSingleThreadScheduledExecutor(factory); + } + + @Override + public @NotNull ScheduledFuture schedule(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + return executor.schedule(task, delay, unit); + } + + @Override + public boolean cancel(@NotNull ScheduledFuture task) { + return task.cancel(true); + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/ScheduledExecutorImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/ScheduledExecutorImpl.java new file mode 100644 index 0000000..0f90f1e --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/ScheduledExecutorImpl.java @@ -0,0 +1,37 @@ +package dev.tommyjs.futur.executor; + +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +class ScheduledExecutorImpl implements PromiseExecutor>, PromiseScheduler> { + + private final ScheduledExecutorService executor; + + public ScheduledExecutorImpl(@NotNull ScheduledExecutorService executor) { + this.executor = executor; + } + + @Override + public @NotNull Future run(@NotNull Runnable task) { + return executor.submit(task); + } + + @Override + public @NotNull Future schedule(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + return executor.schedule(task, delay, unit); + } + + @Override + public boolean cancel(@NotNull Future task) { + return task.cancel(true); + } + + @Override + public @NotNull PromiseScheduler> scheduler() { + return this; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/VirtualThreadImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/VirtualThreadImpl.java index 397efc3..044b74b 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/executor/VirtualThreadImpl.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/VirtualThreadImpl.java @@ -4,15 +4,15 @@ import org.jetbrains.annotations.NotNull; import java.util.concurrent.TimeUnit; -class VirtualThreadImpl implements PromiseExecutor { +class VirtualThreadImpl implements PromiseExecutor, PromiseScheduler { @Override - public Thread run(@NotNull Runnable task) { + public @NotNull Thread run(@NotNull Runnable task) { return Thread.ofVirtual().start(task); } @Override - public Thread run(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + public @NotNull Thread schedule(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { return Thread.ofVirtual().start(() -> { try { Thread.sleep(unit.toMillis(delay)); @@ -24,7 +24,7 @@ class VirtualThreadImpl implements PromiseExecutor { } @Override - public boolean cancel(Thread task) { + public boolean cancel(@NotNull Thread task) { if (task.isAlive()) { task.interrupt(); return true; @@ -33,4 +33,9 @@ class VirtualThreadImpl implements PromiseExecutor { } } + @Override + public @NotNull PromiseScheduler scheduler() { + return this; + } + } \ No newline at end of file diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/FunctionAdapter.java b/futur-api/src/main/java/dev/tommyjs/futur/function/FunctionAdapter.java new file mode 100644 index 0000000..fcbc009 --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/FunctionAdapter.java @@ -0,0 +1,25 @@ +package dev.tommyjs.futur.function; + +import org.jetbrains.annotations.NotNull; + +public final class FunctionAdapter { + + public static @NotNull ExceptionalFunction adapt(@NotNull ExceptionalConsumer consumer) { + return value -> { + consumer.accept(value); + return null; + }; + } + + public static @NotNull ExceptionalFunction adapt(@NotNull ExceptionalRunnable runnable) { + return _ -> { + runnable.run(); + return null; + }; + } + + public static @NotNull ExceptionalFunction adapt(@NotNull ExceptionalSupplier supplier) { + return _ -> supplier.get(); + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/CompletionJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/CompletionJoiner.java index 82f9539..2faeaf7 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/joiner/CompletionJoiner.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/CompletionJoiner.java @@ -13,11 +13,7 @@ public class CompletionJoiner extends PromiseJoiner, Void, Void, List private final ConcurrentResultArray> results; - public CompletionJoiner( - @NotNull PromiseFactory factory, - @NotNull Iterator> promises, - int expectedSize - ) { + public CompletionJoiner(@NotNull PromiseFactory factory, @NotNull Iterator> promises, int expectedSize) { super(factory); results = new ConcurrentResultArray<>(expectedSize); join(promises); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/MappedResultJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/MappedResultJoiner.java index ee64b0e..a2a5ad9 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/joiner/MappedResultJoiner.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/MappedResultJoiner.java @@ -12,11 +12,8 @@ public class MappedResultJoiner extends PromiseJoiner> results; - public MappedResultJoiner( - @NotNull PromiseFactory factory, - @NotNull Iterator>> promises, - int expectedSize - ) { + public MappedResultJoiner(@NotNull PromiseFactory factory, @NotNull Iterator>> promises, + int expectedSize) { super(factory); this.results = new ConcurrentResultArray<>(expectedSize); join(promises); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/joiner/ResultJoiner.java b/futur-api/src/main/java/dev/tommyjs/futur/joiner/ResultJoiner.java index b4e1a2d..3f53887 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/joiner/ResultJoiner.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/joiner/ResultJoiner.java @@ -13,11 +13,7 @@ public class ResultJoiner extends PromiseJoiner, Void, T, List> private final ConcurrentResultArray results; - public ResultJoiner( - @NotNull PromiseFactory factory, - @NotNull Iterator> promises, - int expectedSize - ) { + public ResultJoiner(@NotNull PromiseFactory factory, @NotNull Iterator> promises, int expectedSize) { super(factory); this.results = new ConcurrentResultArray<>(expectedSize); join(promises); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 1cdfb28..c05f705 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -1,23 +1,25 @@ package dev.tommyjs.futur.promise; -import dev.tommyjs.futur.function.ExceptionalConsumer; -import dev.tommyjs.futur.function.ExceptionalFunction; -import dev.tommyjs.futur.function.ExceptionalRunnable; -import dev.tommyjs.futur.function.ExceptionalSupplier; +import dev.tommyjs.futur.executor.PromiseExecutor; +import dev.tommyjs.futur.executor.PromiseScheduler; +import dev.tommyjs.futur.function.*; import dev.tommyjs.futur.util.PromiseUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; -import java.util.concurrent.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -public abstract class AbstractPromise implements Promise { +public abstract class AbstractPromise implements Promise { - public abstract @NotNull AbstractPromiseFactory getFactory(); + public abstract @NotNull AbstractPromiseFactory getFactory(); protected abstract @NotNull Promise addAnyListener(@NotNull PromiseListener listener); @@ -37,7 +39,7 @@ public abstract class AbstractPromise implements Promise { try { return supplier.get(); } catch (Error error) { - // Rethrow error so the Thread can shut down + // rethrow unrecoverable errors throw error; } catch (Throwable e) { return handler.apply(e); @@ -49,7 +51,7 @@ public abstract class AbstractPromise implements Promise { runnable.run(); } catch (Error error) { handler.accept(error); - // Rethrow error so the Thread can shut down + // rethrow unrecoverable errors throw error; } catch (Throwable e) { handler.accept(e); @@ -62,9 +64,13 @@ public abstract class AbstractPromise implements Promise { protected V useCompletion(Supplier unresolved, Function completed, Function failed) { PromiseCompletion completion = getCompletion(); - if (completion == null) return unresolved.get(); - else if (completion.isSuccess()) return completed.apply(completion.getResult()); - else return failed.apply(completion.getException()); + if (completion == null) { + return unresolved.get(); + } else if (completion.isSuccess()) { + return completed.apply(completion.getResult()); + } else { + return failed.apply(completion.getException()); + } } protected @NotNull Runnable createCompleter(T result, @NotNull CompletablePromise promise, @@ -104,21 +110,27 @@ public abstract class AbstractPromise implements Promise { protected T joinCompletionChecked() throws ExecutionException { PromiseCompletion completion = getCompletion(); - assert completion != null; - if (completion.isSuccess()) return completion.getResult(); - throw new ExecutionException(completion.getException()); + if (completion == null) { + throw new IllegalStateException("Promise is not completed yet."); + } + + return completion.getChecked(); } protected T joinCompletionUnchecked() { PromiseCompletion completion = getCompletion(); - assert completion != null; - if (completion.isSuccess()) return completion.getResult(); - throw new CompletionException(completion.getException()); + if (completion == null) { + throw new IllegalStateException("Promise is not completed yet."); + } + + return completion.get(); } @Override public @NotNull Promise fork() { - if (isCompleted()) return this; + if (isCompleted()) { + return this; + } CompletablePromise fork = getFactory().unresolved(); PromiseUtil.propagateCompletion(this, fork); @@ -148,241 +160,243 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApply(@NotNull ExceptionalFunction task) { - return useCompletion( - () -> { - CompletablePromise promise = createLinked(); - addDirectListener( - res -> createCompleter(res, promise, task).run(), - promise::completeExceptionally - ); + return useCompletion(() -> { + CompletablePromise promise = createLinked(); + addDirectListener(res -> createCompleter(res, promise, task).run(), promise::completeExceptionally); - return promise; - }, - result -> supplySafe( - () -> getFactory().resolve(task.apply(result)), - getFactory()::error - ), - getFactory()::error - ); + return promise; + }, result -> supplySafe(() -> getFactory().resolve(task.apply(result)), getFactory()::error), getFactory()::error); } @Override public @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task) { - return useCompletion( - () -> { - CompletablePromise promise = createLinked(); - thenApply(task).addDirectListener( - result -> { - if (result == null) { - promise.complete(null); - } else { - PromiseUtil.propagateCompletion(result, promise); - PromiseUtil.propagateCancel(promise, result); - } - }, - promise::completeExceptionally - ); + return useCompletion(() -> { + CompletablePromise promise = createLinked(); + thenApply(task).addDirectListener(result -> { + if (result == null) { + promise.complete(null); + } else { + PromiseUtil.propagateCompletion(result, promise); + PromiseUtil.propagateCancel(promise, result); + } + }, promise::completeExceptionally); + return promise; + }, result -> supplySafe(() -> { + Promise nested = task.apply(result); + if (nested == null) { + return getFactory().resolve(null); + } else if (nested.isCompleted()) { + return nested; + } else { + CompletablePromise promise = createLinked(); + PromiseUtil.propagateCompletion(nested, promise); + PromiseUtil.propagateCancel(promise, nested); return promise; - }, - result -> supplySafe( - () -> { - Promise nested = task.apply(result); - if (nested == null) { - return getFactory().resolve(null); - } else if (nested.isCompleted()) { - return nested; - } else { - CompletablePromise promise = createLinked(); - PromiseUtil.propagateCompletion(nested, promise); - PromiseUtil.propagateCancel(promise, nested); - return promise; - } - }, - getFactory()::error - ), - getFactory()::error - ); + } + }, getFactory()::error), getFactory()::error); + } + + private @NotNull Promise thenApply(@NotNull ExceptionalFunction task, + @NotNull PromiseExecutor executor) { + CompletablePromise promise = createLinked(); + addDirectListener(res -> runCompleter(promise, () -> { + Runnable completer = createCompleter(res, promise, task); + execute(promise, completer, executor); + }), promise::completeExceptionally); + + return promise; + } + + private @NotNull Promise thenApplyDelayed(@NotNull ExceptionalFunction task, long delay, + @NotNull TimeUnit unit, @NotNull PromiseExecutor executor) { + CompletablePromise promise = createLinked(); + addDirectListener(res -> runCompleter(promise, () -> { + Runnable completer = createCompleter(res, promise, task); + PromiseScheduler scheduler = executor.scheduler(); + if (scheduler == null) { + schedule(promise, () -> runCompleter(promise, () -> execute(promise, completer, executor)), delay, unit, + PromiseScheduler.getDefault()); + } else { + schedule(promise, completer, delay, unit, scheduler); + } + }), promise::completeExceptionally); + + return promise; + } + + private void execute(@NotNull Promise promise, @NotNull Runnable task, @NotNull PromiseExecutor executor) + throws Exception { + F future = executor.run(task); + promise.addDirectListener(_ -> executor.cancel(future)); + } + + private void schedule(@NotNull Promise promise, @NotNull Runnable task, long delay, @NotNull TimeUnit unit, + @NotNull PromiseScheduler scheduler) throws Exception { + F future = scheduler.schedule(task, delay, unit); + promise.addDirectListener(_ -> scheduler.cancel(future)); + } + + private @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task, + @NotNull PromiseExecutor executor) { + CompletablePromise promise = createLinked(); + thenApply(task, executor).addDirectListener(nestedPromise -> { + if (nestedPromise == null) { + promise.complete(null); + } else { + PromiseUtil.propagateCompletion(nestedPromise, promise); + PromiseUtil.propagateCancel(promise, nestedPromise); + } + }, promise::completeExceptionally); + + return promise; } @Override public @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task) { - return thenApplySync(_ -> { - task.run(); - return null; - }); + return thenApply(FunctionAdapter.adapt(task), getFactory().getSyncExecutor()); } @Override - public @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(_ -> { - task.run(); - return null; - }, delay, unit); + public @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenConsumeSync(@NotNull ExceptionalConsumer task) { - return thenApplySync(result -> { - task.accept(result); - return null; - }); + return thenApply(FunctionAdapter.adapt(task), getFactory().getSyncExecutor()); } @Override - public @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(result -> { - task.accept(result); - return null; - }, delay, unit); + public @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task) { - return thenApplySync(_ -> task.get()); + return thenApply(FunctionAdapter.adapt(task), getFactory().getSyncExecutor()); } @Override - public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(_ -> task.get(), delay, unit); + public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { - CompletablePromise promise = createLinked(); - addDirectListener( - res -> runCompleter(promise, () -> { - Runnable runnable = createCompleter(res, promise, task); - FS future = getFactory().getSyncExecutor().run(runnable); - promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future)); - }), - promise::completeExceptionally - ); - - return promise; + return thenApply(task, getFactory().getSyncExecutor()); } @Override - public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { - CompletablePromise promise = createLinked(); - addDirectListener( - res -> runCompleter(promise, () -> { - Runnable runnable = createCompleter(res, promise, task); - FS future = getFactory().getSyncExecutor().run(runnable, delay, unit); - promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future)); - }), - promise::completeExceptionally - ); - - return promise; + public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(task, delay, unit, getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { - CompletablePromise promise = createLinked(); - thenApplySync(task).addDirectListener( - nestedPromise -> { - if (nestedPromise == null) { - promise.complete(null); - } else { - PromiseUtil.propagateCompletion(nestedPromise, promise); - PromiseUtil.propagateCancel(promise, nestedPromise); - } - }, - promise::completeExceptionally - ); - - return promise; + return thenCompose(task, getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task) { - return thenApplyAsync(_ -> { - task.run(); - return null; - }); + return thenApply(FunctionAdapter.adapt(task), getFactory().getAsyncExecutor()); } @Override - public @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(_ -> { - task.run(); - return null; - }, delay, unit); + public @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenConsumeAsync(@NotNull ExceptionalConsumer task) { - return thenApplyAsync(result -> { - task.accept(result); - return null; - }); + return thenApply(FunctionAdapter.adapt(task), getFactory().getAsyncExecutor()); } @Override - public @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(result -> { - task.accept(result); - return null; - }, delay, unit); + public @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task) { - return thenApplyAsync(_ -> task.get()); + return thenApply(FunctionAdapter.adapt(task), getFactory().getAsyncExecutor()); } @Override - public @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(_ -> task.get(), delay, unit); + public @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { - CompletablePromise promise = createLinked(); - addDirectListener( - (res) -> runCompleter(promise, () -> { - Runnable runnable = createCompleter(res, promise, task); - FA future = getFactory().getAsyncExecutor().run(runnable); - promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future)); - }), - promise::completeExceptionally - ); - - return promise; + return thenApply(task, getFactory().getAsyncExecutor()); } @Override - public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { - CompletablePromise promise = createLinked(); - addDirectListener( - res -> runCompleter(promise, () -> { - Runnable runnable = createCompleter(res, promise, task); - FA future = getFactory().getAsyncExecutor().run(runnable, delay, unit); - promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future)); - }), - promise::completeExceptionally - ); - - return promise; + public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(task, delay, unit, getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { - CompletablePromise promise = createLinked(); - thenApplyAsync(task).addDirectListener( - nestedPromise -> { - if (nestedPromise == null) { - promise.complete(null); - } else { - PromiseUtil.propagateCompletion(nestedPromise, promise); - PromiseUtil.propagateCancel(promise, nestedPromise); - } - }, - promise::completeExceptionally - ); + return thenCompose(task, getFactory().getAsyncExecutor()); + } - return promise; + @Override + public @NotNull Promise thenRunVirtual(@NotNull ExceptionalRunnable task) { + return thenApply(FunctionAdapter.adapt(task), getFactory().getVirtualExecutor()); + } + + @Override + public @NotNull Promise thenRunDelayedVirtual(@NotNull ExceptionalRunnable task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor()); + } + + @Override + public @NotNull Promise thenConsumeVirtual(@NotNull ExceptionalConsumer task) { + return thenApply(FunctionAdapter.adapt(task), getFactory().getVirtualExecutor()); + } + + @Override + public @NotNull Promise thenConsumeDelayedVirtual(@NotNull ExceptionalConsumer task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor()); + } + + @Override + public @NotNull Promise thenSupplyVirtual(@NotNull ExceptionalSupplier task) { + return thenApply(FunctionAdapter.adapt(task), getFactory().getVirtualExecutor()); + } + + @Override + public @NotNull Promise thenSupplyDelayedVirtual(@NotNull ExceptionalSupplier task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor()); + } + + @Override + public @NotNull Promise thenApplyVirtual(@NotNull ExceptionalFunction task) { + return thenApply(task, getFactory().getVirtualExecutor()); + } + + @Override + public @NotNull Promise thenApplyDelayedVirtual(@NotNull ExceptionalFunction task, long delay, + @NotNull TimeUnit unit) { + return thenApplyDelayed(task, delay, unit, getFactory().getVirtualExecutor()); + } + + @Override + public @NotNull Promise thenComposeVirtual(@NotNull ExceptionalFunction> task) { + return thenCompose(task, getFactory().getVirtualExecutor()); } @Override @@ -404,12 +418,17 @@ public abstract class AbstractPromise implements Promise { } @Override - public @NotNull Promise addAsyncListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { + public @NotNull Promise addAsyncListener(@Nullable Consumer successListener, + @Nullable Consumer errorListener) { return addAsyncListener(res -> { if (res.isSuccess()) { - if (successListener != null) successListener.accept(res.getResult()); + if (successListener != null) { + successListener.accept(res.getResult()); + } } else { - if (errorListener != null) errorListener.accept(res.getException()); + if (errorListener != null) { + errorListener.accept(res.getException()); + } } }); } @@ -420,12 +439,17 @@ public abstract class AbstractPromise implements Promise { } @Override - public @NotNull Promise addDirectListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { + public @NotNull Promise addDirectListener(@Nullable Consumer successListener, + @Nullable Consumer errorListener) { return addDirectListener(res -> { if (res.isSuccess()) { - if (successListener != null) successListener.accept(res.getResult()); + if (successListener != null) { + successListener.accept(res.getResult()); + } } else { - if (errorListener != null) errorListener.accept(res.getException()); + if (errorListener != null) { + errorListener.accept(res.getException()); + } } }); } @@ -445,7 +469,7 @@ public abstract class AbstractPromise implements Promise { Exception wrapper = new DeferredExecutionException(); return onError(e -> { if (e instanceof CancellationException && e.getMessage() == null && e.getCause() == null) { - // Ignore cancellation exceptions without a message or cause + // ignore cancellation exceptions without a message or cause return; } @@ -480,34 +504,11 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise orDefault(@NotNull ExceptionalFunction function) { - return useCompletion( - () -> { - CompletablePromise promise = createLinked(); - addDirectListener(promise::complete, e -> runCompleter(promise, () -> promise.complete(function.apply(e)))); - return promise; - }, - getFactory()::resolve, - getFactory()::error - ); - } - - @Override - public @NotNull CompletableFuture toFuture() { - return useCompletion( - () -> { - CompletableFuture future = new CompletableFuture<>(); - addDirectListener(future::complete, future::completeExceptionally); - future.whenComplete((_, e) -> { - if (e instanceof CancellationException) { - cancel(); - } - }); - - return future; - }, - CompletableFuture::completedFuture, - CompletableFuture::failedFuture - ); + return useCompletion(() -> { + CompletablePromise promise = createLinked(); + addDirectListener(promise::complete, e -> runCompleter(promise, () -> promise.complete(function.apply(e)))); + return promise; + }, getFactory()::resolve, getFactory()::error); } private static class DeferredExecutionException extends ExecutionException { diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java index 7ce84d1..c38af99 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromiseFactory.java @@ -14,22 +14,28 @@ import java.util.*; import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; -public abstract class AbstractPromiseFactory implements PromiseFactory { +public abstract class AbstractPromiseFactory implements PromiseFactory { + + private static final PromiseExecutor VIRTUAL = PromiseExecutor.virtualThreaded(); public abstract @NotNull Logger getLogger(); - public abstract @NotNull PromiseExecutor getSyncExecutor(); + public abstract @NotNull PromiseExecutor getSyncExecutor(); - public abstract @NotNull PromiseExecutor getAsyncExecutor(); + public abstract @NotNull PromiseExecutor getAsyncExecutor(); + + public @NotNull PromiseExecutor getVirtualExecutor() { + return VIRTUAL; + } @Override public @NotNull Promise wrap(@NotNull CompletionStage completion, @Nullable Future future) { CompletablePromise promise = unresolved(); completion.whenComplete((v, e) -> { - if (e != null) { - promise.completeExceptionally(e); - } else { + if (e == null) { promise.complete(v); + } else { + promise.completeExceptionally(e); } }); @@ -41,9 +47,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise> combine( - @NotNull Promise p1, @NotNull Promise p2 - ) { + public @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { return all(p1, p2).thenApply(_ -> new AbstractMap.SimpleImmutableEntry<>( Objects.requireNonNull(p1.getCompletion()).getResult(), Objects.requireNonNull(p2.getCompletion()).getResult() @@ -51,43 +55,45 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { } @Override - public @NotNull Promise> combineMapped( - @NotNull Iterator>> promises, - int expectedSize - ) { - if (!promises.hasNext()) return resolve(Collections.emptyMap()); + public @NotNull Promise> combineMapped(@NotNull Iterator>> promises, + int expectedSize) { + if (!promises.hasNext()) { + return resolve(Collections.emptyMap()); + } + return new MappedResultJoiner<>(this, promises, expectedSize).joined(); } @Override - public @NotNull Promise> combine( - @NotNull Iterator> promises, - int expectedSize - ) { - if (!promises.hasNext()) return resolve(Collections.emptyList()); + public @NotNull Promise> combine(@NotNull Iterator> promises, int expectedSize) { + if (!promises.hasNext()) { + return resolve(Collections.emptyList()); + } + return new ResultJoiner<>(this, promises, expectedSize).joined(); } @Override - public @NotNull Promise>> allSettled( - @NotNull Iterator> promises, - int expectedSize - ) { - if (!promises.hasNext()) return resolve(Collections.emptyList()); + public @NotNull Promise>> allSettled(@NotNull Iterator> promises, + int expectedSize) { + if (!promises.hasNext()) { + return resolve(Collections.emptyList()); + } + return new CompletionJoiner(this, promises, expectedSize).joined(); } @Override public @NotNull Promise all(@NotNull Iterator> promises) { - if (!promises.hasNext()) return resolve(null); + if (!promises.hasNext()) { + return resolve(null); + } + return new VoidJoiner(this, promises).joined(); } @Override - public @NotNull Promise race( - @NotNull Iterator> promises, - boolean ignoreErrors - ) { + public @NotNull Promise race(@NotNull Iterator> promises, boolean ignoreErrors) { CompletablePromise promise = unresolved(); while (promises.hasNext()) { if (promise.isCompleted()) { diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/BasePromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/BasePromise.java index 45c8c48..c607534 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/BasePromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/BasePromise.java @@ -1,5 +1,6 @@ package dev.tommyjs.futur.promise; +import dev.tommyjs.futur.executor.PromiseScheduler; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -7,13 +8,12 @@ import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.locks.AbstractQueuedSynchronizer; @SuppressWarnings({"FieldMayBeFinal"}) -public abstract class BasePromise extends AbstractPromise implements CompletablePromise { +public abstract class BasePromise extends AbstractPromise implements CompletablePromise { private static final VarHandle COMPLETION_HANDLE; private static final VarHandle LISTENERS_HANDLE; @@ -43,15 +43,19 @@ public abstract class BasePromise extends AbstractPromise } protected void handleCompletion(@NotNull PromiseCompletion cmp) { - if (!COMPLETION_HANDLE.compareAndSet(this, null, cmp)) return; + if (!COMPLETION_HANDLE.compareAndSet(this, null, cmp)) { + return; + } + sync.releaseShared(1); callListeners(cmp); } - protected Promise completeExceptionallyDelayed(Throwable e, long delay, TimeUnit unit) { + protected Promise completeExceptionallyDelayed(Throwable e, long delay, TimeUnit unit, + PromiseScheduler scheduler) { runCompleter(this, () -> { - FA future = getFactory().getAsyncExecutor().run(() -> completeExceptionally(e), delay, unit); - addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future)); + F future = scheduler.schedule(() -> completeExceptionally(e), delay, unit); + addDirectListener(_ -> scheduler.cancel(future)); }); return this; @@ -59,7 +63,7 @@ public abstract class BasePromise extends AbstractPromise @SuppressWarnings("unchecked") protected void callListeners(@NotNull PromiseCompletion cmp) { - Iterator> iter = ((Iterable>) LISTENERS_HANDLE.getAndSet(this, null)).iterator(); + var iter = ((Iterable>) LISTENERS_HANDLE.getAndSet(this, null)).iterator(); try { while (iter.hasNext()) { callListener(iter.next(), cmp); @@ -75,11 +79,14 @@ public abstract class BasePromise extends AbstractPromise for (boolean haveNext = false; ; ) { if (!haveNext) { next = prev == Collections.EMPTY_LIST ? new ConcurrentLinkedQueue<>() : prev; - if (next != null) next.add(listener); + if (next != null) { + next.add(listener); + } } - if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next)) + if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next)) { break; + } haveNext = (prev == (prev = listeners)); } @@ -125,16 +132,23 @@ public abstract class BasePromise extends AbstractPromise return joinCompletionUnchecked(); } + @Override + public T getNow() { + return joinCompletionUnchecked(); + } + @Override public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { - Exception e = new CancellationException("Promise timed out after " + time + " " + unit.toString().toLowerCase()); - return completeExceptionallyDelayed(e, time, unit); + Exception e = new CancellationException( + "Promise timed out after " + time + " " + unit.toString().toLowerCase()); + return completeExceptionallyDelayed(e, time, unit, PromiseScheduler.getDefault()); } @Override public @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit) { - Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit.toString().toLowerCase()); - return completeExceptionallyDelayed(e, time, unit); + Exception e = new TimeoutException( + "Promise stopped waiting after " + time + " " + unit.toString().toLowerCase()); + return completeExceptionallyDelayed(e, time, unit, PromiseScheduler.getDefault()); } @Override @@ -162,6 +176,32 @@ public abstract class BasePromise extends AbstractPromise return completion; } + @Override + public @NotNull CompletableFuture toFuture() { + return useCompletion(() -> { + CompletableFuture future = new CompletableFuture<>(); + addDirectListener(future::complete, future::completeExceptionally); + future.whenComplete((result, error) -> { + if (error == null) { + complete(result); + } else { + completeExceptionally(error); + } + }); + + return future; + }, CompletableFuture::completedFuture, CompletableFuture::failedFuture); + } + + @Override + public @NotNull CompletionStage toCompletionStage() { + return useCompletion(() -> { + CompletableFuture future = new CompletableFuture<>(); + addDirectListener(future::complete, future::completeExceptionally); + return future; + }, CompletableFuture::completedStage, CompletableFuture::failedStage); + } + private static final class Sync extends AbstractQueuedSynchronizer { private Sync() { diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/CompletedPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/CompletedPromise.java index d13fa2c..74602ce 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/CompletedPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/CompletedPromise.java @@ -2,11 +2,9 @@ package dev.tommyjs.futur.promise; import org.jetbrains.annotations.NotNull; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; -public abstract class CompletedPromise extends AbstractPromise { +public abstract class CompletedPromise extends AbstractPromise { private static final PromiseCompletion EMPTY = new PromiseCompletion<>(); @@ -59,6 +57,11 @@ public abstract class CompletedPromise extends AbstractPromise getCompletion() { return completion; @@ -69,4 +72,19 @@ public abstract class CompletedPromise extends AbstractPromise toFuture() { + if (completion.isSuccess()) { + return CompletableFuture.completedFuture(completion.result()); + } + + assert completion.exception() != null; + return CompletableFuture.failedFuture(completion.exception()); + } + + @Override + public @NotNull CompletionStage toCompletionStage() { + return toFuture(); + } + } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index f4628ce..181c7ae 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -135,7 +135,8 @@ public interface Promise { * @param unit the time unit of the delay * @return a new promise that completes after the task is executed */ - @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit); + @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, + @NotNull TimeUnit unit); /** * Chains a task to be executed after this promise completes. The task will be executed immediately @@ -159,7 +160,8 @@ public interface Promise { * @param unit the time unit of the delay * @return a new promise that completes, after the task is executed, with the task result */ - @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit); + @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, + @NotNull TimeUnit unit); /** * Chains a task to be executed after this promise completes. The task will be executed by the sync @@ -185,7 +187,8 @@ public interface Promise { * @param unit the time unit of the delay * @return a new promise that completes, after the task is executed, with the task result */ - @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); + @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, + @NotNull TimeUnit unit); /** * Chains a task to be executed after this promise completes. The task will be executed by the sync @@ -245,7 +248,8 @@ public interface Promise { * @param unit the time unit of the delay * @return a new promise that completes after the task is executed */ - @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit); + @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, + @NotNull TimeUnit unit); /** * Chains a task to be executed after this promise completes. The task will be executed by the @@ -269,7 +273,8 @@ public interface Promise { * @param unit the time unit of the delay * @return a new promise that completes, after the task is executed, with the task result */ - @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit); + @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, + @NotNull TimeUnit unit); /** * Chains a task to be executed after this promise completes. The task will be executed by the async @@ -295,7 +300,8 @@ public interface Promise { * @param unit the time unit of the delay * @return a new promise that completes, after the task is executed, with the task result */ - @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); + @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, + @NotNull TimeUnit unit); /** * Chains a task to be executed after this promise completes. The task will be executed by the async @@ -309,6 +315,119 @@ public interface Promise { */ @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task); + /** + * Chains a task to be executed after this promise completes. + * The task will be executed in a virtual thread, immediately after this promise completes. + * Cancelling the returned promise will cancel this promise, and consequently any previous promises + * in the chain. + * + * @param task the task to execute + * @return a new promise that completes after the task is executed + */ + @NotNull Promise thenRunVirtual(@NotNull ExceptionalRunnable task); + + /** + * Chains a task to be executed after this promise completes. + * The task will be executed in a virtual thread after the specified delay after this + * promise completes. Cancelling the returned promise will cancel this promise, and consequently + * any previous promises in the chain. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes after the task is executed + */ + @NotNull Promise thenRunDelayedVirtual(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit); + + /** + * Chains a task to be executed after this promise completes. The task will be executed + * in a virtual thread immediately after this promise completes, and will be passed + * the result of this promise. Cancelling the returned promise will cancel this + * promise, and consequently any previous promises in the chain. + * + * @param task the task to execute + * @return a new promise that completes after the task is executed + */ + @NotNull Promise thenConsumeVirtual(@NotNull ExceptionalConsumer task); + + /** + * Chains a task to be executed after this promise completes. The task will be executed + * in a virtual thread after the specified delay after this promise completes, + * and will be passed the result of this promise. Cancelling the returned promise + * will cancel this promise, and consequently any previous promises in the chain. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes after the task is executed + */ + @NotNull Promise thenConsumeDelayedVirtual(@NotNull ExceptionalConsumer task, long delay, + @NotNull TimeUnit unit); + + /** + * Chains a task to be executed after this promise completes. The task will be executed + * in a virtual thread immediately after this promise completes, and will supply a value + * to the next promise in the chain. Cancelling the returned promise will + * cancel this promise, and consequently any previous promises in the chain. + * + * @param task the task to execute + * @return a new promise that completes, after the task is executed, with the task result + */ + @NotNull Promise thenSupplyVirtual(@NotNull ExceptionalSupplier task); + + /** + * Chains a task to be executed after this promise completes. The task will be executed + * in a virtual thread after the specified delay after this promise completes, + * and will supply a value to the next promise in the chain. Cancelling the returned promise + * will cancel this promise, and consequently any previous promises in the chain. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes, after the task is executed, with the task result + */ + @NotNull Promise thenSupplyDelayedVirtual(@NotNull ExceptionalSupplier task, long delay, + @NotNull TimeUnit unit); + + /** + * Chains a task to be executed after this promise completes. The task will be executed + * in a virtual thread immediately after this promise completes, and will apply the specified + * function to the result of this promise in order to supply a value to the next promise + * in the chain. Cancelling the returned promise will cancel this promise, and consequently + * any previous promises in the chain. + * + * @param task the task to execute + * @return a new promise that completes, after the task is executed, with the task result + */ + @NotNull Promise thenApplyVirtual(@NotNull ExceptionalFunction task); + + /** + * Chains a task to be executed after this promise completes. The task will be executed + * in a virtual thread after the specified delay after this promise completes, and will apply + * the specified function to the result of this promise in order to supply a value to the next + * promise in the chain. Cancelling the returned promise will cancel this promise, + * and consequently any previous promises in the chain. + * + * @param task the task to execute + * @param delay the amount of time to wait before executing the task + * @param unit the time unit of the delay + * @return a new promise that completes, after the task is executed, with the task result + */ + @NotNull Promise thenApplyDelayedVirtual(@NotNull ExceptionalFunction task, long delay, + @NotNull TimeUnit unit); + + /** + * Chains a task to be executed after this promise completes. The task will be executed + * in a virtual thread immediately after this promise completes, and will compose the next + * promise in the chain from the result of this promise. Cancelling the returned + * promise will cancel this promise, and consequently any previous promises in the chain. + * + * @param task the task to execute + * @return a new promise that completes, once this promise and the promise returned by the task are + * complete, with the result of the task promise + */ + @NotNull Promise thenComposeVirtual(@NotNull ExceptionalFunction> task); + /** * Adds a listener to this promise that will populate the specified reference with the result of this * promise upon successful completion. The reference will not be populated if this promise completes @@ -365,7 +484,8 @@ public interface Promise { * @param errorHandler the function to call on error * @return continuation of the promise chain */ - @NotNull Promise addDirectListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); + @NotNull Promise addDirectListener(@Nullable Consumer successHandler, + @Nullable Consumer errorHandler); /** * Adds a listener to this promise that will be executed immediately when this promise completes, @@ -394,7 +514,8 @@ public interface Promise { * @param successHandler the function to call on success * @param errorHandler the function to call on error */ - @NotNull Promise addAsyncListener(@Nullable Consumer successHandler, @Nullable Consumer errorHandler); + @NotNull Promise addAsyncListener(@Nullable Consumer successHandler, + @Nullable Consumer errorHandler); /** * Adds a listener to this promise that will be called if the promise is completed successfully. @@ -592,6 +713,18 @@ public interface Promise { */ @Nullable PromiseCompletion getCompletion(); + /** + * This method does not block and will return the result immediately if available. + * Get result and throws a {@link CompletionException} if the promise completed exceptionally. + * If the promise has not completed yet, it will throw an {@link IllegalStateException}. + * + * @return the result of the promise + * @throws IllegalStateException if the promise has not completed yet + * @throws CancellationException if the promise was cancelled + * @throws CompletionException if the promise completed exceptionally + */ + T getNow(); + /** * Returns whether this promise has completed. * @@ -601,10 +734,18 @@ public interface Promise { /** * Converts this promise to a {@link CompletableFuture}. The returned future will complete with the - * result of this promise when it completes. + * result of this promise and the promise will complete with the result of the future. * - * @return a future that will complete with the result of this promise + * @return a future linked to this promise */ @NotNull CompletableFuture toFuture(); + /** + * Converts this promise to a {@link CompletionStage}. + * The returned stage will complete with the result of this promise. + * + * @return a completion stage linked to this promise result + */ + @NotNull CompletionStage toCompletionStage(); + } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java index 7ca6e9f..f16938b 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseCompletion.java @@ -4,14 +4,13 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; /** * Represents the result of a {@link Promise}, containing either an optional result or an exception. */ -public class PromiseCompletion { - - private @Nullable T result; - private @Nullable Throwable exception; +public record PromiseCompletion(@Nullable T result, @Nullable Throwable exception) { /** * Creates a new successful completion. @@ -19,7 +18,7 @@ public class PromiseCompletion { * @param result the result */ public PromiseCompletion(@Nullable T result) { - this.result = result; + this(result, null); } /** @@ -28,14 +27,14 @@ public class PromiseCompletion { * @param exception the exception */ public PromiseCompletion(@NotNull Throwable exception) { - this.exception = exception; + this(null, exception); } /** * Creates a new successful completion with a result of {@code null}. */ public PromiseCompletion() { - this((T) null); + this(null, null); } /** @@ -61,15 +60,10 @@ public class PromiseCompletion { * * @return {@code true} if the completion was cancelled, {@code false} otherwise */ - public boolean wasCancelled() { + public boolean isCancelled() { return exception instanceof CancellationException; } - @Deprecated - public boolean wasCanceled() { - return wasCancelled(); - } - /** * Gets the result of the completion. * @@ -88,4 +82,32 @@ public class PromiseCompletion { return exception; } + /** + * Gets the result or throws a {@link CompletionException} if the completion was exceptional. + * + * @return the result of the completion + * @throws CompletionException if the completion was exceptional + */ + public T get() { + if (isSuccess()) { + return getResult(); + } else { + throw new CompletionException(getException()); + } + } + + /** + * Gets the result or throws an {@link ExecutionException} if the completion was exceptional. + * + * @return the result of the completion + * @throws ExecutionException if the completion was exceptional + */ + public T getChecked() throws ExecutionException { + if (isSuccess()) { + return getResult(); + } else { + throw new ExecutionException(getException()); + } + } + } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java index 94ba74b..6780e16 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -31,7 +31,7 @@ public interface PromiseFactory { */ static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor syncExecutor, @NotNull PromiseExecutor asyncExecutor) { - return new PromiseFactoryImpl<>(logger, syncExecutor, asyncExecutor); + return new PromiseFactoryImpl(logger, syncExecutor, asyncExecutor); } /** @@ -42,7 +42,7 @@ public interface PromiseFactory { * @return the new promise factory */ static @NotNull PromiseFactory of(@NotNull Logger logger, @NotNull PromiseExecutor executor) { - return new PromiseFactoryImpl<>(logger, executor, executor); + return new PromiseFactoryImpl(logger, executor, executor); } /** @@ -93,7 +93,7 @@ public interface PromiseFactory { * and the {@link Future} will be cancelled upon cancellation of the promise. * * @param completion the completion stage to wrap - * @param future the future to wrap + * @param future the future to wrap * @return the new promise */ @NotNull Promise wrap(@NotNull CompletionStage completion, @Nullable Future future); @@ -108,15 +108,15 @@ public interface PromiseFactory { */ default @NotNull Promise wrap(@NotNull CompletableFuture future) { return wrap(future, future); - }; + } /** * Combines two promises into a single promise that resolves when both promises are completed. * If either input promise completes exceptionally, the other promise will be cancelled * and the output promise will complete exceptionally. * - * @param p1 the first promise - * @param p2 the second promise + * @param p1 the first promise + * @param p2 the second promise * @return the combined promise */ @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2); @@ -128,7 +128,7 @@ public interface PromiseFactory { * If any promise completes exceptionally, the other promises will be cancelled * and the combined promise will complete exceptionally. * - * @param promises the input promises + * @param promises the input promises * @param expectedSize the expected size of the iterator (used for optimization) * @return the combined promise */ @@ -206,7 +206,7 @@ public interface PromiseFactory { * If any promise completes exceptionally, the other promises will be cancelled * and the combined promise will complete exceptionally. * - * @param keys the keys to map to promises + * @param keys the keys to map to promises * @param mapper the function to map keys to promises * @return the combined promise */ @@ -221,7 +221,7 @@ public interface PromiseFactory { * If any promise completes exceptionally, the other promises will be cancelled * and the combined promise will complete exceptionally. * - * @param keys the keys to map to promises + * @param keys the keys to map to promises * @param mapper the function to map keys to promises * @return the combined promise */ @@ -244,7 +244,7 @@ public interface PromiseFactory { * If any promise completes exceptionally, all other promises will be cancelled * and the combined promise will complete exceptionally. * - * @param promises the input promises + * @param promises the input promises * @param expectedSize the expected size of the iterator (used for optimization) * @return the combined promise */ @@ -306,12 +306,11 @@ public interface PromiseFactory { * Combines multiple promises into a single promise that completes when all promises * are completed, with a list of completions in the original order. * - * @param promises the input promises + * @param promises the input promises * @param expectedSize the expected size of the iterator (used for optimization) * @return the combined promise */ - @NotNull Promise>> allSettled(@NotNull Iterator> promises, - int expectedSize); + @NotNull Promise>> allSettled(@NotNull Iterator> promises, int expectedSize); /** * Combines multiple promises into a single promise that completes when all promises @@ -411,7 +410,7 @@ public interface PromiseFactory { * Additionally, if {@code cancelLosers} is {@code true}, the other promises will be cancelled * once the combined promise is completed. * - * @param promises the input promises + * @param promises the input promises * @param ignoreErrors whether to ignore promises that complete exceptionally * @return the combined promise */ @@ -425,7 +424,7 @@ public interface PromiseFactory { * successful completion or complete with {@code null} if all promises complete exceptionally. * Additionally, The other promises will be cancelled once the combined promise is completed. * - * @param promises the input promises + * @param promises the input promises * @param ignoreErrors whether to ignore promises that complete exceptionally * @return the combined promise */ @@ -452,7 +451,7 @@ public interface PromiseFactory { * successful completion or complete with {@code null} if all promises complete exceptionally. * Additionally, The other promises will be cancelled once the combined promise is completed. * - * @param promises the input promises + * @param promises the input promises * @param ignoreErrors whether to ignore promises that complete exceptionally * @return the combined promise */ @@ -479,7 +478,7 @@ public interface PromiseFactory { * successful completion or complete with {@code null} if all promises complete exceptionally. * Additionally, The other promises will be cancelled once the combined promise is completed. * - * @param promises the input promises + * @param promises the input promises * @param ignoreErrors whether to ignore promises that complete exceptionally * @return the combined promise */ diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java index 7604aeb..e0e7b39 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java @@ -5,17 +5,14 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; -public class PromiseFactoryImpl extends AbstractPromiseFactory { +public class PromiseFactoryImpl extends AbstractPromiseFactory { private final @NotNull Logger logger; - private final @NotNull PromiseExecutor syncExecutor; - private final @NotNull PromiseExecutor asyncExecutor; + private final @NotNull PromiseExecutor syncExecutor; + private final @NotNull PromiseExecutor asyncExecutor; - public PromiseFactoryImpl( - @NotNull Logger logger, - @NotNull PromiseExecutor syncExecutor, - @NotNull PromiseExecutor asyncExecutor - ) { + public PromiseFactoryImpl(@NotNull Logger logger, @NotNull PromiseExecutor syncExecutor, + @NotNull PromiseExecutor asyncExecutor) { this.logger = logger; this.syncExecutor = syncExecutor; this.asyncExecutor = asyncExecutor; @@ -47,40 +44,43 @@ public class PromiseFactoryImpl extends AbstractPromiseFactory { } @Override - public @NotNull PromiseExecutor getSyncExecutor() { + public @NotNull PromiseExecutor getSyncExecutor() { return syncExecutor; } @Override - public @NotNull PromiseExecutor getAsyncExecutor() { + public @NotNull PromiseExecutor getAsyncExecutor() { return asyncExecutor; } - private class PromiseImpl extends BasePromise { + private class PromiseImpl extends BasePromise { + + PromiseImpl() { + } @Override - public @NotNull AbstractPromiseFactory getFactory() { + public @NotNull AbstractPromiseFactory getFactory() { return PromiseFactoryImpl.this; } } - private class CompletedPromiseImpl extends CompletedPromise { + private class CompletedPromiseImpl extends CompletedPromise { - public CompletedPromiseImpl(@Nullable T result) { + CompletedPromiseImpl(@Nullable T result) { super(new PromiseCompletion<>(result)); } - public CompletedPromiseImpl(@NotNull Throwable exception) { + CompletedPromiseImpl(@NotNull Throwable exception) { super(new PromiseCompletion<>(exception)); } - public CompletedPromiseImpl() { + CompletedPromiseImpl() { super(); } @Override - public @NotNull AbstractPromiseFactory getFactory() { + public @NotNull AbstractPromiseFactory getFactory() { return PromiseFactoryImpl.this; } diff --git a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java index 5f3a8ca..8cbabbd 100644 --- a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java +++ b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java @@ -245,14 +245,14 @@ public final class PromiseTests { @Test public void testImmediate1() { var promise = promises.start().thenSupply(() -> 10); - assert promise.isCompleted() && promise instanceof CompletedPromise; + assert promise.isCompleted() && promise instanceof CompletedPromise; } @Test public void testImmediate2() { var resolved = promises.resolve(10); var promise = promises.start().thenCompose(_ -> resolved); - assert promise.isCompleted() && promise instanceof CompletedPromise; + assert promise.isCompleted() && promise instanceof CompletedPromise; } } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index cea7a79..e3f5343 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,5 @@ -distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.12-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME