mirror of
https://github.com/tommyskeff/futur4j.git
synced 2026-01-18 07:16:45 +00:00
finalize changes for 2.5.0 release
This commit is contained in:
26
.github/workflows/publish.yml
vendored
Normal file
26
.github/workflows/publish.yml
vendored
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
name: Build and publish
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- main
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
publish:
|
||||||
|
name: Publish build
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout code
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Setup Java
|
||||||
|
uses: actions/setup-java@v4
|
||||||
|
with:
|
||||||
|
distribution: corretto
|
||||||
|
java-version: 23
|
||||||
|
|
||||||
|
- name: Make Gradle executable
|
||||||
|
run: chmod +x ./gradlew
|
||||||
|
|
||||||
|
- name: Build and publish project
|
||||||
|
run: ./gradlew publish -PtommyjsUsername=${{ secrets.NEXUS_USERNAME }} -PtommyjsPassword=${{ secrets.NEXUS_PASSWORD }}
|
||||||
33
build.gradle
33
build.gradle
@@ -1,15 +1,7 @@
|
|||||||
plugins {
|
plugins {
|
||||||
id 'java-library'
|
id 'java-library'
|
||||||
id 'com.github.johnrengelman.shadow' version '8.1.1'
|
id 'com.github.johnrengelman.shadow' version '8.1.1'
|
||||||
id 'io.github.gradle-nexus.publish-plugin' version '2.0.0'
|
id 'maven-publish'
|
||||||
}
|
|
||||||
|
|
||||||
nexusPublishing {
|
|
||||||
repositories {
|
|
||||||
tommyjs {
|
|
||||||
nexusUrl = uri("https://repo.tommyjs.dev/repository/maven-releases")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
subprojects {
|
subprojects {
|
||||||
@@ -18,6 +10,29 @@ subprojects {
|
|||||||
|
|
||||||
apply plugin: 'java-library'
|
apply plugin: 'java-library'
|
||||||
apply plugin: 'com.github.johnrengelman.shadow'
|
apply plugin: 'com.github.johnrengelman.shadow'
|
||||||
|
apply plugin : 'maven-publish'
|
||||||
|
|
||||||
|
publishing {
|
||||||
|
publications {
|
||||||
|
mavenJava(MavenPublication) {
|
||||||
|
from(components["java"])
|
||||||
|
pom {
|
||||||
|
name = project.name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
repositories {
|
||||||
|
maven {
|
||||||
|
name = 'tommyjs'
|
||||||
|
url = uri("https://repo.tommyjs.dev/repository/maven-releases/")
|
||||||
|
credentials {
|
||||||
|
username = findProperty("tommyjsUsername") as String
|
||||||
|
password = findProperty("tommyjsPassword") as String
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tasks {
|
tasks {
|
||||||
build {
|
build {
|
||||||
|
|||||||
@@ -73,8 +73,8 @@ public interface PromiseExecutor<T> {
|
|||||||
* Cancels the given task if possible. This may interrupt the task mid-execution.
|
* Cancels the given task if possible. This may interrupt the task mid-execution.
|
||||||
*
|
*
|
||||||
* @param task the task
|
* @param task the task
|
||||||
* @return {@code true} if the task was cancelled. {@code false} if the task was already completed
|
* @return {@code true} if the task was cancelled, {@code false} if the task was already completed
|
||||||
* or could not be cancelled.
|
* or could not be cancelled
|
||||||
*/
|
*/
|
||||||
boolean cancel(@NotNull T task);
|
boolean cancel(@NotNull T task);
|
||||||
|
|
||||||
|
|||||||
@@ -28,8 +28,8 @@ public interface PromiseScheduler<T> {
|
|||||||
* Cancels the given task if possible. This may interrupt the task mid-execution.
|
* Cancels the given task if possible. This may interrupt the task mid-execution.
|
||||||
*
|
*
|
||||||
* @param task the task
|
* @param task the task
|
||||||
* @return {@code true} if the task was cancelled. {@code false} if the task was already completed
|
* @return {@code true} if the task was cancelled, {@code false} if the task was already completed
|
||||||
* or could not be cancelled.
|
* or could not be cancelled
|
||||||
*/
|
*/
|
||||||
boolean cancel(@NotNull T task);
|
boolean cancel(@NotNull T task);
|
||||||
|
|
||||||
|
|||||||
@@ -2,17 +2,18 @@ package dev.tommyjs.futur.executor;
|
|||||||
|
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.ScheduledFuture;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
class PromiseSchedulerDefault implements PromiseScheduler<ScheduledFuture<?>> {
|
class PromiseSchedulerDefault implements PromiseScheduler<ScheduledFuture<?>> {
|
||||||
|
|
||||||
static final PromiseSchedulerDefault INSTANCE = new PromiseSchedulerDefault();
|
static final PromiseSchedulerDefault INSTANCE = new PromiseSchedulerDefault();
|
||||||
|
|
||||||
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
|
private final ScheduledExecutorService executor;
|
||||||
Thread.ofPlatform().name("promise-scheduler").daemon(true).factory());
|
|
||||||
|
PromiseSchedulerDefault() {
|
||||||
|
ThreadFactory factory = Thread.ofPlatform().name("promise-scheduler").daemon(true).factory();
|
||||||
|
this.executor = Executors.newSingleThreadScheduledExecutor(factory);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull ScheduledFuture<?> schedule(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
|
public @NotNull ScheduledFuture<?> schedule(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
|
||||||
|
|||||||
@@ -5,21 +5,21 @@ import org.jetbrains.annotations.NotNull;
|
|||||||
public final class FunctionAdapter {
|
public final class FunctionAdapter {
|
||||||
|
|
||||||
public static <T, V> @NotNull ExceptionalFunction<T, V> adapt(@NotNull ExceptionalConsumer<T> consumer) {
|
public static <T, V> @NotNull ExceptionalFunction<T, V> adapt(@NotNull ExceptionalConsumer<T> consumer) {
|
||||||
return (value) -> {
|
return value -> {
|
||||||
consumer.accept(value);
|
consumer.accept(value);
|
||||||
return null;
|
return null;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <K, V> @NotNull ExceptionalFunction<K, V> adapt(@NotNull ExceptionalRunnable runnable) {
|
public static <K, V> @NotNull ExceptionalFunction<K, V> adapt(@NotNull ExceptionalRunnable runnable) {
|
||||||
return (_) -> {
|
return _ -> {
|
||||||
runnable.run();
|
runnable.run();
|
||||||
return null;
|
return null;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <K, T> @NotNull ExceptionalFunction<K, T> adapt(@NotNull ExceptionalSupplier<T> supplier) {
|
public static <K, T> @NotNull ExceptionalFunction<K, T> adapt(@NotNull ExceptionalSupplier<T> supplier) {
|
||||||
return (_) -> supplier.get();
|
return _ -> supplier.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,11 +13,7 @@ public class CompletionJoiner extends PromiseJoiner<Promise<?>, Void, Void, List
|
|||||||
|
|
||||||
private final ConcurrentResultArray<PromiseCompletion<?>> results;
|
private final ConcurrentResultArray<PromiseCompletion<?>> results;
|
||||||
|
|
||||||
public CompletionJoiner(
|
public CompletionJoiner(@NotNull PromiseFactory factory, @NotNull Iterator<Promise<?>> promises, int expectedSize) {
|
||||||
@NotNull PromiseFactory factory,
|
|
||||||
@NotNull Iterator<Promise<?>> promises,
|
|
||||||
int expectedSize
|
|
||||||
) {
|
|
||||||
super(factory);
|
super(factory);
|
||||||
results = new ConcurrentResultArray<>(expectedSize);
|
results = new ConcurrentResultArray<>(expectedSize);
|
||||||
join(promises);
|
join(promises);
|
||||||
|
|||||||
@@ -12,11 +12,8 @@ public class MappedResultJoiner<K, V> extends PromiseJoiner<Map.Entry<K, Promise
|
|||||||
|
|
||||||
private final @NotNull ConcurrentResultArray<Map.Entry<K, V>> results;
|
private final @NotNull ConcurrentResultArray<Map.Entry<K, V>> results;
|
||||||
|
|
||||||
public MappedResultJoiner(
|
public MappedResultJoiner(@NotNull PromiseFactory factory, @NotNull Iterator<Map.Entry<K, Promise<V>>> promises,
|
||||||
@NotNull PromiseFactory factory,
|
int expectedSize) {
|
||||||
@NotNull Iterator<Map.Entry<K, Promise<V>>> promises,
|
|
||||||
int expectedSize
|
|
||||||
) {
|
|
||||||
super(factory);
|
super(factory);
|
||||||
this.results = new ConcurrentResultArray<>(expectedSize);
|
this.results = new ConcurrentResultArray<>(expectedSize);
|
||||||
join(promises);
|
join(promises);
|
||||||
|
|||||||
@@ -13,11 +13,7 @@ public class ResultJoiner<T> extends PromiseJoiner<Promise<T>, Void, T, List<T>>
|
|||||||
|
|
||||||
private final ConcurrentResultArray<T> results;
|
private final ConcurrentResultArray<T> results;
|
||||||
|
|
||||||
public ResultJoiner(
|
public ResultJoiner(@NotNull PromiseFactory factory, @NotNull Iterator<Promise<T>> promises, int expectedSize) {
|
||||||
@NotNull PromiseFactory factory,
|
|
||||||
@NotNull Iterator<Promise<T>> promises,
|
|
||||||
int expectedSize
|
|
||||||
) {
|
|
||||||
super(factory);
|
super(factory);
|
||||||
this.results = new ConcurrentResultArray<>(expectedSize);
|
this.results = new ConcurrentResultArray<>(expectedSize);
|
||||||
join(promises);
|
join(promises);
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
try {
|
try {
|
||||||
return supplier.get();
|
return supplier.get();
|
||||||
} catch (Error error) {
|
} catch (Error error) {
|
||||||
// Rethrow error so the Thread can shut down or whatever
|
// rethrow unrecoverable errors
|
||||||
throw error;
|
throw error;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
return handler.apply(e);
|
return handler.apply(e);
|
||||||
@@ -51,7 +51,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
runnable.run();
|
runnable.run();
|
||||||
} catch (Error error) {
|
} catch (Error error) {
|
||||||
handler.accept(error);
|
handler.accept(error);
|
||||||
// Rethrow error so the Thread can shut down or whatever
|
// rethrow unrecoverable errors
|
||||||
throw error;
|
throw error;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
handler.accept(e);
|
handler.accept(e);
|
||||||
@@ -64,9 +64,13 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
|
|
||||||
protected <V> V useCompletion(Supplier<V> unresolved, Function<T, V> completed, Function<Throwable, V> failed) {
|
protected <V> V useCompletion(Supplier<V> unresolved, Function<T, V> completed, Function<Throwable, V> failed) {
|
||||||
PromiseCompletion<T> completion = getCompletion();
|
PromiseCompletion<T> completion = getCompletion();
|
||||||
if (completion == null) return unresolved.get();
|
if (completion == null) {
|
||||||
else if (completion.isSuccess()) return completed.apply(completion.getResult());
|
return unresolved.get();
|
||||||
else return failed.apply(completion.getException());
|
} else if (completion.isSuccess()) {
|
||||||
|
return completed.apply(completion.getResult());
|
||||||
|
} else {
|
||||||
|
return failed.apply(completion.getException());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected <V> @NotNull Runnable createCompleter(T result, @NotNull CompletablePromise<V> promise,
|
protected <V> @NotNull Runnable createCompleter(T result, @NotNull CompletablePromise<V> promise,
|
||||||
@@ -124,7 +128,9 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<T> fork() {
|
public @NotNull Promise<T> fork() {
|
||||||
if (isCompleted()) return this;
|
if (isCompleted()) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
CompletablePromise<T> fork = getFactory().unresolved();
|
CompletablePromise<T> fork = getFactory().unresolved();
|
||||||
PromiseUtil.propagateCompletion(this, fork);
|
PromiseUtil.propagateCompletion(this, fork);
|
||||||
@@ -154,45 +160,29 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenApply(@NotNull ExceptionalFunction<T, V> task) {
|
public <V> @NotNull Promise<V> thenApply(@NotNull ExceptionalFunction<T, V> task) {
|
||||||
return useCompletion(
|
return useCompletion(() -> {
|
||||||
() -> {
|
|
||||||
CompletablePromise<V> promise = createLinked();
|
CompletablePromise<V> promise = createLinked();
|
||||||
addDirectListener(
|
addDirectListener(res -> createCompleter(res, promise, task).run(), promise::completeExceptionally);
|
||||||
res -> createCompleter(res, promise, task).run(),
|
|
||||||
promise::completeExceptionally
|
|
||||||
);
|
|
||||||
|
|
||||||
return promise;
|
return promise;
|
||||||
},
|
}, result -> supplySafe(() -> getFactory().resolve(task.apply(result)), getFactory()::error), getFactory()::error);
|
||||||
result -> supplySafe(
|
|
||||||
() -> getFactory().resolve(task.apply(result)),
|
|
||||||
getFactory()::error
|
|
||||||
),
|
|
||||||
getFactory()::error
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenCompose(@NotNull ExceptionalFunction<T, Promise<V>> task) {
|
public <V> @NotNull Promise<V> thenCompose(@NotNull ExceptionalFunction<T, Promise<V>> task) {
|
||||||
return useCompletion(
|
return useCompletion(() -> {
|
||||||
() -> {
|
|
||||||
CompletablePromise<V> promise = createLinked();
|
CompletablePromise<V> promise = createLinked();
|
||||||
thenApply(task).addDirectListener(
|
thenApply(task).addDirectListener(result -> {
|
||||||
result -> {
|
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
promise.complete(null);
|
promise.complete(null);
|
||||||
} else {
|
} else {
|
||||||
PromiseUtil.propagateCompletion(result, promise);
|
PromiseUtil.propagateCompletion(result, promise);
|
||||||
PromiseUtil.propagateCancel(promise, result);
|
PromiseUtil.propagateCancel(promise, result);
|
||||||
}
|
}
|
||||||
},
|
}, promise::completeExceptionally);
|
||||||
promise::completeExceptionally
|
|
||||||
);
|
|
||||||
|
|
||||||
return promise;
|
return promise;
|
||||||
},
|
}, result -> supplySafe(() -> {
|
||||||
result -> supplySafe(
|
|
||||||
() -> {
|
|
||||||
Promise<V> nested = task.apply(result);
|
Promise<V> nested = task.apply(result);
|
||||||
if (nested == null) {
|
if (nested == null) {
|
||||||
return getFactory().resolve(null);
|
return getFactory().resolve(null);
|
||||||
@@ -204,80 +194,60 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
PromiseUtil.propagateCancel(promise, nested);
|
PromiseUtil.propagateCancel(promise, nested);
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
},
|
}, getFactory()::error), getFactory()::error);
|
||||||
getFactory()::error
|
|
||||||
),
|
|
||||||
getFactory()::error
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private <F, V> @NotNull Promise<V> thenApply(@NotNull ExceptionalFunction<T, V> task, @NotNull PromiseExecutor<F> executor) {
|
private <F, V> @NotNull Promise<V> thenApply(@NotNull ExceptionalFunction<T, V> task,
|
||||||
|
@NotNull PromiseExecutor<F> executor) {
|
||||||
CompletablePromise<V> promise = createLinked();
|
CompletablePromise<V> promise = createLinked();
|
||||||
addDirectListener(
|
addDirectListener(res -> runCompleter(promise, () -> {
|
||||||
res -> runCompleter(promise, () -> {
|
|
||||||
Runnable completer = createCompleter(res, promise, task);
|
Runnable completer = createCompleter(res, promise, task);
|
||||||
execute(promise, completer, executor);
|
execute(promise, completer, executor);
|
||||||
}),
|
}), promise::completeExceptionally);
|
||||||
promise::completeExceptionally
|
|
||||||
);
|
|
||||||
|
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
private <F, V> @NotNull Promise<V> thenApplyDelayed(
|
private <F, V> @NotNull Promise<V> thenApplyDelayed(@NotNull ExceptionalFunction<T, V> task, long delay,
|
||||||
@NotNull ExceptionalFunction<T, V> task, long delay,
|
@NotNull TimeUnit unit, @NotNull PromiseExecutor<F> executor) {
|
||||||
@NotNull TimeUnit unit, @NotNull PromiseExecutor<F> executor
|
|
||||||
) {
|
|
||||||
CompletablePromise<V> promise = createLinked();
|
CompletablePromise<V> promise = createLinked();
|
||||||
addDirectListener(
|
addDirectListener(res -> runCompleter(promise, () -> {
|
||||||
res -> runCompleter(promise, () -> {
|
|
||||||
Runnable completer = createCompleter(res, promise, task);
|
Runnable completer = createCompleter(res, promise, task);
|
||||||
PromiseScheduler<?> scheduler = executor.scheduler();
|
PromiseScheduler<?> scheduler = executor.scheduler();
|
||||||
if (scheduler == null) {
|
if (scheduler == null) {
|
||||||
schedule(
|
schedule(promise, () -> runCompleter(promise, () -> execute(promise, completer, executor)), delay, unit,
|
||||||
promise,
|
PromiseScheduler.getDefault());
|
||||||
() -> runCompleter(promise, () -> execute(promise, completer, executor)),
|
|
||||||
delay, unit, PromiseScheduler.getDefault()
|
|
||||||
);
|
|
||||||
} else {
|
} else {
|
||||||
schedule(promise, completer, delay, unit, scheduler);
|
schedule(promise, completer, delay, unit, scheduler);
|
||||||
}
|
}
|
||||||
}),
|
}), promise::completeExceptionally);
|
||||||
promise::completeExceptionally
|
|
||||||
);
|
|
||||||
|
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
private <F> void execute(@NotNull Promise<?> promise, @NotNull Runnable task, @NotNull PromiseExecutor<F >executor) throws Exception {
|
private <F> void execute(@NotNull Promise<?> promise, @NotNull Runnable task, @NotNull PromiseExecutor<F> executor)
|
||||||
|
throws Exception {
|
||||||
F future = executor.run(task);
|
F future = executor.run(task);
|
||||||
promise.addDirectListener(_ -> executor.cancel(future));
|
promise.addDirectListener(_ -> executor.cancel(future));
|
||||||
}
|
}
|
||||||
|
|
||||||
private <F> void schedule(
|
private <F> void schedule(@NotNull Promise<?> promise, @NotNull Runnable task, long delay, @NotNull TimeUnit unit,
|
||||||
@NotNull Promise<?> promise, @NotNull Runnable task,
|
@NotNull PromiseScheduler<F> scheduler) throws Exception {
|
||||||
long delay, @NotNull TimeUnit unit, @NotNull PromiseScheduler<F> scheduler
|
|
||||||
) throws Exception {
|
|
||||||
F future = scheduler.schedule(task, delay, unit);
|
F future = scheduler.schedule(task, delay, unit);
|
||||||
promise.addDirectListener(_ -> scheduler.cancel(future));
|
promise.addDirectListener(_ -> scheduler.cancel(future));
|
||||||
}
|
}
|
||||||
|
|
||||||
private <V> @NotNull Promise<V> thenCompose(
|
private <V> @NotNull Promise<V> thenCompose(@NotNull ExceptionalFunction<T, Promise<V>> task,
|
||||||
@NotNull ExceptionalFunction<T, Promise<V>> task,
|
@NotNull PromiseExecutor<?> executor) {
|
||||||
@NotNull PromiseExecutor<?> executor
|
|
||||||
) {
|
|
||||||
CompletablePromise<V> promise = createLinked();
|
CompletablePromise<V> promise = createLinked();
|
||||||
thenApply(task, executor).addDirectListener(
|
thenApply(task, executor).addDirectListener(nestedPromise -> {
|
||||||
nestedPromise -> {
|
|
||||||
if (nestedPromise == null) {
|
if (nestedPromise == null) {
|
||||||
promise.complete(null);
|
promise.complete(null);
|
||||||
} else {
|
} else {
|
||||||
PromiseUtil.propagateCompletion(nestedPromise, promise);
|
PromiseUtil.propagateCompletion(nestedPromise, promise);
|
||||||
PromiseUtil.propagateCancel(promise, nestedPromise);
|
PromiseUtil.propagateCancel(promise, nestedPromise);
|
||||||
}
|
}
|
||||||
},
|
}, promise::completeExceptionally);
|
||||||
promise::completeExceptionally
|
|
||||||
);
|
|
||||||
|
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
@@ -288,7 +258,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
|
public @NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor());
|
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -298,7 +269,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<Void> thenConsumeDelayedSync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) {
|
public @NotNull Promise<Void> thenConsumeDelayedSync(@NotNull ExceptionalConsumer<T> task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor());
|
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -308,7 +280,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
|
public <V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor());
|
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -318,7 +291,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
|
public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(task, delay, unit, getFactory().getSyncExecutor());
|
return thenApplyDelayed(task, delay, unit, getFactory().getSyncExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -333,7 +307,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<Void> thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
|
public @NotNull Promise<Void> thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor());
|
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -343,7 +318,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<Void> thenConsumeDelayedAsync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) {
|
public @NotNull Promise<Void> thenConsumeDelayedAsync(@NotNull ExceptionalConsumer<T> task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor());
|
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -353,7 +329,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenSupplyDelayedAsync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
|
public <V> @NotNull Promise<V> thenSupplyDelayedAsync(@NotNull ExceptionalSupplier<V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor());
|
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -363,7 +340,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
|
public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(task, delay, unit, getFactory().getAsyncExecutor());
|
return thenApplyDelayed(task, delay, unit, getFactory().getAsyncExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -378,7 +356,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<Void> thenRunDelayedVirtual(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) {
|
public @NotNull Promise<Void> thenRunDelayedVirtual(@NotNull ExceptionalRunnable task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor());
|
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -388,7 +367,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<Void> thenConsumeDelayedVirtual(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit) {
|
public @NotNull Promise<Void> thenConsumeDelayedVirtual(@NotNull ExceptionalConsumer<T> task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor());
|
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -398,7 +378,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenSupplyDelayedVirtual(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
|
public <V> @NotNull Promise<V> thenSupplyDelayedVirtual(@NotNull ExceptionalSupplier<V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor());
|
return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -408,7 +389,8 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> thenApplyDelayedVirtual(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
|
public <V> @NotNull Promise<V> thenApplyDelayedVirtual(@NotNull ExceptionalFunction<T, V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit) {
|
||||||
return thenApplyDelayed(task, delay, unit, getFactory().getVirtualExecutor());
|
return thenApplyDelayed(task, delay, unit, getFactory().getVirtualExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -436,12 +418,17 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
|
public @NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successListener,
|
||||||
|
@Nullable Consumer<Throwable> errorListener) {
|
||||||
return addAsyncListener(res -> {
|
return addAsyncListener(res -> {
|
||||||
if (res.isSuccess()) {
|
if (res.isSuccess()) {
|
||||||
if (successListener != null) successListener.accept(res.getResult());
|
if (successListener != null) {
|
||||||
|
successListener.accept(res.getResult());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (errorListener != null) errorListener.accept(res.getException());
|
if (errorListener != null) {
|
||||||
|
errorListener.accept(res.getException());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -452,12 +439,17 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
|
public @NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successListener,
|
||||||
|
@Nullable Consumer<Throwable> errorListener) {
|
||||||
return addDirectListener(res -> {
|
return addDirectListener(res -> {
|
||||||
if (res.isSuccess()) {
|
if (res.isSuccess()) {
|
||||||
if (successListener != null) successListener.accept(res.getResult());
|
if (successListener != null) {
|
||||||
|
successListener.accept(res.getResult());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (errorListener != null) errorListener.accept(res.getException());
|
if (errorListener != null) {
|
||||||
|
errorListener.accept(res.getException());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -477,7 +469,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
Exception wrapper = new DeferredExecutionException();
|
Exception wrapper = new DeferredExecutionException();
|
||||||
return onError(e -> {
|
return onError(e -> {
|
||||||
if (e instanceof CancellationException && e.getMessage() == null && e.getCause() == null) {
|
if (e instanceof CancellationException && e.getMessage() == null && e.getCause() == null) {
|
||||||
// Ignore cancellation exceptions without a message or cause
|
// ignore cancellation exceptions without a message or cause
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -512,15 +504,11 @@ public abstract class AbstractPromise<T> implements Promise<T> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<T> orDefault(@NotNull ExceptionalFunction<Throwable, T> function) {
|
public @NotNull Promise<T> orDefault(@NotNull ExceptionalFunction<Throwable, T> function) {
|
||||||
return useCompletion(
|
return useCompletion(() -> {
|
||||||
() -> {
|
|
||||||
CompletablePromise<T> promise = createLinked();
|
CompletablePromise<T> promise = createLinked();
|
||||||
addDirectListener(promise::complete, e -> runCompleter(promise, () -> promise.complete(function.apply(e))));
|
addDirectListener(promise::complete, e -> runCompleter(promise, () -> promise.complete(function.apply(e))));
|
||||||
return promise;
|
return promise;
|
||||||
},
|
}, getFactory()::resolve, getFactory()::error);
|
||||||
getFactory()::resolve,
|
|
||||||
getFactory()::error
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class DeferredExecutionException extends ExecutionException {
|
private static class DeferredExecutionException extends ExecutionException {
|
||||||
|
|||||||
@@ -47,9 +47,7 @@ public abstract class AbstractPromiseFactory implements PromiseFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(
|
public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
|
||||||
@NotNull Promise<K> p1, @NotNull Promise<V> p2
|
|
||||||
) {
|
|
||||||
return all(p1, p2).thenApply(_ -> new AbstractMap.SimpleImmutableEntry<>(
|
return all(p1, p2).thenApply(_ -> new AbstractMap.SimpleImmutableEntry<>(
|
||||||
Objects.requireNonNull(p1.getCompletion()).getResult(),
|
Objects.requireNonNull(p1.getCompletion()).getResult(),
|
||||||
Objects.requireNonNull(p2.getCompletion()).getResult()
|
Objects.requireNonNull(p2.getCompletion()).getResult()
|
||||||
@@ -57,43 +55,45 @@ public abstract class AbstractPromiseFactory implements PromiseFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull <K, V> Promise<Map<K, V>> combineMapped(
|
public @NotNull <K, V> Promise<Map<K, V>> combineMapped(@NotNull Iterator<Map.Entry<K, Promise<V>>> promises,
|
||||||
@NotNull Iterator<Map.Entry<K, Promise<V>>> promises,
|
int expectedSize) {
|
||||||
int expectedSize
|
if (!promises.hasNext()) {
|
||||||
) {
|
return resolve(Collections.emptyMap());
|
||||||
if (!promises.hasNext()) return resolve(Collections.emptyMap());
|
}
|
||||||
|
|
||||||
return new MappedResultJoiner<>(this, promises, expectedSize).joined();
|
return new MappedResultJoiner<>(this, promises, expectedSize).joined();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<List<V>> combine(
|
public <V> @NotNull Promise<List<V>> combine(@NotNull Iterator<Promise<V>> promises, int expectedSize) {
|
||||||
@NotNull Iterator<Promise<V>> promises,
|
if (!promises.hasNext()) {
|
||||||
int expectedSize
|
return resolve(Collections.emptyList());
|
||||||
) {
|
}
|
||||||
if (!promises.hasNext()) return resolve(Collections.emptyList());
|
|
||||||
return new ResultJoiner<>(this, promises, expectedSize).joined();
|
return new ResultJoiner<>(this, promises, expectedSize).joined();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<List<PromiseCompletion<?>>> allSettled(
|
public @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterator<Promise<?>> promises,
|
||||||
@NotNull Iterator<Promise<?>> promises,
|
int expectedSize) {
|
||||||
int expectedSize
|
if (!promises.hasNext()) {
|
||||||
) {
|
return resolve(Collections.emptyList());
|
||||||
if (!promises.hasNext()) return resolve(Collections.emptyList());
|
}
|
||||||
|
|
||||||
return new CompletionJoiner(this, promises, expectedSize).joined();
|
return new CompletionJoiner(this, promises, expectedSize).joined();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<Void> all(@NotNull Iterator<Promise<?>> promises) {
|
public @NotNull Promise<Void> all(@NotNull Iterator<Promise<?>> promises) {
|
||||||
if (!promises.hasNext()) return resolve(null);
|
if (!promises.hasNext()) {
|
||||||
|
return resolve(null);
|
||||||
|
}
|
||||||
|
|
||||||
return new VoidJoiner(this, promises).joined();
|
return new VoidJoiner(this, promises).joined();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <V> @NotNull Promise<V> race(
|
public <V> @NotNull Promise<V> race(@NotNull Iterator<Promise<V>> promises, boolean ignoreErrors) {
|
||||||
@NotNull Iterator<Promise<V>> promises,
|
|
||||||
boolean ignoreErrors
|
|
||||||
) {
|
|
||||||
CompletablePromise<V> promise = unresolved();
|
CompletablePromise<V> promise = unresolved();
|
||||||
while (promises.hasNext()) {
|
while (promises.hasNext()) {
|
||||||
if (promise.isCompleted()) {
|
if (promise.isCompleted()) {
|
||||||
|
|||||||
@@ -5,5 +5,4 @@ package dev.tommyjs.futur.promise;
|
|||||||
* executed asynchronously by the {@link PromiseFactory} that created the completed promise.
|
* executed asynchronously by the {@link PromiseFactory} that created the completed promise.
|
||||||
*/
|
*/
|
||||||
public interface AsyncPromiseListener<T> extends PromiseListener<T> {
|
public interface AsyncPromiseListener<T> extends PromiseListener<T> {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ import java.lang.invoke.MethodHandles;
|
|||||||
import java.lang.invoke.VarHandle;
|
import java.lang.invoke.VarHandle;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
|
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
|
||||||
@@ -44,12 +43,16 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void handleCompletion(@NotNull PromiseCompletion<T> cmp) {
|
protected void handleCompletion(@NotNull PromiseCompletion<T> cmp) {
|
||||||
if (!COMPLETION_HANDLE.compareAndSet(this, null, cmp)) return;
|
if (!COMPLETION_HANDLE.compareAndSet(this, null, cmp)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
sync.releaseShared(1);
|
sync.releaseShared(1);
|
||||||
callListeners(cmp);
|
callListeners(cmp);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected <F> Promise<T> completeExceptionallyDelayed(Throwable e, long delay, TimeUnit unit, PromiseScheduler<F> scheduler) {
|
protected <F> Promise<T> completeExceptionallyDelayed(Throwable e, long delay, TimeUnit unit,
|
||||||
|
PromiseScheduler<F> scheduler) {
|
||||||
runCompleter(this, () -> {
|
runCompleter(this, () -> {
|
||||||
F future = scheduler.schedule(() -> completeExceptionally(e), delay, unit);
|
F future = scheduler.schedule(() -> completeExceptionally(e), delay, unit);
|
||||||
addDirectListener(_ -> scheduler.cancel(future));
|
addDirectListener(_ -> scheduler.cancel(future));
|
||||||
@@ -60,7 +63,7 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
|
|||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected void callListeners(@NotNull PromiseCompletion<T> cmp) {
|
protected void callListeners(@NotNull PromiseCompletion<T> cmp) {
|
||||||
Iterator<PromiseListener<T>> iter = ((Iterable<PromiseListener<T>>) LISTENERS_HANDLE.getAndSet(this, null)).iterator();
|
var iter = ((Iterable<PromiseListener<T>>) LISTENERS_HANDLE.getAndSet(this, null)).iterator();
|
||||||
try {
|
try {
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
callListener(iter.next(), cmp);
|
callListener(iter.next(), cmp);
|
||||||
@@ -76,11 +79,14 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
|
|||||||
for (boolean haveNext = false; ; ) {
|
for (boolean haveNext = false; ; ) {
|
||||||
if (!haveNext) {
|
if (!haveNext) {
|
||||||
next = prev == Collections.EMPTY_LIST ? new ConcurrentLinkedQueue<>() : prev;
|
next = prev == Collections.EMPTY_LIST ? new ConcurrentLinkedQueue<>() : prev;
|
||||||
if (next != null) next.add(listener);
|
if (next != null) {
|
||||||
|
next.add(listener);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next))
|
if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next)) {
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
haveNext = (prev == (prev = listeners));
|
haveNext = (prev == (prev = listeners));
|
||||||
}
|
}
|
||||||
@@ -133,13 +139,15 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit) {
|
public @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit) {
|
||||||
Exception e = new CancellationException("Promise timed out after " + time + " " + unit.toString().toLowerCase());
|
Exception e = new CancellationException(
|
||||||
|
"Promise timed out after " + time + " " + unit.toString().toLowerCase());
|
||||||
return completeExceptionallyDelayed(e, time, unit, PromiseScheduler.getDefault());
|
return completeExceptionallyDelayed(e, time, unit, PromiseScheduler.getDefault());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit) {
|
public @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit) {
|
||||||
Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit.toString().toLowerCase());
|
Exception e = new TimeoutException(
|
||||||
|
"Promise stopped waiting after " + time + " " + unit.toString().toLowerCase());
|
||||||
return completeExceptionallyDelayed(e, time, unit, PromiseScheduler.getDefault());
|
return completeExceptionallyDelayed(e, time, unit, PromiseScheduler.getDefault());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -170,8 +178,7 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull CompletableFuture<T> toFuture() {
|
public @NotNull CompletableFuture<T> toFuture() {
|
||||||
return useCompletion(
|
return useCompletion(() -> {
|
||||||
() -> {
|
|
||||||
CompletableFuture<T> future = new CompletableFuture<>();
|
CompletableFuture<T> future = new CompletableFuture<>();
|
||||||
addDirectListener(future::complete, future::completeExceptionally);
|
addDirectListener(future::complete, future::completeExceptionally);
|
||||||
future.whenComplete((result, error) -> {
|
future.whenComplete((result, error) -> {
|
||||||
@@ -183,23 +190,16 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
|
|||||||
});
|
});
|
||||||
|
|
||||||
return future;
|
return future;
|
||||||
},
|
}, CompletableFuture::completedFuture, CompletableFuture::failedFuture);
|
||||||
CompletableFuture::completedFuture,
|
|
||||||
CompletableFuture::failedFuture
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull CompletionStage<T> toCompletionStage() {
|
public @NotNull CompletionStage<T> toCompletionStage() {
|
||||||
return useCompletion(
|
return useCompletion(() -> {
|
||||||
() -> {
|
|
||||||
CompletableFuture<T> future = new CompletableFuture<>();
|
CompletableFuture<T> future = new CompletableFuture<>();
|
||||||
addDirectListener(future::complete, future::completeExceptionally);
|
addDirectListener(future::complete, future::completeExceptionally);
|
||||||
return future;
|
return future;
|
||||||
},
|
}, CompletableFuture::completedStage, CompletableFuture::failedStage);
|
||||||
CompletableFuture::completedStage,
|
|
||||||
CompletableFuture::failedStage
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class Sync extends AbstractQueuedSynchronizer {
|
private static final class Sync extends AbstractQueuedSynchronizer {
|
||||||
|
|||||||
@@ -135,7 +135,8 @@ public interface Promise<T> {
|
|||||||
* @param unit the time unit of the delay
|
* @param unit the time unit of the delay
|
||||||
* @return a new promise that completes after the task is executed
|
* @return a new promise that completes after the task is executed
|
||||||
*/
|
*/
|
||||||
@NotNull Promise<Void> thenConsumeDelayedSync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit);
|
@NotNull Promise<Void> thenConsumeDelayedSync(@NotNull ExceptionalConsumer<T> task, long delay,
|
||||||
|
@NotNull TimeUnit unit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Chains a task to be executed after this promise completes. The task will be executed immediately
|
* Chains a task to be executed after this promise completes. The task will be executed immediately
|
||||||
@@ -159,7 +160,8 @@ public interface Promise<T> {
|
|||||||
* @param unit the time unit of the delay
|
* @param unit the time unit of the delay
|
||||||
* @return a new promise that completes, after the task is executed, with the task result
|
* @return a new promise that completes, after the task is executed, with the task result
|
||||||
*/
|
*/
|
||||||
<V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit);
|
<V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Chains a task to be executed after this promise completes. The task will be executed by the sync
|
* Chains a task to be executed after this promise completes. The task will be executed by the sync
|
||||||
@@ -185,7 +187,8 @@ public interface Promise<T> {
|
|||||||
* @param unit the time unit of the delay
|
* @param unit the time unit of the delay
|
||||||
* @return a new promise that completes, after the task is executed, with the task result
|
* @return a new promise that completes, after the task is executed, with the task result
|
||||||
*/
|
*/
|
||||||
<V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit);
|
<V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Chains a task to be executed after this promise completes. The task will be executed by the sync
|
* Chains a task to be executed after this promise completes. The task will be executed by the sync
|
||||||
@@ -245,7 +248,8 @@ public interface Promise<T> {
|
|||||||
* @param unit the time unit of the delay
|
* @param unit the time unit of the delay
|
||||||
* @return a new promise that completes after the task is executed
|
* @return a new promise that completes after the task is executed
|
||||||
*/
|
*/
|
||||||
@NotNull Promise<Void> thenConsumeDelayedAsync(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit);
|
@NotNull Promise<Void> thenConsumeDelayedAsync(@NotNull ExceptionalConsumer<T> task, long delay,
|
||||||
|
@NotNull TimeUnit unit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Chains a task to be executed after this promise completes. The task will be executed by the
|
* Chains a task to be executed after this promise completes. The task will be executed by the
|
||||||
@@ -269,7 +273,8 @@ public interface Promise<T> {
|
|||||||
* @param unit the time unit of the delay
|
* @param unit the time unit of the delay
|
||||||
* @return a new promise that completes, after the task is executed, with the task result
|
* @return a new promise that completes, after the task is executed, with the task result
|
||||||
*/
|
*/
|
||||||
<V> @NotNull Promise<V> thenSupplyDelayedAsync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit);
|
<V> @NotNull Promise<V> thenSupplyDelayedAsync(@NotNull ExceptionalSupplier<V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Chains a task to be executed after this promise completes. The task will be executed by the async
|
* Chains a task to be executed after this promise completes. The task will be executed by the async
|
||||||
@@ -295,7 +300,8 @@ public interface Promise<T> {
|
|||||||
* @param unit the time unit of the delay
|
* @param unit the time unit of the delay
|
||||||
* @return a new promise that completes, after the task is executed, with the task result
|
* @return a new promise that completes, after the task is executed, with the task result
|
||||||
*/
|
*/
|
||||||
<V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit);
|
<V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Chains a task to be executed after this promise completes. The task will be executed by the async
|
* Chains a task to be executed after this promise completes. The task will be executed by the async
|
||||||
@@ -355,7 +361,8 @@ public interface Promise<T> {
|
|||||||
* @param unit the time unit of the delay
|
* @param unit the time unit of the delay
|
||||||
* @return a new promise that completes after the task is executed
|
* @return a new promise that completes after the task is executed
|
||||||
*/
|
*/
|
||||||
@NotNull Promise<Void> thenConsumeDelayedVirtual(@NotNull ExceptionalConsumer<T> task, long delay, @NotNull TimeUnit unit);
|
@NotNull Promise<Void> thenConsumeDelayedVirtual(@NotNull ExceptionalConsumer<T> task, long delay,
|
||||||
|
@NotNull TimeUnit unit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Chains a task to be executed after this promise completes. The task will be executed
|
* Chains a task to be executed after this promise completes. The task will be executed
|
||||||
@@ -379,7 +386,8 @@ public interface Promise<T> {
|
|||||||
* @param unit the time unit of the delay
|
* @param unit the time unit of the delay
|
||||||
* @return a new promise that completes, after the task is executed, with the task result
|
* @return a new promise that completes, after the task is executed, with the task result
|
||||||
*/
|
*/
|
||||||
<V> @NotNull Promise<V> thenSupplyDelayedVirtual(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit);
|
<V> @NotNull Promise<V> thenSupplyDelayedVirtual(@NotNull ExceptionalSupplier<V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Chains a task to be executed after this promise completes. The task will be executed
|
* Chains a task to be executed after this promise completes. The task will be executed
|
||||||
@@ -405,7 +413,8 @@ public interface Promise<T> {
|
|||||||
* @param unit the time unit of the delay
|
* @param unit the time unit of the delay
|
||||||
* @return a new promise that completes, after the task is executed, with the task result
|
* @return a new promise that completes, after the task is executed, with the task result
|
||||||
*/
|
*/
|
||||||
<V> @NotNull Promise<V> thenApplyDelayedVirtual(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit);
|
<V> @NotNull Promise<V> thenApplyDelayedVirtual(@NotNull ExceptionalFunction<T, V> task, long delay,
|
||||||
|
@NotNull TimeUnit unit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Chains a task to be executed after this promise completes. The task will be executed
|
* Chains a task to be executed after this promise completes. The task will be executed
|
||||||
@@ -475,7 +484,8 @@ public interface Promise<T> {
|
|||||||
* @param errorHandler the function to call on error
|
* @param errorHandler the function to call on error
|
||||||
* @return continuation of the promise chain
|
* @return continuation of the promise chain
|
||||||
*/
|
*/
|
||||||
@NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
|
@NotNull Promise<T> addDirectListener(@Nullable Consumer<T> successHandler,
|
||||||
|
@Nullable Consumer<Throwable> errorHandler);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a listener to this promise that will be executed immediately when this promise completes,
|
* Adds a listener to this promise that will be executed immediately when this promise completes,
|
||||||
@@ -504,7 +514,8 @@ public interface Promise<T> {
|
|||||||
* @param successHandler the function to call on success
|
* @param successHandler the function to call on success
|
||||||
* @param errorHandler the function to call on error
|
* @param errorHandler the function to call on error
|
||||||
*/
|
*/
|
||||||
@NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
|
@NotNull Promise<T> addAsyncListener(@Nullable Consumer<T> successHandler,
|
||||||
|
@Nullable Consumer<Throwable> errorHandler);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a listener to this promise that will be called if the promise is completed successfully.
|
* Adds a listener to this promise that will be called if the promise is completed successfully.
|
||||||
|
|||||||
@@ -60,7 +60,7 @@ public record PromiseCompletion<T>(@Nullable T result, @Nullable Throwable excep
|
|||||||
*
|
*
|
||||||
* @return {@code true} if the completion was cancelled, {@code false} otherwise
|
* @return {@code true} if the completion was cancelled, {@code false} otherwise
|
||||||
*/
|
*/
|
||||||
public boolean wasCancelled() {
|
public boolean isCancelled() {
|
||||||
return exception instanceof CancellationException;
|
return exception instanceof CancellationException;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -89,9 +89,12 @@ public record PromiseCompletion<T>(@Nullable T result, @Nullable Throwable excep
|
|||||||
* @throws CompletionException if the completion was exceptional
|
* @throws CompletionException if the completion was exceptional
|
||||||
*/
|
*/
|
||||||
public T get() {
|
public T get() {
|
||||||
if (isSuccess()) return getResult();
|
if (isSuccess()) {
|
||||||
|
return getResult();
|
||||||
|
} else {
|
||||||
throw new CompletionException(getException());
|
throw new CompletionException(getException());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the result or throws an {@link ExecutionException} if the completion was exceptional.
|
* Gets the result or throws an {@link ExecutionException} if the completion was exceptional.
|
||||||
@@ -100,8 +103,11 @@ public record PromiseCompletion<T>(@Nullable T result, @Nullable Throwable excep
|
|||||||
* @throws ExecutionException if the completion was exceptional
|
* @throws ExecutionException if the completion was exceptional
|
||||||
*/
|
*/
|
||||||
public T getChecked() throws ExecutionException {
|
public T getChecked() throws ExecutionException {
|
||||||
if (isSuccess()) return getResult();
|
if (isSuccess()) {
|
||||||
|
return getResult();
|
||||||
|
} else {
|
||||||
throw new ExecutionException(getException());
|
throw new ExecutionException(getException());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -108,7 +108,7 @@ public interface PromiseFactory {
|
|||||||
*/
|
*/
|
||||||
default <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) {
|
default <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) {
|
||||||
return wrap(future, future);
|
return wrap(future, future);
|
||||||
};
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Combines two promises into a single promise that resolves when both promises are completed.
|
* Combines two promises into a single promise that resolves when both promises are completed.
|
||||||
@@ -310,8 +310,7 @@ public interface PromiseFactory {
|
|||||||
* @param expectedSize the expected size of the iterator (used for optimization)
|
* @param expectedSize the expected size of the iterator (used for optimization)
|
||||||
* @return the combined promise
|
* @return the combined promise
|
||||||
*/
|
*/
|
||||||
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterator<Promise<?>> promises,
|
@NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterator<Promise<?>> promises, int expectedSize);
|
||||||
int expectedSize);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Combines multiple promises into a single promise that completes when all promises
|
* Combines multiple promises into a single promise that completes when all promises
|
||||||
|
|||||||
@@ -11,11 +11,8 @@ public class PromiseFactoryImpl extends AbstractPromiseFactory {
|
|||||||
private final @NotNull PromiseExecutor<?> syncExecutor;
|
private final @NotNull PromiseExecutor<?> syncExecutor;
|
||||||
private final @NotNull PromiseExecutor<?> asyncExecutor;
|
private final @NotNull PromiseExecutor<?> asyncExecutor;
|
||||||
|
|
||||||
public PromiseFactoryImpl(
|
public PromiseFactoryImpl(@NotNull Logger logger, @NotNull PromiseExecutor<?> syncExecutor,
|
||||||
@NotNull Logger logger,
|
@NotNull PromiseExecutor<?> asyncExecutor) {
|
||||||
@NotNull PromiseExecutor<?> syncExecutor,
|
|
||||||
@NotNull PromiseExecutor<?> asyncExecutor
|
|
||||||
) {
|
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.syncExecutor = syncExecutor;
|
this.syncExecutor = syncExecutor;
|
||||||
this.asyncExecutor = asyncExecutor;
|
this.asyncExecutor = asyncExecutor;
|
||||||
@@ -58,6 +55,9 @@ public class PromiseFactoryImpl extends AbstractPromiseFactory {
|
|||||||
|
|
||||||
private class PromiseImpl<T> extends BasePromise<T> {
|
private class PromiseImpl<T> extends BasePromise<T> {
|
||||||
|
|
||||||
|
PromiseImpl() {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull AbstractPromiseFactory getFactory() {
|
public @NotNull AbstractPromiseFactory getFactory() {
|
||||||
return PromiseFactoryImpl.this;
|
return PromiseFactoryImpl.this;
|
||||||
@@ -67,15 +67,15 @@ public class PromiseFactoryImpl extends AbstractPromiseFactory {
|
|||||||
|
|
||||||
private class CompletedPromiseImpl<T> extends CompletedPromise<T> {
|
private class CompletedPromiseImpl<T> extends CompletedPromise<T> {
|
||||||
|
|
||||||
public CompletedPromiseImpl(@Nullable T result) {
|
CompletedPromiseImpl(@Nullable T result) {
|
||||||
super(new PromiseCompletion<>(result));
|
super(new PromiseCompletion<>(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletedPromiseImpl(@NotNull Throwable exception) {
|
CompletedPromiseImpl(@NotNull Throwable exception) {
|
||||||
super(new PromiseCompletion<>(exception));
|
super(new PromiseCompletion<>(exception));
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletedPromiseImpl() {
|
CompletedPromiseImpl() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user