diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index c66ba1d..b6a1bc1 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -9,47 +9,29 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.function.Consumer; -@SuppressWarnings({"FieldMayBeFinal", "unchecked"}) -public abstract class AbstractPromise implements CompletablePromise { - - private static final VarHandle COMPLETION_HANDLE; - private static final VarHandle LISTENERS_HANDLE; - - static { - try { - MethodHandles.Lookup lookup = MethodHandles.lookup(); - COMPLETION_HANDLE = lookup.findVarHandle(AbstractPromise.class, "completion", PromiseCompletion.class); - LISTENERS_HANDLE = lookup.findVarHandle(AbstractPromise.class, "listeners", Collection.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } - } - - private final Sync sync; - - private volatile Collection> listeners; - private volatile PromiseCompletion completion; - - public AbstractPromise() { - this.sync = new Sync(); - this.listeners = Collections.EMPTY_LIST; - this.completion = null; - } +public abstract class AbstractPromise implements Promise { public abstract @NotNull AbstractPromiseFactory getFactory(); - private void runCompleter(@NotNull CompletablePromise promise, @NotNull ExceptionalRunnable completer) { + protected abstract @NotNull Promise addAnyListener(@NotNull PromiseListener listener); + + protected @NotNull Logger getLogger() { + return getFactory().getLogger(); + } + + protected void callListener(@NotNull PromiseListener listener, @NotNull PromiseCompletion cmp) { + if (listener instanceof AsyncPromiseListener) { + callListenerAsync(listener, cmp); + } else { + callListenerNow(listener, cmp); + } + } + + protected void runCompleter(@NotNull CompletablePromise promise, @NotNull ExceptionalRunnable completer) { try { completer.run(); } catch (Error e) { @@ -60,11 +42,8 @@ public abstract class AbstractPromise implements CompletablePromise @NotNull Runnable createCompleter( - T result, - @NotNull CompletablePromise promise, - @NotNull ExceptionalFunction completer - ) { + protected @NotNull Runnable createCompleter(T result, @NotNull CompletablePromise promise, + @NotNull ExceptionalFunction completer) { return () -> { if (!promise.isCompleted()) { runCompleter(promise, () -> promise.complete(completer.apply(result))); @@ -72,43 +51,37 @@ public abstract class AbstractPromise implements CompletablePromise @NotNull CompletablePromise createLinked() { + CompletablePromise promise = getFactory().unresolved(); + PromiseUtil.propagateCancel(promise, this); + return promise; } - @Override - public T get() throws InterruptedException, ExecutionException { - sync.acquireSharedInterruptibly(1); - return joinCompletion(); - } - - @Override - public T get(long time, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - boolean success = sync.tryAcquireSharedNanos(1, unit.toNanos(time)); - if (!success) { - throw new TimeoutException("Promise stopped waiting after " + time + " " + unit); - } - - return joinCompletion(); - } - - @Override - public T await() { + protected void callListenerAsync(PromiseListener listener, PromiseCompletion res) { try { - sync.acquireSharedInterruptibly(1); - } catch (InterruptedException e) { - throw new RuntimeException(e); + getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, res)); + } catch (RejectedExecutionException ignored) { + } catch (Exception e) { + getLogger().warn("Exception caught while running promise listener", e); } - - PromiseCompletion completion = Objects.requireNonNull(getCompletion()); - if (completion.isSuccess()) return completion.getResult(); - throw new CompletionException(completion.getException()); } - private T joinCompletion() throws ExecutionException { - PromiseCompletion completion = Objects.requireNonNull(getCompletion()); - if (completion.isSuccess()) return completion.getResult(); - throw new ExecutionException(completion.getException()); + protected void callListenerNow(PromiseListener listener, PromiseCompletion res) { + try { + listener.handle(res); + } catch (Error e) { + getLogger().error("Error caught in promise listener", e); + throw e; + } catch (Throwable e) { + getLogger().error("Exception caught in promise listener", e); + } + } + + protected void callListenerAsyncLastResort(PromiseListener listener, PromiseCompletion completion) { + try { + getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, completion)); + } catch (Throwable ignored) { + } } @Override @@ -141,33 +114,68 @@ public abstract class AbstractPromise implements CompletablePromise @NotNull Promise thenApply(@NotNull ExceptionalFunction task) { - CompletablePromise promise = getFactory().unresolved(); - addDirectListener( - res -> createCompleter(res, promise, task).run(), - promise::completeExceptionally - ); + PromiseCompletion completion = getCompletion(); + if (completion == null) { + CompletablePromise promise = createLinked(); + addDirectListener( + res -> createCompleter(res, promise, task).run(), + promise::completeExceptionally + ); - PromiseUtil.propagateCancel(promise, this); - return promise; + return promise; + } else if (completion.isSuccess()) { + try { + V result = task.apply(completion.getResult()); + return getFactory().resolve(result); + } catch (Exception e) { + return getFactory().error(e); + } + } else { + Throwable ex = completion.getException(); + assert ex != null; + return getFactory().error(ex); + } } @Override public @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task) { - CompletablePromise promise = getFactory().unresolved(); - thenApply(task).addDirectListener( - nestedPromise -> { - if (nestedPromise == null) { - promise.complete(null); - } else { - PromiseUtil.propagateCompletion(nestedPromise, promise); - PromiseUtil.propagateCancel(promise, nestedPromise); - } - }, - promise::completeExceptionally - ); + PromiseCompletion completion = getCompletion(); + if (completion == null) { + CompletablePromise promise = createLinked(); + thenApply(task).addDirectListener( + result -> { + if (result == null) { + promise.complete(null); + } else { + PromiseUtil.propagateCompletion(result, promise); + PromiseUtil.propagateCancel(promise, result); + } + }, + promise::completeExceptionally + ); - PromiseUtil.propagateCancel(promise, this); - return promise; + return promise; + } else if (completion.isSuccess()) { + try { + Promise result = task.apply(completion.getResult()); + if (result == null) { + return getFactory().resolve(null); + } else if (result.isCompleted()) { + return result; + } else { + CompletablePromise promise = createLinked(); + PromiseUtil.propagateCompletion(result, promise); + PromiseUtil.propagateCancel(promise, result); + return promise; + } + } catch (Exception e) { + return getFactory().error(e); + } + } else { + Throwable ex = completion.getException(); + assert ex != null; + return getFactory().error(ex); + } } @Override @@ -214,7 +222,7 @@ public abstract class AbstractPromise implements CompletablePromise @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { - CompletablePromise promise = getFactory().unresolved(); + CompletablePromise promise = createLinked(); addDirectListener( res -> runCompleter(promise, () -> { Runnable runnable = createCompleter(res, promise, task); @@ -224,13 +232,12 @@ public abstract class AbstractPromise implements CompletablePromise @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { - CompletablePromise promise = getFactory().unresolved(); + CompletablePromise promise = createLinked(); addDirectListener( res -> runCompleter(promise, () -> { Runnable runnable = createCompleter(res, promise, task); @@ -240,13 +247,12 @@ public abstract class AbstractPromise implements CompletablePromise @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { - CompletablePromise promise = getFactory().unresolved(); + CompletablePromise promise = createLinked(); thenApplySync(task).addDirectListener( nestedPromise -> { if (nestedPromise == null) { @@ -259,7 +265,6 @@ public abstract class AbstractPromise implements CompletablePromise implements CompletablePromise @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { - CompletablePromise promise = getFactory().unresolved(); + CompletablePromise promise = createLinked(); addDirectListener( (res) -> runCompleter(promise, () -> { Runnable runnable = createCompleter(res, promise, task); @@ -317,13 +322,12 @@ public abstract class AbstractPromise implements CompletablePromise @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { - CompletablePromise promise = getFactory().unresolved(); + CompletablePromise promise = createLinked(); addDirectListener( res -> runCompleter(promise, () -> { Runnable runnable = createCompleter(res, promise, task); @@ -333,13 +337,12 @@ public abstract class AbstractPromise implements CompletablePromise @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { - CompletablePromise promise = getFactory().unresolved(); + CompletablePromise promise = createLinked(); thenApplyAsync(task).addDirectListener( nestedPromise -> { if (nestedPromise == null) { @@ -352,7 +355,6 @@ public abstract class AbstractPromise implements CompletablePromise implements CompletablePromise addAnyListener(PromiseListener listener) { - Collection> prev = listeners, next = null; - for (boolean haveNext = false; ; ) { - if (!haveNext) { - next = prev == Collections.EMPTY_LIST ? new ConcurrentLinkedQueue<>() : prev; - if (next != null) next.add(listener); - } - - if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next)) - break; - - haveNext = (prev == (prev = listeners)); - } - - if (next == null) { - if (listener instanceof AsyncPromiseListener) { - callListenerAsync(listener, Objects.requireNonNull(getCompletion())); - } else { - callListenerNow(listener, Objects.requireNonNull(getCompletion())); - } - } - - return this; - } - - private void callListenerAsync(PromiseListener listener, PromiseCompletion res) { - try { - getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, res)); - } catch (RejectedExecutionException ignored) { - } catch (Exception e) { - getLogger().warn("Exception caught while running promise listener", e); - } - } - - private void callListenerNow(PromiseListener listener, PromiseCompletion res) { - try { - listener.handle(res); - } catch (Error e) { - getLogger().error("Error caught in promise listener", e); - throw e; - } catch (Throwable e) { - getLogger().error("Exception caught in promise listener", e); - } - } - @Override public @NotNull Promise onSuccess(@NotNull Consumer listener) { return addAsyncListener(listener, null); @@ -489,92 +446,11 @@ public abstract class AbstractPromise implements CompletablePromise orDefault(@NotNull ExceptionalFunction function) { - CompletablePromise promise = getFactory().unresolved(); - addDirectListener(promise::complete, e -> { - try { - T result = function.apply(e); - promise.complete(result); - } catch (Exception ex) { - promise.completeExceptionally(ex); - } - }); - - PromiseUtil.propagateCancel(promise, this); + CompletablePromise promise = createLinked(); + addDirectListener(promise::complete, e -> runCompleter(promise, () -> promise.complete(function.apply(e)))); return promise; } - @Override - public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { - Exception e = new CancellationException("Promise timed out after " + time + " " + unit.toString().toLowerCase()); - return completeExceptionallyDelayed(e, time, unit); - } - - @Override - public @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit) { - Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit.toString().toLowerCase()); - return completeExceptionallyDelayed(e, time, unit); - } - - private Promise completeExceptionallyDelayed(Throwable e, long delay, TimeUnit unit) { - runCompleter(this, () -> { - FA future = getFactory().getAsyncExecutor().run(() -> completeExceptionally(e), delay, unit); - addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future)); - }); - return this; - } - - private void handleCompletion(@NotNull PromiseCompletion cmp) { - if (!COMPLETION_HANDLE.compareAndSet(this, null, cmp)) return; - sync.releaseShared(1); - - - Iterator> iter = ((Iterable>) LISTENERS_HANDLE.getAndSet(this, null)).iterator(); - try { - while (iter.hasNext()) { - PromiseListener listener = iter.next(); - if (listener instanceof AsyncPromiseListener) { - callListenerAsync(listener, cmp); - } else { - callListenerNow(listener, cmp); - } - } - } finally { - iter.forEachRemaining(v -> callListenerAsyncLastResort(v, cmp)); - } - } - - private void callListenerAsyncLastResort(PromiseListener listener, PromiseCompletion completion) { - try { - getFactory().getAsyncExecutor().run(() -> callListenerNow(listener, completion)); - } catch (Throwable ignored) { - } - } - - @Override - public void cancel(@NotNull CancellationException e) { - completeExceptionally(e); - } - - @Override - public void complete(@Nullable T result) { - handleCompletion(new PromiseCompletion<>(result)); - } - - @Override - public void completeExceptionally(@NotNull Throwable result) { - handleCompletion(new PromiseCompletion<>(result)); - } - - @Override - public boolean isCompleted() { - return completion != null; - } - - @Override - public @Nullable PromiseCompletion getCompletion() { - return completion; - } - @Override public @NotNull CompletableFuture toFuture() { CompletableFuture future = new CompletableFuture<>(); @@ -591,31 +467,4 @@ public abstract class AbstractPromise implements CompletablePromise implements PromiseFactory { @@ -23,20 +22,6 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { public abstract @NotNull PromiseExecutor getAsyncExecutor(); - @Override - public @NotNull Promise resolve(T value) { - CompletablePromise promise = unresolved(); - promise.complete(value); - return promise; - } - - @Override - public @NotNull Promise error(@NotNull Throwable error) { - CompletablePromise promise = unresolved(); - promise.completeExceptionally(error); - return promise; - } - @Override public @NotNull Promise wrap(@NotNull CompletableFuture future) { return wrap(future, future); @@ -112,14 +97,4 @@ public abstract class AbstractPromiseFactory implements PromiseFactory { return promise; } - @Override - public @NotNull Promise race(@NotNull Iterable> promises, boolean cancelLosers) { - return race(promises.iterator(), cancelLosers); - } - - @Override - public @NotNull Promise race(@NotNull Stream> promises, boolean cancelLosers) { - return race(promises.iterator(), cancelLosers); - } - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java index c53f86d..799b6be 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AsyncPromiseListener.java @@ -5,5 +5,4 @@ package dev.tommyjs.futur.promise; * executed asynchronously by the {@link PromiseFactory} that created the completed promise. */ public interface AsyncPromiseListener extends PromiseListener { - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/BasePromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/BasePromise.java new file mode 100644 index 0000000..f2a1a67 --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/BasePromise.java @@ -0,0 +1,194 @@ +package dev.tommyjs.futur.promise; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.*; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +@SuppressWarnings({"FieldMayBeFinal"}) +public abstract class BasePromise extends AbstractPromise implements CompletablePromise { + + private static final VarHandle COMPLETION_HANDLE; + private static final VarHandle LISTENERS_HANDLE; + + static { + try { + MethodHandles.Lookup lookup = MethodHandles.lookup(); + COMPLETION_HANDLE = lookup.findVarHandle(BasePromise.class, "completion", PromiseCompletion.class); + LISTENERS_HANDLE = lookup.findVarHandle(BasePromise.class, "listeners", Collection.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + + private final Sync sync; + + private volatile PromiseCompletion completion; + + @SuppressWarnings("FieldMayBeFinal") + private volatile Collection> listeners; + + @SuppressWarnings("unchecked") + public BasePromise() { + this.sync = new Sync(); + this.completion = null; + this.listeners = Collections.EMPTY_LIST; + } + + protected T joinCompletion() throws ExecutionException { + PromiseCompletion completion = Objects.requireNonNull(getCompletion()); + if (completion.isSuccess()) return completion.getResult(); + throw new ExecutionException(completion.getException()); + } + + protected void handleCompletion(@NotNull PromiseCompletion cmp) { + if (!COMPLETION_HANDLE.compareAndSet(this, null, cmp)) return; + sync.releaseShared(1); + callListeners(cmp); + } + + protected Promise completeExceptionallyDelayed(Throwable e, long delay, TimeUnit unit) { + runCompleter(this, () -> { + FA future = getFactory().getAsyncExecutor().run(() -> completeExceptionally(e), delay, unit); + addDirectListener(_ -> getFactory().getAsyncExecutor().cancel(future)); + }); + + return this; + } + + @SuppressWarnings("unchecked") + protected void callListeners(@NotNull PromiseCompletion cmp) { + Iterator> iter = ((Iterable>) LISTENERS_HANDLE.getAndSet(this, null)).iterator(); + try { + while (iter.hasNext()) { + callListener(iter.next(), cmp); + } + } finally { + iter.forEachRemaining(v -> callListenerAsyncLastResort(v, cmp)); + } + } + + @Override + protected @NotNull Promise addAnyListener(@NotNull PromiseListener listener) { + Collection> prev = listeners, next = null; + for (boolean haveNext = false; ; ) { + if (!haveNext) { + next = prev == Collections.EMPTY_LIST ? new ConcurrentLinkedQueue<>() : prev; + if (next != null) next.add(listener); + } + + if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next)) + break; + + haveNext = (prev == (prev = listeners)); + } + + if (next == null) { + callListener(listener, Objects.requireNonNull(getCompletion())); + } + + return this; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + sync.acquireSharedInterruptibly(1); + return joinCompletion(); + } + + @Override + public T get(long time, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + boolean success = sync.tryAcquireSharedNanos(1, unit.toNanos(time)); + if (!success) { + throw new TimeoutException("Promise stopped waiting after " + time + " " + unit); + } + + return joinCompletion(); + } + + @Override + public T await() { + try { + sync.acquireSharedInterruptibly(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + PromiseCompletion completion = Objects.requireNonNull(getCompletion()); + if (completion.isSuccess()) return completion.getResult(); + throw new CompletionException(completion.getException()); + } + + @Override + public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { + Exception e = new CancellationException("Promise timed out after " + time + " " + unit.toString().toLowerCase()); + return completeExceptionallyDelayed(e, time, unit); + } + + @Override + public @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit) { + Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit.toString().toLowerCase()); + return completeExceptionallyDelayed(e, time, unit); + } + + @Override + public void cancel(@NotNull CancellationException e) { + completeExceptionally(e); + } + + @Override + public void complete(@Nullable T result) { + handleCompletion(new PromiseCompletion<>(result)); + } + + @Override + public void completeExceptionally(@NotNull Throwable result) { + handleCompletion(new PromiseCompletion<>(result)); + } + + @Override + public boolean isCompleted() { + return completion != null; + } + + @Override + public @Nullable PromiseCompletion getCompletion() { + return completion; + } + + private static final class Sync extends AbstractQueuedSynchronizer { + + private Sync() { + setState(1); + } + + @Override + protected int tryAcquireShared(int acquires) { + return getState() == 0 ? 1 : -1; + } + + @Override + protected boolean tryReleaseShared(int releases) { + int c1, c2; + do { + c1 = getState(); + if (c1 == 0) { + return false; + } + + c2 = c1 - 1; + } while (!compareAndSetState(c1, c2)); + + return c2 == 0; + } + + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/CompletedPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/CompletedPromise.java new file mode 100644 index 0000000..18aaeee --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/CompletedPromise.java @@ -0,0 +1,68 @@ +package dev.tommyjs.futur.promise; + +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; + +public abstract class CompletedPromise extends AbstractPromise { + + private static final PromiseCompletion EMPTY = new PromiseCompletion<>(); + + private final @NotNull PromiseCompletion completion; + + public CompletedPromise(@NotNull PromiseCompletion completion) { + this.completion = completion; + } + + @SuppressWarnings("unchecked") + public CompletedPromise() { + this((PromiseCompletion) EMPTY); + } + + @Override + protected @NotNull Promise addAnyListener(@NotNull PromiseListener listener) { + callListener(listener, completion); + return this; + } + + @Override + public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { + return this; + } + + @Override + public @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit) { + return this; + } + + @Override + public void cancel(@NotNull CancellationException exception) { + } + + @Override + public T get() { + return null; + } + + @Override + public T get(long timeout, @NotNull TimeUnit unit) { + return null; + } + + @Override + public T await() { + return null; + } + + @Override + public @NotNull PromiseCompletion getCompletion() { + return completion; + } + + @Override + public boolean isCompleted() { + return true; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java index f44c919..901721b 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -70,9 +70,7 @@ public interface PromiseFactory { * @return the new promise * @apiNote This method is often useful for starting promise chains. */ - default @NotNull Promise start() { - return resolve(null); - } + @NotNull Promise start(); /** * Creates a new promise, completed exceptionally with the given error. diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java index f5a2153..7604aeb 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactoryImpl.java @@ -2,6 +2,7 @@ package dev.tommyjs.futur.promise; import dev.tommyjs.futur.executor.PromiseExecutor; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; public class PromiseFactoryImpl extends AbstractPromiseFactory { @@ -25,6 +26,21 @@ public class PromiseFactoryImpl extends AbstractPromiseFactory { return new PromiseImpl<>(); } + @Override + public @NotNull Promise resolve(T value) { + return new CompletedPromiseImpl<>(value); + } + + @Override + public @NotNull Promise start() { + return new CompletedPromiseImpl<>(); + } + + @Override + public @NotNull Promise error(@NotNull Throwable error) { + return new CompletedPromiseImpl<>(error); + } + @Override public @NotNull Logger getLogger() { return logger; @@ -40,7 +56,28 @@ public class PromiseFactoryImpl extends AbstractPromiseFactory { return asyncExecutor; } - public class PromiseImpl extends AbstractPromise { + private class PromiseImpl extends BasePromise { + + @Override + public @NotNull AbstractPromiseFactory getFactory() { + return PromiseFactoryImpl.this; + } + + } + + private class CompletedPromiseImpl extends CompletedPromise { + + public CompletedPromiseImpl(@Nullable T result) { + super(new PromiseCompletion<>(result)); + } + + public CompletedPromiseImpl(@NotNull Throwable exception) { + super(new PromiseCompletion<>(exception)); + } + + public CompletedPromiseImpl() { + super(); + } @Override public @NotNull AbstractPromiseFactory getFactory() { diff --git a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java index 9885398..8b65017 100644 --- a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java +++ b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java @@ -1,6 +1,7 @@ package dev.tommyjs.futur; import dev.tommyjs.futur.promise.CompletablePromise; +import dev.tommyjs.futur.promise.CompletedPromise; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseFactory; import org.junit.jupiter.api.Test; @@ -17,7 +18,7 @@ import java.util.stream.Stream; public final class PromiseTests { private final Logger logger = LoggerFactory.getLogger(PromiseTests.class); - private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(6); private final PromiseFactory promises = PromiseFactory.of(logger, executor); @Test @@ -229,4 +230,17 @@ public final class PromiseTests { assert res.get(3) == 6; } + @Test + public void testImmediate1() { + var promise = promises.start().thenSupply(() -> 10); + assert promise.isCompleted() && promise instanceof CompletedPromise; + } + + @Test + public void testImmediate2() { + var resolved = promises.resolve(10); + var promise = promises.start().thenCompose(_ -> resolved); + assert promise.isCompleted() && promise instanceof CompletedPromise; + } + }