diff --git a/futur-api/build.gradle.kts b/futur-api/build.gradle.kts index c4edb65..bc98c91 100644 --- a/futur-api/build.gradle.kts +++ b/futur-api/build.gradle.kts @@ -6,7 +6,7 @@ plugins { } group = "dev.tommyjs" -version = "1.0.1" +version = "2.0.0" repositories { mavenCentral() diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java index a5f57c3..3998e57 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java @@ -1,7 +1,8 @@ package dev.tommyjs.futur.function; +@FunctionalInterface public interface ExceptionalConsumer { - void accept(T value) throws Exception; + void accept(T value) throws Throwable; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java index 4a7d575..ebebf5f 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java @@ -1,7 +1,8 @@ package dev.tommyjs.futur.function; +@FunctionalInterface public interface ExceptionalFunction { - V apply(K value) throws Exception; + V apply(K value) throws Throwable; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java index 6c11ba5..c4b8002 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java @@ -1,7 +1,8 @@ package dev.tommyjs.futur.function; +@FunctionalInterface public interface ExceptionalRunnable { - void run() throws Exception; + void run() throws Throwable; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java index f82800c..e47f977 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java @@ -1,7 +1,8 @@ package dev.tommyjs.futur.function; +@FunctionalInterface public interface ExceptionalSupplier { - T get() throws Exception; + T get() throws Throwable; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java new file mode 100644 index 0000000..e810563 --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java @@ -0,0 +1,36 @@ +package dev.tommyjs.futur.impl; + +import dev.tommyjs.futur.promise.AbstractPromise; +import dev.tommyjs.futur.promise.PromiseFactory; +import org.slf4j.Logger; + +import java.util.concurrent.ScheduledExecutorService; + +public class SimplePromise extends AbstractPromise { + + private final ScheduledExecutorService executor; + private final Logger logger; + private final PromiseFactory factory; + + public SimplePromise(ScheduledExecutorService executor, Logger logger, PromiseFactory factory) { + this.executor = executor; + this.logger = logger; + this.factory = factory; + } + + @Override + protected ScheduledExecutorService getExecutor() { + return executor; + } + + @Override + protected Logger getLogger() { + return logger; + } + + @Override + public PromiseFactory getFactory() { + return factory; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java new file mode 100644 index 0000000..0bcac55 --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java @@ -0,0 +1,40 @@ +package dev.tommyjs.futur.impl; + +import dev.tommyjs.futur.promise.AbstractPromise; +import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.PromiseFactory; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; + +import java.util.concurrent.ScheduledExecutorService; + +public class SimplePromiseFactory implements PromiseFactory { + + private final ScheduledExecutorService executor; + private final Logger logger; + + public SimplePromiseFactory(ScheduledExecutorService executor, Logger logger) { + this.executor = executor; + this.logger = logger; + } + + @Override + public @NotNull Promise resolve(T value) { + AbstractPromise promise = new SimplePromise<>(executor, logger, this); + promise.complete(value); + return promise; + } + + @Override + public @NotNull Promise unresolved() { + return new SimplePromise<>(executor, logger, this); + } + + @Override + public @NotNull Promise error(Throwable error) { + AbstractPromise promise = new SimplePromise<>(executor, logger, this); + promise.completeExceptionally(error); + return promise; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java new file mode 100644 index 0000000..b4b6c7b --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -0,0 +1,365 @@ +package dev.tommyjs.futur.promise; + +import dev.tommyjs.futur.function.ExceptionalConsumer; +import dev.tommyjs.futur.function.ExceptionalFunction; +import dev.tommyjs.futur.function.ExceptionalRunnable; +import dev.tommyjs.futur.function.ExceptionalSupplier; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractPromise implements Promise { + + private final Collection> listeners; + + private @Nullable PromiseCompletion completion; + + public AbstractPromise() { + this.listeners = new ConcurrentLinkedQueue<>(); + this.completion = null; + } + + protected abstract ScheduledExecutorService getExecutor(); + + protected abstract Logger getLogger(); + + @Override + public T join(long interval, long timeout) throws TimeoutException { + long start = System.currentTimeMillis(); + while (!isCompleted()) { + if (System.currentTimeMillis() > start + timeout) + throw new TimeoutException("Promise timed out after " + timeout + "ms"); + + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + PromiseCompletion completion = getCompletion(); + if (completion == null) { + throw new IllegalStateException(); + } + + if (completion.isError()) { + throw new RuntimeException(completion.getException()); + } + + return completion.getResult(); + } + + @Override + public @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task) { + return thenApplySync(result -> { + task.run(); + return null; + }); + } + + @Override + public @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedSync(result -> { + task.run(); + return null; + }, delay, unit); + } + + @Override + public @NotNull Promise thenConsumeSync(@NotNull ExceptionalConsumer task) { + return thenApplySync(result -> { + task.accept(result); + return null; + }); + } + + @Override + public @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedSync(result -> { + task.accept(result); + return null; + }, delay, unit); + } + + @Override + public @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task) { + return thenApplySync(result -> task.get()); + } + + @Override + public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedSync(result -> task.get(), delay, unit); + } + + @Override + public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { + Promise promise = getFactory().unresolved(); + addListener(ctx -> { + if (ctx.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx.getException()); + return; + } + + Runnable runnable = createRunnable(ctx, promise, task); + getExecutor().submit(runnable); + }); + + return promise; + } + + @Override + public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { + Promise promise = getFactory().unresolved(); + addListener(ctx -> { + if (ctx.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx.getException()); + return; + } + + Runnable runnable = createRunnable(ctx, promise, task); + getExecutor().schedule(runnable, delay, unit); + }); + + return promise; + } + + @Override + public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { + Promise promise = getFactory().unresolved(); + thenApplySync(task).thenConsumeAsync(nestedPromise -> { + nestedPromise.addListener(ctx1 -> { + if (ctx1.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx1.getException()); + return; + } + + promise.complete(ctx1.getResult()); + }); + }).addListener(ctx2 -> { + if (ctx2.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx2.getException()); + } + }); + + return promise; + } + + @Override + public @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task) { + return thenApplyAsync(result -> { + task.run(); + return null; + }); + } + + @Override + public @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedAsync(result -> { + task.run(); + return null; + }, delay, unit); + } + + @Override + public @NotNull Promise thenConsumeAsync(@NotNull ExceptionalConsumer task) { + return thenApplyAsync(result -> { + task.accept(result); + return null; + }); + } + + @Override + public @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedAsync(result -> { + task.accept(result); + return null; + }, delay, unit); + } + + @Override + public @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task) { + return thenApplyAsync(result -> task.get()); + } + + @Override + public @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { + return thenApplyDelayedAsync(result -> task.get(), delay, unit); + } + + @Override + public @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference) { + return thenApplyAsync((result) -> { + reference.set(result); + return result; + }); + } + + @Override + public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { + Promise promise = getFactory().unresolved(); + addListener(ctx -> { + if (ctx.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx.getException()); + return; + } + + Runnable runnable = createRunnable(ctx, promise, task); + getExecutor().submit(runnable); + }); + + return promise; + } + + @Override + public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { + Promise promise = getFactory().unresolved(); + addListener(ctx -> { + Runnable runnable = createRunnable(ctx, promise, task); + getExecutor().schedule(runnable, delay, unit); + }); + + return promise; + } + + @Override + public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { + Promise promise = getFactory().unresolved(); + thenApplyAsync(task).thenConsumeAsync(nestedPromise -> { + nestedPromise.addListener(ctx1 -> { + if (ctx1.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx1.getException()); + return; + } + + promise.complete(ctx1.getResult()); + }); + }).addListener(ctx2 -> { + if (ctx2.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx2.getException()); + } + }); + + return promise; + } + + private @NotNull Runnable createRunnable(@NotNull PromiseCompletion ctx, @NotNull Promise promise, @NotNull ExceptionalFunction task) { + return () -> { + if (ctx.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx.getException()); + return; + } + + try { + V result = task.apply(ctx.getResult()); + promise.complete(result); + } catch (Throwable e) { + promise.completeExceptionally(e); + } + }; + } + + @Override + public @NotNull Promise logExceptions() { + return addListener(ctx -> { + if (ctx.isError()) { + getLogger().error("Exception caught in promise chain", ctx.getException()); + } + }); + } + + @Override + public @NotNull Promise addListener(@NotNull PromiseListener listener) { + if (isCompleted()) { + getExecutor().submit(() -> { + try { + //noinspection ConstantConditions + listener.handle(getCompletion()); + } catch (Exception e) { + getLogger().error("Exception caught in promise listener", e); + } + }); + } else { + getListeners().add(listener); + } + + return this; + } + + @Override + public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { + getExecutor().schedule(() -> { + if (!isCompleted()) { + completeExceptionally(new TimeoutException("Promise timed out after " + time + " " + unit)); + } + }, time, unit); + + return this; + } + + @Override + public @NotNull Promise timeout(long ms) { + return timeout(ms, TimeUnit.MILLISECONDS); + } + + protected void handleCompletion(@NotNull PromiseCompletion ctx) { + if (this.isCompleted()) return; + setCompletion(ctx); + + getExecutor().submit(() -> { + for (PromiseListener listener : getListeners()) { + if (!ctx.isActive()) return; + + try { + listener.handle(ctx); + } catch (Exception e) { + e.printStackTrace(); + getLogger().error("Exception caught in promise listener", e); + } + } + }); + } + + @Override + public void complete(@Nullable T result) { + handleCompletion(new PromiseCompletion<>(result)); + } + + @Override + public void completeExceptionally(@NotNull Throwable result) { + handleCompletion(new PromiseCompletion<>(result)); + } + + @Override + public boolean isCompleted() { + return getCompletion() != null; + } + + protected Collection> getListeners() { + return listeners; + } + + @Override + public @Nullable PromiseCompletion getCompletion() { + return completion; + } + + protected void setCompletion(@NotNull PromiseCompletion completion) { + this.completion = completion; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index 6047ad0..a188e82 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -4,406 +4,87 @@ import dev.tommyjs.futur.function.ExceptionalConsumer; import dev.tommyjs.futur.function.ExceptionalFunction; import dev.tommyjs.futur.function.ExceptionalRunnable; import dev.tommyjs.futur.function.ExceptionalSupplier; -import dev.tommyjs.futur.scheduler.Schedulers; -import dev.tommyjs.futur.trace.ExecutorTrace; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; -public class Promise { +public interface Promise { - private static final String PACKAGE; - private static final Logger LOGGER = LoggerFactory.getLogger(Promise.class); - - static { - String[] packageElements = Promise.class.getPackageName().split("\\."); - int i = 0; - - StringBuilder packageBuilder = new StringBuilder(); - while (i < 3) { - packageBuilder.append(packageElements[i]); - i++; - } - - PACKAGE = packageBuilder.toString(); + static @NotNull Promise resolve(T value, PromiseFactory factory) { + return factory.resolve(value); } - private final Collection> listeners; - private final StackTraceElement[] stackTrace; - - private @Nullable PromiseCompletion completion; - - public Promise() { - this.listeners = new ConcurrentLinkedQueue<>(); - this.completion = null; - this.stackTrace = Arrays.stream(Thread.currentThread().getStackTrace()) - .filter(v -> !v.getClassName().startsWith(PACKAGE)) - .toArray(StackTraceElement[]::new); + static @NotNull Promise error(Throwable error, PromiseFactory factory) { + return factory.error(error); } - public T join(long interval, long timeout) throws TimeoutException { - long start = System.currentTimeMillis(); - while (!isCompleted()) { - if (System.currentTimeMillis() > start + timeout) - throw new TimeoutException("Promise timed out after " + timeout + "ms"); - - try { - Thread.sleep(interval); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - PromiseCompletion completion = getCompletion(); - if (completion == null) { - throw new IllegalStateException(); - } - - if (completion.isError()) { - throw new RuntimeException(completion.getException()); - } - - return completion.getResult(); + static @NotNull Promise unresolved(PromiseFactory factory) { + return factory.unresolved(); } - public @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task) { - return thenApplySync(result -> { - task.run(); - return null; - }, Schedulers.getTrace(task)); + static @NotNull Promise start(PromiseFactory factory) { + return factory.resolve(null); } - public @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(result -> { - task.run(); - return null; - }, delay, unit, Schedulers.getTrace(task)); - } + PromiseFactory getFactory(); - public @NotNull Promise thenConsumeSync(@NotNull ExceptionalConsumer task) { - return thenApplySync(result -> { - task.accept(result); - return null; - }, Schedulers.getTrace(task)); - } + T join(long interval, long timeout) throws TimeoutException; - public @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(result -> { - task.accept(result); - return null; - }, delay, unit, Schedulers.getTrace(task)); - } + @NotNull Promise thenRunSync(@NotNull ExceptionalRunnable task); - public @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task) { - return thenApplySync(result -> task.get(), Schedulers.getTrace(task)); - } + @NotNull Promise thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit); - public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(result -> task.get(), delay, unit, Schedulers.getTrace(task)); - } + @NotNull Promise thenConsumeSync(@NotNull ExceptionalConsumer task); - public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task, @NotNull ExecutorTrace trace) { - Promise promise = new Promise<>(); - addListener(ctx -> { - if (ctx.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx.getException()); - return; - } + @NotNull Promise thenConsumeDelayedSync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit); - Runnable runnable = createRunnable(ctx, promise, task); - Schedulers.runSync(runnable, trace); - }); + @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task); - return promise; - } + @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit); - public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { - return thenApplySync(task, Schedulers.getTrace(task)); - } + @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task); - public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - Promise promise = new Promise<>(); - addListener(ctx -> { - if (ctx.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx.getException()); - return; - } + @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); - Runnable runnable = createRunnable(ctx, promise, task); - Schedulers.runDelayedSync(runnable, delay, unit, trace); - }); + @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task); - return promise; - } + @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task); - public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(task, delay, unit, Schedulers.getTrace(task)); - } + @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit); - public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { - Promise promise = new Promise<>(); - thenApplySync(task, Schedulers.getTrace(task)).thenConsumeAsync(nestedPromise -> { - nestedPromise.addListener(ctx1 -> { - if (ctx1.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx1.getException()); - return; - } + @NotNull Promise thenConsumeAsync(@NotNull ExceptionalConsumer task); - promise.complete(ctx1.getResult()); - }); - }).addListener(ctx2 -> { - if (ctx2.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx2.getException()); - } - }); + @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit); - return promise; - } + @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task); - public @NotNull Promise thenRunAsync(@NotNull ExceptionalRunnable task) { - return thenApplyAsync(result -> { - task.run(); - return null; - }, Schedulers.getTrace(task)); - } + @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit); - public @NotNull Promise thenRunDelayedAsync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(result -> { - task.run(); - return null; - }, delay, unit, Schedulers.getTrace(task)); - } + @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference); - public @NotNull Promise thenConsumeAsync(@NotNull ExceptionalConsumer task) { - return thenApplyAsync(result -> { - task.accept(result); - return null; - }, Schedulers.getTrace(task)); - } + @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task); - public @NotNull Promise thenConsumeDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(result -> { - task.accept(result); - return null; - }, delay, unit, Schedulers.getTrace(task)); - } + @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); - @Deprecated(forRemoval = true) - public @NotNull Promise thenConsumerDelayedAsync(@NotNull ExceptionalConsumer task, long delay, @NotNull TimeUnit unit) { - return thenConsumeDelayedAsync(task, delay, unit); - } + @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task); - public @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task) { - return thenApplyAsync(result -> task.get(), Schedulers.getTrace(task)); - } + @NotNull Promise logExceptions(); - public @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(result -> task.get(), delay, unit, Schedulers.getTrace(task)); - } + @NotNull Promise addListener(@NotNull PromiseListener listener); - public @NotNull Promise thenPopulateReference(@NotNull AtomicReference reference) { - return thenApplyAsync((result) -> { - reference.set(result); - return result; - }); - } + @NotNull Promise timeout(long time, @NotNull TimeUnit unit); - public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task, @NotNull ExecutorTrace trace) { - Promise promise = new Promise<>(); - addListener(ctx -> { - createRunnable(ctx, promise, task).run(); - }); + @NotNull Promise timeout(long ms); - return promise; - } + void complete(@Nullable T result); - public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { - return thenApplyAsync(task, Schedulers.getTrace(task)); - } + void completeExceptionally(@NotNull Throwable result); - public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - Promise promise = new Promise<>(); - addListener(ctx -> { - Runnable runnable = createRunnable(ctx, promise, task); - Schedulers.runDelayedAsync(runnable, delay, unit, trace); - }); + boolean isCompleted(); - return promise; - } - - public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(task, delay, unit, Schedulers.getTrace(task)); - } - - public @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task) { - return this.thenComposeAsync(task); - } - - public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { - Promise promise = new Promise<>(); - thenApplyAsync(task, Schedulers.getTrace(task)).thenConsumeAsync(nestedPromise -> { - nestedPromise.addListener(ctx1 -> { - if (ctx1.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx1.getException()); - return; - } - - promise.complete(ctx1.getResult()); - }); - }).addListener(ctx2 -> { - if (ctx2.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx2.getException()); - } - }); - - return promise; - } - - private @NotNull Runnable createRunnable(@NotNull PromiseCompletion ctx, @NotNull Promise promise, @NotNull ExceptionalFunction task) { - return () -> { - if (ctx.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx.getException()); - return; - } - - try { - V result = task.apply(ctx.getResult()); - promise.complete(result); - } catch (Exception e) { - promise.completeExceptionally(e, true); - } - }; - } - - public @NotNull Promise logExceptions() { - return addListener(ctx -> { - if (ctx.isError()) { - LOGGER.error("Exception caught in promise pipeline", ctx.getException()); - } - }); - } - - public @NotNull Promise addListener(@NotNull PromiseListener listener) { - if (isCompleted()) { - Schedulers.runAsync(() -> { - try { - listener.handle(getCompletion()); - } catch (Exception e) { - LOGGER.error("Exception caught in promise listener", e); - } - }, Schedulers.getTrace(listener)); - } else { - getListeners().add(listener); - } - - return this; - } - - public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { - Schedulers.runDelayedAsync(() -> { - if (!isCompleted()) { - completeExceptionally(new TimeoutException("Promise timed out after " + time + " " + unit), true); - } - }, time, unit); - - return this; - } - - public @NotNull Promise timeout(long ms) { - return timeout(ms, TimeUnit.MILLISECONDS); - } - - protected void handleCompletion(@NotNull PromiseCompletion ctx) { - if (this.isCompleted()) return; - setCompletion(ctx); - - Schedulers.runAsync(() -> { - for (PromiseListener listener : getListeners()) { - if (!ctx.isActive()) return; - - try { - listener.handle(ctx); - } catch (Exception e) { - LOGGER.error("Exception caught in promise listener", e); - } - } - }); - } - - public void complete(@Nullable T result) { - handleCompletion(new PromiseCompletion<>(result)); - } - - public void completeExceptionally(@NotNull Throwable result, boolean appendStacktrace) { - if (appendStacktrace && this.stackTrace != null) { - result.setStackTrace(Stream.of(result.getStackTrace(), this.stackTrace) - .flatMap(Stream::of) - .filter(v -> !v.getClassName().startsWith(PACKAGE)) - .filter(v -> !v.getClassName().startsWith("java.lang.Thread")) - .filter(v -> !v.getClassName().startsWith("java.util.concurrent")) - .toArray(StackTraceElement[]::new)); - } - - handleCompletion(new PromiseCompletion<>(result)); - } - - public void completeExceptionally(@NotNull Throwable result) { - completeExceptionally(result, false); - } - - public boolean isCompleted() { - return getCompletion() != null; - } - - protected Collection> getListeners() { - return listeners; - } - - public @Nullable PromiseCompletion getCompletion() { - return completion; - } - - protected void setCompletion(@NotNull PromiseCompletion completion) { - this.completion = completion; - } - - public static @NotNull Promise resolve(T value) { - Promise promise = new Promise<>(); - promise.setCompletion(new PromiseCompletion<>(value)); - return promise; - } - - public static @NotNull Promise error(Throwable error) { - Promise promise = new Promise<>(); - promise.completeExceptionally(error); - return promise; - } - - public static @NotNull Promise start() { - return Promise.resolve(null); - } - - @Deprecated // use resolve() - public static @NotNull Promise start(T start) { - Promise promise = new Promise<>(); - promise.complete(start); - return promise; - } + @Nullable PromiseCompletion getCompletion(); } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java new file mode 100644 index 0000000..923e967 --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -0,0 +1,35 @@ +package dev.tommyjs.futur.promise; + +import dev.tommyjs.futur.impl.SimplePromiseFactory; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +public interface PromiseFactory { + + @NotNull Promise resolve(T value); + + @NotNull Promise unresolved(); + + @NotNull Promise error(Throwable error); + + static PromiseFactory create(ScheduledExecutorService executor, Logger logger) { + return new SimplePromiseFactory(executor, logger); + } + + static PromiseFactory create(ScheduledExecutorService executor) { + return create(executor, LoggerFactory.getLogger(SimplePromiseFactory.class)); + } + + static PromiseFactory create(int threadPoolSize) { + return create(Executors.newScheduledThreadPool(threadPoolSize)); + } + + static PromiseFactory create() { + return create(Runtime.getRuntime().availableProcessors()); + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java index 7ecf0c4..ba0fb00 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java @@ -13,8 +13,8 @@ import java.util.stream.Collectors; public class Promises { - public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { - Promise> promise = new Promise<>(); + public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2, PromiseFactory factory) { + Promise> promise = factory.unresolved(); p1.addListener(ctx -> { if (ctx.isError()) { //noinspection ConstantConditions @@ -37,13 +37,15 @@ public class Promises { return promise; } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler) { - Map map = new HashMap<>(); - if (promises.isEmpty()) return Promise.resolve(map); + public static @NotNull Promise> combine(@NotNull Promise p1, @NotNull Promise p2) { + return combine(p1, p2, p1.getFactory()); + } + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler, PromiseFactory factory) { + Map map = new HashMap<>(); ReentrantLock lock = new ReentrantLock(); - Promise> promise = new Promise<>(); + Promise> promise = factory.unresolved(); for (Map.Entry> entry : promises.entrySet()) { entry.getValue().addListener((ctx) -> { lock.lock(); @@ -69,25 +71,23 @@ public class Promises { return promise.timeout(timeout); } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict) { - return combine(promises, timeout, strict ? null : (_k, _v) -> {}); + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict, PromiseFactory factory) { + return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory); } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout) { - return combine(promises, timeout, true); + public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, PromiseFactory factory) { + return combine(promises, timeout, true, factory); } - public static @NotNull Promise> combine(@NotNull Map> promises) { - return combine(promises, 1500L, true); + public static @NotNull Promise> combine(@NotNull Map> promises, PromiseFactory factory) { + return combine(promises, 1500L, true, factory); } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict) { + public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict, PromiseFactory factory) { AtomicInteger index = new AtomicInteger(); return combine( - promises.stream() - .collect(Collectors.toMap(s -> index.getAndIncrement(), v -> v)), - timeout, - strict + promises.stream().collect(Collectors.toMap(s -> index.getAndIncrement(), v -> v)), + timeout, strict, factory ).thenApplySync(v -> v.entrySet().stream() .sorted(Map.Entry.comparingByKey()) @@ -96,18 +96,16 @@ public class Promises { ); } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout) { - return combine(promises, timeout, true); + public static @NotNull Promise> combine(@NotNull List> promises, long timeout, PromiseFactory factory) { + return combine(promises, timeout, true, factory); } - public static @NotNull Promise> combine(@NotNull List> promises) { - return combine(promises, 1500L, true); + public static @NotNull Promise> combine(@NotNull List> promises, PromiseFactory factory) { + return combine(promises, 1500L, true, factory); } - public static @NotNull Promise all(@NotNull List> promises) { - if (promises.isEmpty()) return Promise.start(); - - Promise promise = new Promise<>(); + public static @NotNull Promise all(@NotNull List> promises, PromiseFactory factory) { + Promise promise = factory.unresolved(); for (Promise p : promises) { p.addListener((ctx) -> { if (ctx.isError()) { @@ -121,30 +119,26 @@ public class Promises { return promise; } - public static @NotNull Promise all(@NotNull Promise... promises) { - return all(Arrays.asList(promises)); - } - - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, boolean strict) { + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, boolean strict, PromiseFactory factory) { Map> promises = new HashMap<>(); for (K key : keys) { - Promise promise = Promise.resolve(key).thenApplyAsync(mapper); + Promise promise = factory.resolve(key).thenApplyAsync(mapper); promises.put(key, promise); } - return combine(promises, timeout, strict); + return combine(promises, timeout, strict, factory); } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout) { - return combine(keys, mapper, timeout, true); + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, PromiseFactory factory) { + return combine(keys, mapper, timeout, true, factory); } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper) { - return combine(keys, mapper, 1500L, true); + public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, PromiseFactory factory) { + return combine(keys, mapper, 1500L, true, factory); } - public static @NotNull Promise erase(@NotNull Promise p) { - Promise promise = new Promise<>(); + public static @NotNull Promise erase(@NotNull Promise p, PromiseFactory factory) { + Promise promise = factory.unresolved(); p.addListener(ctx -> { if (ctx.isError()) { //noinspection ConstantConditions @@ -157,8 +151,12 @@ public class Promises { return promise; } - public static @NotNull Promise wrap(@NotNull CompletableFuture future) { - Promise promise = new Promise<>(); + public static @NotNull Promise erase(@NotNull Promise p) { + return erase(p, p.getFactory()); + } + + public static @NotNull Promise wrap(@NotNull CompletableFuture future, PromiseFactory factory) { + Promise promise = factory.unresolved(); future.whenComplete((result, e) -> { if (e != null) { promise.completeExceptionally(e); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromise.java new file mode 100644 index 0000000..8a7986f --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromise.java @@ -0,0 +1,24 @@ +package dev.tommyjs.futur.promise; + +import org.slf4j.Logger; + +import java.util.concurrent.ScheduledExecutorService; + +public class StaticPromise extends AbstractPromise { + + @Override + protected ScheduledExecutorService getExecutor() { + return StaticPromiseFactory.EXECUTOR; + } + + @Override + protected Logger getLogger() { + return StaticPromiseFactory.LOGGER; + } + + @Override + public PromiseFactory getFactory() { + return StaticPromiseFactory.INSTANCE; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromiseFactory.java new file mode 100644 index 0000000..7c73eec --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromiseFactory.java @@ -0,0 +1,44 @@ +package dev.tommyjs.futur.promise; + +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +public class StaticPromiseFactory implements PromiseFactory { + + public static final @NotNull PromiseFactory INSTANCE; + public static final @NotNull ScheduledExecutorService EXECUTOR; + public static final @NotNull Logger LOGGER; + + static { + INSTANCE = new StaticPromiseFactory(); + EXECUTOR = Executors.newSingleThreadScheduledExecutor(); + LOGGER = LoggerFactory.getLogger(StaticPromiseFactory.class); + } + + private StaticPromiseFactory() { + } + + @Override + public @NotNull Promise resolve(T value) { + AbstractPromise promise = new StaticPromise<>(); + promise.setCompletion(new PromiseCompletion<>(value)); + return promise; + } + + @Override + public @NotNull Promise unresolved() { + return new StaticPromise<>(); + } + + @Override + public @NotNull Promise error(Throwable error) { + AbstractPromise promise = new StaticPromise<>(); + promise.completeExceptionally(error); + return promise; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Scheduler.java b/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Scheduler.java deleted file mode 100644 index 16889f1..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Scheduler.java +++ /dev/null @@ -1,37 +0,0 @@ -package dev.tommyjs.futur.scheduler; - -import dev.tommyjs.futur.trace.ExecutorTrace; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -public interface Scheduler { - - Logger LOGGER = LoggerFactory.getLogger(Scheduler.class); - - void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace); - - void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - - void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - - void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace); - - void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - - void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - - default @NotNull Runnable wrapExceptions(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - return () -> { - try { - task.run(); - } catch (Exception e) { - LOGGER.error("Exception in scheduled task: {}", e.getClass().getName()); - LOGGER.error(trace.toString()); - } - }; - } - -} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Schedulers.java b/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Schedulers.java deleted file mode 100644 index 5416e2d..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Schedulers.java +++ /dev/null @@ -1,105 +0,0 @@ -package dev.tommyjs.futur.scheduler; - -import dev.tommyjs.futur.trace.ExecutorTrace; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -@SuppressWarnings("ConstantConditions") -public class Schedulers { - - private static final Logger LOGGER = LoggerFactory.getLogger(Schedulers.class); - - private static @Nullable Scheduler scheduler; - - public static void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runSync(task, trace); - } - - public static void runSync(@NotNull Runnable task) { - ensureLoaded(); - getScheduler().runSync(task, Schedulers.getTrace(task)); - } - - public static void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runDelayedSync(task, delay, unit, trace); - } - - public static void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { - ensureLoaded(); - getScheduler().runDelayedSync(task, delay, unit, Schedulers.getTrace(task)); - } - - public static void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runRepeatingSync(task, interval, unit, trace); - } - - public static void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit) { - ensureLoaded(); - getScheduler().runRepeatingSync(task, interval, unit, Schedulers.getTrace(task)); - } - - public static void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runAsync(task, trace); - } - - public static void runAsync(@NotNull Runnable task) { - ensureLoaded(); - getScheduler().runAsync(task, Schedulers.getTrace(task)); - } - - public static void runDelayedAsync(@NotNull Runnable task, long delay, TimeUnit unit, ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runDelayedAsync(task, delay, unit, trace); - } - - public static void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { - ensureLoaded(); - getScheduler().runDelayedAsync(task, delay, unit, Schedulers.getTrace(task)); - } - - public static void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - ensureLoaded(); - getScheduler().runRepeatingAsync(task, interval, unit, trace); - } - - public static void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit) { - ensureLoaded(); - getScheduler().runRepeatingAsync(task, interval, unit, Schedulers.getTrace(task)); - } - - public static ExecutorTrace getTrace(@NotNull Object function) { - return new ExecutorTrace(function.getClass(), Thread.currentThread().getStackTrace()); - } - - public static void ensureLoaded() { - if (getScheduler() == null) { - LOGGER.warn("No scheduler loaded, falling back to default single threaded scheduler"); - setScheduler(SingleExecutorScheduler.create()); - } - } - - public static void loadDefaultScheduler() { - - } - - public static boolean isLoaded() { - return getScheduler() != null; - } - - public static @Nullable Scheduler getScheduler() { - return scheduler; - } - - public static void setScheduler(@NotNull Scheduler scheduler) { - Schedulers.scheduler = scheduler; - } - -} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/SingleExecutorScheduler.java b/futur-api/src/main/java/dev/tommyjs/futur/scheduler/SingleExecutorScheduler.java deleted file mode 100644 index 1c2164d..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/SingleExecutorScheduler.java +++ /dev/null @@ -1,52 +0,0 @@ -package dev.tommyjs.futur.scheduler; - -import dev.tommyjs.futur.trace.ExecutorTrace; -import org.jetbrains.annotations.NotNull; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class SingleExecutorScheduler implements Scheduler { - - private final ScheduledExecutorService service; - - protected SingleExecutorScheduler(ScheduledExecutorService service) { - this.service = service; - } - - @Override - public void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - service.submit(wrapExceptions(task, trace)); - } - - @Override - public void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - service.schedule(wrapExceptions(task, trace), delay, unit); - } - - @Override - public void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - service.scheduleAtFixedRate(wrapExceptions(task, trace), 0L, interval, unit); - } - - @Override - public void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - runSync(task, trace); - } - - @Override - public void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - runDelayedSync(task, delay, unit, trace); - } - - @Override - public void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - runRepeatingSync(task, interval, unit, trace); - } - - public static SingleExecutorScheduler create() { - return new SingleExecutorScheduler(Executors.newSingleThreadScheduledExecutor()); - } - -} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/trace/ExecutorTrace.java b/futur-api/src/main/java/dev/tommyjs/futur/trace/ExecutorTrace.java deleted file mode 100644 index 47d4503..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/trace/ExecutorTrace.java +++ /dev/null @@ -1,31 +0,0 @@ -package dev.tommyjs.futur.trace; - -import org.jetbrains.annotations.NotNull; - -import java.util.Arrays; -import java.util.stream.Collectors; - -public class ExecutorTrace { - - private final @NotNull Class clazz; - private final @NotNull StackTraceElement[] trace; - - public ExecutorTrace(@NotNull Class clazz, @NotNull StackTraceElement[] trace) { - this.clazz = clazz; - this.trace = trace; - } - - public @NotNull Class getClazz() { - return clazz; - } - - public @NotNull StackTraceElement[] getTrace() { - return trace; - } - - @Override - public String toString() { - return Arrays.stream(trace).map(StackTraceElement::toString).collect(Collectors.joining("\n")); - } - -} diff --git a/futur-reactive-streams/build.gradle.kts b/futur-reactive-streams/build.gradle.kts index 147c32f..2a05a9f 100644 --- a/futur-reactive-streams/build.gradle.kts +++ b/futur-reactive-streams/build.gradle.kts @@ -6,7 +6,7 @@ plugins { } group = "dev.tommyjs" -version = "1.0.1" +version = "2.0.0" repositories { mavenCentral() diff --git a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java index cdb9129..4b655ed 100644 --- a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java +++ b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java @@ -1,13 +1,15 @@ package dev.tommyjs.futur.reactivestreams; import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.promise.StaticPromiseFactory; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; public class ReactiveTransformer { - public static @NotNull Promise wrapPublisher(@NotNull Publisher publisher) { - SingleAccumulatorSubscriber subscriber = SingleAccumulatorSubscriber.create(); + public static @NotNull Promise wrapPublisher(@NotNull Publisher publisher, PromiseFactory factory) { + SingleAccumulatorSubscriber subscriber = SingleAccumulatorSubscriber.create(factory); publisher.subscribe(subscriber); return subscriber.getPromise(); } diff --git a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java index ee752ef..921d852 100644 --- a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java +++ b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java @@ -1,6 +1,8 @@ package dev.tommyjs.futur.reactivestreams; import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.promise.StaticPromiseFactory; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -40,8 +42,12 @@ public class SingleAccumulatorSubscriber implements Subscriber { return new SingleAccumulatorSubscriber<>(promise); } + public static SingleAccumulatorSubscriber create(PromiseFactory factory) { + return create(factory.unresolved()); + } + public static SingleAccumulatorSubscriber create() { - return create(new Promise<>()); + return create(StaticPromiseFactory.INSTANCE); } } diff --git a/futur-reactor/build.gradle.kts b/futur-reactor/build.gradle.kts index dc4b6e3..820caf0 100644 --- a/futur-reactor/build.gradle.kts +++ b/futur-reactor/build.gradle.kts @@ -4,7 +4,7 @@ plugins { } group = "dev.tommyjs" -version = "1.0.1" +version = "2.0.0" repositories { mavenCentral() diff --git a/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java b/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java index 9f6817d..c6a65cc 100644 --- a/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java +++ b/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java @@ -1,6 +1,8 @@ package dev.tommyjs.futur.reactor; import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.promise.StaticPromiseFactory; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -11,14 +13,14 @@ import java.util.concurrent.atomic.AtomicReference; public class ReactorTransformer { - public static @NotNull Promise wrapMono(@NotNull Mono mono) { - Promise promise = new Promise<>(); + public static @NotNull Promise wrapMono(@NotNull Mono mono, PromiseFactory factory) { + Promise promise = factory.unresolved(); mono.doOnSuccess(promise::complete).doOnError(promise::completeExceptionally).subscribe(); return promise; } - public static @NotNull Promise<@NotNull List> wrapFlux(@NotNull Flux flux) { - Promise> promise = new Promise<>(); + public static @NotNull Promise<@NotNull List> wrapFlux(@NotNull Flux flux, PromiseFactory factory) { + Promise> promise = factory.unresolved(); AtomicReference> out = new AtomicReference<>(new ArrayList<>()); flux.doOnNext(out.get()::add).subscribe(); diff --git a/futur-standalone/.gitignore b/futur-standalone/.gitignore deleted file mode 100644 index b63da45..0000000 --- a/futur-standalone/.gitignore +++ /dev/null @@ -1,42 +0,0 @@ -.gradle -build/ -!gradle/wrapper/gradle-wrapper.jar -!**/src/main/**/build/ -!**/src/test/**/build/ - -### IntelliJ IDEA ### -.idea/modules.xml -.idea/jarRepositories.xml -.idea/compiler.xml -.idea/libraries/ -*.iws -*.iml -*.ipr -out/ -!**/src/main/**/out/ -!**/src/test/**/out/ - -### Eclipse ### -.apt_generated -.classpath -.factorypath -.project -.settings -.springBeans -.sts4-cache -bin/ -!**/src/main/**/bin/ -!**/src/test/**/bin/ - -### NetBeans ### -/nbproject/private/ -/nbbuild/ -/dist/ -/nbdist/ -/.nb-gradle/ - -### VS Code ### -.vscode/ - -### Mac OS ### -.DS_Store \ No newline at end of file diff --git a/futur-standalone/build.gradle.kts b/futur-standalone/build.gradle.kts deleted file mode 100644 index e637a80..0000000 --- a/futur-standalone/build.gradle.kts +++ /dev/null @@ -1,34 +0,0 @@ -import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar - -plugins { - id("java") - id("com.github.johnrengelman.shadow") version "7.1.2" -} - -group = "dev.tommyjs" -version = "1.0.1" - -repositories { - mavenCentral() -} - -dependencies { - implementation("org.jetbrains:annotations:24.1.0") - compileOnly(project(mapOf("path" to ":futur-api"))) - testImplementation(platform("org.junit:junit-bom:5.9.1")) - testImplementation("org.junit.jupiter:junit-jupiter") -} - -tasks { - build { - dependsOn(shadowJar) - } - - withType { - exclude("META-INF/**") - } -} - -tasks.test { - useJUnitPlatform() -} \ No newline at end of file diff --git a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ExclusiveThreadPoolScheduler.java b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ExclusiveThreadPoolScheduler.java deleted file mode 100644 index 6bb4399..0000000 --- a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ExclusiveThreadPoolScheduler.java +++ /dev/null @@ -1,61 +0,0 @@ -package dev.tommyjs.futur.standalone; - -import dev.tommyjs.futur.scheduler.Scheduler; -import dev.tommyjs.futur.trace.ExecutorTrace; -import org.jetbrains.annotations.NotNull; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class ExclusiveThreadPoolScheduler implements Scheduler { - - private final ScheduledExecutorService executor; - - protected ExclusiveThreadPoolScheduler(ScheduledExecutorService executor) { - this.executor = executor; - } - - @Override - public void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - throw new UnsupportedOperationException("Sync task invoked on asynchronous environment"); - } - - @Override - public void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - throw new UnsupportedOperationException("Sync task invoked on asynchronous environment"); - } - - @Override - public void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - throw new UnsupportedOperationException("Sync task invoked on asynchronous environment"); - } - - @Override - public void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - executor.submit(wrapExceptions(task, trace)); - } - - @Override - public void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - executor.schedule(wrapExceptions(task, trace), delay, unit); - } - - @Override - public void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - executor.scheduleAtFixedRate(wrapExceptions(task, trace), 0L, interval, unit); - } - - public @NotNull ScheduledExecutorService getExecutor() { - return executor; - } - - public static ExclusiveThreadPoolScheduler create(ScheduledExecutorService executor) { - return new ExclusiveThreadPoolScheduler(executor); - } - - public static ExclusiveThreadPoolScheduler create(int nThreads) { - return create(Executors.newScheduledThreadPool(nThreads)); - } - -} diff --git a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ThreadPoolScheduler.java b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ThreadPoolScheduler.java deleted file mode 100644 index 4272c49..0000000 --- a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ThreadPoolScheduler.java +++ /dev/null @@ -1,63 +0,0 @@ -package dev.tommyjs.futur.standalone; - -import dev.tommyjs.futur.scheduler.Scheduler; -import dev.tommyjs.futur.trace.ExecutorTrace; -import org.jetbrains.annotations.NotNull; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class ThreadPoolScheduler implements Scheduler { - - private final ScheduledExecutorService syncExecutor; - private final ScheduledExecutorService asyncExecutor; - - protected ThreadPoolScheduler(ScheduledExecutorService syncExecutor, ScheduledExecutorService asyncExecutor) { - this.syncExecutor = syncExecutor; - this.asyncExecutor = asyncExecutor; - } - - @Override - public void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - syncExecutor.submit(wrapExceptions(task, trace)); - } - - @Override - public void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - syncExecutor.schedule(wrapExceptions(task, trace), delay, unit); - } - - @Override - public void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - syncExecutor.scheduleAtFixedRate(wrapExceptions(task, trace), 0L, interval, unit); - } - - @Override - public void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - asyncExecutor.submit(wrapExceptions(task, trace)); - } - - @Override - public void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - asyncExecutor.schedule(wrapExceptions(task, trace), delay, unit); - } - - @Override - public void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - asyncExecutor.scheduleAtFixedRate(wrapExceptions(task, trace), 0L, interval, unit); - } - - public @NotNull ScheduledExecutorService getSyncExecutor() { - return syncExecutor; - } - - public @NotNull ScheduledExecutorService getAsyncExecutor() { - return asyncExecutor; - } - - public static ThreadPoolScheduler create(int nThreads) { - return new ThreadPoolScheduler(Executors.newSingleThreadScheduledExecutor(), Executors.newScheduledThreadPool(nThreads)); - } - -}