fix: address concurrency issues and add stress tests

This commit is contained in:
2026-03-13 16:15:07 +00:00
parent 28cf21f89a
commit a7d71293a5
7 changed files with 246 additions and 34 deletions

34
.github/workflows/stress.yml vendored Normal file
View File

@@ -0,0 +1,34 @@
name: Stress tests
on:
push:
branches:
- main
pull_request:
jobs:
jcstress:
name: JCStress
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Java
uses: actions/setup-java@v4
with:
distribution: corretto
java-version: 23
- name: Make Gradle executable
run: chmod +x ./gradlew
- name: Run JCStress tests
run: ./gradlew :futur-jcstress:jcstress
- name: Upload JCStress report
if: always()
uses: actions/upload-artifact@v4
with:
name: jcstress-report
path: futur-jcstress/build/reports/jcstress/

View File

@@ -6,7 +6,7 @@ plugins {
subprojects {
group = 'dev.tommyjs'
version = '2.5.4'
version = '2.5.5'
apply plugin: 'java-library'
apply plugin: 'com.github.johnrengelman.shadow'

View File

@@ -18,11 +18,20 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
private static final VarHandle COMPLETION_HANDLE;
private static final VarHandle LISTENERS_HANDLE;
private static final class ListenerNode<T> {
final PromiseListener<T> listener;
ListenerNode<T> next;
ListenerNode(PromiseListener<T> listener) { this.listener = listener; }
}
@SuppressWarnings("rawtypes")
private static final ListenerNode COMPLETED_NODE = new ListenerNode<>(null);
static {
try {
MethodHandles.Lookup lookup = MethodHandles.lookup();
COMPLETION_HANDLE = lookup.findVarHandle(BasePromise.class, "completion", PromiseCompletion.class);
LISTENERS_HANDLE = lookup.findVarHandle(BasePromise.class, "listeners", Collection.class);
LISTENERS_HANDLE = lookup.findVarHandle(BasePromise.class, "listeners", ListenerNode.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
@@ -32,14 +41,12 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
private volatile PromiseCompletion<T> completion;
@SuppressWarnings("FieldMayBeFinal")
private volatile Collection<PromiseListener<T>> listeners;
private volatile ListenerNode<T> listeners;
@SuppressWarnings("unchecked")
public BasePromise() {
this.sync = new Sync();
this.completion = null;
this.listeners = Collections.EMPTY_LIST;
this.listeners = null;
}
protected void handleCompletion(@NotNull PromiseCompletion<T> cmp) {
@@ -63,37 +70,46 @@ public abstract class BasePromise<T> extends AbstractPromise<T> implements Compl
@SuppressWarnings("unchecked")
protected void callListeners(@NotNull PromiseCompletion<T> cmp) {
var iter = ((Iterable<PromiseListener<T>>) LISTENERS_HANDLE.getAndSet(this, null)).iterator();
ListenerNode<T> node = (ListenerNode<T>) LISTENERS_HANDLE.getAndSet(this, COMPLETED_NODE);
if (node == null || node == COMPLETED_NODE) {
return;
}
ListenerNode<T> prev = null;
while (node != null) {
ListenerNode<T> next = node.next;
node.next = prev;
prev = node;
node = next;
}
ListenerNode<T> curr = prev;
try {
while (iter.hasNext()) {
callListener(iter.next(), cmp);
while (curr != null) {
callListener(curr.listener, cmp);
curr = curr.next;
}
} finally {
iter.forEachRemaining(v -> callListenerAsyncLastResort(v, cmp));
while (curr != null) {
callListenerAsyncLastResort(curr.listener, cmp);
curr = curr.next;
}
}
}
@Override
@SuppressWarnings("unchecked")
protected @NotNull Promise<T> addAnyListener(@NotNull PromiseListener<T> listener) {
Collection<PromiseListener<T>> prev = listeners, next = null;
for (boolean haveNext = false; ; ) {
if (!haveNext) {
next = prev == Collections.EMPTY_LIST ? new ConcurrentLinkedQueue<>() : prev;
if (next != null) {
next.add(listener);
}
ListenerNode<T> node = new ListenerNode<>(listener);
ListenerNode<T> prev;
do {
prev = listeners;
if (prev == COMPLETED_NODE) {
callListener(listener, Objects.requireNonNull(getCompletion()));
return this;
}
if (LISTENERS_HANDLE.weakCompareAndSet(this, prev, next)) {
break;
}
haveNext = (prev == (prev = listeners));
}
if (next == null) {
callListener(listener, Objects.requireNonNull(getCompletion()));
}
node.next = prev;
} while (!LISTENERS_HANDLE.weakCompareAndSet(this, prev, node));
return this;
}

View File

@@ -9,21 +9,20 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
public class ConcurrentResultArray<T> {
private final T[] expected;
private final AtomicInteger size;
private T @Nullable [] unexpected;
@SuppressWarnings("unchecked")
public ConcurrentResultArray(int expectedSize) {
//noinspection unchecked
this.expected = (T[]) new Object[expectedSize];
this.size = new AtomicInteger(0);
}
public void set(int index, T element) {
size.updateAndGet(v -> Math.max(v, index + 1));
if (index < expected.length) {
expected[index] = element;
size.updateAndGet(v -> Math.max(v, index + 1));
return;
}
@@ -38,9 +37,9 @@ public class ConcurrentResultArray<T> {
int newLength = unexpected.length + Math.max(minGrowth, prefGrowth);
unexpected = Arrays.copyOf(unexpected, newLength);
}
unexpected[altIndex] = element;
}
size.updateAndGet(v -> Math.max(v, index + 1));
}
public @NotNull List<T> toList() {
@@ -49,10 +48,8 @@ public class ConcurrentResultArray<T> {
if (size <= expected.length) {
return Arrays.asList(result);
}
System.arraycopy(Objects.requireNonNull(unexpected), 0,
result, expected.length, size - expected.length);
return Arrays.asList(result);
}
}

View File

@@ -0,0 +1,24 @@
plugins {
id 'java'
id 'io.github.reyerizo.gradle.jcstress' version '0.8.15'
}
group = 'dev.tommyjs'
version = '2.5.5'
repositories {
mavenCentral()
}
java {
sourceCompatibility = JavaVersion.VERSION_21
targetCompatibility = JavaVersion.VERSION_21
}
dependencies {
implementation project(':futur-api')
}
jcstress {
jcstressDependency = 'org.openjdk.jcstress:jcstress-core:0.16'
}

View File

@@ -0,0 +1,140 @@
package dev.tommyjs.futur.stress;
import dev.tommyjs.futur.promise.CompletablePromise;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.util.ConcurrentResultArray;
import org.openjdk.jcstress.annotations.*;
import org.openjdk.jcstress.infra.results.I_Result;
import org.openjdk.jcstress.infra.results.L_Result;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class FuturStress {
private static PromiseFactory factory() {
return PromiseFactory.of(
LoggerFactory.getLogger(FuturStress.class),
Executors.newScheduledThreadPool(1)
);
}
@JCStressTest
@Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Listener called exactly once")
@Outcome(expect = Expect.FORBIDDEN, desc = "Unexpected call count")
@State
public static class ListenerVersusComplete {
final CompletablePromise<Integer> promise;
final AtomicInteger callCount = new AtomicInteger();
public ListenerVersusComplete() {
promise = factory().unresolved();
promise.addDirectListener(v -> {});
}
@Actor
public void adder() {
promise.addDirectListener(v -> callCount.incrementAndGet());
}
@Actor
public void completer() {
promise.complete(42);
}
@Arbiter
public void arbiter(I_Result r) {
r.r1 = callCount.get();
}
}
@JCStressTest
@Outcome(id = "1", expect = Expect.ACCEPTABLE, desc = "Listener called exactly once")
@Outcome(expect = Expect.FORBIDDEN, desc = "Unexpected call count")
@State
public static class ConcurrentComplete {
final CompletablePromise<Integer> promise;
final AtomicInteger callCount = new AtomicInteger();
public ConcurrentComplete() {
promise = factory().unresolved();
promise.addDirectListener(v -> callCount.incrementAndGet());
}
@Actor
public void completer1() {
promise.complete(1);
}
@Actor
public void completer2() {
promise.complete(2);
}
@Arbiter
public void arbiter(I_Result r) {
r.r1 = callCount.get();
}
}
@JCStressTest
@Outcome(id = "2", expect = Expect.ACCEPTABLE, desc = "Both listeners called exactly once")
@Outcome(expect = Expect.FORBIDDEN, desc = "Unexpected call count")
@State
public static class ConcurrentListenerAdders {
final CompletablePromise<Integer> promise;
final AtomicInteger callCount = new AtomicInteger();
public ConcurrentListenerAdders() {
promise = factory().unresolved();
}
@Actor
public void adder1() {
promise.addDirectListener(v -> callCount.incrementAndGet());
}
@Actor
public void adder2() {
promise.addDirectListener(v -> callCount.incrementAndGet());
}
@Arbiter
public void arbiter(I_Result r) {
promise.complete(42);
r.r1 = callCount.get();
}
}
@JCStressTest
@Outcome(id = "42, 1", expect = Expect.ACCEPTABLE, desc = "Write visible")
@Outcome(id = "null, 0", expect = Expect.ACCEPTABLE, desc = "Neither visible yet")
@Outcome(id = "42, 0", expect = Expect.ACCEPTABLE, desc = "Element visible but size not yet")
@Outcome(id = "null, 1", expect = Expect.FORBIDDEN, desc = "Size visible but element not")
@State
public static class ArrayWriteRead {
final ConcurrentResultArray<Integer> array = new ConcurrentResultArray<>(4);
@Actor
public void writer() {
array.set(0, 42);
}
@Actor
public void reader(L_Result r) {
int size = array.toList().size();
Integer elem = size > 0 ? array.toList().get(0) : null;
r.r1 = elem + ", " + size;
}
}
}

View File

@@ -2,3 +2,4 @@ rootProject.name = 'futur'
include 'futur-api'
include 'futur-lazy'
include 'futur-jcstress'