release 2.1.0

This commit is contained in:
tommyskeff
2024-03-12 07:27:51 +00:00
parent 49b701c736
commit 795ce04eb4
15 changed files with 143 additions and 56 deletions

View File

@@ -1,5 +1,6 @@
package dev.tommyjs.futur.promise;
import dev.tommyjs.futur.executor.PromiseExecutor;
import dev.tommyjs.futur.function.ExceptionalConsumer;
import dev.tommyjs.futur.function.ExceptionalFunction;
import dev.tommyjs.futur.function.ExceptionalRunnable;
@@ -10,7 +11,6 @@ import org.slf4j.Logger;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -18,15 +18,14 @@ import java.util.concurrent.atomic.AtomicReference;
public abstract class AbstractPromise<T> implements Promise<T> {
private final Collection<PromiseListener<T>> listeners;
private @Nullable PromiseCompletion<T> completion;
private final AtomicReference<PromiseCompletion<T>> completion;
public AbstractPromise() {
this.listeners = new ConcurrentLinkedQueue<>();
this.completion = null;
this.completion = new AtomicReference<>();
}
protected abstract ScheduledExecutorService getExecutor();
protected abstract PromiseExecutor getExecutor();
protected abstract Logger getLogger();
@@ -97,7 +96,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
public <V> @NotNull Promise<V> thenSupplyDelayedSync(@NotNull ExceptionalSupplier<V> task, long delay, @NotNull TimeUnit unit) {
return thenApplyDelayedSync(result -> task.get(), delay, unit);
}
@Override
public <V> @NotNull Promise<V> thenApplySync(@NotNull ExceptionalFunction<T, V> task) {
Promise<V> promise = getFactory().unresolved();
@@ -109,7 +108,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
}
Runnable runnable = createRunnable(ctx, promise, task);
getExecutor().submit(runnable);
getExecutor().runSync(runnable, 0L, TimeUnit.MILLISECONDS);
});
return promise;
@@ -126,7 +125,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
}
Runnable runnable = createRunnable(ctx, promise, task);
getExecutor().schedule(runnable, delay, unit);
getExecutor().runSync(runnable, delay, unit);
});
return promise;
@@ -216,7 +215,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
}
Runnable runnable = createRunnable(ctx, promise, task);
getExecutor().submit(runnable);
getExecutor().runAsync(runnable, 0L, TimeUnit.MILLISECONDS);
});
return promise;
@@ -227,7 +226,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
Promise<V> promise = getFactory().unresolved();
addListener(ctx -> {
Runnable runnable = createRunnable(ctx, promise, task);
getExecutor().schedule(runnable, delay, unit);
getExecutor().runAsync(runnable, delay, unit);
});
return promise;
@@ -285,14 +284,14 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override
public @NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener) {
if (isCompleted()) {
getExecutor().submit(() -> {
getExecutor().runAsync(() -> {
try {
//noinspection ConstantConditions
listener.handle(getCompletion());
} catch (Exception e) {
getLogger().error("Exception caught in promise listener", e);
}
});
}, 0L, TimeUnit.MILLISECONDS);
} else {
getListeners().add(listener);
}
@@ -302,7 +301,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override
public @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit) {
getExecutor().schedule(() -> {
getExecutor().runAsync(() -> {
if (!isCompleted()) {
completeExceptionally(new TimeoutException("Promise timed out after " + time + " " + unit));
}
@@ -317,10 +316,23 @@ public abstract class AbstractPromise<T> implements Promise<T> {
}
protected void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
if (this.isCompleted()) return;
setCompletion(ctx);
AtomicReference<Boolean> success = new AtomicReference<>();
completion.getAndUpdate(c -> {
if (c == null) {
return null;
} else {
success.set(true);
return ctx;
}
});
getExecutor().submit(() -> {
if (success.get()) {
handleCompletion0(ctx);
}
}
protected void handleCompletion0(@NotNull PromiseCompletion<T> ctx) {
getExecutor().runAsync(() -> {
for (PromiseListener<T> listener : getListeners()) {
if (!ctx.isActive()) return;
@@ -331,7 +343,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
getLogger().error("Exception caught in promise listener", e);
}
}
});
}, 0L, TimeUnit.MILLISECONDS);
}
@Override
@@ -346,20 +358,16 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override
public boolean isCompleted() {
return getCompletion() != null;
return completion.get() != null;
}
@Override
public @Nullable PromiseCompletion<T> getCompletion() {
return completion.get();
}
protected Collection<PromiseListener<T>> getListeners() {
return listeners;
}
@Override
public @Nullable PromiseCompletion<T> getCompletion() {
return completion;
}
protected void setCompletion(@NotNull PromiseCompletion<T> completion) {
this.completion = completion;
}
}