Merge pull request #8 from tommyskeff/feat/error-propagation

add cancellation and refractor PromiseFactory
This commit is contained in:
Tommy
2024-04-04 17:25:36 +01:00
committed by GitHub
24 changed files with 670 additions and 445 deletions

View File

@@ -2,6 +2,8 @@
Futur4J is a powerful and intuitive open-source Java library that simplifies asynchronous task scheduling, inspired by the concept of JavaScript promises. Futur4J is a powerful and intuitive open-source Java library that simplifies asynchronous task scheduling, inspired by the concept of JavaScript promises.
**This documentation is outdated. Please don't read it.**
## Dependency ## Dependency
The Futur4J project is composed of multiple modules. It is required to include the `futur-api` module, and the other modules depend on it at runtime, however the others are optional and dependent on your use case. The Futur4J project is composed of multiple modules. It is required to include the `futur-api` module, and the other modules depend on it at runtime, however the others are optional and dependent on your use case.
### Gradle ### Gradle

View File

@@ -1,4 +1,5 @@
plugins { plugins {
id 'java'
id 'com.github.johnrengelman.shadow' version '8.1.1' id 'com.github.johnrengelman.shadow' version '8.1.1'
id 'io.github.gradle-nexus.publish-plugin' version '1.3.0' id 'io.github.gradle-nexus.publish-plugin' version '1.3.0'
} }
@@ -12,8 +13,8 @@ nexusPublishing {
} }
subprojects { subprojects {
group = "dev.tommyjs" group = 'dev.tommyjs'
version = "2.1.3" version = '2.2.0'
apply plugin: 'java' apply plugin: 'java'
apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'com.github.johnrengelman.shadow'
@@ -27,4 +28,23 @@ subprojects {
repositories { repositories {
mavenCentral() mavenCentral()
} }
dependencies {
implementation 'org.jetbrains:annotations:24.1.0'
implementation 'org.slf4j:slf4j-api:2.0.12'
compileOnly 'io.projectreactor:reactor-core:3.6.4'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testImplementation 'io.projectreactor:reactor-core:3.6.4'
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
testImplementation 'org.slf4j:slf4j-api:2.0.12'
testImplementation 'ch.qos.logback:logback-classic:1.5.3'
}
test {
useJUnitPlatform()
testLogging {
exceptionFormat = 'full'
}
}
} }

View File

@@ -1,4 +0,0 @@
dependencies {
implementation("org.jetbrains:annotations:24.1.0")
implementation("org.slf4j:slf4j-api:2.0.12")
}

View File

@@ -4,9 +4,10 @@ import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class DualPoolExecutor implements PromiseExecutor { public class DualPoolExecutor implements PromiseExecutor<ScheduledFuture<?>> {
private final @NotNull ScheduledExecutorService syncSvc; private final @NotNull ScheduledExecutorService syncSvc;
private final @NotNull ScheduledExecutorService asyncSvc; private final @NotNull ScheduledExecutorService asyncSvc;
@@ -16,18 +17,23 @@ public class DualPoolExecutor implements PromiseExecutor {
this.asyncSvc = asyncSvc; this.asyncSvc = asyncSvc;
} }
@Override
public void runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
syncSvc.schedule(task, delay, unit);
}
@Override
public void runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
asyncSvc.schedule(task, delay, unit);
}
public static @NotNull DualPoolExecutor create(int asyncPoolSize) { public static @NotNull DualPoolExecutor create(int asyncPoolSize) {
return new DualPoolExecutor(Executors.newSingleThreadScheduledExecutor(), Executors.newScheduledThreadPool(asyncPoolSize)); return new DualPoolExecutor(Executors.newSingleThreadScheduledExecutor(), Executors.newScheduledThreadPool(asyncPoolSize));
} }
@Override
public ScheduledFuture<?> runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return syncSvc.schedule(task, delay, unit);
}
@Override
public ScheduledFuture<?> runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
return asyncSvc.schedule(task, delay, unit);
}
@Override
public void cancel(ScheduledFuture<?> task) {
task.cancel(true);
}
} }

View File

@@ -4,18 +4,20 @@ import org.jetbrains.annotations.NotNull;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public interface PromiseExecutor { public interface PromiseExecutor<T> {
void runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit); T runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit);
void runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit); T runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit);
default void runSync(@NotNull Runnable task) { default T runSync(@NotNull Runnable task) {
runSync(task, 0L, TimeUnit.MILLISECONDS); return runSync(task, 0L, TimeUnit.MILLISECONDS);
} }
default void runAsync(@NotNull Runnable task) { default T runAsync(@NotNull Runnable task) {
runAsync(task, 0L, TimeUnit.MILLISECONDS); return runAsync(task, 0L, TimeUnit.MILLISECONDS);
} }
void cancel(T task);
} }

View File

