mirror of
https://github.com/tommyskeff/futur4j.git
synced 2026-03-19 02:01:22 +00:00
fix join concurrency issue
This commit is contained in:
@@ -1,35 +1,58 @@
|
||||
package dev.tommyjs.futur.util;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class ConcurrentResultArray<T> {
|
||||
|
||||
private static final float RESIZE_FACTOR = 1.2F;
|
||||
|
||||
private final AtomicReference<T[]> ref;
|
||||
private final T[] expected;
|
||||
private final AtomicInteger size;
|
||||
private T @Nullable [] unexpected;
|
||||
|
||||
public ConcurrentResultArray(int expectedSize) {
|
||||
//noinspection unchecked
|
||||
this.ref = new AtomicReference<>((T[]) new Object[expectedSize]);
|
||||
this.expected = (T[]) new Object[expectedSize];
|
||||
this.size = new AtomicInteger(0);
|
||||
}
|
||||
|
||||
public void set(int index, T element) {
|
||||
ref.updateAndGet(array -> {
|
||||
if (array.length <= index) {
|
||||
array = Arrays.copyOf(array, (int) (array.length * RESIZE_FACTOR));
|
||||
size.updateAndGet(v -> Math.max(v, index + 1));
|
||||
if (index < expected.length) {
|
||||
expected[index] = element;
|
||||
return;
|
||||
}
|
||||
|
||||
int altIndex = index - expected.length;
|
||||
synchronized (this) {
|
||||
if (unexpected == null) {
|
||||
//noinspection unchecked
|
||||
unexpected = (T[]) new Object[Math.max(10, altIndex + 1)];
|
||||
} else if (altIndex >= unexpected.length) {
|
||||
int minGrowth = altIndex - unexpected.length + 1;
|
||||
int prefGrowth = Math.max(1, unexpected.length >> 1);
|
||||
int newLength = unexpected.length + Math.max(minGrowth, prefGrowth);
|
||||
unexpected = Arrays.copyOf(unexpected, newLength);
|
||||
}
|
||||
|
||||
array[index] = element;
|
||||
return array;
|
||||
});
|
||||
unexpected[altIndex] = element;
|
||||
}
|
||||
}
|
||||
|
||||
public @NotNull List<T> toList() {
|
||||
return Arrays.asList(ref.get());
|
||||
int size = this.size.get();
|
||||
T[] result = Arrays.copyOf(expected, size);
|
||||
if (size <= expected.length) {
|
||||
return Arrays.asList(result);
|
||||
}
|
||||
|
||||
System.arraycopy(Objects.requireNonNull(unexpected), 0,
|
||||
result, expected.length, size - expected.length);
|
||||
return Arrays.asList(result);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -8,11 +8,14 @@ import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public final class PromiseTests {
|
||||
@@ -83,6 +86,36 @@ public final class PromiseTests {
|
||||
assert !finished.get();
|
||||
}
|
||||
|
||||
public IntStream unsizedIntStream(int size) {
|
||||
AtomicInteger i = new AtomicInteger();
|
||||
return IntStream.generate(i::getAndIncrement).limit(size);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnsizedIntStream() {
|
||||
assert unsizedIntStream(1000).spliterator().estimateSize() == Long.MAX_VALUE;
|
||||
assert Arrays.equals(unsizedIntStream(1000).toArray(), IntStream.range(0, 1000).toArray());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicCombine() {
|
||||
var result = promises.combine(unsizedIntStream(1000).mapToObj(promises::resolve)).await();
|
||||
assert result.equals(unsizedIntStream(1000).boxed().toList());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicCombine1() {
|
||||
var result = promises.combine(IntStream.range(0, 1000).mapToObj(promises::resolve)).await();
|
||||
assert result.equals(IntStream.range(0, 1000).boxed().toList());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicCombine2() {
|
||||
var result = promises.combine(unsizedIntStream(1000)
|
||||
.mapToObj(i -> promises.start().thenSupplyDelayedAsync(() -> i, 1000 - i, TimeUnit.MILLISECONDS))).await();
|
||||
assert result.equals(unsizedIntStream(1000).boxed().toList());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCombineUtil() throws TimeoutException, ExecutionException, InterruptedException {
|
||||
promises.all(
|
||||
|
||||
Reference in New Issue
Block a user