make other modules compatible with 1.2.0

This commit is contained in:
tommyskeff
2023-12-22 17:59:04 +00:00
parent 173e34810c
commit 6fc6c9f236
9 changed files with 55 additions and 58 deletions

View File

@@ -1,15 +1,22 @@
package dev.tommyjs.futur.reactivestreams;
import dev.tommyjs.futur.promise.AbstractPromise;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.promise.UnpooledPromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
public class ReactiveTransformer {
public static <T> @NotNull AbstractPromise<T> wrapPublisher(@NotNull Publisher<T> publisher) {
SingleAccumulatorSubscriber<T> subscriber = SingleAccumulatorSubscriber.create();
public static <T> @NotNull Promise<T> wrapPublisher(@NotNull Publisher<T> publisher, PromiseFactory factory) {
SingleAccumulatorSubscriber<T> subscriber = SingleAccumulatorSubscriber.create(factory);
publisher.subscribe(subscriber);
return subscriber.getPromise();
}
public static <T> @NotNull Promise<T> wrapPublisher(@NotNull Publisher<T> publisher) {
return wrapPublisher(publisher, UnpooledPromiseFactory.INSTANCE);
}
}

View File

@@ -1,14 +1,17 @@
package dev.tommyjs.futur.reactivestreams;
import dev.tommyjs.futur.promise.AbstractPromise;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.promise.UnpooledPromiseFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class SingleAccumulatorSubscriber<T> implements Subscriber<T> {
private final AbstractPromise<T> promise;
private final Promise<T> promise;
public SingleAccumulatorSubscriber(AbstractPromise<T> promise) {
public SingleAccumulatorSubscriber(Promise<T> promise) {
this.promise = promise;
}
@@ -32,16 +35,20 @@ public class SingleAccumulatorSubscriber<T> implements Subscriber<T> {
// ignore
}
public AbstractPromise<T> getPromise() {
public Promise<T> getPromise() {
return promise;
}
public static <T> SingleAccumulatorSubscriber<T> create(AbstractPromise<T> promise) {
public static <T> SingleAccumulatorSubscriber<T> create(Promise<T> promise) {
return new SingleAccumulatorSubscriber<>(promise);
}
public static <T> SingleAccumulatorSubscriber<T> create(PromiseFactory factory) {
return create(factory.unresolved());
}
public static <T> SingleAccumulatorSubscriber<T> create() {
return create(new AbstractPromise<>());
return create(UnpooledPromiseFactory.INSTANCE);
}
}