@@ -4,24 +4,11 @@ import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SinglePoolExecutor implements PromiseExecutor {
private final @NotNull ScheduledExecutorService service;
public class SinglePoolExecutor extends DualPoolExecutor {
public SinglePoolExecutor(@NotNull ScheduledExecutorService service) { public SinglePoolExecutor(@NotNull ScheduledExecutorService service) {
this.service = service; super(service, service);
}
@Override
public void runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
service.schedule(task, delay, unit);
}
@Override
public void runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
service.schedule(task, delay, unit);
} }
public static @NotNull SinglePoolExecutor create(int threadPoolSize) { public static @NotNull SinglePoolExecutor create(int threadPoolSize) {

View File

@@ -2,33 +2,25 @@ package dev.tommyjs.futur.impl;
import dev.tommyjs.futur.executor.PromiseExecutor; import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.promise.AbstractPromise; import dev.tommyjs.futur.promise.AbstractPromise;
import dev.tommyjs.futur.promise.PromiseFactory; import dev.tommyjs.futur.promise.AbstractPromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
public class SimplePromise<T> extends AbstractPromise<T> { public class SimplePromise<T, F> extends AbstractPromise<T, F> {
private final PromiseExecutor executor; private final @NotNull AbstractPromiseFactory<F> factory;
private final Logger logger;
private final PromiseFactory factory;
public SimplePromise(PromiseExecutor executor, Logger logger, PromiseFactory factory) { public SimplePromise(@NotNull AbstractPromiseFactory<F> factory) {
this.executor = executor;
this.logger = logger;
this.factory = factory; this.factory = factory;
} }
@Override @Deprecated
protected PromiseExecutor getExecutor() { public SimplePromise(@NotNull PromiseExecutor<F> executor, @NotNull Logger logger, @NotNull AbstractPromiseFactory<F> factory) {
return executor; this(factory);
} }
@Override @Override
protected Logger getLogger() { public @NotNull AbstractPromiseFactory<F> getFactory() {
return logger;
}
@Override
public PromiseFactory getFactory() {
return factory; return factory;
} }

View File

@@ -1,39 +1,34 @@
package dev.tommyjs.futur.impl; package dev.tommyjs.futur.impl;
import dev.tommyjs.futur.executor.PromiseExecutor; import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.promise.AbstractPromise; import dev.tommyjs.futur.promise.AbstractPromiseFactory;
import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
public class SimplePromiseFactory implements PromiseFactory { public class SimplePromiseFactory<F> extends AbstractPromiseFactory<F> {
private final PromiseExecutor executor; private final PromiseExecutor<F> executor;
private final Logger logger; private final Logger logger;
public SimplePromiseFactory(PromiseExecutor executor, Logger logger) { public SimplePromiseFactory(PromiseExecutor<F> executor, Logger logger) {
this.executor = executor; this.executor = executor;
this.logger = logger; this.logger = logger;
} }
@Override
public @NotNull <T> Promise<T> resolve(T value) {
AbstractPromise<T> promise = new SimplePromise<>(executor, logger, this);
promise.complete(value);
return promise;
}
@Override @Override
public @NotNull <T> Promise<T> unresolved() { public @NotNull <T> Promise<T> unresolved() {
return new SimplePromise<>(executor, logger, this); return new SimplePromise<>(this);
} }
@Override @Override
public @NotNull <T> Promise<T> error(Throwable error) { public @NotNull Logger getLogger() {
AbstractPromise<T> promise = new SimplePromise<>(executor, logger, this); return logger;
promise.completeExceptionally(error); }
return promise;
@Override
public @NotNull PromiseExecutor<F> getExecutor() {
return executor;
} }
} }

View File

@@ -1,25 +0,0 @@
package dev.tommyjs.futur.impl;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.promise.AbstractPromise;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.slf4j.Logger;
public class StaticPromise<T> extends AbstractPromise<T> {
@Override
protected PromiseExecutor getExecutor() {
return StaticPromiseFactory.EXECUTOR;
}
@Override
protected Logger getLogger() {
return StaticPromiseFactory.LOGGER;
}
@Override
public PromiseFactory getFactory() {
return StaticPromiseFactory.INSTANCE;
}
}

View File

@@ -1,48 +0,0 @@
package dev.tommyjs.futur.impl;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.executor.SinglePoolExecutor;
import dev.tommyjs.futur.promise.AbstractPromise;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Executors;
public class StaticPromiseFactory implements PromiseFactory {
public static final @NotNull PromiseFactory INSTANCE;
public static final @NotNull PromiseExecutor EXECUTOR;
public static final @NotNull Logger LOGGER;
static {
INSTANCE = new StaticPromiseFactory();
EXECUTOR = SinglePoolExecutor.create(1);
LOGGER = LoggerFactory.getLogger(StaticPromiseFactory.class);
}
private StaticPromiseFactory() {
}
@Override
public @NotNull <T> Promise<T> resolve(T value) {
AbstractPromise<T> promise = new StaticPromise<>();
promise.complete(value);
return promise;
}
@Override
public @NotNull <T> Promise<T> unresolved() {
return new StaticPromise<>();
}
@Override
public @NotNull <T> Promise<T> error(Throwable error) {
AbstractPromise<T> promise = new StaticPromise<>();
promise.completeExceptionally(error);
return promise;
}
}

View File

@@ -10,12 +10,14 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
public abstract class AbstractPromise<T> implements Promise<T> { public abstract class AbstractPromise<T, F> implements Promise<T> {
private final Collection<PromiseListener<T>> listeners; private final Collection<PromiseListener<T>> listeners;
private final AtomicReference<PromiseCompletion<T>> completion; private final AtomicReference<PromiseCompletion<T>> completion;
@@ -25,14 +27,14 @@ public abstract class AbstractPromise<T> implements Promise<T> {
this.completion = new AtomicReference<>(); this.completion = new AtomicReference<>();
} }
protected abstract PromiseExecutor getExecutor(); public abstract @NotNull AbstractPromiseFactory<F> getFactory();
protected abstract Logger getLogger(); protected @NotNull PromiseExecutor<F> getExecutor() {
return getFactory().getExecutor();
}
@Deprecated protected @NotNull Logger getLogger() {
@Override return getFactory().getLogger();
public T join(long interval, long timeoutMillis) throws TimeoutException {
return join(timeoutMillis);
} }
@Override @Override
@@ -56,7 +58,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
} }
if (completion == null) if (completion == null)
throw new TimeoutException("Promise timed out after " + timeoutMillis + "ms"); throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms");
return joinCompletion(completion); return joinCompletion(completion);
} }
@@ -113,57 +115,47 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public <V> @NotNull Promise<V> thenApplySync(@NotNull ExceptionalFunction<T, V> task) { public <V> @NotNull Promise<V> thenApplySync(@NotNull ExceptionalFunction<T, V> task) {
Promise<V> promise = getFactory().unresolved(); Promise<V> promise = getFactory().unresolved();
addListener(ctx -> { addListener(
if (ctx.isError()) { res -> {
//noinspection ConstantConditions Runnable runnable = createRunnable(res, promise, task);
promise.completeExceptionally(ctx.getException()); F future = getExecutor().runSync(runnable);
return; promise.onCancel((e) -> getExecutor().cancel(future));
} },
promise::completeExceptionally
Runnable runnable = createRunnable(ctx, promise, task); );
getExecutor().runSync(runnable, 0L, TimeUnit.MILLISECONDS);
});
addChild(promise);
return promise; return promise;
} }
@Override @Override
public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) { public <V> @NotNull Promise<V> thenApplyDelayedSync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
Promise<V> promise = getFactory().unresolved(); Promise<V> promise = getFactory().unresolved();
addListener(ctx -> { addListener(
if (ctx.isError()) { res -> {
//noinspection ConstantConditions Runnable runnable = createRunnable(res, promise, task);
promise.completeExceptionally(ctx.getException()); F future = getExecutor().runSync(runnable, delay, unit);
return; promise.onCancel((e) -> getExecutor().cancel(future));
} },
promise::completeExceptionally
Runnable runnable = createRunnable(ctx, promise, task); );
getExecutor().runSync(runnable, delay, unit);
});
addChild(promise);
return promise; return promise;
} }
@Override @Override
public <V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, @NotNull Promise<V>> task) { public <V> @NotNull Promise<V> thenComposeSync(@NotNull ExceptionalFunction<T, @NotNull Promise<V>> task) {
Promise<V> promise = getFactory().unresolved(); Promise<V> promise = getFactory().unresolved();
thenApplySync(task).thenConsumeAsync(nestedPromise -> { thenApplySync(task).addListener(
nestedPromise.addListener(ctx1 -> { nestedPromise -> {
if (ctx1.isError()) { nestedPromise.propagateResult(promise);
//noinspection ConstantConditions nestedPromise.addChild(promise);
promise.completeExceptionally(ctx1.getException()); },
return; promise::completeExceptionally
} );
promise.complete(ctx1.getResult());
});
}).addListener(ctx2 -> {
if (ctx2.isError()) {
//noinspection ConstantConditions
promise.completeExceptionally(ctx2.getException());
}
});
addChild(promise);
return promise; return promise;
} }
@@ -220,83 +212,66 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public <V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task) { public <V> @NotNull Promise<V> thenApplyAsync(@NotNull ExceptionalFunction<T, V> task) {
Promise<V> promise = getFactory().unresolved(); Promise<V> promise = getFactory().unresolved();
addListener(ctx -> { addListener(
if (ctx.isError()) { (res) -> {
//noinspection ConstantConditions Runnable runnable = createRunnable(res, promise, task);
promise.completeExceptionally(ctx.getException()); F future = getExecutor().runAsync(runnable);
return; promise.onCancel((e) -> getExecutor().cancel(future));
} },
promise::completeExceptionally
Runnable runnable = createRunnable(ctx, promise, task); );
getExecutor().runAsync(runnable, 0L, TimeUnit.MILLISECONDS);
});
addChild(promise);
return promise; return promise;
} }
@Override @Override
public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) { public <V> @NotNull Promise<V> thenApplyDelayedAsync(@NotNull ExceptionalFunction<T, V> task, long delay, @NotNull TimeUnit unit) {
Promise<V> promise = getFactory().unresolved(); Promise<V> promise = getFactory().unresolved();
addListener(ctx -> { addListener(
Runnable runnable = createRunnable(ctx, promise, task); res -> {
getExecutor().runAsync(runnable, delay, unit); Runnable runnable = createRunnable(res, promise, task);
}); F future = getExecutor().runAsync(runnable, delay, unit);
promise.onCancel((e) -> getExecutor().cancel(future));
},
promise::completeExceptionally
);
addChild(promise);
return promise; return promise;
} }
@Override @Override
public <V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task) { public <V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task) {
Promise<V> promise = getFactory().unresolved(); Promise<V> promise = getFactory().unresolved();
thenApplyAsync(task).thenConsumeAsync(nestedPromise -> { thenApplyAsync(task).addListener(
nestedPromise.addListener(ctx1 -> { nestedPromise -> {
if (ctx1.isError()) { nestedPromise.propagateResult(promise);
//noinspection ConstantConditions nestedPromise.addChild(promise);
promise.completeExceptionally(ctx1.getException()); },
return; promise::completeExceptionally
} );
promise.complete(ctx1.getResult());
});
}).addListener(ctx2 -> {
if (ctx2.isError()) {
//noinspection ConstantConditions
promise.completeExceptionally(ctx2.getException());
}
});
addChild(promise);
return promise; return promise;
} }
private <V> @NotNull Runnable createRunnable(@NotNull PromiseCompletion<T> ctx, @NotNull Promise<V> promise, @NotNull ExceptionalFunction<T, V> task) { private <V> @NotNull Runnable createRunnable(T result, @NotNull Promise<V> promise, @NotNull ExceptionalFunction<T, V> task) {
return () -> { return () -> {
if (ctx.isError()) { if (promise.isCompleted()) return;
//noinspection ConstantConditions
promise.completeExceptionally(ctx.getException());
return;
}
try { try {
V result = task.apply(ctx.getResult()); V nextResult = task.apply(result);
promise.complete(result); promise.complete(nextResult);
} catch (Throwable e) { } catch (Throwable e) {
promise.completeExceptionally(e); promise.completeExceptionally(e);
} }
}; };
} }
@Override
public @NotNull Promise<T> logExceptions() {
return logExceptions("Exception caught in promise chain");
}
@Override @Override
public @NotNull Promise<T> logExceptions(@NotNull String message) { public @NotNull Promise<T> logExceptions(@NotNull String message) {
return addListener(ctx -> { return onError(e -> getLogger().error(message, e));
if (ctx.isError()) {
getLogger().error(message, ctx.getException());
}
});
} }
@Override @Override
@@ -320,19 +295,51 @@ public abstract class AbstractPromise<T> implements Promise<T> {
} }
@Override @Override
public @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit) { public @NotNull Promise<T> addListener(@Nullable Consumer<T> successListener, @Nullable Consumer<Throwable> errorListener) {
getExecutor().runAsync(() -> { return addListener((res) -> {
if (!isCompleted()) { if (res.isError()) {
completeExceptionally(new TimeoutException("Promise timed out after " + time + " " + unit)); if (errorListener != null) errorListener.accept(res.getException());
} else {
if (successListener != null) successListener.accept(res.getResult());
} }
}, time, unit); });
return this;
} }
@Override @Override
public @NotNull Promise<T> timeout(long ms) { public @NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener) {
return timeout(ms, TimeUnit.MILLISECONDS); return addListener(listener, null);
}
@Override
public @NotNull Promise<T> onError(@NotNull Consumer<Throwable> listener) {
return addListener(null, listener);
}
@Override
public <E extends Throwable> @NotNull Promise<T> onError(@NotNull Class<E> clazz, @NotNull Consumer<E> listener) {
return onError((e) -> {
if (clazz.isAssignableFrom(e.getClass())) {
//noinspection unchecked
listener.accept((E) e);
}
});
}
@Override
public @NotNull Promise<T> onCancel(@NotNull Consumer<CancellationException> listener) {
return onError(CancellationException.class, listener);
}
@Deprecated
@Override
public @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit) {
return maxWaitTime(time, unit);
}
@Override
public @NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit) {
F future = getExecutor().runAsync(() -> completeExceptionally(new TimeoutException("Promise stopped waiting after " + time + " " + unit)), time, unit);
return onError(e -> getExecutor().cancel(future));
} }
private void handleCompletion(@NotNull PromiseCompletion<T> ctx) { private void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
@@ -358,6 +365,26 @@ public abstract class AbstractPromise<T> implements Promise<T> {
return this.completion.compareAndSet(null, completion); return this.completion.compareAndSet(null, completion);
} }
@Override
public void addChild(@NotNull Promise<?> child) {
child.onCancel((e) -> this.cancel(e.getMessage()));
}
@Override
public void propagateResult(@NotNull Promise<T> target) {
addListener(target::complete, target::completeExceptionally);
}
@Override
public void cancel() {
completeExceptionally(new CancellationException());
}
@Override
public void cancel(@NotNull String message) {
completeExceptionally(new CancellationException(message));
}
@Override @Override
public void complete(@Nullable T result) { public void complete(@Nullable T result) {
handleCompletion(new PromiseCompletion<>(result)); handleCompletion(new PromiseCompletion<>(result));

View File

@@ -0,0 +1,181 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public abstract class AbstractPromiseFactory<F> implements PromiseFactory {
public abstract @NotNull PromiseExecutor<F> getExecutor();
@Override
public <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
List<Promise<?>> promises = List.of(p1, p2);
return all(promises)
.thenApplyAsync((res) -> new AbstractMap.SimpleImmutableEntry<>(
Objects.requireNonNull(p1.getCompletion()).getResult(),
Objects.requireNonNull(p2.getCompletion()).getResult()
));
}
@Override
public <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
if (promises.isEmpty()) return resolve(Collections.emptyMap());
Map<K, V> map = new HashMap<>();
Promise<Map<K, V>> promise = unresolved();
for (Map.Entry<K, Promise<V>> entry : promises.entrySet()) {
entry.getValue().addListener((ctx) -> {
synchronized (map) {
if (ctx.getException() != null) {
if (exceptionHandler == null) {
promise.completeExceptionally(ctx.getException());
} else {
exceptionHandler.accept(entry.getKey(), ctx.getException());
map.put(entry.getKey(), null);
}
} else {
map.put(entry.getKey(), ctx.getResult());
}
if (map.size() == promises.size()) {
promise.complete(map);
}
}
});
}
return promise;
}
@Override
public <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable Consumer<Throwable> exceptionHandler) {
AtomicInteger index = new AtomicInteger();
return this.combine(
StreamSupport.stream(promises.spliterator(), false)
.collect(Collectors.toMap(k -> index.getAndIncrement(), v -> v)),
exceptionHandler != null ? (i, e) -> exceptionHandler.accept(e) : null
).thenApplyAsync(v ->
v.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(Collectors.toList())
);
}
@Override
public <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
return combine(promises, null);
}
@Override
public @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
List<Promise<?>> promises = new ArrayList<>();
promiseIterable.iterator().forEachRemaining(promises::add);
if (promises.isEmpty()) return resolve(Collections.emptyList());
PromiseCompletion<?>[] results = new PromiseCompletion<?>[promises.size()];
Promise<List<PromiseCompletion<?>>> promise = unresolved();
var iter = promises.listIterator();
while (iter.hasNext()) {
int index = iter.nextIndex();
iter.next().addListener((res) -> {
synchronized (results) {
results[index] = res;
if (Arrays.stream(results).allMatch(Objects::nonNull))
promise.complete(Arrays.asList(results));
}
});
}
return promise;
}
@Override
public @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
List<Promise<?>> promises = new ArrayList<>();
promiseIterable.iterator().forEachRemaining(promises::add);
if (promises.isEmpty()) return resolve(null);
AtomicInteger completed = new AtomicInteger();
Promise<Void> promise = unresolved();
for (Promise<?> p : promises) {
p.addListener((res) -> {
if (res.getException() != null) {
promise.completeExceptionally(res.getException());
}
if (completed.incrementAndGet() == promises.size()) {
promise.complete(null);
}
});
}
return promise;
}
@Override
public <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future) {
Promise<T> promise = unresolved();
future.whenComplete((v, e) -> {
if (e != null) {
promise.completeExceptionally(e);
} else {
promise.complete(v);
}
});
promise.onCancel((e) -> future.cancel(true));
return promise;
}
@Override
public <T> @NotNull Promise<T> wrap(@NotNull Mono<T> mono) {
Promise<T> promise = this.unresolved();
Disposable disposable = mono.subscribe(promise::complete, promise::completeExceptionally);
promise.onCancel((e) -> disposable.dispose());
return promise;
}
@Override
public <T> @NotNull Promise<T> resolve(T value) {
Promise<T> promise = unresolved();
promise.complete(value);
return promise;
}
@Override
public <T> @NotNull Promise<T> error(@NotNull Throwable error) {
Promise<T> promise = unresolved();
promise.completeExceptionally(error);
return promise;
}
@Override
public @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
Promise<Void> promise = unresolved();
p.addListener(ctx -> {
if (ctx.getException() != null) {
promise.completeExceptionally(ctx.getException());
} else {
promise.complete(null);
}
});
return promise;
}
}

