basic 1.2.0 changes

This commit is contained in:
tommyskeff
2023-12-22 16:43:52 +00:00
parent 11ed692e75
commit 173e34810c
16 changed files with 799 additions and 500 deletions

View File

@@ -1,12 +1,12 @@
package dev.tommyjs.futur.reactivestreams;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.AbstractPromise;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
public class ReactiveTransformer {
public static <T> @NotNull Promise<T> wrapPublisher(@NotNull Publisher<T> publisher) {
public static <T> @NotNull AbstractPromise<T> wrapPublisher(@NotNull Publisher<T> publisher) {
SingleAccumulatorSubscriber<T> subscriber = SingleAccumulatorSubscriber.create();
publisher.subscribe(subscriber);
return subscriber.getPromise();

View File

@@ -1,14 +1,14 @@
package dev.tommyjs.futur.reactivestreams;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.AbstractPromise;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class SingleAccumulatorSubscriber<T> implements Subscriber<T> {
private final Promise<T> promise;
private final AbstractPromise<T> promise;
public SingleAccumulatorSubscriber(Promise<T> promise) {
public SingleAccumulatorSubscriber(AbstractPromise<T> promise) {
this.promise = promise;
}
@@ -32,16 +32,16 @@ public class SingleAccumulatorSubscriber<T> implements Subscriber<T> {
// ignore
}
public Promise<T> getPromise() {
public AbstractPromise<T> getPromise() {
return promise;
}
public static <T> SingleAccumulatorSubscriber<T> create(Promise<T> promise) {
public static <T> SingleAccumulatorSubscriber<T> create(AbstractPromise<T> promise) {
return new SingleAccumulatorSubscriber<>(promise);
}
public static <T> SingleAccumulatorSubscriber<T> create() {
return create(new Promise<>());
return create(new AbstractPromise<>());
}
}