diff --git a/futur-api/build.gradle.kts b/futur-api/build.gradle.kts index 75f4109..bc98c91 100644 --- a/futur-api/build.gradle.kts +++ b/futur-api/build.gradle.kts @@ -6,7 +6,7 @@ plugins { } group = "dev.tommyjs" -version = "1.2.0" +version = "2.0.0" repositories { mavenCentral() diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java index a5f57c3..3998e57 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalConsumer.java @@ -1,7 +1,8 @@ package dev.tommyjs.futur.function; +@FunctionalInterface public interface ExceptionalConsumer { - void accept(T value) throws Exception; + void accept(T value) throws Throwable; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java index 4a7d575..ebebf5f 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalFunction.java @@ -1,7 +1,8 @@ package dev.tommyjs.futur.function; +@FunctionalInterface public interface ExceptionalFunction { - V apply(K value) throws Exception; + V apply(K value) throws Throwable; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java index 6c11ba5..c4b8002 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalRunnable.java @@ -1,7 +1,8 @@ package dev.tommyjs.futur.function; +@FunctionalInterface public interface ExceptionalRunnable { - void run() throws Exception; + void run() throws Throwable; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java index f82800c..e47f977 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/function/ExceptionalSupplier.java @@ -1,7 +1,8 @@ package dev.tommyjs.futur.function; +@FunctionalInterface public interface ExceptionalSupplier { - T get() throws Exception; + T get() throws Throwable; } diff --git a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java similarity index 52% rename from futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromise.java rename to futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java index 00637b3..e810563 100644 --- a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromise.java @@ -1,25 +1,26 @@ -package dev.tommyjs.futur.standalone; +package dev.tommyjs.futur.impl; import dev.tommyjs.futur.promise.AbstractPromise; import dev.tommyjs.futur.promise.PromiseFactory; -import dev.tommyjs.futur.scheduler.Scheduler; import org.slf4j.Logger; -public class PooledPromise extends AbstractPromise { +import java.util.concurrent.ScheduledExecutorService; - private final Scheduler scheduler; +public class SimplePromise extends AbstractPromise { + + private final ScheduledExecutorService executor; private final Logger logger; private final PromiseFactory factory; - public PooledPromise(Scheduler scheduler, Logger logger, PromiseFactory factory) { - this.scheduler = scheduler; + public SimplePromise(ScheduledExecutorService executor, Logger logger, PromiseFactory factory) { + this.executor = executor; this.logger = logger; this.factory = factory; } @Override - protected Scheduler getScheduler() { - return scheduler; + protected ScheduledExecutorService getExecutor() { + return executor; } @Override diff --git a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java similarity index 51% rename from futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromiseFactory.java rename to futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java index 27cb8c4..0bcac55 100644 --- a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/PooledPromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/impl/SimplePromiseFactory.java @@ -1,44 +1,40 @@ -package dev.tommyjs.futur.standalone; +package dev.tommyjs.futur.impl; import dev.tommyjs.futur.promise.AbstractPromise; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseFactory; -import dev.tommyjs.futur.scheduler.Scheduler; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; -public class PooledPromiseFactory implements PromiseFactory { +import java.util.concurrent.ScheduledExecutorService; - private final Scheduler scheduler; +public class SimplePromiseFactory implements PromiseFactory { + + private final ScheduledExecutorService executor; private final Logger logger; - public PooledPromiseFactory(Scheduler scheduler, Logger logger) { - this.scheduler = scheduler; + public SimplePromiseFactory(ScheduledExecutorService executor, Logger logger) { + this.executor = executor; this.logger = logger; } @Override public @NotNull Promise resolve(T value) { - AbstractPromise promise = new PooledPromise<>(scheduler, logger, this); + AbstractPromise promise = new SimplePromise<>(executor, logger, this); promise.complete(value); return promise; } @Override public @NotNull Promise unresolved() { - return new PooledPromise<>(scheduler, logger, this); + return new SimplePromise<>(executor, logger, this); } @Override public @NotNull Promise error(Throwable error) { - AbstractPromise promise = new PooledPromise<>(scheduler, logger, this); + AbstractPromise promise = new SimplePromise<>(executor, logger, this); promise.completeExceptionally(error); return promise; } - @Override - public @NotNull Promise start() { - return resolve(null); - } - } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 06d2dac..b4b6c7b 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -4,52 +4,29 @@ import dev.tommyjs.futur.function.ExceptionalConsumer; import dev.tommyjs.futur.function.ExceptionalFunction; import dev.tommyjs.futur.function.ExceptionalRunnable; import dev.tommyjs.futur.function.ExceptionalSupplier; -import dev.tommyjs.futur.scheduler.Scheduler; -import dev.tommyjs.futur.trace.ExecutorTrace; -import dev.tommyjs.futur.trace.TraceUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; -import java.util.Arrays; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; public abstract class AbstractPromise implements Promise { - private static final String PACKAGE; - - static { - String[] packageElements = AbstractPromise.class.getPackageName().split("\\."); - int i = 0; - - StringBuilder packageBuilder = new StringBuilder(); - while (i < 3) { - packageBuilder.append(packageElements[i]); - i++; - } - - PACKAGE = packageBuilder.toString(); - } - private final Collection> listeners; - private final StackTraceElement[] stackTrace; private @Nullable PromiseCompletion completion; public AbstractPromise() { this.listeners = new ConcurrentLinkedQueue<>(); this.completion = null; - this.stackTrace = Arrays.stream(Thread.currentThread().getStackTrace()) - .filter(v -> !v.getClassName().startsWith(PACKAGE)) - .toArray(StackTraceElement[]::new); } - protected abstract Scheduler getScheduler(); + protected abstract ScheduledExecutorService getExecutor(); protected abstract Logger getLogger(); @@ -84,7 +61,7 @@ public abstract class AbstractPromise implements Promise { return thenApplySync(result -> { task.run(); return null; - }, TraceUtil.getTrace(task)); + }); } @Override @@ -92,7 +69,7 @@ public abstract class AbstractPromise implements Promise { return thenApplyDelayedSync(result -> { task.run(); return null; - }, delay, unit, TraceUtil.getTrace(task)); + }, delay, unit); } @Override @@ -100,7 +77,7 @@ public abstract class AbstractPromise implements Promise { return thenApplySync(result -> { task.accept(result); return null; - }, TraceUtil.getTrace(task)); + }); } @Override @@ -108,42 +85,21 @@ public abstract class AbstractPromise implements Promise { return thenApplyDelayedSync(result -> { task.accept(result); return null; - }, delay, unit, TraceUtil.getTrace(task)); + }, delay, unit); } @Override public @NotNull Promise thenSupplySync(@NotNull ExceptionalSupplier task) { - return thenApplySync(result -> task.get(), TraceUtil.getTrace(task)); + return thenApplySync(result -> task.get()); } @Override public @NotNull Promise thenSupplyDelayedSync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(result -> task.get(), delay, unit, TraceUtil.getTrace(task)); + return thenApplyDelayedSync(result -> task.get(), delay, unit); } - protected @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task, @NotNull ExecutorTrace trace) { - Promise promise = getFactory().unresolved(); - addListener(ctx -> { - if (ctx.isError()) { - //noinspection ConstantConditions - promise.completeExceptionally(ctx.getException()); - return; - } - - Runnable runnable = createRunnable(ctx, promise, task); - getScheduler().runSync(runnable, trace); - }); - - return promise; - } - @Override public @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task) { - return thenApplySync(task, TraceUtil.getTrace(task)); - } - - @Override - public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { Promise promise = getFactory().unresolved(); addListener(ctx -> { if (ctx.isError()) { @@ -153,7 +109,7 @@ public abstract class AbstractPromise implements Promise { } Runnable runnable = createRunnable(ctx, promise, task); - getScheduler().runDelayedSync(runnable, delay, unit, trace); + getExecutor().submit(runnable); }); return promise; @@ -161,13 +117,25 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedSync(task, delay, unit, TraceUtil.getTrace(task)); + Promise promise = getFactory().unresolved(); + addListener(ctx -> { + if (ctx.isError()) { + //noinspection ConstantConditions + promise.completeExceptionally(ctx.getException()); + return; + } + + Runnable runnable = createRunnable(ctx, promise, task); + getExecutor().schedule(runnable, delay, unit); + }); + + return promise; } @Override public @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task) { Promise promise = getFactory().unresolved(); - thenApplySync(task, TraceUtil.getTrace(task)).thenConsumeAsync(nestedPromise -> { + thenApplySync(task).thenConsumeAsync(nestedPromise -> { nestedPromise.addListener(ctx1 -> { if (ctx1.isError()) { //noinspection ConstantConditions @@ -192,7 +160,7 @@ public abstract class AbstractPromise implements Promise { return thenApplyAsync(result -> { task.run(); return null; - }, TraceUtil.getTrace(task)); + }); } @Override @@ -200,7 +168,7 @@ public abstract class AbstractPromise implements Promise { return thenApplyDelayedAsync(result -> { task.run(); return null; - }, delay, unit, TraceUtil.getTrace(task)); + }, delay, unit); } @Override @@ -208,7 +176,7 @@ public abstract class AbstractPromise implements Promise { return thenApplyAsync(result -> { task.accept(result); return null; - }, TraceUtil.getTrace(task)); + }); } @Override @@ -216,17 +184,17 @@ public abstract class AbstractPromise implements Promise { return thenApplyDelayedAsync(result -> { task.accept(result); return null; - }, delay, unit, TraceUtil.getTrace(task)); + }, delay, unit); } @Override public @NotNull Promise thenSupplyAsync(@NotNull ExceptionalSupplier task) { - return thenApplyAsync(result -> task.get(), TraceUtil.getTrace(task)); + return thenApplyAsync(result -> task.get()); } @Override public @NotNull Promise thenSupplyDelayedAsync(@NotNull ExceptionalSupplier task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(result -> task.get(), delay, unit, TraceUtil.getTrace(task)); + return thenApplyDelayedAsync(result -> task.get(), delay, unit); } @Override @@ -236,8 +204,9 @@ public abstract class AbstractPromise implements Promise { return result; }); } - - protected @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task, @NotNull ExecutorTrace trace) { + + @Override + public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { Promise promise = getFactory().unresolved(); addListener(ctx -> { if (ctx.isError()) { @@ -247,23 +216,7 @@ public abstract class AbstractPromise implements Promise { } Runnable runnable = createRunnable(ctx, promise, task); - getScheduler().runAsync(runnable, trace); - }); - - return promise; - } - - @Override - public @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task) { - return thenApplyAsync(task, TraceUtil.getTrace(task)); - } - - @Override - public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - Promise promise = getFactory().unresolved(); - addListener(ctx -> { - Runnable runnable = createRunnable(ctx, promise, task); - getScheduler().runDelayedAsync(runnable, delay, unit, trace); + getExecutor().submit(runnable); }); return promise; @@ -271,18 +224,19 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit) { - return thenApplyDelayedAsync(task, delay, unit, TraceUtil.getTrace(task)); - } + Promise promise = getFactory().unresolved(); + addListener(ctx -> { + Runnable runnable = createRunnable(ctx, promise, task); + getExecutor().schedule(runnable, delay, unit); + }); - @Override - public @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task) { - return this.thenComposeAsync(task); + return promise; } @Override public @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task) { Promise promise = getFactory().unresolved(); - thenApplyAsync(task, TraceUtil.getTrace(task)).thenConsumeAsync(nestedPromise -> { + thenApplyAsync(task).thenConsumeAsync(nestedPromise -> { nestedPromise.addListener(ctx1 -> { if (ctx1.isError()) { //noinspection ConstantConditions @@ -313,8 +267,8 @@ public abstract class AbstractPromise implements Promise { try { V result = task.apply(ctx.getResult()); promise.complete(result); - } catch (Exception e) { - promise.completeExceptionally(e, true); + } catch (Throwable e) { + promise.completeExceptionally(e); } }; } @@ -331,14 +285,14 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise addListener(@NotNull PromiseListener listener) { if (isCompleted()) { - getScheduler().runAsync(() -> { + getExecutor().submit(() -> { try { //noinspection ConstantConditions listener.handle(getCompletion()); } catch (Exception e) { getLogger().error("Exception caught in promise listener", e); } - }, TraceUtil.getTrace(listener)); + }); } else { getListeners().add(listener); } @@ -348,13 +302,11 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise timeout(long time, @NotNull TimeUnit unit) { - Runnable func = () -> { + getExecutor().schedule(() -> { if (!isCompleted()) { - completeExceptionally(new TimeoutException("Promise timed out after " + time + " " + unit), true); + completeExceptionally(new TimeoutException("Promise timed out after " + time + " " + unit)); } - }; - - getScheduler().runDelayedAsync(func, time, unit, TraceUtil.getTrace(func)); + }, time, unit); return this; } @@ -368,19 +320,18 @@ public abstract class AbstractPromise implements Promise { if (this.isCompleted()) return; setCompletion(ctx); - Runnable func = () -> { + getExecutor().submit(() -> { for (PromiseListener listener : getListeners()) { if (!ctx.isActive()) return; try { listener.handle(ctx); } catch (Exception e) { + e.printStackTrace(); getLogger().error("Exception caught in promise listener", e); } } - }; - - getScheduler().runAsync(func, TraceUtil.getTrace(func)); + }); } @Override @@ -388,23 +339,9 @@ public abstract class AbstractPromise implements Promise { handleCompletion(new PromiseCompletion<>(result)); } - @Override - public void completeExceptionally(@NotNull Throwable result, boolean appendStacktrace) { - if (appendStacktrace && this.stackTrace != null) { - result.setStackTrace(Stream.of(result.getStackTrace(), this.stackTrace) - .flatMap(Stream::of) - .filter(v -> !v.getClassName().startsWith(PACKAGE)) - .filter(v -> !v.getClassName().startsWith("java.lang.Thread")) - .filter(v -> !v.getClassName().startsWith("java.util.concurrent")) - .toArray(StackTraceElement[]::new)); - } - - handleCompletion(new PromiseCompletion<>(result)); - } - @Override public void completeExceptionally(@NotNull Throwable result) { - completeExceptionally(result, false); + handleCompletion(new PromiseCompletion<>(result)); } @Override diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java index c446372..a188e82 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promise.java @@ -4,7 +4,6 @@ import dev.tommyjs.futur.function.ExceptionalConsumer; import dev.tommyjs.futur.function.ExceptionalFunction; import dev.tommyjs.futur.function.ExceptionalRunnable; import dev.tommyjs.futur.function.ExceptionalSupplier; -import dev.tommyjs.futur.trace.ExecutorTrace; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -22,25 +21,12 @@ public interface Promise { return factory.error(error); } + static @NotNull Promise unresolved(PromiseFactory factory) { + return factory.unresolved(); + } + static @NotNull Promise start(PromiseFactory factory) { - return factory.start(); - } - - static @NotNull Promise resolve(T value) { - return resolve(value, UnpooledPromiseFactory.INSTANCE); - } - - static @NotNull Promise error(Throwable error) { - return error(error, UnpooledPromiseFactory.INSTANCE); - } - - static @NotNull Promise start() { - return start(UnpooledPromiseFactory.INSTANCE); - } - - @Deprecated - static @NotNull Promise start(T start) { - return resolve(start); + return factory.resolve(null); } PromiseFactory getFactory(); @@ -61,8 +47,6 @@ public interface Promise { @NotNull Promise thenApplySync(@NotNull ExceptionalFunction task); - @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - @NotNull Promise thenApplyDelayedSync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); @NotNull Promise thenComposeSync(@NotNull ExceptionalFunction> task); @@ -83,12 +67,8 @@ public interface Promise { @NotNull Promise thenApplyAsync(@NotNull ExceptionalFunction task); - @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - @NotNull Promise thenApplyDelayedAsync(@NotNull ExceptionalFunction task, long delay, @NotNull TimeUnit unit); - @NotNull Promise thenCompose(@NotNull ExceptionalFunction> task); - @NotNull Promise thenComposeAsync(@NotNull ExceptionalFunction> task); @NotNull Promise logExceptions(); @@ -101,8 +81,6 @@ public interface Promise { void complete(@Nullable T result); - void completeExceptionally(@NotNull Throwable result, boolean appendStacktrace); - void completeExceptionally(@NotNull Throwable result); boolean isCompleted(); diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java index a608dd8..923e967 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -1,6 +1,12 @@ package dev.tommyjs.futur.promise; +import dev.tommyjs.futur.impl.SimplePromiseFactory; import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; public interface PromiseFactory { @@ -10,6 +16,20 @@ public interface PromiseFactory { @NotNull Promise error(Throwable error); - @NotNull Promise start(); + static PromiseFactory create(ScheduledExecutorService executor, Logger logger) { + return new SimplePromiseFactory(executor, logger); + } + + static PromiseFactory create(ScheduledExecutorService executor) { + return create(executor, LoggerFactory.getLogger(SimplePromiseFactory.class)); + } + + static PromiseFactory create(int threadPoolSize) { + return create(Executors.newScheduledThreadPool(threadPoolSize)); + } + + static PromiseFactory create() { + return create(Runtime.getRuntime().availableProcessors()); + } } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java index 9f6b84d..ba0fb00 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/Promises.java @@ -71,41 +71,23 @@ public class Promises { return promise.timeout(timeout); } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, @Nullable BiConsumer exceptionHandler) { - return combine(promises, timeout, exceptionHandler, obtainFactory(promises.values())); - } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict, PromiseFactory factory) { return combine(promises, timeout, strict ? null : (_k, _v) -> {}, factory); } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, boolean strict) { - return combine(promises, timeout, strict, obtainFactory(promises.values())); - } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout, PromiseFactory factory) { return combine(promises, timeout, true, factory); } - public static @NotNull Promise> combine(@NotNull Map> promises, long timeout) { - return combine(promises, timeout, true, obtainFactory(promises.values())); - } - public static @NotNull Promise> combine(@NotNull Map> promises, PromiseFactory factory) { return combine(promises, 1500L, true, factory); } - public static @NotNull Promise> combine(@NotNull Map> promises) { - return combine(promises, obtainFactory(promises.values())); - } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict, PromiseFactory factory) { AtomicInteger index = new AtomicInteger(); return combine( - promises.stream() - .collect(Collectors.toMap(s -> index.getAndIncrement(), v -> v)), - timeout, - strict + promises.stream().collect(Collectors.toMap(s -> index.getAndIncrement(), v -> v)), + timeout, strict, factory ).thenApplySync(v -> v.entrySet().stream() .sorted(Map.Entry.comparingByKey()) @@ -114,24 +96,12 @@ public class Promises { ); } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout, boolean strict) { - return combine(promises, timeout, strict, obtainFactory(promises)); - } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout, PromiseFactory factory) { return combine(promises, timeout, true, factory); } - public static @NotNull Promise> combine(@NotNull List> promises, long timeout) { - return combine(promises, timeout, obtainFactory(promises)); - } - public static @NotNull Promise> combine(@NotNull List> promises, PromiseFactory factory) { - return combine(promises, 1500L, true); - } - - public static @NotNull Promise> combine(@NotNull List> promises) { - return combine(promises, obtainFactory(promises)); + return combine(promises, 1500L, true, factory); } public static @NotNull Promise all(@NotNull List> promises, PromiseFactory factory) { @@ -149,17 +119,6 @@ public class Promises { return promise; } - public static @NotNull Promise all(@NotNull List> promises) { - PromiseFactory factory; - if (promises.isEmpty()) { - factory = UnpooledPromiseFactory.INSTANCE; - } else { - factory = promises.get(0).getFactory(); - } - - return all(promises, factory); - } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, boolean strict, PromiseFactory factory) { Map> promises = new HashMap<>(); for (K key : keys) { @@ -167,29 +126,17 @@ public class Promises { promises.put(key, promise); } - return combine(promises, timeout, strict); - } - - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, boolean strict) { - return combine(keys, mapper, timeout, strict, UnpooledPromiseFactory.INSTANCE); + return combine(promises, timeout, strict, factory); } public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout, PromiseFactory factory) { return combine(keys, mapper, timeout, true, factory); } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, long timeout) { - return combine(keys, mapper, timeout, UnpooledPromiseFactory.INSTANCE); - } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper, PromiseFactory factory) { return combine(keys, mapper, 1500L, true, factory); } - public static @NotNull Promise> combine(@NotNull Collection keys, @NotNull ExceptionalFunction mapper) { - return combine(keys, mapper, UnpooledPromiseFactory.INSTANCE); - } - public static @NotNull Promise erase(@NotNull Promise p, PromiseFactory factory) { Promise promise = factory.unresolved(); p.addListener(ctx -> { @@ -221,19 +168,4 @@ public class Promises { return promise; } - public static @NotNull Promise wrap(@NotNull CompletableFuture future) { - return wrap(future, UnpooledPromiseFactory.INSTANCE); - } - - public static PromiseFactory obtainFactory(Collection> promises) { - PromiseFactory factory; - if (promises.isEmpty()) { - factory = UnpooledPromiseFactory.INSTANCE; - } else { - factory = promises.stream().findFirst().get().getFactory(); - } - - return factory; - } - } \ No newline at end of file diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromise.java new file mode 100644 index 0000000..8a7986f --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromise.java @@ -0,0 +1,24 @@ +package dev.tommyjs.futur.promise; + +import org.slf4j.Logger; + +import java.util.concurrent.ScheduledExecutorService; + +public class StaticPromise extends AbstractPromise { + + @Override + protected ScheduledExecutorService getExecutor() { + return StaticPromiseFactory.EXECUTOR; + } + + @Override + protected Logger getLogger() { + return StaticPromiseFactory.LOGGER; + } + + @Override + public PromiseFactory getFactory() { + return StaticPromiseFactory.INSTANCE; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromiseFactory.java new file mode 100644 index 0000000..7c73eec --- /dev/null +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/StaticPromiseFactory.java @@ -0,0 +1,44 @@ +package dev.tommyjs.futur.promise; + +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +public class StaticPromiseFactory implements PromiseFactory { + + public static final @NotNull PromiseFactory INSTANCE; + public static final @NotNull ScheduledExecutorService EXECUTOR; + public static final @NotNull Logger LOGGER; + + static { + INSTANCE = new StaticPromiseFactory(); + EXECUTOR = Executors.newSingleThreadScheduledExecutor(); + LOGGER = LoggerFactory.getLogger(StaticPromiseFactory.class); + } + + private StaticPromiseFactory() { + } + + @Override + public @NotNull Promise resolve(T value) { + AbstractPromise promise = new StaticPromise<>(); + promise.setCompletion(new PromiseCompletion<>(value)); + return promise; + } + + @Override + public @NotNull Promise unresolved() { + return new StaticPromise<>(); + } + + @Override + public @NotNull Promise error(Throwable error) { + AbstractPromise promise = new StaticPromise<>(); + promise.completeExceptionally(error); + return promise; + } + +} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromise.java deleted file mode 100644 index 75a4abf..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromise.java +++ /dev/null @@ -1,23 +0,0 @@ -package dev.tommyjs.futur.promise; - -import dev.tommyjs.futur.scheduler.Scheduler; -import org.slf4j.Logger; - -public class UnpooledPromise extends AbstractPromise { - - @Override - protected Scheduler getScheduler() { - return UnpooledPromiseFactory.SCHEDULER; - } - - @Override - protected Logger getLogger() { - return UnpooledPromiseFactory.LOGGER; - } - - @Override - public PromiseFactory getFactory() { - return UnpooledPromiseFactory.INSTANCE; - } - -} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromiseFactory.java deleted file mode 100644 index a1238af..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/UnpooledPromiseFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -package dev.tommyjs.futur.promise; - -import dev.tommyjs.futur.scheduler.Scheduler; -import dev.tommyjs.futur.scheduler.SingleExecutorScheduler; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Executors; - -public class UnpooledPromiseFactory implements PromiseFactory { - - public static final @NotNull PromiseFactory INSTANCE; - public static final @NotNull Scheduler SCHEDULER; - public static final @NotNull Logger LOGGER; - - static { - INSTANCE = new UnpooledPromiseFactory(); - SCHEDULER = new SingleExecutorScheduler(Executors.newSingleThreadScheduledExecutor()); - LOGGER = LoggerFactory.getLogger(UnpooledPromiseFactory.class); - } - - private UnpooledPromiseFactory() { - } - - @Override - public @NotNull Promise resolve(T value) { - AbstractPromise promise = new UnpooledPromise<>(); - promise.setCompletion(new PromiseCompletion<>(value)); - return promise; - } - - @Override - public @NotNull Promise unresolved() { - return new UnpooledPromise<>(); - } - - @Override - public @NotNull Promise error(Throwable error) { - AbstractPromise promise = new UnpooledPromise<>(); - promise.completeExceptionally(error); - return promise; - } - - @Override - public @NotNull Promise start() { - return resolve(null); - } - -} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Scheduler.java b/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Scheduler.java deleted file mode 100644 index e9efe7f..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/Scheduler.java +++ /dev/null @@ -1,37 +0,0 @@ -package dev.tommyjs.futur.scheduler; - -import dev.tommyjs.futur.trace.ExecutorTrace; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -public interface Scheduler { - - Logger LOGGER = LoggerFactory.getLogger(Scheduler.class); - - void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace); - - void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - - void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - - void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace); - - void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - - void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace); - - static @NotNull Runnable wrapExceptions(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - return () -> { - try { - task.run(); - } catch (Exception e) { - LOGGER.error("Exception in scheduled task: {}", e.getClass().getName()); - LOGGER.error(trace.toString()); - } - }; - } - -} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/SingleExecutorScheduler.java b/futur-api/src/main/java/dev/tommyjs/futur/scheduler/SingleExecutorScheduler.java deleted file mode 100644 index d228059..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/scheduler/SingleExecutorScheduler.java +++ /dev/null @@ -1,47 +0,0 @@ -package dev.tommyjs.futur.scheduler; - -import dev.tommyjs.futur.trace.ExecutorTrace; -import org.jetbrains.annotations.NotNull; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class SingleExecutorScheduler implements Scheduler { - - private final ScheduledExecutorService service; - - public SingleExecutorScheduler(ScheduledExecutorService service) { - this.service = service; - } - - @Override - public void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - service.submit(Scheduler.wrapExceptions(task, trace)); - } - - @Override - public void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - service.schedule(Scheduler.wrapExceptions(task, trace), delay, unit); - } - - @Override - public void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - service.scheduleAtFixedRate(Scheduler.wrapExceptions(task, trace), 0L, interval, unit); - } - - @Override - public void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - runSync(task, trace); - } - - @Override - public void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - runDelayedSync(task, delay, unit, trace); - } - - @Override - public void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - runRepeatingSync(task, interval, unit, trace); - } - -} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/trace/ExecutorTrace.java b/futur-api/src/main/java/dev/tommyjs/futur/trace/ExecutorTrace.java deleted file mode 100644 index 47d4503..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/trace/ExecutorTrace.java +++ /dev/null @@ -1,31 +0,0 @@ -package dev.tommyjs.futur.trace; - -import org.jetbrains.annotations.NotNull; - -import java.util.Arrays; -import java.util.stream.Collectors; - -public class ExecutorTrace { - - private final @NotNull Class clazz; - private final @NotNull StackTraceElement[] trace; - - public ExecutorTrace(@NotNull Class clazz, @NotNull StackTraceElement[] trace) { - this.clazz = clazz; - this.trace = trace; - } - - public @NotNull Class getClazz() { - return clazz; - } - - public @NotNull StackTraceElement[] getTrace() { - return trace; - } - - @Override - public String toString() { - return Arrays.stream(trace).map(StackTraceElement::toString).collect(Collectors.joining("\n")); - } - -} diff --git a/futur-api/src/main/java/dev/tommyjs/futur/trace/TraceUtil.java b/futur-api/src/main/java/dev/tommyjs/futur/trace/TraceUtil.java deleted file mode 100644 index 694d162..0000000 --- a/futur-api/src/main/java/dev/tommyjs/futur/trace/TraceUtil.java +++ /dev/null @@ -1,11 +0,0 @@ -package dev.tommyjs.futur.trace; - -import org.jetbrains.annotations.NotNull; - -public class TraceUtil { - - public static ExecutorTrace getTrace(@NotNull Object function) { - return new ExecutorTrace(function.getClass(), Thread.currentThread().getStackTrace()); - } - -} diff --git a/futur-reactive-streams/build.gradle.kts b/futur-reactive-streams/build.gradle.kts index 13cf185..2a05a9f 100644 --- a/futur-reactive-streams/build.gradle.kts +++ b/futur-reactive-streams/build.gradle.kts @@ -6,7 +6,7 @@ plugins { } group = "dev.tommyjs" -version = "1.2.0" +version = "2.0.0" repositories { mavenCentral() diff --git a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java index 2ee3683..4b655ed 100644 --- a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java +++ b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/ReactiveTransformer.java @@ -2,7 +2,7 @@ package dev.tommyjs.futur.reactivestreams; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseFactory; -import dev.tommyjs.futur.promise.UnpooledPromiseFactory; +import dev.tommyjs.futur.promise.StaticPromiseFactory; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; @@ -14,8 +14,4 @@ public class ReactiveTransformer { return subscriber.getPromise(); } - public static @NotNull Promise wrapPublisher(@NotNull Publisher publisher) { - return wrapPublisher(publisher, UnpooledPromiseFactory.INSTANCE); - } - } diff --git a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java index 363da96..921d852 100644 --- a/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java +++ b/futur-reactive-streams/src/main/java/dev/tommyjs/futur/reactivestreams/SingleAccumulatorSubscriber.java @@ -2,7 +2,7 @@ package dev.tommyjs.futur.reactivestreams; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseFactory; -import dev.tommyjs.futur.promise.UnpooledPromiseFactory; +import dev.tommyjs.futur.promise.StaticPromiseFactory; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -47,7 +47,7 @@ public class SingleAccumulatorSubscriber implements Subscriber { } public static SingleAccumulatorSubscriber create() { - return create(UnpooledPromiseFactory.INSTANCE); + return create(StaticPromiseFactory.INSTANCE); } } diff --git a/futur-reactor/build.gradle.kts b/futur-reactor/build.gradle.kts index 57a8c76..820caf0 100644 --- a/futur-reactor/build.gradle.kts +++ b/futur-reactor/build.gradle.kts @@ -4,7 +4,7 @@ plugins { } group = "dev.tommyjs" -version = "1.2.0" +version = "2.0.0" repositories { mavenCentral() diff --git a/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java b/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java index 5dc34ba..c6a65cc 100644 --- a/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java +++ b/futur-reactor/src/main/java/dev/tommyjs/futur/reactor/ReactorTransformer.java @@ -2,7 +2,7 @@ package dev.tommyjs.futur.reactor; import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseFactory; -import dev.tommyjs.futur.promise.UnpooledPromiseFactory; +import dev.tommyjs.futur.promise.StaticPromiseFactory; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -19,10 +19,6 @@ public class ReactorTransformer { return promise; } - public static @NotNull Promise wrapMono(@NotNull Mono mono) { - return wrapMono(mono, UnpooledPromiseFactory.INSTANCE); - } - public static @NotNull Promise<@NotNull List> wrapFlux(@NotNull Flux flux, PromiseFactory factory) { Promise> promise = factory.unresolved(); AtomicReference> out = new AtomicReference<>(new ArrayList<>()); @@ -34,8 +30,4 @@ public class ReactorTransformer { return promise; } - public static @NotNull Promise<@NotNull List> wrapFlux(@NotNull Flux flux) { - return wrapFlux(flux, UnpooledPromiseFactory.INSTANCE); - } - } diff --git a/futur-standalone/.gitignore b/futur-standalone/.gitignore deleted file mode 100644 index b63da45..0000000 --- a/futur-standalone/.gitignore +++ /dev/null @@ -1,42 +0,0 @@ -.gradle -build/ -!gradle/wrapper/gradle-wrapper.jar -!**/src/main/**/build/ -!**/src/test/**/build/ - -### IntelliJ IDEA ### -.idea/modules.xml -.idea/jarRepositories.xml -.idea/compiler.xml -.idea/libraries/ -*.iws -*.iml -*.ipr -out/ -!**/src/main/**/out/ -!**/src/test/**/out/ - -### Eclipse ### -.apt_generated -.classpath -.factorypath -.project -.settings -.springBeans -.sts4-cache -bin/ -!**/src/main/**/bin/ -!**/src/test/**/bin/ - -### NetBeans ### -/nbproject/private/ -/nbbuild/ -/dist/ -/nbdist/ -/.nb-gradle/ - -### VS Code ### -.vscode/ - -### Mac OS ### -.DS_Store \ No newline at end of file diff --git a/futur-standalone/build.gradle.kts b/futur-standalone/build.gradle.kts deleted file mode 100644 index 3016b93..0000000 --- a/futur-standalone/build.gradle.kts +++ /dev/null @@ -1,35 +0,0 @@ -import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar - -plugins { - id("java") - id("com.github.johnrengelman.shadow") version "7.1.2" -} - -group = "dev.tommyjs" -version = "1.2.0" - -repositories { - mavenCentral() -} - -dependencies { - implementation("org.jetbrains:annotations:24.1.0") - implementation("org.slf4j:slf4j-api:2.0.9") - compileOnly(project(mapOf("path" to ":futur-api"))) - testImplementation(platform("org.junit:junit-bom:5.9.1")) - testImplementation("org.junit.jupiter:junit-jupiter") -} - -tasks { - build { - dependsOn(shadowJar) - } - - withType { - exclude("META-INF/**") - } -} - -tasks.test { - useJUnitPlatform() -} \ No newline at end of file diff --git a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ExclusiveThreadPoolScheduler.java b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ExclusiveThreadPoolScheduler.java deleted file mode 100644 index 9bc92b1..0000000 --- a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ExclusiveThreadPoolScheduler.java +++ /dev/null @@ -1,61 +0,0 @@ -package dev.tommyjs.futur.standalone; - -import dev.tommyjs.futur.scheduler.Scheduler; -import dev.tommyjs.futur.trace.ExecutorTrace; -import org.jetbrains.annotations.NotNull; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class ExclusiveThreadPoolScheduler implements Scheduler { - - private final ScheduledExecutorService executor; - - protected ExclusiveThreadPoolScheduler(ScheduledExecutorService executor) { - this.executor = executor; - } - - @Override - public void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - throw new UnsupportedOperationException("Sync task invoked on asynchronous environment"); - } - - @Override - public void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - throw new UnsupportedOperationException("Sync task invoked on asynchronous environment"); - } - - @Override - public void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - throw new UnsupportedOperationException("Sync task invoked on asynchronous environment"); - } - - @Override - public void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - executor.submit(Scheduler.wrapExceptions(task, trace)); - } - - @Override - public void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - executor.schedule(Scheduler.wrapExceptions(task, trace), delay, unit); - } - - @Override - public void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - executor.scheduleAtFixedRate(Scheduler.wrapExceptions(task, trace), 0L, interval, unit); - } - - public @NotNull ScheduledExecutorService getExecutor() { - return executor; - } - - public static ExclusiveThreadPoolScheduler create(ScheduledExecutorService executor) { - return new ExclusiveThreadPoolScheduler(executor); - } - - public static ExclusiveThreadPoolScheduler create(int nThreads) { - return create(Executors.newScheduledThreadPool(nThreads)); - } - -} diff --git a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ThreadPoolScheduler.java b/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ThreadPoolScheduler.java deleted file mode 100644 index af9b17c..0000000 --- a/futur-standalone/src/main/java/dev/tommyjs/futur/standalone/ThreadPoolScheduler.java +++ /dev/null @@ -1,63 +0,0 @@ -package dev.tommyjs.futur.standalone; - -import dev.tommyjs.futur.scheduler.Scheduler; -import dev.tommyjs.futur.trace.ExecutorTrace; -import org.jetbrains.annotations.NotNull; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class ThreadPoolScheduler implements Scheduler { - - private final ScheduledExecutorService syncExecutor; - private final ScheduledExecutorService asyncExecutor; - - protected ThreadPoolScheduler(ScheduledExecutorService syncExecutor, ScheduledExecutorService asyncExecutor) { - this.syncExecutor = syncExecutor; - this.asyncExecutor = asyncExecutor; - } - - @Override - public void runSync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - syncExecutor.submit(Scheduler.wrapExceptions(task, trace)); - } - - @Override - public void runDelayedSync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - syncExecutor.schedule(Scheduler.wrapExceptions(task, trace), delay, unit); - } - - @Override - public void runRepeatingSync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - syncExecutor.scheduleAtFixedRate(Scheduler.wrapExceptions(task, trace), 0L, interval, unit); - } - - @Override - public void runAsync(@NotNull Runnable task, @NotNull ExecutorTrace trace) { - asyncExecutor.submit(Scheduler.wrapExceptions(task, trace)); - } - - @Override - public void runDelayedAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - asyncExecutor.schedule(Scheduler.wrapExceptions(task, trace), delay, unit); - } - - @Override - public void runRepeatingAsync(@NotNull Runnable task, long interval, @NotNull TimeUnit unit, @NotNull ExecutorTrace trace) { - asyncExecutor.scheduleAtFixedRate(Scheduler.wrapExceptions(task, trace), 0L, interval, unit); - } - - public @NotNull ScheduledExecutorService getSyncExecutor() { - return syncExecutor; - } - - public @NotNull ScheduledExecutorService getAsyncExecutor() { - return asyncExecutor; - } - - public static ThreadPoolScheduler create(int nThreads) { - return new ThreadPoolScheduler(Executors.newSingleThreadScheduledExecutor(), Executors.newScheduledThreadPool(nThreads)); - } - -}