View File

@@ -7,35 +7,16 @@ import dev.tommyjs.futur.function.ExceptionalSupplier;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
public interface Promise<T> { public interface Promise<T> {
static <T> @NotNull Promise<T> resolve(T value, PromiseFactory factory) {
return factory.resolve(value);
}
static <T> @NotNull Promise<T> error(Throwable error, PromiseFactory factory) {
return factory.error(error);
}
static <T> @NotNull Promise<T> unresolved(PromiseFactory factory) {
return factory.unresolved();
}
static @NotNull Promise<Void> start(PromiseFactory factory) {
return factory.resolve(null);
}
PromiseFactory getFactory(); PromiseFactory getFactory();
@Deprecated
T join(long interval, long timeout) throws TimeoutException;
T join(long timeout) throws TimeoutException;
@NotNull Promise<Void> thenRunSync(@NotNull ExceptionalRunnable task); @NotNull Promise<Void> thenRunSync(@NotNull ExceptionalRunnable task);
@NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit); @NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit);
@@ -74,22 +55,56 @@ public interface Promise<T> {
<V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task); <V> @NotNull Promise<V> thenComposeAsync(@NotNull ExceptionalFunction<T, Promise<V>> task);
@NotNull Promise<T> logExceptions();
@NotNull Promise<T> logExceptions(@NotNull String message); @NotNull Promise<T> logExceptions(@NotNull String message);
default @NotNull Promise<T> logExceptions() {
return logExceptions("Exception caught in promise chain");
}
@NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener); @NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener);
@NotNull Promise<T> addListener(@Nullable Consumer<T> successHandler, @Nullable Consumer<Throwable> errorHandler);
@NotNull Promise<T> onSuccess(@NotNull Consumer<T> listener);
@NotNull Promise<T> onError(@NotNull Consumer<Throwable> listener);
<E extends Throwable> @NotNull Promise<T> onError(@NotNull Class<E> clazz, @NotNull Consumer<E> listener);
@NotNull Promise<T> onCancel(@NotNull Consumer<CancellationException> listener);
@Deprecated
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit); @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);
@NotNull Promise<T> timeout(long ms); @Deprecated
default @NotNull Promise<T> timeout(long ms) {
return timeout(ms, TimeUnit.MILLISECONDS);
}
@NotNull Promise<T> maxWaitTime(long time, @NotNull TimeUnit unit);
default @NotNull Promise<T> maxWaitTime(long ms) {
return maxWaitTime(ms, TimeUnit.MILLISECONDS);
}
void addChild(@NotNull Promise<?> child);
void propagateResult(@NotNull Promise<T> target);
void cancel(@Nullable String reason);
default void cancel() {
cancel(null);
}
void complete(@Nullable T result); void complete(@Nullable T result);
void completeExceptionally(@NotNull Throwable result); void completeExceptionally(@NotNull Throwable result);
boolean isCompleted(); T join(long timeout) throws TimeoutException;
@Nullable PromiseCompletion<T> getCompletion(); @Nullable PromiseCompletion<T> getCompletion();
boolean isCompleted();
} }

