From a7d71293a5f532aabf57c77d2d146600a6539c54 Mon Sep 17 00:00:00 2001 From: tommyskeff Date: Fri, 13 Mar 2026 16:15:07 +0000 Subject: [PATCH] fix: address concurrency issues and add stress tests --- .github/workflows/stress.yml | 34 +++++ build.gradle | 2 +- .../tommyjs/futur/promise/BasePromise.java | 70 +++++---- .../futur/util/ConcurrentResultArray.java | 9 +- futur-jcstress/build.gradle | 24 +++ .../dev/tommyjs/futur/stress/FuturStress.java | 140 ++++++++++++++++++ settings.gradle | 1 + 7 files changed, 246 insertions(+), 34 deletions(-) create mode 100644 .github/workflows/stress.yml create mode 100644 futur-jcstress/build.gradle create mode 100644 futur-jcstress/src/jcstress/java/dev/tommyjs/futur/stress/FuturStress.java diff --git a/.github/workflows/stress.yml b/.github/workflows/stress.yml new file mode 100644 index 0000000..c81ac7e --- /dev/null +++ b/.github/workflows/stress.yml @@ -0,0 +1,34 @@ +name: Stress tests + +on: + push: + branches: + - main + pull_request: + +jobs: + jcstress: + name: JCStress + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Java + uses: actions/setup-java@v4 + with: + distribution: corretto + java-version: 23 + + - name: Make Gradle executable + run: chmod +x ./gradlew + + - name: Run JCStress tests + run: ./gradlew :futur-jcstress:jcstress + + - name: Upload JCStress report + if: always() + uses: actions/upload-artifact@v4 + with: + name: jcstress-report + path: futur-jcstress/build/reports/jcstress/ diff --git a/build.gradle b/build.gradle index 59932c5..1f74535 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { subprojects { group = 'dev.tommyjs' - version = '2.5.4' + version = '2.5.5' apply plugin: 'java-library' apply plugin: 'com.github.johnrengelman.shadow' diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/BasePromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/BasePromise.java index 8560d7f..7acdf77 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/BasePromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/BasePromise.java @@ -18,11 +18,20 @@ public abstract class BasePromise extends AbstractPromise implements Compl private static final VarHandle COMPLETION_HANDLE; private static final VarHandle LISTENERS_HANDLE; + private static final class ListenerNode { + final PromiseListener listener; + ListenerNode next; + ListenerNode(PromiseListener listener) { this.listener = listener; } + } + + @SuppressWarnings("rawtypes") + private static final ListenerNode COMPLETED_NODE = new ListenerNode<>(null); + static { try { MethodHandles.Lookup lookup = MethodHandles.lookup(); COMPLETION_HANDLE = lookup.findVarHandle(BasePromise.class, "completion", PromiseCompletion.class); - LISTENERS_HANDLE = lookup.findVarHandle(BasePromise.class, "listeners", Collection.class); + LISTENERS_HANDLE = lookup.findVarHandle(BasePromise.class, "listeners", ListenerNode.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } @@ -32,14 +41,12 @@ public abstract class BasePromise extends AbstractPromise implements Compl private volatile PromiseCompletion completion; - @SuppressWarnings("FieldMayBeFinal") - private volatile Collection> listeners; + private volatile ListenerNode listeners; - @SuppressWarnings("unchecked") public BasePromise() { this.sync = new Sync(); this.completion = null; - this.listeners = Collections.EMPTY_LIST; + this.listeners = null; } protected void handleCompletion(@NotNull PromiseCompletion cmp) { @@ -63,37 +70,46 @@ public abstract class BasePromise extends AbstractPromise implements Compl @SuppressWarnings("unchecked") protected void callListeners(@NotNull PromiseCompletion cmp) { - var iter = ((Iterable>) LISTENERS_HANDLE.getAndSet(this, null)).iterator(); + ListenerNode node = (ListenerNode) LISTENERS_HANDLE.getAndSet(this, COMPLETED_NODE); + if (node == null || node == COMPLETED_NODE) { + return; + } + + ListenerNode prev = null; + while (node != null) { + ListenerNode next = node.next; + node.next = prev; + prev = node; + node = next; + } + + ListenerNode curr = prev; try { - while (iter.hasNext()) { - callListener(iter.next(), cmp); + while (curr != null) { + callListener(curr.listener, cmp); + curr = curr.next; } } finally { - iter.forEachRemaining(v -> callListenerAsyncLastResort(v, cmp)); + while (curr != null) { + callListenerAsyncLastResort(curr.listener, cmp); + curr = curr.next; + } } } @Override + @SuppressWarnings("unchecked") protected @NotNull Promise addAnyListener(@NotNull PromiseListener listener) { - Collection> prev = listeners, next = null; - for (boolean haveNext = false; ; ) { - if (!haveNext) { - next = prev == Collections.EMPTY_LIST ? new ConcurrentLinkedQueue<>() : prev; - if (next != null) { - next.add(listener); - } + ListenerNode node = new ListenerNode<>(listener); + ListenerNode prev; + do { + prev = listeners; + if (prev == COMPLETED_NODE) { + callListener(listener, Objects.requireNonNull(getCompletion())); + return this; } - - if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next)) { - break; - } - - haveNext = (prev == (prev = listeners)); - } - - if (next == null) { - callListener(listener, Objects.requireNonNull(getCompletion())); - } + node.next = prev; + } while (!LISTENERS_HANDLE.weakCompareAndSet(this, prev, node)); return this; } diff --git a/futur-api/src/main/java/dev/tommyjs/futur/util/ConcurrentResultArray.java b/futur-api/src/main/java/dev/tommyjs/futur/util/ConcurrentResultArray.java index a59e9ae..c7a6bc8 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/util/ConcurrentResultArray.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/util/ConcurrentResultArray.java @@ -9,21 +9,20 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; public class ConcurrentResultArray { - private final T[] expected; private final AtomicInteger size; private T @Nullable [] unexpected; + @SuppressWarnings("unchecked") public ConcurrentResultArray(int expectedSize) { - //noinspection unchecked this.expected = (T[]) new Object[expectedSize]; this.size = new AtomicInteger(0); } public void set(int index, T element) { - size.updateAndGet(v -> Math.max(v, index + 1)); if (index < expected.length) { expected[index] = element; + size.updateAndGet(v -> Math.max(v, index + 1)); return; } @@ -38,9 +37,9 @@ public class ConcurrentResultArray { int newLength = unexpected.length + Math.max(minGrowth, prefGrowth); unexpected = Arrays.copyOf(unexpected, newLength); } - unexpected[altIndex] = element; } + size.updateAndGet(v -> Math.max(v, index + 1)); } public @NotNull List toList() { @@ -49,10 +48,8 @@ public class ConcurrentResultArray { if (size <= expected.length) { return Arrays.asList(result); } - System.arraycopy(Objects.requireNonNull(unexpected), 0, result, expected.length, size - expected.length); return Arrays.asList(result); } - } diff --git a/futur-jcstress/build.gradle b/futur-jcstress/build.gradle new file mode 100644 index 0000000..54a14cd --- /dev/null +++ b/futur-jcstress/build.gradle @@ -0,0 +1,24 @@ +plugins { + id 'java' + id 'io.github.reyerizo.gradle.jcstress' version '0.8.15' +} + +group = 'dev.tommyjs' +version = '2.5.5' + +repositories { + mavenCentral() +} + +java { + sourceCompatibility = JavaVersion.VERSION_21 + targetCompatibility = JavaVersion.VERSION_21 +} + +dependencies { + implementation project(':futur-api') +} + +jcstress { + jcstressDependency = 'org.openjdk.jcstress:jcstress-core:0.16' +} diff --git a/futur-jcstress/src/jcstress/java/dev/tommyjs/futur/stress/FuturStress.java b/futur-jcstress/src/jcstress/java/dev/tommyjs/futur/stress/FuturStress.java new file mode 100644 index 0000000..828cac9 --- /dev/null +++ b/futur-jcstress/src/jcstress/java/dev/tommyjs/futur/stress/FuturStress.java @@ -0,0 +1,140 @@ +package dev.tommyjs.futur.stress; + +import dev.tommyjs.futur.promise.CompletablePromise; +import dev.tommyjs.futur.promise.PromiseFactory; +import dev.tommyjs.futur.util.ConcurrentResultArray; +import org.openjdk.jcstress.annotations.*; +import org.openjdk.jcstress.infra.results.I_Result; +import org.openjdk.jcstress.infra.results.L_Result; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +public class FuturStress { + + private static PromiseFactory factory() { + return PromiseFactory.of( + LoggerFactory.getLogger(FuturStress.class), + Executors.newScheduledThreadPool(1) + ); + } + + @JCStressTest + @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Listener called exactly once") + @Outcome(expect = Expect.FORBIDDEN, desc = "Unexpected call count") + @State + public static class ListenerVersusComplete { + + final CompletablePromise promise; + final AtomicInteger callCount = new AtomicInteger(); + + public ListenerVersusComplete() { + promise = factory().unresolved(); + promise.addDirectListener(v -> {}); + } + + @Actor + public void adder() { + promise.addDirectListener(v -> callCount.incrementAndGet()); + } + + @Actor + public void completer() { + promise.complete(42); + } + + @Arbiter + public void arbiter(I_Result r) { + r.r1 = callCount.get(); + } + + } + + @JCStressTest + @Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Listener called exactly once") + @Outcome(expect = Expect.FORBIDDEN, desc = "Unexpected call count") + @State + public static class ConcurrentComplete { + + final CompletablePromise promise; + final AtomicInteger callCount = new AtomicInteger(); + + public ConcurrentComplete() { + promise = factory().unresolved(); + promise.addDirectListener(v -> callCount.incrementAndGet()); + } + + @Actor + public void completer1() { + promise.complete(1); + } + + @Actor + public void completer2() { + promise.complete(2); + } + + @Arbiter + public void arbiter(I_Result r) { + r.r1 = callCount.get(); + } + + } + + @JCStressTest + @Outcome(id = "2", expect = Expect.ACCEPTABLE, desc = "Both listeners called exactly once") + @Outcome(expect = Expect.FORBIDDEN, desc = "Unexpected call count") + @State + public static class ConcurrentListenerAdders { + + final CompletablePromise promise; + final AtomicInteger callCount = new AtomicInteger(); + + public ConcurrentListenerAdders() { + promise = factory().unresolved(); + } + + @Actor + public void adder1() { + promise.addDirectListener(v -> callCount.incrementAndGet()); + } + + @Actor + public void adder2() { + promise.addDirectListener(v -> callCount.incrementAndGet()); + } + + @Arbiter + public void arbiter(I_Result r) { + promise.complete(42); + r.r1 = callCount.get(); + } + + } + + @JCStressTest + @Outcome(id = "42, 1", expect = Expect.ACCEPTABLE, desc = "Write visible") + @Outcome(id = "null, 0", expect = Expect.ACCEPTABLE, desc = "Neither visible yet") + @Outcome(id = "42, 0", expect = Expect.ACCEPTABLE, desc = "Element visible but size not yet") + @Outcome(id = "null, 1", expect = Expect.FORBIDDEN, desc = "Size visible but element not") + @State + public static class ArrayWriteRead { + + final ConcurrentResultArray array = new ConcurrentResultArray<>(4); + + @Actor + public void writer() { + array.set(0, 42); + } + + @Actor + public void reader(L_Result r) { + int size = array.toList().size(); + Integer elem = size > 0 ? array.toList().get(0) : null; + r.r1 = elem + ", " + size; + } + + } + +} diff --git a/settings.gradle b/settings.gradle index a0e7d05..602bcf2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -2,3 +2,4 @@ rootProject.name = 'futur' include 'futur-api' include 'futur-lazy' +include 'futur-jcstress'