diff --git a/futur-api/build.gradle.kts b/futur-api/build.gradle.kts index bc98c91..9495788 100644 --- a/futur-api/build.gradle.kts +++ b/futur-api/build.gradle.kts @@ -6,7 +6,7 @@ plugins { } group = "dev.tommyjs" -version = "2.0.0" +version = "2.1.0" repositories { mavenCentral() diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java new file mode 100644 index 0000000..bbb1dec --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/DualPoolExecutor.java @@ -0,0 +1,33 @@ +package dev.tommyjs.futur.executor; + +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class DualPoolExecutor implements PromiseExecutor { + + private final @NotNull ScheduledExecutorService syncSvc; + private final @NotNull ScheduledExecutorService asyncSvc; + + public DualPoolExecutor(@NotNull ScheduledExecutorService syncSvc, @NotNull ScheduledExecutorService asyncSvc) { + this.syncSvc = syncSvc; + this.asyncSvc = asyncSvc; + } + + @Override + public void runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + syncSvc.schedule(task, delay, unit); + } + + @Override + public void runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + asyncSvc.schedule(task, delay, unit); + } + + public static @NotNull DualPoolExecutor create(int asyncPoolSize) { + return new DualPoolExecutor(Executors.newSingleThreadScheduledExecutor(), Executors.newScheduledThreadPool(asyncPoolSize)); + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java new file mode 100644 index 0000000..db0b600 --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/PromiseExecutor.java @@ -0,0 +1,13 @@ +package dev.tommyjs.futur.executor; + +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.TimeUnit; + +public interface PromiseExecutor { + + void runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit); + + void runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit); + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/executor/SinglePoolExecutor.java b/futur-api/src/main/java/dev/tommyjs/futur/executor/SinglePoolExecutor.java new file mode 100644 index 0000000..310ad11 --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/executor/SinglePoolExecutor.java @@ -0,0 +1,31 @@ +package dev.tommyjs.futur.executor; + +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class SinglePoolExecutor implements PromiseExecutor { + + private final @NotNull ScheduledExecutorService service; + + public SinglePoolExecutor(@NotNull ScheduledExecutorService service) { + this.service = service; + } + + @Override + public void runSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + service.schedule(task, delay, unit); + } + + @Override + public void runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) { + service.schedule(task, delay, unit); + } + + public static @NotNull SinglePoolExecutor create(int threadPoolSize) { + return new SinglePoolExecutor(Executors.newScheduledThreadPool(threadPoolSize)); + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java index e810563..df268fe 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java @@ -1,25 +1,24 @@ package dev.tommyjs.futur.impl; +import dev.tommyjs.futur.executor.PromiseExecutor; import dev.tommyjs.futur.promise.AbstractPromise; import dev.tommyjs.futur.promise.PromiseFactory; import org.slf4j.Logger; -import java.util.concurrent.ScheduledExecutorService; - public class SimplePromise extends AbstractPromise { - private final ScheduledExecutorService executor; + private final PromiseExecutor executor; private final Logger logger; private final PromiseFactory factory; - public SimplePromise(ScheduledExecutorService executor, Logger logger, PromiseFactory factory) { + public SimplePromise(PromiseExecutor executor, Logger logger, PromiseFactory factory) { this.executor = executor; this.logger = logger; this.factory = factory; } @Override - protected ScheduledExecutorService getExecutor() { + protected PromiseExecutor getExecutor() { return executor; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java index 0bcac55..47c8b7b 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java @@ -1,19 +1,18 @@ package dev.tommyjs.futur.impl; +import dev.tommyjs.futur.executor.PromiseExecutor; import dev.tommyjs.futur.promise.AbstractPromise; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseFactory; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; -import java.util.concurrent.ScheduledExecutorService; - public class SimplePromiseFactory implements PromiseFactory { - private final ScheduledExecutorService executor; + private final PromiseExecutor executor; private final Logger logger; - public SimplePromiseFactory(ScheduledExecutorService executor, Logger logger) { + public SimplePromiseFactory(PromiseExecutor executor, Logger logger) { this.executor = executor; this.logger = logger; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/impl/StaticPromise.java similarity index 61% rename from futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromise.java rename to futur-api/src/main/java/dev/tommyjs/futur/impl/StaticPromise.java index 8a7986f..76e2324 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/impl/StaticPromise.java @@ -1,13 +1,14 @@ -package dev.tommyjs.futur.promise; +package dev.tommyjs.futur.impl; +import dev.tommyjs.futur.executor.PromiseExecutor; +import dev.tommyjs.futur.promise.AbstractPromise; +import dev.tommyjs.futur.promise.PromiseFactory; import org.slf4j.Logger; -import java.util.concurrent.ScheduledExecutorService; - public class StaticPromise extends AbstractPromise { @Override - protected ScheduledExecutorService getExecutor() { + protected PromiseExecutor getExecutor() { return StaticPromiseFactory.EXECUTOR; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/impl/StaticPromiseFactory.java similarity index 69% rename from futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromiseFactory.java rename to futur-api/src/main/java/dev/tommyjs/futur/impl/StaticPromiseFactory.java index 7c73eec..a5e39ea 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/impl/StaticPromiseFactory.java @@ -1,21 +1,25 @@ -package dev.tommyjs.futur.promise; +package dev.tommyjs.futur.impl; +import dev.tommyjs.futur.executor.PromiseExecutor; +import dev.tommyjs.futur.executor.SinglePoolExecutor; +import dev.tommyjs.futur.promise.AbstractPromise; +import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.PromiseFactory; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; public class StaticPromiseFactory implements PromiseFactory { public static final @NotNull PromiseFactory INSTANCE; - public static final @NotNull ScheduledExecutorService EXECUTOR; + public static final @NotNull PromiseExecutor EXECUTOR; public static final @NotNull Logger LOGGER; static { INSTANCE = new StaticPromiseFactory(); - EXECUTOR = Executors.newSingleThreadScheduledExecutor(); + EXECUTOR = SinglePoolExecutor.create(1); LOGGER = LoggerFactory.getLogger(StaticPromiseFactory.class); } @@ -25,7 +29,7 @@ public class StaticPromiseFactory implements PromiseFactory { @Override public @NotNull Promise resolve(T value) { AbstractPromise promise = new StaticPromise<>(); - promise.setCompletion(new PromiseCompletion<>(value)); + promise.complete(value); return promise; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index b4b6c7b..f38e838 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -1,5 +1,6 @@ package dev.tommyjs.futur.promise; +import dev.tommyjs.futur.executor.PromiseExecutor; import dev.tommyjs.futur.function.ExceptionalConsumer; import dev.tommyjs.futur.function.ExceptionalFunction; import dev.tommyjs.futur.function.ExceptionalRunnable; @@ -10,7 +11,6 @@ import org.slf4j.Logger; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -18,15 +18,14 @@ import java.util.concurrent.atomic.AtomicReference; public abstract class AbstractPromise implements Promise { private final Collection> listeners; - - private @Nullable PromiseCompletion completion; + private final AtomicReference> completion; public AbstractPromise() { this.listeners = new ConcurrentLinkedQueue<>(); - this.completion = null; + this.completion = new AtomicReference<>(); } - - protected abstract ScheduledExecutorService getExecutor(); + + protected abstract PromiseExecutor getExecutor(); protected abstract Logger getLogger(); @@ -97,7 +96,7 @@ public abstract class AbstractPromise implements Promise { public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { return thenApplyDelayedSync(result -> task.get(), delay, unit); } - + @Override public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { Promise promise = getFactory().unresolved(); @@ -109,7 +108,7 @@ public abstract class AbstractPromise implements Promise { } Runnable runnable = createRunnable(ctx, promise, task); - getExecutor().submit(runnable); + getExecutor().runSync(runnable, 0L, TimeUnit.MILLISECONDS); }); return promise; @@ -126,7 +125,7 @@ public abstract class AbstractPromise implements Promise { } Runnable runnable = createRunnable(ctx, promise, task); - getExecutor().schedule(runnable, delay, unit); + getExecutor().runSync(runnable, delay, unit); }); return promise; @@ -216,7 +215,7 @@ public abstract class AbstractPromise implements Promise { } Runnable runnable = createRunnable(ctx, promise, task); - getExecutor().submit(runnable); + getExecutor().runAsync(runnable, 0L, TimeUnit.MILLISECONDS); }); return promise; @@ -227,7 +226,7 @@ public abstract class AbstractPromise implements Promise { Promise promise = getFactory().unresolved(); addListener(ctx -> { Runnable runnable = createRunnable(ctx, promise, task); - getExecutor().schedule(runnable, delay, unit); + getExecutor().runAsync(runnable, delay, unit); }); return promise; @@ -285,14 +284,14 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise addListener(@NotNull PromiseListener listener) { if (isCompleted()) { - getExecutor().submit(() -> { + getExecutor().runAsync(() -> { try { //noinspection ConstantConditions listener.handle(getCompletion()); } catch (Exception e) { getLogger().error("Exception caught in promise listener", e); } - }); + }, 0L, TimeUnit.MILLISECONDS); } else { getListeners().add(listener); } @@ -302,7 +301,7 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { - getExecutor().schedule(() -> { + getExecutor().runAsync(() -> { if (!isCompleted()) { completeExceptionally(new TimeoutException("Promise timed out after " + time + " " + unit)); } @@ -317,10 +316,23 @@ public abstract class AbstractPromise implements Promise { } protected void handleCompletion(@NotNull PromiseCompletion ctx) { - if (this.isCompleted()) return; - setCompletion(ctx); + AtomicReference success = new AtomicReference<>(); + completion.getAndUpdate(c -> { + if (c == null) { + return null; + } else { + success.set(true); + return ctx; + } + }); - getExecutor().submit(() -> { + if (success.get()) { + handleCompletion0(ctx); + } + } + + protected void handleCompletion0(@NotNull PromiseCompletion ctx) { + getExecutor().runAsync(() -> { for (PromiseListener listener : getListeners()) { if (!ctx.isActive()) return; @@ -331,7 +343,7 @@ public abstract class AbstractPromise implements Promise { getLogger().error("Exception caught in promise listener", e); } } - }); + }, 0L, TimeUnit.MILLISECONDS); } @Override @@ -346,20 +358,16 @@ public abstract class AbstractPromise implements Promise { @Override public boolean isCompleted() { - return getCompletion() != null; + return completion.get() != null; + } + + @Override + public @Nullable PromiseCompletion getCompletion() { + return completion.get(); } protected Collection> getListeners() { return listeners; } - @Override - public @Nullable PromiseCompletion getCompletion() { - return completion; - } - - protected void setCompletion(@NotNull PromiseCompletion completion) { - this.completion = completion; - } - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java index 923e967..e46e9ff 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -1,12 +1,13 @@ package dev.tommyjs.futur.promise; +import dev.tommyjs.futur.executor.PromiseExecutor; +import dev.tommyjs.futur.executor.SinglePoolExecutor; import dev.tommyjs.futur.impl.SimplePromiseFactory; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; public interface PromiseFactory { @@ -16,16 +17,16 @@ public interface PromiseFactory { @NotNull Promise error(Throwable error); - static PromiseFactory create(ScheduledExecutorService executor, Logger logger) { + static PromiseFactory create(PromiseExecutor executor, Logger logger) { return new SimplePromiseFactory(executor, logger); } - static PromiseFactory create(ScheduledExecutorService executor) { + static PromiseFactory create(PromiseExecutor executor) { return create(executor, LoggerFactory.getLogger(SimplePromiseFactory.class)); } static PromiseFactory create(int threadPoolSize) { - return create(Executors.newScheduledThreadPool(threadPoolSize)); + return create(SinglePoolExecutor.create(threadPoolSize)); } static PromiseFactory create() { diff --git a/futur-reactive-streams/build.gradle.kts b/futur-reactive-streams/build.gradle.kts index 2a05a9f..f2b84f3 100644 --- a/futur-reactive-streams/build.gradle.kts +++ b/futur-reactive-streams/build.gradle.kts @@ -6,7 +6,7 @@ plugins { } group = "dev.tommyjs" -version = "2.0.0" +version = "2.1.0" repositories { mavenCentral() diff --git a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java index 4b655ed..bb608c5 100644 --- a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java +++ b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java @@ -2,7 +2,6 @@ package dev.tommyjs.futur.reactivestreams; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseFactory; -import dev.tommyjs.futur.promise.StaticPromiseFactory; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; diff --git a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java index 921d852..e45e55c 100644 --- a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java +++ b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java @@ -2,7 +2,7 @@ package dev.tommyjs.futur.reactivestreams; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseFactory; -import dev.tommyjs.futur.promise.StaticPromiseFactory; +import dev.tommyjs.futur.impl.StaticPromiseFactory; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; diff --git a/futur-reactor/build.gradle.kts b/futur-reactor/build.gradle.kts index 820caf0..804b69d 100644 --- a/futur-reactor/build.gradle.kts +++ b/futur-reactor/build.gradle.kts @@ -4,7 +4,7 @@ plugins { } group = "dev.tommyjs" -version = "2.0.0" +version = "2.1.0" repositories { mavenCentral() diff --git a/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java b/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java index c6a65cc..bf934b5 100644 --- a/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java +++ b/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java @@ -2,7 +2,6 @@ package dev.tommyjs.futur.reactor; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseFactory; -import dev.tommyjs.futur.promise.StaticPromiseFactory; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;