View File

@@ -3,6 +3,8 @@ package dev.tommyjs.futur.promise;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import java.util.concurrent.CancellationException;
public class PromiseCompletion<T> { public class PromiseCompletion<T> {
private @Nullable T result; private @Nullable T result;
@@ -36,6 +38,10 @@ public class PromiseCompletion<T> {
return getException() != null; return getException() != null;
} }
public boolean wasCanceled() {
return getException() instanceof CancellationException;
}
public @Nullable T getResult() { public @Nullable T getResult() {
return result; return result;
} }

View File

@@ -1,36 +1,61 @@
package dev.tommyjs.futur.promise; package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.executor.SinglePoolExecutor;
import dev.tommyjs.futur.impl.SimplePromiseFactory;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono;
import java.util.concurrent.Executors; import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public interface PromiseFactory { public interface PromiseFactory {
<T> @NotNull Promise<T> resolve(T value); @NotNull Logger getLogger();
<T> @NotNull Promise<T> unresolved(); <T> @NotNull Promise<T> unresolved();
<T> @NotNull Promise<T> error(Throwable error); <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2);
static PromiseFactory create(PromiseExecutor executor, Logger logger) { <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler);
return new SimplePromiseFactory(executor, logger);
default <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
return combine(promises, null);
} }
static PromiseFactory create(PromiseExecutor executor) { <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable Consumer<Throwable> exceptionHandler);
return create(executor, LoggerFactory.getLogger(SimplePromiseFactory.class));
default <V> @NotNull Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
return combine(promises, null);
} }
static PromiseFactory create(int threadPoolSize) { @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable);
return create(SinglePoolExecutor.create(threadPoolSize));
default @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
return allSettled(Arrays.asList(promiseArray));
} }
static PromiseFactory create() { @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable);
return create(Runtime.getRuntime().availableProcessors());
default @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
return all(Arrays.asList(promiseArray));
} }
<T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future);
<T> @NotNull Promise<T> wrap(@NotNull Mono<T> mono);
<T> @NotNull Promise<T> resolve(T value);
default @NotNull Promise<Void> start() {
return resolve(null);
}
<T> @NotNull Promise<T> error(@NotNull Throwable error);
@NotNull Promise<Void> erase(@NotNull Promise<?> p);
} }

