From 18dff51617909804aafeecb3206c3165f4640e58 Mon Sep 17 00:00:00 2001 From: tommyskeff Date: Thu, 9 Jan 2025 10:17:13 +0000 Subject: [PATCH] fix stream closed issue --- .../tommyjs/futur/promise/PromiseFactory.java | 13 ++++++++----- .../dev/tommyjs/futur/util/PromiseUtil.java | 6 +++--- .../java/dev/tommyjs/futur/PromiseTests.java | 19 +++++++++++++++++++ .../java/dev/tommyjs/futur/lazy/Promises.java | 2 -- 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java index 095f8f1..f44c919 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/promise/PromiseFactory.java @@ -195,7 +195,8 @@ public interface PromiseFactory { */ default @NotNull Promise> combineMapped(@NotNull Stream>> promises, boolean link) { - return combineMapped(promises.iterator(), PromiseUtil.estimateSize(promises), link); + Spliterator>> spliterator = promises.spliterator(); + return combineMapped(Spliterators.iterator(spliterator), PromiseUtil.estimateSize(spliterator), link); } /** @@ -302,7 +303,8 @@ public interface PromiseFactory { * @return the combined promise */ default @NotNull Promise> combine(@NotNull Stream> promises, boolean link) { - return combine(promises.iterator(), PromiseUtil.estimateSize(promises), link); + Spliterator> spliterator = promises.spliterator(); + return combine(Spliterators.iterator(spliterator), PromiseUtil.estimateSize(spliterator), link); } /** @@ -314,7 +316,7 @@ public interface PromiseFactory { * @return the combined promise */ default @NotNull Promise> combine(@NotNull Stream> promises) { - return combine(promises.iterator(), PromiseUtil.estimateSize(promises), true); + return combine(promises, true); } /** @@ -369,7 +371,8 @@ public interface PromiseFactory { */ default @NotNull Promise>> allSettled(@NotNull Stream> promises, boolean link) { - return allSettled(promises.iterator(), PromiseUtil.estimateSize(promises), link); + Spliterator> spliterator = promises.spliterator(); + return allSettled(Spliterators.iterator(spliterator), PromiseUtil.estimateSize(spliterator), link); } /** @@ -380,7 +383,7 @@ public interface PromiseFactory { * @return the combined promise */ default @NotNull Promise>> allSettled(@NotNull Stream> promises) { - return allSettled(promises.iterator(), PromiseUtil.estimateSize(promises), true); + return allSettled(promises, true); } /** diff --git a/futur-api/src/main/java/dev/tommyjs/futur/util/PromiseUtil.java b/futur-api/src/main/java/dev/tommyjs/futur/util/PromiseUtil.java index 25895b8..f2a87bd 100644 --- a/futur-api/src/main/java/dev/tommyjs/futur/util/PromiseUtil.java +++ b/futur-api/src/main/java/dev/tommyjs/futur/util/PromiseUtil.java @@ -4,7 +4,7 @@ import dev.tommyjs.futur.promise.CompletablePromise; import dev.tommyjs.futur.promise.Promise; import org.jetbrains.annotations.NotNull; -import java.util.stream.Stream; +import java.util.Spliterator; public class PromiseUtil { @@ -44,8 +44,8 @@ public class PromiseUtil { * @param stream the stream * @return the estimated size */ - public static int estimateSize(@NotNull Stream stream) { - long estimate = stream.spliterator().estimateSize(); + public static int estimateSize(@NotNull Spliterator stream) { + long estimate = stream.estimateSize(); return estimate == Long.MAX_VALUE ? 10 : (int) estimate; } diff --git a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java index 37ff3ad..9885398 100644 --- a/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java +++ b/futur-api/src/test/java/dev/tommyjs/futur/PromiseTests.java @@ -12,6 +12,7 @@ import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; public final class PromiseTests { @@ -210,4 +211,22 @@ public final class PromiseTests { assert promise2.getCompletion() != null && promise2.getCompletion().getException() instanceof IllegalStateException; } + @Test + public void testStream() { + var res = promises.combine(Stream.of(1, 2, 3).map(promises::resolve)).await(); + assert res.size() == 3; + } + + @Test + public void combineMappedTest() { + var res = promises.combineMapped(List.of(1, 2, 3), + n -> promises.start().thenSupplyDelayedAsync(() -> n * 2, 50, TimeUnit.MILLISECONDS) + ).await(); + + assert res.size() == 3; + assert res.get(1) == 2; + assert res.get(2) == 4; + assert res.get(3) == 6; + } + } diff --git a/futur-lazy/src/main/java/dev/tommyjs/futur/lazy/Promises.java b/futur-lazy/src/main/java/dev/tommyjs/futur/lazy/Promises.java index 633328f..288e983 100644 --- a/futur-lazy/src/main/java/dev/tommyjs/futur/lazy/Promises.java +++ b/futur-lazy/src/main/java/dev/tommyjs/futur/lazy/Promises.java @@ -6,7 +6,6 @@ import dev.tommyjs.futur.promise.Promise; import dev.tommyjs.futur.promise.PromiseCompletion; import dev.tommyjs.futur.promise.PromiseFactory; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,7 +14,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Stream;