Merge pull request #6 from tommyskeff/patch/joining

better promise joining and combining
This commit is contained in:
Tommy
2024-03-23 21:43:38 +00:00
committed by GitHub
8 changed files with 109 additions and 73 deletions

View File

@@ -10,4 +10,12 @@ public interface PromiseExecutor {
void runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit); void runAsync(@NotNull Runnable task, long delay, @NotNull TimeUnit unit);
default void runSync(@NotNull Runnable task) {
runSync(task, 0L, TimeUnit.MILLISECONDS);
}
default void runAsync(@NotNull Runnable task) {
runAsync(task, 0L, TimeUnit.MILLISECONDS);
}
} }

View File

@@ -13,7 +13,6 @@ import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
public abstract class AbstractPromise<T> implements Promise<T> { public abstract class AbstractPromise<T> implements Promise<T> {
@@ -30,28 +29,42 @@ public abstract class AbstractPromise<T> implements Promise<T> {
protected abstract Logger getLogger(); protected abstract Logger getLogger();
@Deprecated
@Override @Override
public T join(long interval, long timeout) throws TimeoutException { public T join(long interval, long timeoutMillis) throws TimeoutException {
long start = System.currentTimeMillis(); return join(timeoutMillis);
while (!isCompleted()) { }
if (System.currentTimeMillis() > start + timeout)
throw new TimeoutException("Promise timed out after " + timeout + "ms");
try { @Override
Thread.sleep(interval); public T join(long timeoutMillis) throws TimeoutException {
} catch (InterruptedException e) { PromiseCompletion<T> completion = this.completion.get();
throw new RuntimeException(e); if (completion != null) return joinCompletion(completion);
long start = System.currentTimeMillis();
long remainingTimeout = timeoutMillis;
synchronized (this.completion) {
while (completion == null && remainingTimeout > 0){
try {
this.completion.wait(remainingTimeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
completion = this.completion.get();
remainingTimeout = timeoutMillis - (System.currentTimeMillis() - start);
} }
} }
PromiseCompletion<T> completion = getCompletion(); if (completion == null)
if (completion == null) { throw new TimeoutException("Promise timed out after " + timeoutMillis + "ms");
throw new IllegalStateException();
}
if (completion.isError()) { return joinCompletion(completion);
}
private T joinCompletion(PromiseCompletion<T> completion) {
if (completion.isError())
throw new RuntimeException(completion.getException()); throw new RuntimeException(completion.getException());
}
return completion.getResult(); return completion.getResult();
} }
@@ -275,26 +288,33 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override @Override
public @NotNull Promise<T> logExceptions() { public @NotNull Promise<T> logExceptions() {
return logExceptions("Exception caught in promise chain");
}
@Override
public @NotNull Promise<T> logExceptions(@NotNull String message) {
return addListener(ctx -> { return addListener(ctx -> {
if (ctx.isError()) { if (ctx.isError()) {
getLogger().error("Exception caught in promise chain", ctx.getException()); getLogger().error(message, ctx.getException());
} }
}); });
} }
@Override @Override
public @NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener) { public @NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener) {
if (isCompleted()) { synchronized (completion) {
getExecutor().runAsync(() -> { if (isCompleted()) {
try { getExecutor().runAsync(() -> {
//noinspection ConstantConditions try {
listener.handle(getCompletion()); //noinspection ConstantConditions
} catch (Exception e) { listener.handle(getCompletion());
getLogger().error("Exception caught in promise listener", e); } catch (Exception e) {
} getLogger().error("Exception caught in promise listener", e);
}, 0L, TimeUnit.MILLISECONDS); }
} else { });
getListeners().add(listener); } else {
getListeners().add(listener);
}
} }
return this; return this;
@@ -316,35 +336,27 @@ public abstract class AbstractPromise<T> implements Promise<T> {
return timeout(ms, TimeUnit.MILLISECONDS); return timeout(ms, TimeUnit.MILLISECONDS);
} }
protected void handleCompletion(@NotNull PromiseCompletion<T> ctx) { private void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
AtomicBoolean success = new AtomicBoolean(); synchronized (completion) {
completion.getAndUpdate(c -> { if (!setCompletion(ctx)) return;
if (c == null) {
success.set(true);
return ctx;
} else {
success.set(false);
return c;
}
});
if (success.get()) { completion.notifyAll();
handleCompletion0(ctx); getExecutor().runAsync(() -> {
for (PromiseListener<T> listener : getListeners()) {
if (!ctx.isActive()) return;
try {
listener.handle(ctx);
} catch (Exception e) {
getLogger().error("Exception caught in promise listener", e);
}
}
});
} }
} }
protected void handleCompletion0(@NotNull PromiseCompletion<T> ctx) { private boolean setCompletion(PromiseCompletion<T> completion) {
getExecutor().runAsync(() -> { return this.completion.compareAndSet(null, completion);
for (PromiseListener<T> listener : getListeners()) {
if (!ctx.isActive()) return;
try {
listener.handle(ctx);
} catch (Exception e) {
getLogger().error("Exception caught in promise listener", e);
}
}
}, 0L, TimeUnit.MILLISECONDS);
} }
@Override @Override
@@ -367,7 +379,7 @@ public abstract class AbstractPromise<T> implements Promise<T> {
return completion.get(); return completion.get();
} }
protected Collection<PromiseListener<T>> getListeners() { private Collection<PromiseListener<T>> getListeners() {
return listeners; return listeners;
} }

View File

@@ -31,8 +31,11 @@ public interface Promise<T> {
PromiseFactory getFactory(); PromiseFactory getFactory();
@Deprecated
T join(long interval, long timeout) throws TimeoutException; T join(long interval, long timeout) throws TimeoutException;
T join(long timeout) throws TimeoutException;
@NotNull Promise<Void> thenRunSync(@NotNull ExceptionalRunnable task); @NotNull Promise<Void> thenRunSync(@NotNull ExceptionalRunnable task);
@NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit); @NotNull Promise<Void> thenRunDelayedSync(@NotNull ExceptionalRunnable task, long delay, @NotNull TimeUnit unit);
@@ -73,6 +76,8 @@ public interface Promise<T> {
@NotNull Promise<T> logExceptions(); @NotNull Promise<T> logExceptions();
@NotNull Promise<T> logExceptions(@NotNull String message);
@NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener); @NotNull Promise<T> addListener(@NotNull PromiseListener<T> listener);
@NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit); @NotNull Promise<T> timeout(long time, @NotNull TimeUnit unit);