View File

@@ -4,37 +4,21 @@ import dev.tommyjs.futur.function.ExceptionalFunction;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import java.util.*; import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.stream.Collectors;
/**
* @deprecated Use PromiseFactory instance methods instead.
*/
@Deprecated
public class Promises { public class Promises {
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2, PromiseFactory factory) { public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2, PromiseFactory factory) {
Promise<Map.Entry<K, V>> promise = factory.unresolved(); return factory.combine(p1, p2);
p1.addListener(ctx -> {
if (ctx.isError()) {
//noinspection ConstantConditions
promise.completeExceptionally(ctx.getException());
return;
}
p2.addListener(ctx1 -> {
if (ctx1.isError()) {
//noinspection ConstantConditions
promise.completeExceptionally(ctx1.getException());
return;
}
Map.Entry<K, V> result = new AbstractMap.SimpleEntry<>(ctx.getResult(), ctx1.getResult());
promise.complete(result);
});
});
return promise;
} }
public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) { public static <K, V> @NotNull Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
@@ -42,29 +26,7 @@ public class Promises {
} }
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, @Nullable BiConsumer<K, Throwable> exceptionHandler, PromiseFactory factory) { public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, @Nullable BiConsumer<K, Throwable> exceptionHandler, PromiseFactory factory) {
if (promises.isEmpty()) return factory.resolve(Collections.emptyMap()); return factory.combine(promises, exceptionHandler).timeout(timeout);
Map<K, V> map = new HashMap<>();
Promise<Map<K, V>> promise = factory.unresolved();
for (Map.Entry<K, Promise<V>> entry : promises.entrySet()) {
entry.getValue().addListener((ctx) -> {
synchronized (map) {
if (ctx.isError()) {
if (exceptionHandler == null) {
//noinspection ConstantConditions
promise.completeExceptionally(ctx.getException());
} else {
exceptionHandler.accept(entry.getKey(), ctx.getException());
map.put(entry.getKey(), null);
}
} else {
map.put(entry.getKey(), ctx.getResult());
}
if (map.size() == promises.size()) promise.complete(map);
}
});
}
return promise.timeout(timeout);
} }
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) { public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
@@ -80,16 +42,7 @@ public class Promises {
} }
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) { public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, boolean strict, PromiseFactory factory) {
AtomicInteger index = new AtomicInteger(); return factory.combine(promises, strict ? null : (_v) -> {}).timeout(timeout);
return combine(
promises.stream().collect(Collectors.toMap(s -> index.getAndIncrement(), v -> v)),
timeout, strict, factory
).thenApplySync(v ->
v.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(Collectors.toList())
);
} }
public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, PromiseFactory factory) { public static <V> @NotNull Promise<List<V>> combine(@NotNull List<Promise<V>> promises, long timeout, PromiseFactory factory) {
@@ -101,20 +54,7 @@ public class Promises {
} }
public static @NotNull Promise<Void> all(@NotNull List<Promise<?>> promises, PromiseFactory factory) { public static @NotNull Promise<Void> all(@NotNull List<Promise<?>> promises, PromiseFactory factory) {
if (promises.isEmpty()) return factory.resolve(null); return factory.all(promises);
Promise<Void> promise = factory.unresolved();
for (Promise<?> p : promises) {
p.addListener((ctx) -> {
if (ctx.isError()) {
//noinspection ConstantConditions
promise.completeExceptionally(ctx.getException());
} else if (promises.stream().allMatch(Promise::isCompleted)) {
promise.complete(null);
}
});
}
return promise;
} }
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, boolean strict, PromiseFactory factory) { public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Collection<K> keys, @NotNull ExceptionalFunction<K, V> mapper, long timeout, boolean strict, PromiseFactory factory) {
@@ -136,17 +76,7 @@ public class Promises {
} }
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p, PromiseFactory factory) { public static @NotNull Promise<Void> erase(@NotNull Promise<?> p, PromiseFactory factory) {
Promise<Void> promise = factory.unresolved(); return factory.erase(p);
p.addListener(ctx -> {
if (ctx.isError()) {
//noinspection ConstantConditions
promise.completeExceptionally(ctx.getException());
} else {
promise.complete(null);
}
});
return promise;
} }
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p) { public static @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
@@ -154,16 +84,7 @@ public class Promises {
} }
public static <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future, PromiseFactory factory) { public static <T> @NotNull Promise<T> wrap(@NotNull CompletableFuture<T> future, PromiseFactory factory) {
Promise<T> promise = factory.unresolved(); return factory.wrap(future);
future.whenComplete((result, e) -> {
if (e != null) {
promise.completeExceptionally(e);
} else {
promise.complete(result);
}
});
return promise;
} }
} }

