package dev.tommyjs.futur.promise; import dev.tommyjs.futur.executor.PromiseExecutor; import dev.tommyjs.futur.executor.PromiseScheduler; import dev.tommyjs.futur.function.*; import dev.tommyjs.futur.util.PromiseUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; public abstract class AbstractPromise implements Promise { public abstract @NotNull AbstractPromiseFactory getFactory(); protected abstract @NotNull Promise addAnyListener(@NotNull PromiseListener listener); protected @NotNull Logger getLogger() { return getFactory().getLogger(); } protected void callListener(@NotNull PromiseListener listener, @NotNull PromiseCompletion cmp) { if (listener instanceof AsyncPromiseListener) { callListenerAsync(listener, cmp); } else { callListenerNow(listener, cmp); } } protected V supplySafe(@NotNull ExceptionalSupplier supplier, @NotNull Function handler) { try { return supplier.get(); } catch (Error error) { // rethrow unrecoverable errors throw error; } catch (Throwable e) { return handler.apply(e); } } protected void runSafe(@NotNull ExceptionalRunnable runnable, @NotNull Consumer handler) { try { runnable.run(); } catch (Error error) { handler.accept(error); // rethrow unrecoverable errors throw error; } catch (Throwable e) { handler.accept(e); } } protected void runCompleter(@NotNull CompletablePromise promise, @NotNull ExceptionalRunnable completer) { runSafe(completer, promise::completeExceptionally); } protected V useCompletion(Supplier unresolved, Function completed, Function failed) { PromiseCompletion completion = getCompletion(); if (completion == null) { return unresolved.get(); } else if (completion.isSuccess()) { return completed.apply(completion.getResult()); } else { return failed.apply(completion.getException()); } } protected @NotNull Runnable createCompleter(T result, @NotNull CompletablePromise promise, @NotNull ExceptionalFunction completer) { return () -> { if (!promise.isCompleted()) { runCompleter(promise, () -> promise.complete(completer.apply(result))); } }; } protected @NotNull CompletablePromise createLinked() { CompletablePromise promise = getFactory().unresolved(); PromiseUtil.propagateCancel(promise, this); return promise; } protected void callListenerAsync(PromiseListener listener, PromiseCompletion res) { try { getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, res)); } catch (RejectedExecutionException ignored) { } catch (Exception e) { getLogger().warn("Exception caught while running promise listener", e); } } protected void callListenerNow(PromiseListener listener, PromiseCompletion res) { runSafe(() -> listener.handle(res), e -> getLogger().error("Exception caught in promise listener", e)); } protected void callListenerAsyncLastResort(PromiseListener listener, PromiseCompletion completion) { try { getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, completion)); } catch (Throwable ignored) { } } protected T joinCompletionChecked() throws ExecutionException { PromiseCompletion completion = getCompletion(); if (completion == null) { throw new IllegalStateException("Promise is not completed yet."); } return completion.getChecked(); } protected T joinCompletionUnchecked() { PromiseCompletion completion = getCompletion(); if (completion == null) { throw new IllegalStateException("Promise is not completed yet."); } return completion.get(); } @Override public @NotNull Promise fork() { if (isCompleted()) { return this; } CompletablePromise fork = getFactory().unresolved(); PromiseUtil.propagateCompletion(this, fork); return fork; } @Override public @NotNull Promise thenRun(@NotNull ExceptionalRunnable task) { return thenApply(v -> { task.run(); return null; }); } @Override public @NotNull Promise thenConsume(@NotNull ExceptionalConsumer task) { return thenApply(result -> { task.accept(result); return null; }); } @Override public @NotNull Promise thenSupply(@NotNull ExceptionalSupplier task) { return thenApply(v -> task.get()); } @Override public @NotNull Promise thenApply(@NotNull ExceptionalFunction task) { return useCompletion(() -> { CompletablePromise promise = createLinked(); addDirectListener(res -> createCompleter(res, promise, task).run(), promise::completeExceptionally); return promise; }, result -> supplySafe(() -> getFactory().resolve(task.apply(result)), getFactory()::error), getFactory()::error); } @Override public @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task) { return useCompletion(() -> { CompletablePromise promise = createLinked(); thenApply(task).addDirectListener(result -> { if (result == null) { promise.complete(null); } else { PromiseUtil.propagateCompletion(result, promise); PromiseUtil.propagateCancel(promise, result); } }, promise::completeExceptionally); return promise; }, result -> supplySafe(() -> { Promise nested = task.apply(result); if (nested == null) { return getFactory().resolve(null); } else if (nested.isCompleted()) { return nested; } else { CompletablePromise promise = createLinked(); PromiseUtil.propagateCompletion(nested, promise); PromiseUtil.propagateCancel(promise, nested); return promise; } }, getFactory()::error), getFactory()::error); } private @NotNull Promise thenApply(@NotNull ExceptionalFunction task, @NotNull PromiseExecutor executor) { CompletablePromise promise = createLinked(); addDirectListener(res -> runCompleter(promise, () -> { Runnable completer = createCompleter(res, promise, task); execute(promise, completer, executor); }), promise::completeExceptionally); return promise; } private @NotNull Promise thenApplyDelayed(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull PromiseExecutor executor) { CompletablePromise promise = createLinked(); addDirectListener(res -> runCompleter(promise, () -> { Runnable completer = createCompleter(res, promise, task); PromiseScheduler scheduler = executor.scheduler(); if (scheduler == null) { schedule(promise, () -> runCompleter(promise, () -> execute(promise, completer, executor)), delay, unit, PromiseScheduler.getDefault()); } else { schedule(promise, completer, delay, unit, scheduler); } }), promise::completeExceptionally); return promise; } private void execute(@NotNull Promise promise, @NotNull Runnable task, @NotNull PromiseExecutor executor) throws Exception { F future = executor.run(task); promise.addDirectListener(v -> executor.cancel(future)); } private void schedule(@NotNull Promise promise, @NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull PromiseScheduler scheduler) throws Exception { F future = scheduler.schedule(task, delay, unit); promise.addDirectListener(v -> scheduler.cancel(future)); } private @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task, @NotNull PromiseExecutor executor) { CompletablePromise promise = createLinked(); thenApply(task, executor).addDirectListener(nestedPromise -> { if (nestedPromise == null) { promise.complete(null); } else { PromiseUtil.propagateCompletion(nestedPromise, promise); PromiseUtil.propagateCancel(promise, nestedPromise); } }, promise::completeExceptionally); return promise; } @Override public @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task) { return thenApply(FunctionAdapter.adapt(task), getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenConsumeSync(@NotNull ExceptionalConsumer task) { return thenApply(FunctionAdapter.adapt(task), getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task) { return thenApply(FunctionAdapter.adapt(task), getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { return thenApply(task, getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(task, delay, unit, getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { return thenCompose(task, getFactory().getSyncExecutor()); } @Override public @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task) { return thenApply(FunctionAdapter.adapt(task), getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenConsumeAsync(@NotNull ExceptionalConsumer task) { return thenApply(FunctionAdapter.adapt(task), getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task) { return thenApply(FunctionAdapter.adapt(task), getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { return thenApply(task, getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(task, delay, unit, getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { return thenCompose(task, getFactory().getAsyncExecutor()); } @Override public @NotNull Promise thenRunVirtual(@NotNull ExceptionalRunnable task) { return thenApply(FunctionAdapter.adapt(task), getFactory().getVirtualExecutor()); } @Override public @NotNull Promise thenRunDelayedVirtual(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor()); } @Override public @NotNull Promise thenConsumeVirtual(@NotNull ExceptionalConsumer task) { return thenApply(FunctionAdapter.adapt(task), getFactory().getVirtualExecutor()); } @Override public @NotNull Promise thenConsumeDelayedVirtual(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor()); } @Override public @NotNull Promise thenSupplyVirtual(@NotNull ExceptionalSupplier task) { return thenApply(FunctionAdapter.adapt(task), getFactory().getVirtualExecutor()); } @Override public @NotNull Promise thenSupplyDelayedVirtual(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(FunctionAdapter.adapt(task), delay, unit, getFactory().getVirtualExecutor()); } @Override public @NotNull Promise thenApplyVirtual(@NotNull ExceptionalFunction task) { return thenApply(task, getFactory().getVirtualExecutor()); } @Override public @NotNull Promise thenApplyDelayedVirtual(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayed(task, delay, unit, getFactory().getVirtualExecutor()); } @Override public @NotNull Promise thenComposeVirtual(@NotNull ExceptionalFunction> task) { return thenCompose(task, getFactory().getVirtualExecutor()); } @Override public @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference) { return thenApply(result -> { reference.set(result); return result; }); } @Override public @NotNull Promise erase() { return thenSupply(() -> null); } @Override public @NotNull Promise addAsyncListener(@NotNull AsyncPromiseListener listener) { return addAnyListener(listener); } @Override public @NotNull Promise addAsyncListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { return addAsyncListener(res -> { if (res.isSuccess()) { if (successListener != null) { successListener.accept(res.getResult()); } } else { if (errorListener != null) { errorListener.accept(res.getException()); } } }); } @Override public @NotNull Promise addDirectListener(@NotNull PromiseListener listener) { return addAnyListener(listener); } @Override public @NotNull Promise addDirectListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { return addDirectListener(res -> { if (res.isSuccess()) { if (successListener != null) { successListener.accept(res.getResult()); } } else { if (errorListener != null) { errorListener.accept(res.getException()); } } }); } @Override public @NotNull Promise onSuccess(@NotNull Consumer listener) { return addAsyncListener(listener, null); } @Override public @NotNull Promise onError(@NotNull Consumer listener) { return addAsyncListener(null, listener); } @Override public @NotNull Promise logExceptions(@NotNull String message) { Exception wrapper = new DeferredExecutionException(); return onError(e -> { if (e instanceof CancellationException && e.getMessage() == null && e.getCause() == null) { // ignore cancellation exceptions without a message or cause return; } getLogger().error(message, wrapper.initCause(e)); }); } @Override public @NotNull Promise onError(@NotNull Class type, @NotNull Consumer listener) { return onError(e -> { if (type.isAssignableFrom(e.getClass())) { //noinspection unchecked listener.accept((E) e); } }); } @Override public @NotNull Promise onCancel(@NotNull Consumer listener) { return onError(CancellationException.class, listener); } @Override public @NotNull Promise orDefault(@Nullable T defaultValue) { return orDefault(v -> defaultValue); } @Override public @NotNull Promise orDefault(@NotNull ExceptionalSupplier supplier) { return orDefault(v -> supplier.get()); } @Override public @NotNull Promise orDefault(@NotNull ExceptionalFunction function) { PromiseFactory factory = getFactory(); return useCompletion(() -> { CompletablePromise promise = createLinked(); addDirectListener(promise::complete, e -> runCompleter(promise, () -> promise.complete(function.apply(e)))); return promise; }, factory::resolve, e -> supplySafe(() -> factory.resolve(function.apply(e)), factory::error)); } private static class DeferredExecutionException extends ExecutionException { } }