optimization for virtual threads

This commit is contained in:
WhatCats
2024-04-09 19:17:29 +02:00
parent e8512df504
commit 29c614f5d7
3 changed files with 24 additions and 25 deletions

View File

@@ -38,7 +38,6 @@ subprojects {
testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testImplementation 'io.projectreactor:reactor-core:3.6.4' testImplementation 'io.projectreactor:reactor-core:3.6.4'
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
testImplementation 'org.slf4j:slf4j-api:2.0.12'
testImplementation 'ch.qos.logback:logback-classic:1.5.3' testImplementation 'ch.qos.logback:logback-classic:1.5.3'
} }

View File

@@ -12,16 +12,21 @@ import org.slf4j.Logger;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
public abstract class AbstractPromise<T, F> implements Promise<T> { public abstract class AbstractPromise<T, F> implements Promise<T> {
private final AtomicReference<Collection<PromiseListener<T>>> listeners; private final AtomicReference<Collection<PromiseListener<T>>> listeners;
private final AtomicReference<PromiseCompletion<T>> completion; private final AtomicReference<PromiseCompletion<T>> completion;
private final CountDownLatch latch;
private final ReentrantLock lock;
public AbstractPromise() { public AbstractPromise() {
this.listeners = new AtomicReference<>(); this.listeners = new AtomicReference<>();
this.completion = new AtomicReference<>(); this.completion = new AtomicReference<>();
this.latch = new CountDownLatch(1);
this.lock = new ReentrantLock();
} }
protected static <V> void propagateResult(Promise<V> from, Promise<V> to) { protected static <V> void propagateResult(Promise<V> from, Promise<V> to) {
@@ -57,24 +62,14 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
@Override @Override
public T join(long timeoutMillis) throws TimeoutException { public T join(long timeoutMillis) throws TimeoutException {
PromiseCompletion<T> completion; try {
long start = System.currentTimeMillis(); //noinspection ResultOfMethodCallIgnored
long remainingTimeout = timeoutMillis; this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
synchronized (this.completion) { throw new RuntimeException(e);
completion = this.completion.get();
while (completion == null && remainingTimeout > 0) {
try {
this.completion.wait(remainingTimeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
completion = this.completion.get();
remainingTimeout = timeoutMillis - (System.currentTimeMillis() - start);
}
} }
PromiseCompletion<T> completion = getCompletion();
if (completion == null) if (completion == null)
throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms"); throw new TimeoutException("Promise stopped waiting after " + timeoutMillis + "ms");
@@ -320,7 +315,8 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
} }
private @NotNull Promise<T> addAnyListener(PromiseListener<T> listener) { private @NotNull Promise<T> addAnyListener(PromiseListener<T> listener) {
synchronized (completion) { lock.lock();
try {
PromiseCompletion<T> completion = getCompletion(); PromiseCompletion<T> completion = getCompletion();
if (completion != null) { if (completion != null) {
callListener(listener, completion); callListener(listener, completion);
@@ -328,6 +324,8 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
listeners.compareAndSet(null, new ConcurrentLinkedQueue<>()); listeners.compareAndSet(null, new ConcurrentLinkedQueue<>());
listeners.get().add(listener); listeners.get().add(listener);
} }
} finally {
lock.unlock();
} }
return this; return this;
@@ -392,17 +390,19 @@ public abstract class AbstractPromise<T, F> implements Promise<T> {
} }
private void handleCompletion(@NotNull PromiseCompletion<T> ctx) { private void handleCompletion(@NotNull PromiseCompletion<T> ctx) {
synchronized (completion) { if (!setCompletion(ctx)) return;
if (!setCompletion(ctx)) return;
completion.notifyAll();
lock.lock();
try {
this.latch.countDown();
Collection<PromiseListener<T>> listeners = this.listeners.get(); Collection<PromiseListener<T>> listeners = this.listeners.get();
if (listeners != null) { if (listeners != null) {
for (PromiseListener<T> listener : listeners) { for (PromiseListener<T> listener : listeners) {
callListener(listener, ctx); callListener(listener, ctx);
} }
} }
} finally {
lock.unlock();
} }
} }

View File

@@ -160,10 +160,10 @@ public final class PromiseTests {
public void testRace() throws TimeoutException { public void testRace() throws TimeoutException {
assert pfac.race( assert pfac.race(
List.of( List.of(
pfac.start().thenSupplyDelayedAsync(() -> true, 50, TimeUnit.MILLISECONDS), pfac.start().thenSupplyDelayedAsync(() -> true, 150, TimeUnit.MILLISECONDS),
pfac.start().thenSupplyDelayedAsync(() -> false, 150, TimeUnit.MILLISECONDS) pfac.start().thenSupplyDelayedAsync(() -> false, 200, TimeUnit.MILLISECONDS)
) )
).join(100L); ).join(300L);
} }
} }