View File

@@ -0,0 +1,48 @@
package dev.tommyjs.futur;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.executor.SinglePoolExecutor;
import dev.tommyjs.futur.impl.SimplePromiseFactory;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public final class PromiseTests {
private final Logger logger = LoggerFactory.getLogger(PromiseTests.class);
private final PromiseExecutor<ScheduledFuture<?>> executor = SinglePoolExecutor.create(1);
private final PromiseFactory pfac = new SimplePromiseFactory<>(executor, logger);
@Test
void testMono() {
Exception value = new Exception("Test Error");
var error = pfac.wrap(Mono.error(value));
assert Objects.requireNonNull(error.getCompletion()).isError();
assert error.getCompletion().getException() == value;
var resolved = pfac.wrap(Mono.just(value));
assert !Objects.requireNonNull(resolved.getCompletion()).isError();
assert resolved.getCompletion().getResult() == value;
}
@Test
void testErrorCancellation() throws InterruptedException {
var finish = new AtomicBoolean();
pfac.start()
.thenRunDelayedAsync(() -> finish.set(true), 50, TimeUnit.MILLISECONDS)
.thenRunAsync(() -> {})
.cancel();
Thread.sleep(100L);
assert !finish.get();
}
}

View File

@@ -1,42 +0,0 @@
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

