Merge pull request from patch/reactor

This commit is contained in:
Tommy
2024-03-31 20:40:31 +01:00
committed by GitHub
14 changed files with 53 additions and 248 deletions

View File

@@ -13,9 +13,8 @@ repositories {
}
dependencies {
compile 'dev.tommyjs:futur-api:2.1.2'
compile 'dev.tommyjs:futur-reactor:2.1.2'
compile 'dev.tommyjs:futur-reactive-streams:2.1.2'
compile 'dev.tommyjs:futur-api:2.1.3'
compile 'dev.tommyjs:futur-reactor:2.1.3'
}
```
### Gradle DSL
@@ -25,9 +24,8 @@ repositories {
}
dependencies {
implementation("dev.tommyjs:futur-api:2.1.2")
implementation("dev.tommyjs:futur-reactor:2.1.2")
implementation("dev.tommyjs:futur-reactive-streams:2.1.2")
implementation("dev.tommyjs:futur-api:2.1.3")
implementation("dev.tommyjs:futur-reactor:2.1.3")
}
```
### Maven
@@ -43,17 +41,12 @@ dependencies {
<dependency>
<groupId>dev.tommyjs</groupId>
<artifactId>futur-api</artifactId>
<version>2.1.2</version>
<version>2.1.3</version>
</dependency>
<dependency>
<groupId>dev.tommyjs</groupId>
<artifactId>futur-reactor</artifactId>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>dev.tommyjs</groupId>
<artifactId>futur-reactive-streams</artifactId>
<version>2.1.1</version>
<version>2.1.3</version>
</dependency>
</dependencies>
```
@@ -164,10 +157,5 @@ This can also be applied to key-value scenarios in the same way, where you can p
### Future wrappers
External libaries provide asynchronous ways to interact with their output in all shapes and sizes. Futur4J currently has wrappers for the following libraries/frameworks:
- Reactive Streams (via futur-reactive-streams)
- Reactor Core (via futur-reactor)
- Java CompletableFuture (via `Promises.wrap` in futur-api)
Coming Soon:
- Lettuce (redis futures)
- Netty futures (maybe?)

30
build.gradle Normal file
View File

@@ -0,0 +1,30 @@
plugins {
id 'com.github.johnrengelman.shadow' version '8.1.1'
id 'io.github.gradle-nexus.publish-plugin' version '1.3.0'
}
nexusPublishing {
repositories {
tommyjs {
nexusUrl = uri("https://repo.tommyjs.dev/repository/maven-releases")
}
}
}
subprojects {
group = "dev.tommyjs"
version = "2.1.3"
apply plugin: 'java'
apply plugin: 'com.github.johnrengelman.shadow'
tasks {
build {
dependsOn(tasks.shadowJar)
}
}
repositories {
mavenCentral()
}
}

4
futur-api/build.gradle Normal file
View File

@@ -0,0 +1,4 @@
dependencies {
implementation("org.jetbrains:annotations:24.1.0")
implementation("org.slf4j:slf4j-api:2.0.12")
}

View File

@@ -1,34 +0,0 @@
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
plugins {
id("java")
id("com.github.johnrengelman.shadow") version "7.1.2"
}
group = "dev.tommyjs"
version = "2.1.2"
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains:annotations:24.1.0")
implementation("org.slf4j:slf4j-api:2.0.9")
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")
}
tasks {
build {
dependsOn(shadowJar)
}
withType<ShadowJar> {
exclude("META-INF/**")
}
}
tasks.test {
useJUnitPlatform()
}

View File

@@ -37,13 +37,12 @@ public abstract class AbstractPromise<T> implements Promise<T> {
@Override
public T join(long timeoutMillis) throws TimeoutException {
PromiseCompletion<T> completion = this.completion.get();
if (completion != null) return joinCompletion(completion);
PromiseCompletion<T> completion;
long start = System.currentTimeMillis();
long remainingTimeout = timeoutMillis;
synchronized (this.completion) {
completion = this.completion.get();
while (completion == null && remainingTimeout > 0) {
try {
this.completion.wait(remainingTimeout);

View File

@@ -1,42 +0,0 @@
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

View File

@@ -1,35 +0,0 @@
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
plugins {
id("java")
id("com.github.johnrengelman.shadow") version "7.1.2"
}
group = "dev.tommyjs"
version = "2.1.2"
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains:annotations:24.1.0")
compileOnly(project(mapOf("path" to ":futur-api")))
compileOnly("org.reactivestreams:reactive-streams:1.0.4")
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")
}
tasks {
build {
dependsOn(shadowJar)
}
withType<ShadowJar> {
exclude("META-INF/**")
}
}
tasks.test {
useJUnitPlatform()
}

View File

@@ -1,16 +0,0 @@
package dev.tommyjs.futur.reactivestreams;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
public class ReactiveTransformer {
public static <T> @NotNull Promise<T> wrapPublisher(@NotNull Publisher<T> publisher, PromiseFactory factory) {
SingleAccumulatorSubscriber<T> subscriber = SingleAccumulatorSubscriber.create(factory);
publisher.subscribe(subscriber);
return subscriber.getPromise();
}
}

View File

@@ -1,53 +0,0 @@
package dev.tommyjs.futur.reactivestreams;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseFactory;
import dev.tommyjs.futur.impl.StaticPromiseFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class SingleAccumulatorSubscriber<T> implements Subscriber<T> {
private final Promise<T> promise;
public SingleAccumulatorSubscriber(Promise<T> promise) {
this.promise = promise;
}
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onNext(T t) {
promise.complete(t);
}
@Override
public void onError(Throwable t) {
promise.completeExceptionally(t);
}
@Override
public void onComplete() {
// ignore
}
public Promise<T> getPromise() {
return promise;
}
public static <T> SingleAccumulatorSubscriber<T> create(Promise<T> promise) {
return new SingleAccumulatorSubscriber<>(promise);
}
public static <T> SingleAccumulatorSubscriber<T> create(PromiseFactory factory) {
return create(factory.unresolved());
}
public static <T> SingleAccumulatorSubscriber<T> create() {
return create(StaticPromiseFactory.INSTANCE);
}
}

View File

@@ -0,0 +1,5 @@
dependencies {
compileOnly(project(":futur-api"))
implementation("org.jetbrains:annotations:24.1.0")
implementation("io.projectreactor:reactor-core:3.6.4")
}

View File

@@ -1,24 +0,0 @@
plugins {
id("java")
id("com.github.johnrengelman.shadow") version "7.1.2"
}
group = "dev.tommyjs"
version = "2.1.2"
repositories {
mavenCentral()
}
dependencies {
implementation("org.jetbrains:annotations:24.1.0")
compileOnly(project(mapOf("path" to ":futur-api")))
implementation("io.projectreactor:reactor-core:3.6.0")
implementation(project(mapOf("path" to ":futur-reactive-streams")))
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")
}
tasks.test {
useJUnitPlatform()
}

View File

@@ -3,29 +3,13 @@ package dev.tommyjs.futur.reactor;
import dev.tommyjs.futur.promise.Promise;
import dev.tommyjs.futur.promise.PromiseFactory;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public class ReactorTransformer {
public static <T> @NotNull Promise<T> wrapMono(@NotNull Mono<T> mono, PromiseFactory factory) {
Promise<T> promise = factory.unresolved();
mono.doOnSuccess(promise::complete).doOnError(promise::completeExceptionally).subscribe();
return promise;
}
public static <T> @NotNull Promise<@NotNull List<T>> wrapFlux(@NotNull Flux<T> flux, PromiseFactory factory) {
Promise<List<T>> promise = factory.unresolved();
AtomicReference<List<T>> out = new AtomicReference<>(new ArrayList<>());
flux.doOnNext(out.get()::add).subscribe();
flux.doOnComplete(() -> promise.complete(out.get())).subscribe();
flux.doOnError(promise::completeExceptionally).subscribe();
mono.subscribe(promise::complete, promise::completeExceptionally);
return promise;
}

4
settings.gradle Normal file
View File

@@ -0,0 +1,4 @@
rootProject.name = 'futur'
include 'futur-api'
include 'futur-reactor'

View File

@@ -1,5 +0,0 @@
rootProject.name = "futur"
include("futur-api")
include("futur-standalone")
include("futur-reactive-streams")
include("futur-reactor")