From 363669d2c6c56c587873c6cab4572d106c46626d Mon Sep 17 00:00:00 2001 From: WhatCats Date: Sat, 25 May 2024 21:13:20 +0200 Subject: [PATCH] catch RejectedExecutionException --- .../futur/promise/AbstractPromise.java | 56 ++++++++++++++----- .../java/dev/tommyjs/futur/PromiseTests.java | 20 +++++-- 2 files changed, 56 insertions(+), 20 deletions(-) diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java index 0fa1265..351e1b4 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/AbstractPromise.java @@ -150,9 +150,13 @@ public abstract class AbstractPromise implements Promise { Promise promise = getFactory().unresolved(); addDirectListener( res -> { - Runnable runnable = createRunnable(res, promise, task); - F future = getExecutor().runSync(runnable); - promise.onCancel((e) -> getExecutor().cancel(future)); + try { + Runnable runnable = createRunnable(res, promise, task); + F future = getExecutor().runSync(runnable); + promise.onCancel((e) -> getExecutor().cancel(future)); + } catch (RejectedExecutionException e) { + promise.completeExceptionally(e); + } }, promise::completeExceptionally ); @@ -166,9 +170,13 @@ public abstract class AbstractPromise implements Promise { Promise promise = getFactory().unresolved(); addDirectListener( res -> { - Runnable runnable = createRunnable(res, promise, task); - F future = getExecutor().runSync(runnable, delay, unit); - promise.onCancel((e) -> getExecutor().cancel(future)); + try { + Runnable runnable = createRunnable(res, promise, task); + F future = getExecutor().runSync(runnable, delay, unit); + promise.onCancel((e) -> getExecutor().cancel(future)); + } catch (RejectedExecutionException e) { + promise.completeExceptionally(e); + } }, promise::completeExceptionally ); @@ -251,9 +259,13 @@ public abstract class AbstractPromise implements Promise { Promise promise = getFactory().unresolved(); addDirectListener( (res) -> { - Runnable runnable = createRunnable(res, promise, task); - F future = getExecutor().runAsync(runnable); - promise.onCancel((e) -> getExecutor().cancel(future)); + try { + Runnable runnable = createRunnable(res, promise, task); + F future = getExecutor().runAsync(runnable); + promise.onCancel((e) -> getExecutor().cancel(future)); + } catch (RejectedExecutionException e) { + promise.completeExceptionally(e); + } }, promise::completeExceptionally ); @@ -267,9 +279,13 @@ public abstract class AbstractPromise implements Promise { Promise promise = getFactory().unresolved(); addDirectListener( res -> { - Runnable runnable = createRunnable(res, promise, task); - F future = getExecutor().runAsync(runnable, delay, unit); - promise.onCancel((e) -> getExecutor().cancel(future)); + try { + Runnable runnable = createRunnable(res, promise, task); + F future = getExecutor().runAsync(runnable, delay, unit); + promise.onCancel((e) -> getExecutor().cancel(future)); + } catch (RejectedExecutionException e) { + promise.completeExceptionally(e); + } }, promise::completeExceptionally ); @@ -355,7 +371,11 @@ public abstract class AbstractPromise implements Promise { private void callListener(PromiseListener listener, PromiseCompletion ctx) { if (listener instanceof AsyncPromiseListener) { - getExecutor().runAsync(() -> callListenerNow(listener, ctx)); + try { + getExecutor().runAsync(() -> callListenerNow(listener, ctx)); + } catch (RejectedExecutionException ignored) { + + } } else { callListenerNow(listener, ctx); } @@ -407,8 +427,14 @@ public abstract class AbstractPromise implements Promise { @Override public @NotNull Promise maxWaitTime(long time, @NotNull TimeUnit unit) { - F future = getExecutor().runAsync(() -> completeExceptionally(new TimeoutException("Promise stopped waiting after " + time + " " + unit)), time, unit); - return addListener((_v) -> getExecutor().cancel(future)); + try { + Exception e = new TimeoutException("Promise stopped waiting after " + time + " " + unit); + F future = getExecutor().runAsync(() -> completeExceptionally(e), time, unit); + return addDirectListener((_v) -> getExecutor().cancel(future)); + } catch (RejectedExecutionException e) { + completeExceptionally(e); + return this; + } } private void handleCompletion(@NotNull PromiseCompletion ctx) { diff --git a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java index ffe6f01..3a9cb9e 100644 --- a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java +++ b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java @@ -3,6 +3,7 @@ package dev.tommyjs.futur; import dev.tommyjs.futur.executor.PromiseExecutor; import dev.tommyjs.futur.executor.SinglePoolExecutor; import dev.tommyjs.futur.impl.SimplePromiseFactory; +import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseFactory; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -12,16 +13,14 @@ import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; public final class PromiseTests { private final Logger logger = LoggerFactory.getLogger(PromiseTests.class); - private final PromiseExecutor> executor = SinglePoolExecutor.create(5); - private final PromiseFactory pfac = new SimplePromiseFactory<>(executor, logger); + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); + private final PromiseFactory pfac = new SimplePromiseFactory<>(new SinglePoolExecutor(executor), logger); @Test public void testMono() { @@ -36,6 +35,17 @@ public final class PromiseTests { assert resolved.getCompletion().getResult() == value; } + @Test + public void testShutdown() { + executor.close(); + Promise promise = pfac.resolve(null).thenSupplyAsync(() -> null); + try { + promise.await(); + } catch (RuntimeException e) { + assert e.getCause() instanceof RejectedExecutionException; + } + } + @Test public void testErrorCancellation() throws InterruptedException { var finished = new AtomicBoolean();