View File

@@ -1,5 +0,0 @@
dependencies {
compileOnly(project(":futur-api"))
implementation("org.jetbrains:annotations:24.1.0")
implementation("io.projectreactor:reactor-core:3.6.4")
}

View File

@@ -1,16 +0,0 @@
package dev.tommyjs.futur.reactor;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
public class ReactorTransformer {
public static <T> @NotNull Promise<T> wrapMono(@NotNull Mono<T> mono, PromiseFactory factory) {
Promise<T> promise = factory.unresolved();
mono.subscribe(promise::complete, promise::completeExceptionally);
return promise;
}
}

View File

@@ -0,0 +1,6 @@
apply plugin: 'java-library'
dependencies {
api project(':futur-api')
testImplementation project(':futur-api')
}

View File

@@ -0,0 +1,93 @@
package dev.tommyjs.futur.lazy;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseCompletion;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public final class PromiseUtil {
private static PromiseFactory pfac = StaticPromiseFactory.INSTANCE;
public static @NotNull Logger getLogger() {
return pfac.getLogger();
}
public static void setPromiseFactory(PromiseFactory pfac) {
PromiseUtil.pfac = pfac;
}
public static @NotNull <T> Promise<T> unresolved() {
return pfac.unresolved();
}
public static @NotNull <K, V> Promise<Map.Entry<K, V>> combine(@NotNull Promise<K> p1, @NotNull Promise<V> p2) {
return pfac.combine(p1, p2);
}
public static @NotNull <K, V> Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, @Nullable BiConsumer<K, Throwable> exceptionHandler) {
return pfac.combine(promises, exceptionHandler);
}
public static @NotNull <K, V> Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises) {
return pfac.combine(promises);
}
public static @NotNull <V> Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises, @Nullable Consumer<Throwable> exceptionHandler) {
return pfac.combine(promises, exceptionHandler);
}
public static @NotNull <V> Promise<List<V>> combine(@NotNull Iterable<Promise<V>> promises) {
return pfac.combine(promises);
}
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Iterable<Promise<?>> promiseIterable) {
return pfac.allSettled(promiseIterable);
}
public static @NotNull Promise<List<PromiseCompletion<?>>> allSettled(@NotNull Promise<?>... promiseArray) {
return pfac.allSettled(promiseArray);
}
public static @NotNull Promise<Void> all(@NotNull Iterable<Promise<?>> promiseIterable) {
return pfac.all(promiseIterable);
}
public static @NotNull Promise<Void> all(@NotNull Promise<?>... promiseArray) {
return pfac.all(promiseArray);
}
public static @NotNull <T> Promise<T> wrap(@NotNull CompletableFuture<T> future) {
return pfac.wrap(future);
}
public static @NotNull <T> Promise<T> wrap(@NotNull Mono<T> mono) {
return pfac.wrap(mono);
}
public static @NotNull <T> Promise<T> resolve(T value) {
return pfac.resolve(value);
}
public static @NotNull <T> Promise<T> error(@NotNull Throwable error) {
return pfac.error(error);
}
public static @NotNull Promise<Void> erase(@NotNull Promise<?> p) {
return pfac.erase(p);
}
public static @NotNull Promise<Void> start() {
return pfac.start();
}
}

