diff --git a/futur-api/src/test/java/dev/tommyjs/test/Test.java b/futur-api/src/test/java/dev/tommyjs/test/Test.java deleted file mode 100644 index e698aad..0000000 --- a/futur-api/src/test/java/dev/tommyjs/test/Test.java +++ /dev/null @@ -1,35 +0,0 @@ -package dev.tommyjs.test; - -import dev.tommyjs.futur.promise.PooledPromiseFactory; -import dev.tommyjs.futur.promise.Promise; -import dev.tommyjs.futur.promise.PromiseFactory; -import dev.tommyjs.futur.scheduler.Scheduler; -import dev.tommyjs.futur.scheduler.SingleExecutorScheduler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -public class Test { - - public static void main(String[] args) throws InterruptedException { - Scheduler scheduler = new SingleExecutorScheduler(Executors.newScheduledThreadPool(4)); - Logger logger = LoggerFactory.getLogger(Test.class); - PromiseFactory factory = new PooledPromiseFactory(scheduler, logger); - - Thread.sleep(2000); - - Promise.start(factory) - .thenRunAsync(() -> { - System.out.println("HI"); - }) - .thenApplyDelayedAsync(_v -> { - return "ABC"; - }, 1L, TimeUnit.SECONDS) - .thenConsumeSync(t -> { - System.out.println(t); - }); - } - -} diff --git a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java index 5c68a38..f8b9fdf 100644 --- a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java +++ b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java @@ -1,15 +1,22 @@ package dev.tommyjs.futur.reactivestreams; import dev.tommyjs.futur.promise.AbstractPromise; +import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.promise.UnpooledPromiseFactory; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; public class ReactiveTransformer { - public static @NotNull AbstractPromise wrapPublisher(@NotNull Publisher publisher) { - SingleAccumulatorSubscriber subscriber = SingleAccumulatorSubscriber.create(); + public static @NotNull Promise wrapPublisher(@NotNull Publisher publisher, PromiseFactory factory) { + SingleAccumulatorSubscriber subscriber = SingleAccumulatorSubscriber.create(factory); publisher.subscribe(subscriber); return subscriber.getPromise(); } + public static @NotNull Promise wrapPublisher(@NotNull Publisher publisher) { + return wrapPublisher(publisher, UnpooledPromiseFactory.INSTANCE); + } + } diff --git a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java index a9a1e33..7c49083 100644 --- a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java +++ b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java @@ -1,14 +1,17 @@ package dev.tommyjs.futur.reactivestreams; import dev.tommyjs.futur.promise.AbstractPromise; +import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.promise.UnpooledPromiseFactory; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; public class SingleAccumulatorSubscriber implements Subscriber { - private final AbstractPromise promise; + private final Promise promise; - public SingleAccumulatorSubscriber(AbstractPromise promise) { + public SingleAccumulatorSubscriber(Promise promise) { this.promise = promise; } @@ -32,16 +35,20 @@ public class SingleAccumulatorSubscriber implements Subscriber { // ignore } - public AbstractPromise getPromise() { + public Promise getPromise() { return promise; } - public static SingleAccumulatorSubscriber create(AbstractPromise promise) { + public static SingleAccumulatorSubscriber create(Promise promise) { return new SingleAccumulatorSubscriber<>(promise); } + public static SingleAccumulatorSubscriber create(PromiseFactory factory) { + return create(factory.unresolved()); + } + public static SingleAccumulatorSubscriber create() { - return create(new AbstractPromise<>()); + return create(UnpooledPromiseFactory.INSTANCE); } } diff --git a/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java b/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java index 782c7f3..16b90b1 100644 --- a/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java +++ b/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java @@ -1,6 +1,9 @@ package dev.tommyjs.futur.reactor; import dev.tommyjs.futur.promise.AbstractPromise; +import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.UnpooledPromiseFactory; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -11,14 +14,18 @@ import java.util.concurrent.atomic.AtomicReference; public class ReactorTransformer { - public static @NotNull AbstractPromise wrapMono(@NotNull Mono mono) { - AbstractPromise promise = new AbstractPromise<>(); + public static @NotNull Promise wrapMono(@NotNull Mono mono, PromiseFactory factory) { + Promise promise = factory.unresolved(); mono.doOnSuccess(promise::complete).doOnError(promise::completeExceptionally).subscribe(); return promise; } - public static @NotNull AbstractPromise<@NotNull List> wrapFlux(@NotNull Flux flux) { - AbstractPromise> promise = new AbstractPromise<>(); + public static @NotNull Promise wrapMono(@NotNull Mono mono) { + return wrapMono(mono, UnpooledPromiseFactory.INSTANCE); + } + + public static @NotNull Promise<@NotNull List> wrapFlux(@NotNull Flux flux, PromiseFactory factory) { + Promise> promise = factory.unresolved(); AtomicReference> out = new AtomicReference<>(new ArrayList<>()); flux.doOnNext(out.get()::add).subscribe(); @@ -28,4 +35,8 @@ public class ReactorTransformer { return promise; } + public static @NotNull Promise<@NotNull List> wrapFlux(@NotNull Flux flux) { + return wrapFlux(flux, UnpooledPromiseFactory.INSTANCE); + } + } diff --git a/futur-standalone/build.gradle.kts b/futur-standalone/build.gradle.kts index e637a80..a60091b 100644 --- a/futur-standalone/build.gradle.kts +++ b/futur-standalone/build.gradle.kts @@ -14,6 +14,7 @@ repositories { dependencies { implementation("org.jetbrains:annotations:24.1.0") + implementation("org.slf4j:slf4j-api:2.0.9") compileOnly(project(mapOf("path" to ":futur-api"))) testImplementation(platform("org.junit:junit-bom:5.9.1")) testImplementation("org.junit.jupiter:junit-jupiter") diff --git a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ExclusiveThreadPoolScheduler.java b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ExclusiveThreadPoolScheduler.java index 6bb4399..9bc92b1 100644 --- a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ExclusiveThreadPoolScheduler.java +++ b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ExclusiveThreadPoolScheduler.java @@ -33,17 +33,17 @@ public class ExclusiveThreadPoolScheduler implements Scheduler { @Override public void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - executor.submit(wrapExceptions(task, trace)); + executor.submit(Scheduler.wrapExceptions(task, trace)); } @Override public void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - executor.schedule(wrapExceptions(task, trace), delay, unit); + executor.schedule(Scheduler.wrapExceptions(task, trace), delay, unit); } @Override public void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - executor.scheduleAtFixedRate(wrapExceptions(task, trace), 0L, interval, unit); + executor.scheduleAtFixedRate(Scheduler.wrapExceptions(task, trace), 0L, interval, unit); } public @NotNull ScheduledExecutorService getExecutor() { diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromise.java b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromise.java similarity index 83% rename from futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromise.java rename to futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromise.java index eb7c45e..00637b3 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromise.java +++ b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromise.java @@ -1,5 +1,7 @@ -package dev.tommyjs.futur.promise; +package dev.tommyjs.futur.standalone; +import dev.tommyjs.futur.promise.AbstractPromise; +import dev.tommyjs.futur.promise.PromiseFactory; import dev.tommyjs.futur.scheduler.Scheduler; import org.slf4j.Logger; diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromiseFactory.java b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromiseFactory.java similarity index 79% rename from futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromiseFactory.java rename to futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromiseFactory.java index 881dd49..5c8cdf0 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PooledPromiseFactory.java +++ b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromiseFactory.java @@ -1,5 +1,9 @@ -package dev.tommyjs.futur.promise; +package dev.tommyjs.futur.standalone; +import dev.tommyjs.futur.promise.AbstractPromise; +import dev.tommyjs.futur.promise.Promise; +import dev.tommyjs.futur.promise.PromiseCompletion; +import dev.tommyjs.futur.promise.PromiseFactory; import dev.tommyjs.futur.scheduler.Scheduler; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -17,7 +21,7 @@ public class PooledPromiseFactory implements PromiseFactory { @Override public @NotNull Promise resolve(T value) { AbstractPromise promise = new PooledPromise<>(scheduler, logger, this); - promise.setCompletion(new PromiseCompletion<>(value)); + promise.complete(value); return promise; } diff --git a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ThreadPoolScheduler.java b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ThreadPoolScheduler.java index 4272c49..af9b17c 100644 --- a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ThreadPoolScheduler.java +++ b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ThreadPoolScheduler.java @@ -20,32 +20,32 @@ public class ThreadPoolScheduler implements Scheduler { @Override public void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - syncExecutor.submit(wrapExceptions(task, trace)); + syncExecutor.submit(Scheduler.wrapExceptions(task, trace)); } @Override public void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - syncExecutor.schedule(wrapExceptions(task, trace), delay, unit); + syncExecutor.schedule(Scheduler.wrapExceptions(task, trace), delay, unit); } @Override public void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - syncExecutor.scheduleAtFixedRate(wrapExceptions(task, trace), 0L, interval, unit); + syncExecutor.scheduleAtFixedRate(Scheduler.wrapExceptions(task, trace), 0L, interval, unit); } @Override public void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - asyncExecutor.submit(wrapExceptions(task, trace)); + asyncExecutor.submit(Scheduler.wrapExceptions(task, trace)); } @Override public void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - asyncExecutor.schedule(wrapExceptions(task, trace), delay, unit); + asyncExecutor.schedule(Scheduler.wrapExceptions(task, trace), delay, unit); } @Override public void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - asyncExecutor.scheduleAtFixedRate(wrapExceptions(task, trace), 0L, interval, unit); + asyncExecutor.scheduleAtFixedRate(Scheduler.wrapExceptions(task, trace), 0L, interval, unit); } public @NotNull ScheduledExecutorService getSyncExecutor() {