package dev.tommyjs.futur.promise; import dev.tommyjs.futur.function.ExceptionalConsumer; import dev.tommyjs.futur.function.ExceptionalFunction; import dev.tommyjs.futur.function.ExceptionalRunnable; import dev.tommyjs.futur.function.ExceptionalSupplier; import dev.tommyjs.futur.scheduler.Schedulers; import dev.tommyjs.futur.trace.ExecutorTrace; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; public class Promise { private static final String PACKAGE; private static final Logger LOGGER = LoggerFactory.getLogger(Promise.class); static { String[] packageElements = Promise.class.getPackageName().split("\\."); int i = 0; StringBuilder packageBuilder = new StringBuilder(); while (i < 3) { packageBuilder.append(packageElements[i]); i++; } PACKAGE = packageBuilder.toString(); } private final Collection> listeners; private final StackTraceElement[] stackTrace; private @Nullable PromiseCompletion completion; public Promise() { this.listeners = new ConcurrentLinkedQueue<>(); this.completion = null; this.stackTrace = Arrays.stream(Thread.currentThread().getStackTrace()) .filter(v -> !v.getClassName().startsWith(PACKAGE)) .toArray(StackTraceElement[]::new); } public T join(long interval, long timeout) throws TimeoutException { long start = System.currentTimeMillis(); while (!isCompleted()) { if (System.currentTimeMillis() > start + timeout) throw new TimeoutException("Promise timed out after " + timeout + "ms"); try { Thread.sleep(interval); } catch (InterruptedException e) { throw new RuntimeException(e); } } PromiseCompletion completion = getCompletion(); if (completion == null) { throw new IllegalStateException(); } if (completion.isError()) { throw new RuntimeException(completion.getException()); } return completion.getResult(); } public @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task) { return thenApplySync(result -> { task.run(); return null; }, Schedulers.getTrace(task)); } public @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedSync(result -> { task.run(); return null; }, delay, unit, Schedulers.getTrace(task)); } public @NotNull Promise thenAcceptSync(@NotNull ExceptionalConsumer task) { return thenApplySync(result -> { task.accept(result); return null; }, Schedulers.getTrace(task)); } public @NotNull Promise thenAcceptDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedSync(result -> { task.accept(result); return null; }, delay, unit, Schedulers.getTrace(task)); } public @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task) { return thenApplySync(result -> task.get(), Schedulers.getTrace(task)); } public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedSync(result -> task.get(), delay, unit, Schedulers.getTrace(task)); } public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task, @NotNull ExecutorTrace trace) { Promise promise = new Promise<>(); addListener(ctx -> { if (ctx.isError()) { //noinspection ConstantConditions promise.completeExceptionally(ctx.getException()); return; } Runnable runnable = createRunnable(ctx, promise, task); Schedulers.runSync(runnable, trace); }); return promise; } public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { return thenApplySync(task, Schedulers.getTrace(task)); } public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { Promise promise = new Promise<>(); addListener(ctx -> { if (ctx.isError()) { //noinspection ConstantConditions promise.completeExceptionally(ctx.getException()); return; } Runnable runnable = createRunnable(ctx, promise, task); Schedulers.runDelayedSync(runnable, delay, unit, trace); }); return promise; } public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedSync(task, delay, unit, Schedulers.getTrace(task)); } public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { Promise promise = new Promise<>(); thenApplySync(task, Schedulers.getTrace(task)).thenAcceptAsync(nestedPromise -> { nestedPromise.addListener(ctx1 -> { if (ctx1.isError()) { //noinspection ConstantConditions promise.completeExceptionally(ctx1.getException()); return; } promise.complete(ctx1.getResult()); }); }).addListener(ctx2 -> { if (ctx2.isError()) { //noinspection ConstantConditions promise.completeExceptionally(ctx2.getException()); } }); return promise; } public @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task) { return thenApplyAsync(result -> { task.run(); return null; }, Schedulers.getTrace(task)); } public @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedAsync(result -> { task.run(); return null; }, delay, unit, Schedulers.getTrace(task)); } public @NotNull Promise thenAcceptAsync(@NotNull ExceptionalConsumer task) { return thenApplyAsync(result -> { task.accept(result); return null; }, Schedulers.getTrace(task)); } public @NotNull Promise thenAcceptDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedAsync(result -> { task.accept(result); return null; }, delay, unit, Schedulers.getTrace(task)); } public @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task) { return thenApplyAsync(result -> task.get(), Schedulers.getTrace(task)); } public @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedAsync(result -> task.get(), delay, unit, Schedulers.getTrace(task)); } public @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference) { return thenApplyAsync((result) -> { reference.set(result); return result; }); } public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task, @NotNull ExecutorTrace trace) { Promise promise = new Promise<>(); addListener(ctx -> { createRunnable(ctx, promise, task).run(); }); return promise; } public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { return thenApplyAsync(task, Schedulers.getTrace(task)); } public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { Promise promise = new Promise<>(); addListener(ctx -> { Runnable runnable = createRunnable(ctx, promise, task); Schedulers.runDelayedAsync(runnable, delay, unit, trace); }); return promise; } public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedAsync(task, delay, unit, Schedulers.getTrace(task)); } public @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task) { return this.thenComposeAsync(task); } public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { Promise promise = new Promise<>(); thenApplyAsync(task, Schedulers.getTrace(task)).thenAcceptAsync(nestedPromise -> { nestedPromise.addListener(ctx1 -> { if (ctx1.isError()) { //noinspection ConstantConditions promise.completeExceptionally(ctx1.getException()); return; } promise.complete(ctx1.getResult()); }); }).addListener(ctx2 -> { if (ctx2.isError()) { //noinspection ConstantConditions promise.completeExceptionally(ctx2.getException()); } }); return promise; } private @NotNull Runnable createRunnable(@NotNull PromiseCompletion ctx, @NotNull Promise promise, @NotNull ExceptionalFunction task) { return () -> { if (ctx.isError()) { //noinspection ConstantConditions promise.completeExceptionally(ctx.getException()); return; } try { V result = task.apply(ctx.getResult()); promise.complete(result); } catch (Exception e) { promise.completeExceptionally(e, true); } }; } public @NotNull Promise logExceptions() { return addListener(ctx -> { if (ctx.isError()) { LOGGER.error("Exception caught in promise pipeline", ctx.getException()); } }); } public @NotNull Promise addListener(@NotNull PromiseListener listener) { if (isCompleted()) { Schedulers.runAsync(() -> { try { listener.handle(getCompletion()); } catch (Exception e) { LOGGER.error("Exception caught in promise listener", e); } }, Schedulers.getTrace(listener)); } else { getListeners().add(listener); } return this; } public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { Schedulers.runDelayedAsync(() -> { if (!isCompleted()) { completeExceptionally(new TimeoutException("Promise timed out after " + time + " " + unit), true); } }, time, unit); return this; } public @NotNull Promise timeout(long ms) { return timeout(ms, TimeUnit.MILLISECONDS); } protected void handleCompletion(@NotNull PromiseCompletion ctx) { if (this.isCompleted()) return; setCompletion(ctx); Schedulers.runAsync(() -> { for (PromiseListener listener : getListeners()) { if (!ctx.isActive()) return; try { listener.handle(ctx); } catch (Exception e) { LOGGER.error("Exception caught in promise listener", e); } } }); } public void complete(@Nullable T result) { handleCompletion(new PromiseCompletion<>(result)); } public void completeExceptionally(@NotNull Throwable result, boolean appendStacktrace) { if (appendStacktrace && this.stackTrace != null) { result.setStackTrace(Stream.of(result.getStackTrace(), this.stackTrace) .flatMap(Stream::of) .filter(v -> !v.getClassName().startsWith(PACKAGE)) .filter(v -> !v.getClassName().startsWith("java.lang.Thread")) .filter(v -> !v.getClassName().startsWith("java.util.concurrent")) .toArray(StackTraceElement[]::new)); } handleCompletion(new PromiseCompletion<>(result)); } public void completeExceptionally(@NotNull Throwable result) { completeExceptionally(result, false); } public boolean isCompleted() { return getCompletion() != null; } protected Collection> getListeners() { return listeners; } public @Nullable PromiseCompletion getCompletion() { return completion; } protected void setCompletion(@NotNull PromiseCompletion completion) { this.completion = completion; } public static @NotNull Promise resolve(T value) { Promise promise = new Promise<>(); promise.setCompletion(new PromiseCompletion<>(value)); return promise; } public static @NotNull Promise error(Throwable error) { Promise promise = new Promise<>(); promise.completeExceptionally(error); return promise; } public static @NotNull Promise start() { return Promise.resolve(null); } @Deprecated // use resolve() public static @NotNull Promise start(T start) { Promise promise = new Promise<>(); promise.complete(start); return promise; } }