package dev.tommyjs.futur.promise; import dev.tommyjs.futur.function.ExceptionalConsumer; import dev.tommyjs.futur.function.ExceptionalFunction; import dev.tommyjs.futur.function.ExceptionalRunnable; import dev.tommyjs.futur.function.ExceptionalSupplier; import dev.tommyjs.futur.util.PromiseUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.function.Consumer; @SuppressWarnings({"FieldMayBeFinal", "unchecked"}) public abstract class AbstractPromise implements CompletablePromise { private static final VarHandle COMPLETION_HANDLE; private static final VarHandle LISTENERS_HANDLE; static { try { MethodHandles.Lookup lookup = MethodHandles.lookup(); COMPLETION_HANDLE = lookup.findVarHandle(AbstractPromise.class, "completion", PromiseCompletion.class); LISTENERS_HANDLE = lookup.findVarHandle(AbstractPromise.class, "listeners", Collection.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } private final Sync sync; private volatile Collection> listeners; private volatile PromiseCompletion completion; public AbstractPromise() { this.sync = new Sync(); this.listeners = Collections.EMPTY_LIST; this.completion = null; } public abstract @NotNull AbstractPromiseFactory getFactory(); private void runCompleter(@NotNull CompletablePromise promise, @NotNull ExceptionalRunnable completer) { try { completer.run(); } catch (Error e) { promise.completeExceptionally(e); throw e; } catch (Throwable e) { promise.completeExceptionally(e); } } private @NotNull Runnable createCompleter( T result, @NotNull CompletablePromise promise, @NotNull ExceptionalFunction completer ) { return () -> { if (!promise.isCompleted()) { runCompleter(promise, () -> promise.complete(completer.apply(result))); } }; } protected @NotNull Logger getLogger() { return getFactory().getLogger(); } @Override public T get() throws InterruptedException, ExecutionException { sync.acquireSharedInterruptibly(1); return joinCompletion(); } @Override public T get(long time, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { boolean success = sync.tryAcquireSharedNanos(1, unit.toNanos(time)); if (!success) { throw new TimeoutException("Promise stopped waiting after " + time + " " + unit); } return joinCompletion(); } @Override public T await() { try { sync.acquireSharedInterruptibly(1); } catch (InterruptedException e) { throw new RuntimeException(e); } PromiseCompletion completion = Objects.requireNonNull(getCompletion()); if (completion.isSuccess()) return completion.getResult(); throw new CompletionException(completion.getException()); } private T joinCompletion() throws ExecutionException { PromiseCompletion completion = Objects.requireNonNull(getCompletion()); if (completion.isSuccess()) return completion.getResult(); throw new ExecutionException(completion.getException()); } @Override public @NotNull Promise fork() { CompletablePromise fork = getFactory().unresolved(); PromiseUtil.propagateCompletion(this, fork); return fork; } @Override public @NotNull Promise thenRun(@NotNull ExceptionalRunnable task) { return thenApply(_ -> { task.run(); return null; }); } @Override public @NotNull Promise thenConsume(@NotNull ExceptionalConsumer task) { return thenApply(result -> { task.accept(result); return null; }); } @Override public @NotNull Promise thenSupply(@NotNull ExceptionalSupplier task) { return thenApply(_ -> task.get()); } @Override public @NotNull Promise thenApply(@NotNull ExceptionalFunction task) { CompletablePromise promise = getFactory().unresolved(); addDirectListener( res -> createCompleter(res, promise, task).run(), promise::completeExceptionally ); PromiseUtil.propagateCancel(promise, this); return promise; } @Override public @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task) { CompletablePromise promise = getFactory().unresolved(); thenApply(task).addDirectListener( nestedPromise -> { if (nestedPromise == null) { promise.complete(null); } else { PromiseUtil.propagateCompletion(nestedPromise, promise); PromiseUtil.propagateCancel(promise, nestedPromise); } }, promise::completeExceptionally ); PromiseUtil.propagateCancel(promise, this); return promise; } @Override public @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task) { return thenApplySync(_ -> { task.run(); return null; }); } @Override public @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedSync(_ -> { task.run(); return null; }, delay, unit); } @Override public @NotNull Promise thenConsumeSync(@NotNull ExceptionalConsumer task) { return thenApplySync(result -> { task.accept(result); return null; }); } @Override public @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedSync(result -> { task.accept(result); return null; }, delay, unit); } @Override public @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task) { return thenApplySync(_ -> task.get()); } @Override public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedSync(_ -> task.get(), delay, unit); } @Override public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { CompletablePromise promise = getFactory().unresolved(); addDirectListener( res -> runCompleter(promise, () -> { Runnable runnable = createCompleter(res, promise, task); FS future = getFactory().getSyncExecutor().run(runnable); promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future)); }), promise::completeExceptionally ); PromiseUtil.propagateCancel(promise, this); return promise; } @Override public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { CompletablePromise promise = getFactory().unresolved(); addDirectListener( res -> runCompleter(promise, () -> { Runnable runnable = createCompleter(res, promise, task); FS future = getFactory().getSyncExecutor().run(runnable, delay, unit); promise.addDirectListener(_ -> getFactory().getSyncExecutor().cancel(future)); }), promise::completeExceptionally ); PromiseUtil.propagateCancel(promise, this); return promise; } @Override public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { CompletablePromise promise = getFactory().unresolved(); thenApplySync(task).addDirectListener( nestedPromise -> { if (nestedPromise == null) { promise.complete(null); } else { PromiseUtil.propagateCompletion(nestedPromise, promise); PromiseUtil.propagateCancel(promise, nestedPromise); } }, promise::completeExceptionally ); PromiseUtil.propagateCancel(promise, this); return promise; } @Override public @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task) { return thenApplyAsync(_ -> { task.run(); return null; }); } @Override public @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedAsync(_ -> { task.run(); return null; }, delay, unit); } @Override public @NotNull Promise thenConsumeAsync(@NotNull ExceptionalConsumer task) { return thenApplyAsync(result -> { task.accept(result); return null; }); } @Override public @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedAsync(result -> { task.accept(result); return null; }, delay, unit); } @Override public @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task) { return thenApplyAsync(_ -> task.get()); } @Override public @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedAsync(_ -> task.get(), delay, unit); } @Override public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { CompletablePromise promise = getFactory().unresolved(); addDirectListener( (res) -> runCompleter(promise, () -> { Runnable runnable = createCompleter(res, promise, task); FA future = getFactory().getAsyncExecutor().run(runnable); promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future)); }), promise::completeExceptionally ); PromiseUtil.propagateCancel(promise, this); return promise; } @Override public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { CompletablePromise promise = getFactory().unresolved(); addDirectListener( res -> runCompleter(promise, () -> { Runnable runnable = createCompleter(res, promise, task); FA future = getFactory().getAsyncExecutor().run(runnable, delay, unit); promise.addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future)); }), promise::completeExceptionally ); PromiseUtil.propagateCancel(promise, this); return promise; } @Override public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { CompletablePromise promise = getFactory().unresolved(); thenApplyAsync(task).addDirectListener( nestedPromise -> { if (nestedPromise == null) { promise.complete(null); } else { PromiseUtil.propagateCompletion(nestedPromise, promise); PromiseUtil.propagateCancel(promise, nestedPromise); } }, promise::completeExceptionally ); PromiseUtil.propagateCancel(promise, this); return promise; } @Override public @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference) { return thenApplyAsync(result -> { reference.set(result); return result; }); } @Override public @NotNull Promise erase() { return thenSupply(() -> null); } @Override public @NotNull Promise addAsyncListener(@NotNull AsyncPromiseListener listener) { return addAnyListener(listener); } @Override public @NotNull Promise addAsyncListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { return addAsyncListener(res -> { if (res.isSuccess()) { if (successListener != null) successListener.accept(res.getResult()); } else { if (errorListener != null) errorListener.accept(res.getException()); } }); } @Override public @NotNull Promise addDirectListener(@NotNull PromiseListener listener) { return addAnyListener(listener); } @Override public @NotNull Promise addDirectListener(@Nullable Consumer successListener, @Nullable Consumer errorListener) { return addDirectListener(res -> { if (res.isSuccess()) { if (successListener != null) successListener.accept(res.getResult()); } else { if (errorListener != null) errorListener.accept(res.getException()); } }); } private @NotNull Promise addAnyListener(PromiseListener listener) { Collection> prev = listeners, next = null; for (boolean haveNext = false;;) { if (!haveNext) { next = prev == Collections.EMPTY_LIST ? new ConcurrentLinkedQueue<>() : prev; if (next != null) next.add(listener); } if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next)) break; haveNext = (prev == (prev = listeners)); } if (next == null) { if (listener instanceof AsyncPromiseListener) { callListenerAsync(listener, Objects.requireNonNull(getCompletion())); } else { callListenerNow(listener, Objects.requireNonNull(getCompletion())); } } return this; } private void callListenerAsync(PromiseListener listener, PromiseCompletion res) { try { getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, res)); } catch (RejectedExecutionException ignored) { } catch (Exception e) { getLogger().warn("Exception caught while running promise listener", e); } } private void callListenerNow(PromiseListener listener, PromiseCompletion res) { try { listener.handle(res); } catch (Error e) { getLogger().error("Error caught in promise listener", e); throw e; } catch (Throwable e) { getLogger().error("Exception caught in promise listener", e); } } @Override public @NotNull Promise onSuccess(@NotNull Consumer listener) { return addAsyncListener(listener, null); } @Override public @NotNull Promise onError(@NotNull Consumer listener) { return addAsyncListener(null, listener); } @Override public @NotNull Promise logExceptions(@NotNull String message) { Exception wrapper = new DeferredExecutionException(); return onError(e -> getLogger().error(message, wrapper.initCause(e))); } @Override public @NotNull Promise onError(@NotNull Class type, @NotNull Consumer listener) { return onError(e -> { if (type.isAssignableFrom(e.getClass())) { //noinspection unchecked listener.accept((E) e); } }); } @Override public @NotNull Promise onCancel(@NotNull Consumer listener) { return onError(CancellationException.class, listener); } @Override public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { Exception e = new CancellationException("Promise timed out after " + time + " " + unit.toString().toLowerCase()); return completeExceptionallyDelayed(e, time, unit); } @Override public @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit) { Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit.toString().toLowerCase()); return completeExceptionallyDelayed(e, time, unit); } private Promise completeExceptionallyDelayed(Throwable e, long delay, TimeUnit unit) { runCompleter(this, () -> { FA future = getFactory().getAsyncExecutor().run(() -> completeExceptionally(e), delay, unit); addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future)); }); return this; } private void handleCompletion(@NotNull PromiseCompletion cmp) { if (!COMPLETION_HANDLE.compareAndSet(this, null, cmp)) return; sync.releaseShared(1); Iterator> iter = ((Iterable>) LISTENERS_HANDLE.getAndSet(this, null)).iterator(); try { while (iter.hasNext()) { PromiseListener listener = iter.next(); if (listener instanceof AsyncPromiseListener) { callListenerAsync(listener, cmp); } else { callListenerNow(listener, cmp); } } } finally { iter.forEachRemaining(v -> callListenerAsyncLastResort(v, cmp)); } } private void callListenerAsyncLastResort(PromiseListener listener, PromiseCompletion completion) { try { getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, completion)); } catch (Throwable ignored) {} } @Override public void cancel(@NotNull CancellationException e) { completeExceptionally(e); } @Override public void complete(@Nullable T result) { handleCompletion(new PromiseCompletion<>(result)); } @Override public void completeExceptionally(@NotNull Throwable result) { handleCompletion(new PromiseCompletion<>(result)); } @Override public boolean isCompleted() { return completion != null; } @Override public @Nullable PromiseCompletion getCompletion() { return completion; } @Override public @NotNull CompletableFuture toFuture() { CompletableFuture future = new CompletableFuture<>(); addDirectListener(future::complete, future::completeExceptionally); future.whenComplete((_, e) -> { if (e instanceof CancellationException) { this.cancel(); } }); return future; } private static class DeferredExecutionException extends ExecutionException { } private static final class Sync extends AbstractQueuedSynchronizer { private Sync() { setState(1); } @Override protected int tryAcquireShared(int acquires) { return getState() == 0 ? 1 : -1; } @Override protected boolean tryReleaseShared(int releases) { int c1, c2; do { c1 = getState(); if (c1 == 0) { return false; } c2 = c1 - 1; } while(!compareAndSetState(c1, c2)); return c2 == 0; } } }