View File

@@ -0,0 +1,39 @@
package dev.tommyjs.futur.lazy;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.executor.SinglePoolExecutor;
import dev.tommyjs.futur.impl.SimplePromise;
import dev.tommyjs.futur.promise.AbstractPromiseFactory;
import dev.tommyjs.futur.promise.Promise;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledFuture;
public final class StaticPromiseFactory extends AbstractPromiseFactory<ScheduledFuture<?>> {
public final static StaticPromiseFactory INSTANCE = new StaticPromiseFactory();
private final static @NotNull SinglePoolExecutor EXECUTOR = SinglePoolExecutor.create(1);
private final static @NotNull Logger LOGGER = LoggerFactory.getLogger(StaticPromiseFactory.class);
private StaticPromiseFactory() {
}
@Override
public @NotNull <T> Promise<T> unresolved() {
return new SimplePromise<>(this);
}
@Override
public @NotNull Logger getLogger() {
return LOGGER;
}
@Override
public @NotNull PromiseExecutor<ScheduledFuture<?>> getExecutor() {
return EXECUTOR;
}
}

View File

@@ -1,4 +1,4 @@
rootProject.name = 'futur' rootProject.name = 'futur'
include 'futur-api' include 'futur-api'
include 'futur-reactor' include 'futur-static'