View File

@@ -42,15 +42,13 @@ public class Promises {
} }
public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, @Nullable BiConsumer<K, Throwable> exceptionHandler, PromiseFactory factory) { public static <K, V> @NotNull Promise<Map<K, V>> combine(@NotNull Map<K, Promise<V>> promises, long timeout, @Nullable BiConsumer<K, Throwable> exceptionHandler, PromiseFactory factory) {
Map<K, V> map = new HashMap<>(); if (promises.isEmpty()) return factory.resolve(Collections.emptyMap());
ReentrantLock lock = new ReentrantLock();
Map<K, V> map = new HashMap<>();
Promise<Map<K, V>> promise = factory.unresolved(); Promise<Map<K, V>> promise = factory.unresolved();
for (Map.Entry<K, Promise<V>> entry : promises.entrySet()) { for (Map.Entry<K, Promise<V>> entry : promises.entrySet()) {
entry.getValue().addListener((ctx) -> { entry.getValue().addListener((ctx) -> {
lock.lock(); synchronized (map) {
try {
if (ctx.isError()) { if (ctx.isError()) {
if (exceptionHandler == null) { if (exceptionHandler == null) {
//noinspection ConstantConditions //noinspection ConstantConditions
@@ -63,8 +61,6 @@ public class Promises {
map.put(entry.getKey(), ctx.getResult()); map.put(entry.getKey(), ctx.getResult());
} }
if (map.size() == promises.size()) promise.complete(map); if (map.size() == promises.size()) promise.complete(map);
} finally {
lock.unlock();
} }
}); });
} }
@@ -105,6 +101,8 @@ public class Promises {
} }
public static @NotNull Promise<Void> all(@NotNull List<Promise<?>> promises, PromiseFactory factory) { public static @NotNull Promise<Void> all(@NotNull List<Promise<?>> promises, PromiseFactory factory) {
if (promises.isEmpty()) return factory.resolve(null);
Promise<Void> promise = factory.unresolved(); Promise<Void> promise = factory.unresolved();
for (Promise<?> p : promises) { for (Promise<?> p : promises) {
p.addListener((ctx) -> { p.addListener((ctx) -> {

Binary file not shown.

View File

@@ -1,6 +1,6 @@
#Sun Nov 19 18:44:26 GMT 2023
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.0-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

18
gradlew vendored
View File

@@ -55,7 +55,7 @@
# Darwin, MinGW, and NonStop. # Darwin, MinGW, and NonStop.
# #
# (3) This script is generated from the Groovy template # (3) This script is generated from the Groovy template
# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project. # within the Gradle project.
# #
# You can find Gradle at https://github.com/gradle/gradle/. # You can find Gradle at https://github.com/gradle/gradle/.
@@ -80,10 +80,10 @@ do
esac esac
done done
APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit # This is normally unused
# shellcheck disable=SC2034
APP_NAME="Gradle"
APP_BASE_NAME=${0##*/} APP_BASE_NAME=${0##*/}
APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
@@ -143,12 +143,16 @@ fi
if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #( case $MAX_FD in #(
max*) max*)
# In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
MAX_FD=$( ulimit -H -n ) || MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit" warn "Could not query maximum file descriptor limit"
esac esac
case $MAX_FD in #( case $MAX_FD in #(
'' | soft) :;; #( '' | soft) :;; #(
*) *)
# In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
ulimit -n "$MAX_FD" || ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD" warn "Could not set maximum file descriptor limit to $MAX_FD"
esac esac
@@ -205,6 +209,12 @@ set -- \
org.gradle.wrapper.GradleWrapperMain \ org.gradle.wrapper.GradleWrapperMain \
"$@" "$@"
# Stop when "xargs" is not available.
if ! command -v xargs >/dev/null 2>&1
then
die "xargs is not available"
fi
# Use "xargs" to parse quoted args. # Use "xargs" to parse quoted args.
# #
# With -n1 it outputs one arg per line, with the quotes and backslashes removed. # With -n1 it outputs one arg per line, with the quotes and backslashes removed.

15
gradlew.bat vendored
View File

@@ -14,7 +14,7 @@
@rem limitations under the License. @rem limitations under the License.
@rem @rem
@if "%DEBUG%" == "" @echo off @if "%DEBUG%"=="" @echo off
@rem ########################################################################## @rem ##########################################################################
@rem @rem
@rem Gradle startup script for Windows @rem Gradle startup script for Windows
@@ -25,7 +25,8 @@
if "%OS%"=="Windows_NT" setlocal if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0 set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=. if "%DIRNAME%"=="" set DIRNAME=.
@rem This is normally unused
set APP_BASE_NAME=%~n0 set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME% set APP_HOME=%DIRNAME%
@@ -40,7 +41,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1 %JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute if %ERRORLEVEL% equ 0 goto execute
echo. echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
@@ -75,13 +76,15 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
:end :end
@rem End local scope for the variables with windows NT shell @rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd if %ERRORLEVEL% equ 0 goto mainEnd
:fail :fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code! rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 set EXIT_CODE=%ERRORLEVEL%
exit /b 1 if %EXIT_CODE% equ 0 set EXIT_CODE=1
if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
exit /b %EXIT_CODE%
:mainEnd :mainEnd
if "%OS%"=="Windows_NT" endlocal if "%OS%"=="Windows_NT" endlocal