From 173e34810c3a972eeea93f96f48030f3c42854fa Mon Sep 17 00:00:00 2001 From: tommyskeff Date: Fri, 22 Dec 2023 16:43:52 +0000 Subject: [PATCH] basic 1.2.0 changes --- .../futur/promise/AbstractPromise.java | 429 ++++++++++++++++++ .../tommyjs/futur/promise/PooledPromise.java | 33 ++ .../futur/promise/PooledPromiseFactory.java | 41 ++ .../dev/tommyjs/futur/promise/Promise.java | 395 ++-------------- .../tommyjs/futur/promise/PromiseFactory.java | 16 + .../dev/tommyjs/futur/promise/Promises.java | 120 +++-- .../futur/promise/UnpooledPromise.java | 23 + .../futur/promise/UnpooledPromiseFactory.java | 50 ++ .../tommyjs/futur/scheduler/Scheduler.java | 2 +- .../tommyjs/futur/scheduler/Schedulers.java | 105 ----- .../scheduler/SingleExecutorScheduler.java | 13 +- .../dev/tommyjs/futur/trace/TraceUtil.java | 11 + .../src/test/java/dev/tommyjs/test/Test.java | 35 ++ .../reactivestreams/ReactiveTransformer.java | 4 +- .../SingleAccumulatorSubscriber.java | 12 +- .../futur/reactor/ReactorTransformer.java | 10 +- 16 files changed, 799 insertions(+), 500 deletions(-) create mode 100644 futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java create mode 100644 futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromise.java create mode 100644 futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromiseFactory.java create mode 100644 futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java create mode 100644 futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromise.java create mode 100644 futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromiseFactory.java delete mode 100644 futur-api/src/main/java/dev/tommyjs/futur/scheduler/Schedulers.java create mode 100644 futur-api/src/main/java/dev/tommyjs/futur/trace/TraceUtil.java create mode 100644 futur-api/src/test/java/dev/tommyjs/test/Test.java diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java new file mode 100644 index 0000000..38c21cd --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -0,0 +1,429 @@ +package dev.tommyjs.futur.promise; + +import dev.tommyjs.futur.function.ExceptionalConsumer; +import dev.tommyjs.futur.function.ExceptionalFunction; +import dev.tommyjs.futur.function.ExceptionalRunnable; +import dev.tommyjs.futur.function.ExceptionalSupplier; +import dev.tommyjs.futur.scheduler.Scheduler; +import dev.tommyjs.futur.trace.ExecutorTrace; +import dev.tommyjs.futur.trace.TraceUtil; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +public abstract class AbstractPromise implements Promise { + + private static final String PACKAGE; + + static { + String[] packageElements = AbstractPromise.class.getPackageName().split("\\."); + int i = 0; + + StringBuilder packageBuilder = new StringBuilder(); + while (i < 3) { + packageBuilder.append(packageElements[i]); + i++; + } + + PACKAGE = packageBuilder.toString(); + } + + private final Collection> listeners; + private final StackTraceElement[] stackTrace; + + private @Nullable PromiseCompletion completion; + + public AbstractPromise() { + this.listeners = new ConcurrentLinkedQueue<>(); + this.completion = null; + this.stackTrace = Arrays.stream(Thread.currentThread().getStackTrace()) + .filter(v -> !v.getClassName().startsWith(PACKAGE)) + .toArray(StackTraceElement[]::new); + } + + protected abstract Scheduler getScheduler(); + + protected abstract Logger getLogger(); + + @Override + public T join(long interval, long timeout) throws TimeoutException { + long start = System.currentTimeMillis(); + while (!isCompleted()) { + if (System.currentTimeMillis() > start + timeout) + throw new TimeoutException("Promise timed out after " + timeout + "ms"); + + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + PromiseCompletion completion = getCompletion(); + if (completion == null) { + throw new IllegalStateException(); + } + + if (completion.isError()) { + throw new RuntimeException(completion.getException()); + } + + return completion.getResult(); + } + + @Override + public @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task) { + return thenApplySync(result -> { + task.run(); + return null; + }, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedSync(result -> { + task.run(); + return null; + }, delay, unit, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenConsumeSync(@NotNull ExceptionalConsumer task) { + return thenApplySync(result -> { + task.accept(result); + return null; + }, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedSync(result -> { + task.accept(result); + return null; + }, delay, unit, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task) { + return thenApplySync(result -> task.get(), TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedSync(result -> task.get(), delay, unit, TraceUtil.getTrace(task)); + } + + protected @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task, @NotNull ExecutorTrace trace) { + Promise promise = getFactory().unresolved(); + addListener(ctx -> { + if (ctx.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx.getException()); + return; + } + + Runnable runnable = createRunnable(ctx, promise, task); + getScheduler().runSync(runnable, trace); + }); + + return promise; + } + + @Override + public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { + return thenApplySync(task, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { + Promise promise = getFactory().unresolved(); + addListener(ctx -> { + if (ctx.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx.getException()); + return; + } + + Runnable runnable = createRunnable(ctx, promise, task); + getScheduler().runDelayedSync(runnable, delay, unit, trace); + }); + + return promise; + } + + @Override + public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedSync(task, delay, unit, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { + Promise promise = getFactory().unresolved(); + thenApplySync(task, TraceUtil.getTrace(task)).thenConsumeAsync(nestedPromise -> { + nestedPromise.addListener(ctx1 -> { + if (ctx1.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx1.getException()); + return; + } + + promise.complete(ctx1.getResult()); + }); + }).addListener(ctx2 -> { + if (ctx2.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx2.getException()); + } + }); + + return promise; + } + + @Override + public @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task) { + return thenApplyAsync(result -> { + task.run(); + return null; + }, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedAsync(result -> { + task.run(); + return null; + }, delay, unit, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenConsumeAsync(@NotNull ExceptionalConsumer task) { + return thenApplyAsync(result -> { + task.accept(result); + return null; + }, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedAsync(result -> { + task.accept(result); + return null; + }, delay, unit, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task) { + return thenApplyAsync(result -> task.get(), TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedAsync(result -> task.get(), delay, unit, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference) { + return thenApplyAsync((result) -> { + reference.set(result); + return result; + }); + } + + protected @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task, @NotNull ExecutorTrace trace) { + Promise promise = getFactory().unresolved(); + addListener(ctx -> { + if (ctx.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx.getException()); + return; + } + + Runnable runnable = createRunnable(ctx, promise, task); + getScheduler().runAsync(runnable, trace); + }); + + return promise; + } + + @Override + public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { + return thenApplyAsync(task, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { + Promise promise = getFactory().unresolved(); + addListener(ctx -> { + Runnable runnable = createRunnable(ctx, promise, task); + getScheduler().runDelayedAsync(runnable, delay, unit, trace); + }); + + return promise; + } + + @Override + public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedAsync(task, delay, unit, TraceUtil.getTrace(task)); + } + + @Override + public @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task) { + return this.thenComposeAsync(task); + } + + @Override + public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { + Promise promise = getFactory().unresolved(); + thenApplyAsync(task, TraceUtil.getTrace(task)).thenConsumeAsync(nestedPromise -> { + nestedPromise.addListener(ctx1 -> { + if (ctx1.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx1.getException()); + return; + } + + promise.complete(ctx1.getResult()); + }); + }).addListener(ctx2 -> { + if (ctx2.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx2.getException()); + } + }); + + return promise; + } + + private @NotNull Runnable createRunnable(@NotNull PromiseCompletion ctx, @NotNull Promise promise, @NotNull ExceptionalFunction task) { + return () -> { + if (ctx.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx.getException()); + return; + } + + try { + V result = task.apply(ctx.getResult()); + promise.complete(result); + } catch (Exception e) { + promise.completeExceptionally(e, true); + } + }; + } + + @Override + public @NotNull Promise logExceptions() { + return addListener(ctx -> { + if (ctx.isError()) { + getLogger().error("Exception caught in promise chain", ctx.getException()); + } + }); + } + + @Override + public @NotNull Promise addListener(@NotNull PromiseListener listener) { + if (isCompleted()) { + getScheduler().runAsync(() -> { + try { + //noinspection ConstantConditions + listener.handle(getCompletion()); + } catch (Exception e) { + getLogger().error("Exception caught in promise listener", e); + } + }, TraceUtil.getTrace(listener)); + } else { + getListeners().add(listener); + } + + return this; + } + + @Override + public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { + Runnable func = () -> { + if (!isCompleted()) { + completeExceptionally(new TimeoutException("Promise timed out after " + time + " " + unit), true); + } + }; + + getScheduler().runDelayedAsync(func, time, unit, TraceUtil.getTrace(func)); + + return this; + } + + @Override + public @NotNull Promise timeout(long ms) { + return timeout(ms, TimeUnit.MILLISECONDS); + } + + protected void handleCompletion(@NotNull PromiseCompletion ctx) { + if (this.isCompleted()) return; + setCompletion(ctx); + + Runnable func = () -> { + for (PromiseListener listener : getListeners()) { + if (!ctx.isActive()) return; + + try { + listener.handle(ctx); + } catch (Exception e) { + getLogger().error("Exception caught in promise listener", e); + } + } + }; + + getScheduler().runAsync(func, TraceUtil.getTrace(func)); + } + + @Override + public void complete(@Nullable T result) { + handleCompletion(new PromiseCompletion<>(result)); + } + + @Override + public void completeExceptionally(@NotNull Throwable result, boolean appendStacktrace) { + if (appendStacktrace && this.stackTrace != null) { + result.setStackTrace(Stream.of(result.getStackTrace(), this.stackTrace) + .flatMap(Stream::of) + .filter(v -> !v.getClassName().startsWith(PACKAGE)) + .filter(v -> !v.getClassName().startsWith("java.lang.Thread")) + .filter(v -> !v.getClassName().startsWith("java.util.concurrent")) + .toArray(StackTraceElement[]::new)); + } + + handleCompletion(new PromiseCompletion<>(result)); + } + + @Override + public void completeExceptionally(@NotNull Throwable result) { + completeExceptionally(result, false); + } + + @Override + public boolean isCompleted() { + return getCompletion() != null; + } + + protected Collection> getListeners() { + return listeners; + } + + @Override + public @Nullable PromiseCompletion getCompletion() { + return completion; + } + + protected void setCompletion(@NotNull PromiseCompletion completion) { + this.completion = completion; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromise.java new file mode 100644 index 0000000..eb7c45e --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromise.java @@ -0,0 +1,33 @@ +package dev.tommyjs.futur.promise; + +import dev.tommyjs.futur.scheduler.Scheduler; +import org.slf4j.Logger; + +public class PooledPromise extends AbstractPromise { + + private final Scheduler scheduler; + private final Logger logger; + private final PromiseFactory factory; + + public PooledPromise(Scheduler scheduler, Logger logger, PromiseFactory factory) { + this.scheduler = scheduler; + this.logger = logger; + this.factory = factory; + } + + @Override + protected Scheduler getScheduler() { + return scheduler; + } + + @Override + protected Logger getLogger() { + return logger; + } + + @Override + public PromiseFactory getFactory() { + return factory; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromiseFactory.java new file mode 100644 index 0000000..881dd49 --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromiseFactory.java @@ -0,0 +1,41 @@ +package dev.tommyjs.futur.promise; + +import dev.tommyjs.futur.scheduler.Scheduler; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; + +public class PooledPromiseFactory implements PromiseFactory { + + private final Scheduler scheduler; + private final Logger logger; + + public PooledPromiseFactory(Scheduler scheduler, Logger logger) { + this.scheduler = scheduler; + this.logger = logger; + } + + @Override + public @NotNull Promise resolve(T value) { + AbstractPromise promise = new PooledPromise<>(scheduler, logger, this); + promise.setCompletion(new PromiseCompletion<>(value)); + return promise; + } + + @Override + public @NotNull Promise unresolved() { + return new PooledPromise<>(scheduler, logger, this); + } + + @Override + public @NotNull Promise error(Throwable error) { + AbstractPromise promise = new PooledPromise<>(scheduler, logger, this); + promise.completeExceptionally(error); + return promise; + } + + @Override + public @NotNull Promise start() { + return resolve(null); + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index 6047ad0..c446372 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -4,406 +4,109 @@ import dev.tommyjs.futur.function.ExceptionalConsumer; import dev.tommyjs.futur.function.ExceptionalFunction; import dev.tommyjs.futur.function.ExceptionalRunnable; import dev.tommyjs.futur.function.ExceptionalSupplier; -import dev.tommyjs.futur.scheduler.Schedulers; import dev.tommyjs.futur.trace.ExecutorTrace; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; -public class Promise { +public interface Promise { - private static final String PACKAGE; - private static final Logger LOGGER = LoggerFactory.getLogger(Promise.class); - - static { - String[] packageElements = Promise.class.getPackageName().split("\\."); - int i = 0; - - StringBuilder packageBuilder = new StringBuilder(); - while (i < 3) { - packageBuilder.append(packageElements[i]); - i++; - } - - PACKAGE = packageBuilder.toString(); + static @NotNull Promise resolve(T value, PromiseFactory factory) { + return factory.resolve(value); } - private final Collection> listeners; - private final StackTraceElement[] stackTrace; - - private @Nullable PromiseCompletion completion; - - public Promise() { - this.listeners = new ConcurrentLinkedQueue<>(); - this.completion = null; - this.stackTrace = Arrays.stream(Thread.currentThread().getStackTrace()) - .filter(v -> !v.getClassName().startsWith(PACKAGE)) - .toArray(StackTraceElement[]::new); + static @NotNull Promise error(Throwable error, PromiseFactory factory) { + return factory.error(error); } - public T join(long interval, long timeout) throws TimeoutException { - long start = System.currentTimeMillis(); - while (!isCompleted()) { - if (System.currentTimeMillis() > start + timeout) - throw new TimeoutException("Promise timed out after " + timeout + "ms"); - - try { - Thread.sleep(interval); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - PromiseCompletion completion = getCompletion(); - if (completion == null) { - throw new IllegalStateException(); - } - - if (completion.isError()) { - throw new RuntimeException(completion.getException()); - } - - return completion.getResult(); + static @NotNull Promise start(PromiseFactory factory) { + return factory.start(); } - public @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task) { - return thenApplySync(result -> { - task.run(); - return null; - }, Schedulers.getTrace(task)); + static @NotNull Promise resolve(T value) { + return resolve(value, UnpooledPromiseFactory.INSTANCE); } - public @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(result -> { - task.run(); - return null; - }, delay, unit, Schedulers.getTrace(task)); + static @NotNull Promise error(Throwable error) { + return error(error, UnpooledPromiseFactory.INSTANCE); } - public @NotNull Promise thenConsumeSync(@NotNull ExceptionalConsumer task) { - return thenApplySync(result -> { - task.accept(result); - return null; - }, Schedulers.getTrace(task)); + static @NotNull Promise start() { + return start(UnpooledPromiseFactory.INSTANCE); } - public @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(result -> { - task.accept(result); - return null; - }, delay, unit, Schedulers.getTrace(task)); + @Deprecated + static @NotNull Promise start(T start) { + return resolve(start); } - public @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task) { - return thenApplySync(result -> task.get(), Schedulers.getTrace(task)); - } + PromiseFactory getFactory(); - public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(result -> task.get(), delay, unit, Schedulers.getTrace(task)); - } + T join(long interval, long timeout) throws TimeoutException; - public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task, @NotNull ExecutorTrace trace) { - Promise promise = new Promise<>(); - addListener(ctx -> { - if (ctx.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx.getException()); - return; - } + @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task); - Runnable runnable = createRunnable(ctx, promise, task); - Schedulers.runSync(runnable, trace); - }); + @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit); - return promise; - } + @NotNull Promise thenConsumeSync(@NotNull ExceptionalConsumer task); - public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { - return thenApplySync(task, Schedulers.getTrace(task)); - } + @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit); - public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - Promise promise = new Promise<>(); - addListener(ctx -> { - if (ctx.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx.getException()); - return; - } + @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task); - Runnable runnable = createRunnable(ctx, promise, task); - Schedulers.runDelayedSync(runnable, delay, unit, trace); - }); + @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit); - return promise; - } + @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task); - public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(task, delay, unit, Schedulers.getTrace(task)); - } + @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { - Promise promise = new Promise<>(); - thenApplySync(task, Schedulers.getTrace(task)).thenConsumeAsync(nestedPromise -> { - nestedPromise.addListener(ctx1 -> { - if (ctx1.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx1.getException()); - return; - } + @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); - promise.complete(ctx1.getResult()); - }); - }).addListener(ctx2 -> { - if (ctx2.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx2.getException()); - } - }); + @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task); - return promise; - } + @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task); - public @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task) { - return thenApplyAsync(result -> { - task.run(); - return null; - }, Schedulers.getTrace(task)); - } + @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit); - public @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(result -> { - task.run(); - return null; - }, delay, unit, Schedulers.getTrace(task)); - } + @NotNull Promise thenConsumeAsync(@NotNull ExceptionalConsumer task); - public @NotNull Promise thenConsumeAsync(@NotNull ExceptionalConsumer task) { - return thenApplyAsync(result -> { - task.accept(result); - return null; - }, Schedulers.getTrace(task)); - } + @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit); - public @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(result -> { - task.accept(result); - return null; - }, delay, unit, Schedulers.getTrace(task)); - } + @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task); - @Deprecated(forRemoval = true) - public @NotNull Promise thenConsumerDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { - return thenConsumeDelayedAsync(task, delay, unit); - } + @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit); - public @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task) { - return thenApplyAsync(result -> task.get(), Schedulers.getTrace(task)); - } + @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference); - public @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(result -> task.get(), delay, unit, Schedulers.getTrace(task)); - } + @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task); - public @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference) { - return thenApplyAsync((result) -> { - reference.set(result); - return result; - }); - } + @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task, @NotNull ExecutorTrace trace) { - Promise promise = new Promise<>(); - addListener(ctx -> { - createRunnable(ctx, promise, task).run(); - }); + @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); - return promise; - } + @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task); - public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { - return thenApplyAsync(task, Schedulers.getTrace(task)); - } + @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task); - public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - Promise promise = new Promise<>(); - addListener(ctx -> { - Runnable runnable = createRunnable(ctx, promise, task); - Schedulers.runDelayedAsync(runnable, delay, unit, trace); - }); + @NotNull Promise logExceptions(); - return promise; - } + @NotNull Promise addListener(@NotNull PromiseListener listener); - public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(task, delay, unit, Schedulers.getTrace(task)); - } + @NotNull Promise timeout(long time, @NotNull TimeUnit unit); - public @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task) { - return this.thenComposeAsync(task); - } + @NotNull Promise timeout(long ms); - public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { - Promise promise = new Promise<>(); - thenApplyAsync(task, Schedulers.getTrace(task)).thenConsumeAsync(nestedPromise -> { - nestedPromise.addListener(ctx1 -> { - if (ctx1.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx1.getException()); - return; - } + void complete(@Nullable T result); - promise.complete(ctx1.getResult()); - }); - }).addListener(ctx2 -> { - if (ctx2.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx2.getException()); - } - }); + void completeExceptionally(@NotNull Throwable result, boolean appendStacktrace); - return promise; - } + void completeExceptionally(@NotNull Throwable result); - private @NotNull Runnable createRunnable(@NotNull PromiseCompletion ctx, @NotNull Promise promise, @NotNull ExceptionalFunction task) { - return () -> { - if (ctx.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx.getException()); - return; - } + boolean isCompleted(); - try { - V result = task.apply(ctx.getResult()); - promise.complete(result); - } catch (Exception e) { - promise.completeExceptionally(e, true); - } - }; - } - - public @NotNull Promise logExceptions() { - return addListener(ctx -> { - if (ctx.isError()) { - LOGGER.error("Exception caught in promise pipeline", ctx.getException()); - } - }); - } - - public @NotNull Promise addListener(@NotNull PromiseListener listener) { - if (isCompleted()) { - Schedulers.runAsync(() -> { - try { - listener.handle(getCompletion()); - } catch (Exception e) { - LOGGER.error("Exception caught in promise listener", e); - } - }, Schedulers.getTrace(listener)); - } else { - getListeners().add(listener); - } - - return this; - } - - public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { - Schedulers.runDelayedAsync(() -> { - if (!isCompleted()) { - completeExceptionally(new TimeoutException("Promise timed out after " + time + " " + unit), true); - } - }, time, unit); - - return this; - } - - public @NotNull Promise timeout(long ms) { - return timeout(ms, TimeUnit.MILLISECONDS); - } - - protected void handleCompletion(@NotNull PromiseCompletion ctx) { - if (this.isCompleted()) return; - setCompletion(ctx); - - Schedulers.runAsync(() -> { - for (PromiseListener listener : getListeners()) { - if (!ctx.isActive()) return; - - try { - listener.handle(ctx); - } catch (Exception e) { - LOGGER.error("Exception caught in promise listener", e); - } - } - }); - } - - public void complete(@Nullable T result) { - handleCompletion(new PromiseCompletion<>(result)); - } - - public void completeExceptionally(@NotNull Throwable result, boolean appendStacktrace) { - if (appendStacktrace && this.stackTrace != null) { - result.setStackTrace(Stream.of(result.getStackTrace(), this.stackTrace) - .flatMap(Stream::of) - .filter(v -> !v.getClassName().startsWith(PACKAGE)) - .filter(v -> !v.getClassName().startsWith("java.lang.Thread")) - .filter(v -> !v.getClassName().startsWith("java.util.concurrent")) - .toArray(StackTraceElement[]::new)); - } - - handleCompletion(new PromiseCompletion<>(result)); - } - - public void completeExceptionally(@NotNull Throwable result) { - completeExceptionally(result, false); - } - - public boolean isCompleted() { - return getCompletion() != null; - } - - protected Collection> getListeners() { - return listeners; - } - - public @Nullable PromiseCompletion getCompletion() { - return completion; - } - - protected void setCompletion(@NotNull PromiseCompletion completion) { - this.completion = completion; - } - - public static @NotNull Promise resolve(T value) { - Promise promise = new Promise<>(); - promise.setCompletion(new PromiseCompletion<>(value)); - return promise; - } - - public static @NotNull Promise error(Throwable error) { - Promise promise = new Promise<>(); - promise.completeExceptionally(error); - return promise; - } - - public static @NotNull Promise start() { - return Promise.resolve(null); - } - - @Deprecated // use resolve() - public static @NotNull Promise start(T start) { - Promise promise = new Promise<>(); - promise.complete(start); - return promise; - } + @Nullable PromiseCompletion getCompletion(); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java new file mode 100644 index 0000000..86ff3eb --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -0,0 +1,16 @@ +package dev.tommyjs.futur.promise; + +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; + +public interface PromiseFactory { + + @NotNull Promise resolve(T value); + + @NotNull Promise unresolved(); + + @NotNull Promise error(Throwable error); + + @NotNull Promise start(); + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java index 7ecf0c4..196e228 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java @@ -3,6 +3,8 @@ package dev.tommyjs.futur.promise; import dev.tommyjs.futur.function.ExceptionalFunction; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -13,8 +15,8 @@ import java.util.stream.Collectors; public class Promises { - public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { - Promise> promise = new Promise<>(); + public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2, PromiseFactory factory) { + Promise> promise = factory.unresolved(); p1.addListener(ctx -> { if (ctx.isError()) { //noinspection ConstantConditions @@ -37,13 +39,15 @@ public class Promises { return promise; } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler) { - Map map = new HashMap<>(); - if (promises.isEmpty()) return Promise.resolve(map); + public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { + return combine(p1, p2, p1.getFactory()); + } + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler, PromiseFactory factory) { + Map map = new HashMap<>(); ReentrantLock lock = new ReentrantLock(); - Promise> promise = new Promise<>(); + Promise> promise = factory.unresolved(); for (Map.Entry> entry : promises.entrySet()) { entry.getValue().addListener((ctx) -> { lock.lock(); @@ -69,19 +73,35 @@ public class Promises { return promise.timeout(timeout); } + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler) { + return combine(promises, timeout, exceptionHandler, obtainFactory(promises.values())); + } + + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict, PromiseFactory factory) { + return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory); + } + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict) { - return combine(promises, timeout, strict ? null : (_k, _v) -> {}); + return combine(promises, timeout, strict, obtainFactory(promises.values())); + } + + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, PromiseFactory factory) { + return combine(promises, timeout, true, factory); } public static @NotNull Promise> combine(@NotNull Map> promises, long timeout) { - return combine(promises, timeout, true); + return combine(promises, timeout, true, obtainFactory(promises.values())); + } + + public static @NotNull Promise> combine(@NotNull Map> promises, PromiseFactory factory) { + return combine(promises, 1500L, true, factory); } public static @NotNull Promise> combine(@NotNull Map> promises) { - return combine(promises, 1500L, true); + return combine(promises, obtainFactory(promises.values())); } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict) { + public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict, PromiseFactory factory) { AtomicInteger index = new AtomicInteger(); return combine( promises.stream() @@ -96,18 +116,28 @@ public class Promises { ); } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout) { - return combine(promises, timeout, true); + public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict) { + return combine(promises, timeout, strict, obtainFactory(promises)); } - public static @NotNull Promise> combine(@NotNull List> promises) { + public static @NotNull Promise> combine(@NotNull List> promises, long timeout, PromiseFactory factory) { + return combine(promises, timeout, true, factory); + } + + public static @NotNull Promise> combine(@NotNull List> promises, long timeout) { + return combine(promises, timeout, obtainFactory(promises)); + } + + public static @NotNull Promise> combine(@NotNull List> promises, PromiseFactory factory) { return combine(promises, 1500L, true); } - public static @NotNull Promise all(@NotNull List> promises) { - if (promises.isEmpty()) return Promise.start(); + public static @NotNull Promise> combine(@NotNull List> promises) { + return combine(promises, obtainFactory(promises)); + } - Promise promise = new Promise<>(); + public static @NotNull Promise all(@NotNull List> promises, PromiseFactory factory) { + Promise promise = factory.unresolved(); for (Promise p : promises) { p.addListener((ctx) -> { if (ctx.isError()) { @@ -121,30 +151,49 @@ public class Promises { return promise; } - public static @NotNull Promise all(@NotNull Promise... promises) { - return all(Arrays.asList(promises)); + public static @NotNull Promise all(@NotNull List> promises) { + PromiseFactory factory; + if (promises.isEmpty()) { + factory = UnpooledPromiseFactory.INSTANCE; + } else { + factory = promises.get(0).getFactory(); + } + + return all(promises, factory); } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, boolean strict) { + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, boolean strict, PromiseFactory factory) { Map> promises = new HashMap<>(); for (K key : keys) { - Promise promise = Promise.resolve(key).thenApplyAsync(mapper); + Promise promise = factory.resolve(key).thenApplyAsync(mapper); promises.put(key, promise); } return combine(promises, timeout, strict); } + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, boolean strict) { + return combine(keys, mapper, timeout, strict, UnpooledPromiseFactory.INSTANCE); + } + + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, PromiseFactory factory) { + return combine(keys, mapper, timeout, true, factory); + } + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout) { - return combine(keys, mapper, timeout, true); + return combine(keys, mapper, timeout, UnpooledPromiseFactory.INSTANCE); + } + + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, PromiseFactory factory) { + return combine(keys, mapper, 1500L, true, factory); } public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper) { - return combine(keys, mapper, 1500L, true); + return combine(keys, mapper, UnpooledPromiseFactory.INSTANCE); } - public static @NotNull Promise erase(@NotNull Promise p) { - Promise promise = new Promise<>(); + public static @NotNull Promise erase(@NotNull Promise p, PromiseFactory factory) { + Promise promise = factory.unresolved(); p.addListener(ctx -> { if (ctx.isError()) { //noinspection ConstantConditions @@ -157,8 +206,12 @@ public class Promises { return promise; } - public static @NotNull Promise wrap(@NotNull CompletableFuture future) { - Promise promise = new Promise<>(); + public static @NotNull Promise erase(@NotNull Promise p) { + return erase(p, p.getFactory()); + } + + public static @NotNull Promise wrap(@NotNull CompletableFuture future, PromiseFactory factory) { + Promise promise = factory.unresolved(); future.whenComplete((result, e) -> { if (e != null) { promise.completeExceptionally(e); @@ -170,4 +223,19 @@ public class Promises { return promise; } + public static @NotNull Promise wrap(@NotNull CompletableFuture future) { + return wrap(future, UnpooledPromiseFactory.INSTANCE); + } + + public static PromiseFactory obtainFactory(Collection> promises) { + PromiseFactory factory; + if (promises.isEmpty()) { + factory = UnpooledPromiseFactory.INSTANCE; + } else { + factory = promises.stream().findFirst().get().getFactory(); + } + + return factory; + } + } \ No newline at end of file diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromise.java new file mode 100644 index 0000000..75a4abf --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromise.java @@ -0,0 +1,23 @@ +package dev.tommyjs.futur.promise; + +import dev.tommyjs.futur.scheduler.Scheduler; +import org.slf4j.Logger; + +public class UnpooledPromise extends AbstractPromise { + + @Override + protected Scheduler getScheduler() { + return UnpooledPromiseFactory.SCHEDULER; + } + + @Override + protected Logger getLogger() { + return UnpooledPromiseFactory.LOGGER; + } + + @Override + public PromiseFactory getFactory() { + return UnpooledPromiseFactory.INSTANCE; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromiseFactory.java new file mode 100644 index 0000000..a1238af --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromiseFactory.java @@ -0,0 +1,50 @@ +package dev.tommyjs.futur.promise; + +import dev.tommyjs.futur.scheduler.Scheduler; +import dev.tommyjs.futur.scheduler.SingleExecutorScheduler; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; + +public class UnpooledPromiseFactory implements PromiseFactory { + + public static final @NotNull PromiseFactory INSTANCE; + public static final @NotNull Scheduler SCHEDULER; + public static final @NotNull Logger LOGGER; + + static { + INSTANCE = new UnpooledPromiseFactory(); + SCHEDULER = new SingleExecutorScheduler(Executors.newSingleThreadScheduledExecutor()); + LOGGER = LoggerFactory.getLogger(UnpooledPromiseFactory.class); + } + + private UnpooledPromiseFactory() { + } + + @Override + public @NotNull Promise resolve(T value) { + AbstractPromise promise = new UnpooledPromise<>(); + promise.setCompletion(new PromiseCompletion<>(value)); + return promise; + } + + @Override + public @NotNull Promise unresolved() { + return new UnpooledPromise<>(); + } + + @Override + public @NotNull Promise error(Throwable error) { + AbstractPromise promise = new UnpooledPromise<>(); + promise.completeExceptionally(error); + return promise; + } + + @Override + public @NotNull Promise start() { + return resolve(null); + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Scheduler.java b/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Scheduler.java index 16889f1..e9efe7f 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Scheduler.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Scheduler.java @@ -23,7 +23,7 @@ public interface Scheduler { void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - default @NotNull Runnable wrapExceptions(@NotNull Runnable task, @NotNull ExecutorTrace trace) { + static @NotNull Runnable wrapExceptions(@NotNull Runnable task, @NotNull ExecutorTrace trace) { return () -> { try { task.run(); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Schedulers.java b/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Schedulers.java deleted file mode 100644 index 5416e2d..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Schedulers.java +++ /dev/null @@ -1,105 +0,0 @@ -package dev.tommyjs.futur.scheduler; - -import dev.tommyjs.futur.trace.ExecutorTrace; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -@SuppressWarnings("ConstantConditions") -public class Schedulers { - - private static final Logger LOGGER = LoggerFactory.getLogger(Schedulers.class); - - private static @Nullable Scheduler scheduler; - - public static void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runSync(task, trace); - } - - public static void runSync(@NotNull Runnable task) { - ensureLoaded(); - getScheduler().runSync(task, Schedulers.getTrace(task)); - } - - public static void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runDelayedSync(task, delay, unit, trace); - } - - public static void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { - ensureLoaded(); - getScheduler().runDelayedSync(task, delay, unit, Schedulers.getTrace(task)); - } - - public static void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runRepeatingSync(task, interval, unit, trace); - } - - public static void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit) { - ensureLoaded(); - getScheduler().runRepeatingSync(task, interval, unit, Schedulers.getTrace(task)); - } - - public static void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runAsync(task, trace); - } - - public static void runAsync(@NotNull Runnable task) { - ensureLoaded(); - getScheduler().runAsync(task, Schedulers.getTrace(task)); - } - - public static void runDelayedAsync(@NotNull Runnable task, long delay, TimeUnit unit, ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runDelayedAsync(task, delay, unit, trace); - } - - public static void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { - ensureLoaded(); - getScheduler().runDelayedAsync(task, delay, unit, Schedulers.getTrace(task)); - } - - public static void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runRepeatingAsync(task, interval, unit, trace); - } - - public static void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit) { - ensureLoaded(); - getScheduler().runRepeatingAsync(task, interval, unit, Schedulers.getTrace(task)); - } - - public static ExecutorTrace getTrace(@NotNull Object function) { - return new ExecutorTrace(function.getClass(), Thread.currentThread().getStackTrace()); - } - - public static void ensureLoaded() { - if (getScheduler() == null) { - LOGGER.warn("No scheduler loaded, falling back to default single threaded scheduler"); - setScheduler(SingleExecutorScheduler.create()); - } - } - - public static void loadDefaultScheduler() { - - } - - public static boolean isLoaded() { - return getScheduler() != null; - } - - public static @Nullable Scheduler getScheduler() { - return scheduler; - } - - public static void setScheduler(@NotNull Scheduler scheduler) { - Schedulers.scheduler = scheduler; - } - -} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/SingleExecutorScheduler.java b/futur-api/src/main/java/dev/tommyjs/futur/scheduler/SingleExecutorScheduler.java index 1c2164d..d228059 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/SingleExecutorScheduler.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/scheduler/SingleExecutorScheduler.java @@ -3,7 +3,6 @@ package dev.tommyjs.futur.scheduler; import dev.tommyjs.futur.trace.ExecutorTrace; import org.jetbrains.annotations.NotNull; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -11,23 +10,23 @@ public class SingleExecutorScheduler implements Scheduler { private final ScheduledExecutorService service; - protected SingleExecutorScheduler(ScheduledExecutorService service) { + public SingleExecutorScheduler(ScheduledExecutorService service) { this.service = service; } @Override public void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - service.submit(wrapExceptions(task, trace)); + service.submit(Scheduler.wrapExceptions(task, trace)); } @Override public void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - service.schedule(wrapExceptions(task, trace), delay, unit); + service.schedule(Scheduler.wrapExceptions(task, trace), delay, unit); } @Override public void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - service.scheduleAtFixedRate(wrapExceptions(task, trace), 0L, interval, unit); + service.scheduleAtFixedRate(Scheduler.wrapExceptions(task, trace), 0L, interval, unit); } @Override @@ -45,8 +44,4 @@ public class SingleExecutorScheduler implements Scheduler { runRepeatingSync(task, interval, unit, trace); } - public static SingleExecutorScheduler create() { - return new SingleExecutorScheduler(Executors.newSingleThreadScheduledExecutor()); - } - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/trace/TraceUtil.java b/futur-api/src/main/java/dev/tommyjs/futur/trace/TraceUtil.java new file mode 100644 index 0000000..694d162 --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/trace/TraceUtil.java @@ -0,0 +1,11 @@ +package dev.tommyjs.futur.trace; + +import org.jetbrains.annotations.NotNull; + +public class TraceUtil { + + public static ExecutorTrace getTrace(@NotNull Object function) { + return new ExecutorTrace(function.getClass(), Thread.currentThread().getStackTrace()); + } + +} diff --git a/futur-api/src/test/java/dev/tommyjs/test/Test.java b/futur-api/src/test/java/dev/tommyjs/test/Test.java new file mode 100644 index 0000000..e698aad --- /dev/null +++ b/futur-api/src/test/java/dev/tommyjs/test/Test.java @@ -0,0 +1,35 @@ +package dev.tommyjs.test; + +import dev.tommyjs.futur.promise.PooledPromiseFactory; +import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.scheduler.Scheduler; +import dev.tommyjs.futur.scheduler.SingleExecutorScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class Test { + + public static void main(String[] args) throws InterruptedException { + Scheduler scheduler = new SingleExecutorScheduler(Executors.newScheduledThreadPool(4)); + Logger logger = LoggerFactory.getLogger(Test.class); + PromiseFactory factory = new PooledPromiseFactory(scheduler, logger); + + Thread.sleep(2000); + + Promise.start(factory) + .thenRunAsync(() -> { + System.out.println("HI"); + }) + .thenApplyDelayedAsync(_v -> { + return "ABC"; + }, 1L, TimeUnit.SECONDS) + .thenConsumeSync(t -> { + System.out.println(t); + }); + } + +} diff --git a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java index cdb9129..5c68a38 100644 --- a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java +++ b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java @@ -1,12 +1,12 @@ package dev.tommyjs.futur.reactivestreams; -import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.AbstractPromise; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; public class ReactiveTransformer { - public static @NotNull Promise wrapPublisher(@NotNull Publisher publisher) { + public static @NotNull AbstractPromise wrapPublisher(@NotNull Publisher publisher) { SingleAccumulatorSubscriber subscriber = SingleAccumulatorSubscriber.create(); publisher.subscribe(subscriber); return subscriber.getPromise(); diff --git a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java index ee752ef..a9a1e33 100644 --- a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java +++ b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java @@ -1,14 +1,14 @@ package dev.tommyjs.futur.reactivestreams; -import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.AbstractPromise; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; public class SingleAccumulatorSubscriber implements Subscriber { - private final Promise promise; + private final AbstractPromise promise; - public SingleAccumulatorSubscriber(Promise promise) { + public SingleAccumulatorSubscriber(AbstractPromise promise) { this.promise = promise; } @@ -32,16 +32,16 @@ public class SingleAccumulatorSubscriber implements Subscriber { // ignore } - public Promise getPromise() { + public AbstractPromise getPromise() { return promise; } - public static SingleAccumulatorSubscriber create(Promise promise) { + public static SingleAccumulatorSubscriber create(AbstractPromise promise) { return new SingleAccumulatorSubscriber<>(promise); } public static SingleAccumulatorSubscriber create() { - return create(new Promise<>()); + return create(new AbstractPromise<>()); } } diff --git a/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java b/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java index 9f6817d..782c7f3 100644 --- a/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java +++ b/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java @@ -1,6 +1,6 @@ package dev.tommyjs.futur.reactor; -import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.AbstractPromise; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -11,14 +11,14 @@ import java.util.concurrent.atomic.AtomicReference; public class ReactorTransformer { - public static @NotNull Promise wrapMono(@NotNull Mono mono) { - Promise promise = new Promise<>(); + public static @NotNull AbstractPromise wrapMono(@NotNull Mono mono) { + AbstractPromise promise = new AbstractPromise<>(); mono.doOnSuccess(promise::complete).doOnError(promise::completeExceptionally).subscribe(); return promise; } - public static @NotNull Promise<@NotNull List> wrapFlux(@NotNull Flux flux) { - Promise> promise = new Promise<>(); + public static @NotNull AbstractPromise<@NotNull List> wrapFlux(@NotNull Flux flux) { + AbstractPromise> promise = new AbstractPromise<>(); AtomicReference> out = new AtomicReference<>(new ArrayList<>()); flux.doOnNext(out.get()::add).subscribe();