Skip to content

Commit aa4a9ec

Browse files
authored
reactor M3 upgrade (#364)
1 parent 64309d1 commit aa4a9ec

11 files changed

Lines changed: 45 additions & 50 deletions

File tree

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ subprojects {
6262
}
6363

6464
dependencies {
65-
compile 'io.projectreactor:reactor-core:3.1.0.M2'
65+
compile 'io.projectreactor:reactor-core:3.1.0.M3'
6666
compile 'io.netty:netty-buffer:4.1.12.Final'
6767
compile 'org.reactivestreams:reactive-streams:1.0.0'
6868
compile 'org.slf4j:slf4j-api:1.7.25'
@@ -72,7 +72,7 @@ subprojects {
7272
testCompile 'org.mockito:mockito-core:1.10.19'
7373
testCompile 'org.hamcrest:hamcrest-library:1.3'
7474
testCompile 'org.slf4j:slf4j-log4j12:1.7.25'
75-
testCompile 'io.projectreactor:reactor-test:3.1.0.M2'
75+
testCompile 'io.projectreactor:reactor-test:3.1.0.M3'
7676
}
7777

7878
publishing {

rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.reactivestreams.Publisher;
2222
import org.reactivestreams.Subscriber;
2323
import org.reactivestreams.Subscription;
24+
import reactor.core.CoreSubscriber;
2425
import reactor.core.publisher.Flux;
2526
import reactor.core.publisher.Operators;
2627

@@ -48,7 +49,7 @@ public static <T> LimitableRequestPublisher<T> wrap(Publisher<T> source) {
4849
}
4950

5051
@Override
51-
public void subscribe(Subscriber<? super T> destination) {
52+
public void subscribe(CoreSubscriber<? super T> destination) {
5253
synchronized (this) {
5354
if (subscribed) {
5455
throw new IllegalStateException("only one subscriber at a time");

rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package io.rsocket.client;
1717

18-
import static io.rsocket.util.ExceptionUtil.noStacktrace;
19-
2018
import io.rsocket.Availability;
2119
import io.rsocket.Closeable;
2220
import io.rsocket.Payload;
@@ -31,30 +29,27 @@
3129
import io.rsocket.stat.Quantile;
3230
import io.rsocket.util.Clock;
3331
import io.rsocket.util.RSocketProxy;
34-
import java.nio.channels.ClosedChannelException;
35-
import java.util.ArrayList;
36-
import java.util.Collection;
37-
import java.util.HashSet;
38-
import java.util.Iterator;
39-
import java.util.Random;
40-
import java.util.Set;
41-
import java.util.concurrent.ThreadLocalRandom;
42-
import java.util.concurrent.TimeUnit;
43-
import java.util.concurrent.atomic.AtomicBoolean;
44-
import java.util.concurrent.atomic.AtomicInteger;
45-
import java.util.concurrent.atomic.AtomicLong;
4632
import org.reactivestreams.Publisher;
4733
import org.reactivestreams.Subscriber;
4834
import org.reactivestreams.Subscription;
4935
import org.slf4j.Logger;
5036
import org.slf4j.LoggerFactory;
37+
import reactor.core.CoreSubscriber;
5138
import reactor.core.publisher.Flux;
52-
import reactor.core.publisher.FluxSource;
5339
import reactor.core.publisher.Mono;
5440
import reactor.core.publisher.MonoProcessor;
55-
import reactor.core.publisher.MonoSource;
5641
import reactor.core.publisher.Operators;
5742

43+
import java.nio.channels.ClosedChannelException;
44+
import java.util.*;
45+
import java.util.concurrent.ThreadLocalRandom;
46+
import java.util.concurrent.TimeUnit;
47+
import java.util.concurrent.atomic.AtomicBoolean;
48+
import java.util.concurrent.atomic.AtomicInteger;
49+
import java.util.concurrent.atomic.AtomicLong;
50+
51+
import static io.rsocket.util.ExceptionUtil.noStacktrace;
52+
5853
/**
5954
* An implementation of {@link Mono} that load balances across a pool of RSockets and emits one when
6055
* it is subscribed to
@@ -203,7 +198,7 @@ public static LoadBalancedRSocketMono create(
203198
maxAperture,
204199
maxRefreshPeriodMs) {
205200
@Override
206-
public void subscribe(Subscriber<? super RSocket> s) {
201+
public void subscribe(CoreSubscriber<? super RSocket> s) {
207202
started.thenMany(rSocketMono).subscribe(s);
208203
}
209204
};
@@ -493,7 +488,7 @@ public Mono<Void> onClose() {
493488

494489
@Override
495490
public Mono<Void> close() {
496-
return MonoSource.wrap(
491+
return Mono.from(
497492
subscriber -> {
498493
subscriber.onSubscribe(Operators.emptySubscription());
499494

@@ -778,35 +773,35 @@ private class WeightedSocket extends RSocketProxy implements LoadBalancerSocketM
778773

779774
@Override
780775
public Mono<Payload> requestResponse(Payload payload) {
781-
return MonoSource.wrap(
776+
return Mono.from(
782777
subscriber ->
783778
source.requestResponse(payload).subscribe(new LatencySubscriber<>(subscriber, this)));
784779
}
785780

786781
@Override
787782
public Flux<Payload> requestStream(Payload payload) {
788-
return FluxSource.wrap(
783+
return Flux.from(
789784
subscriber ->
790785
source.requestStream(payload).subscribe(new CountingSubscriber<>(subscriber, this)));
791786
}
792787

793788
@Override
794789
public Mono<Void> fireAndForget(Payload payload) {
795-
return MonoSource.wrap(
790+
return Mono.from(
796791
subscriber ->
797792
source.fireAndForget(payload).subscribe(new CountingSubscriber<>(subscriber, this)));
798793
}
799794

800795
@Override
801796
public Mono<Void> metadataPush(Payload payload) {
802-
return MonoSource.wrap(
797+
return Mono.from(
803798
subscriber ->
804799
source.metadataPush(payload).subscribe(new CountingSubscriber<>(subscriber, this)));
805800
}
806801

807802
@Override
808803
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
809-
return FluxSource.wrap(
804+
return Flux.from(
810805
subscriber ->
811806
source
812807
.requestChannel(payloads)

rsocket-load-balancer/src/main/java/io/rsocket/client/filter/BackupRequestSocket.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,18 @@
2020
import io.rsocket.stat.FrugalQuantile;
2121
import io.rsocket.stat.Quantile;
2222
import io.rsocket.util.Clock;
23+
import org.reactivestreams.Publisher;
24+
import org.reactivestreams.Subscriber;
25+
import org.reactivestreams.Subscription;
26+
import reactor.core.publisher.Flux;
27+
import reactor.core.publisher.Mono;
28+
2329
import java.util.concurrent.Executors;
2430
import java.util.concurrent.ScheduledExecutorService;
2531
import java.util.concurrent.ScheduledFuture;
2632
import java.util.concurrent.TimeUnit;
2733
import java.util.concurrent.atomic.AtomicBoolean;
2834
import java.util.function.Supplier;
29-
import org.reactivestreams.Publisher;
30-
import org.reactivestreams.Subscriber;
31-
import org.reactivestreams.Subscription;
32-
import reactor.core.publisher.Flux;
33-
import reactor.core.publisher.Mono;
34-
import reactor.core.publisher.MonoSource;
3535

3636
public class BackupRequestSocket implements RSocket {
3737
private final ScheduledExecutorService executor;
@@ -59,7 +59,7 @@ public Mono<Void> fireAndForget(Payload payload) {
5959

6060
@Override
6161
public Mono<Payload> requestResponse(Payload payload) {
62-
return MonoSource.wrap(
62+
return Mono.from(
6363
subscriber -> {
6464
Subscriber<? super Payload> oneSubscriber = new OneSubscriber<>(subscriber);
6565
Subscriber<? super Payload> backupRequest =

rsocket-load-balancer/src/test/java/io/rsocket/client/TestingRSocket.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@
2424
import org.reactivestreams.Publisher;
2525
import org.reactivestreams.Subscriber;
2626
import org.reactivestreams.Subscription;
27-
import reactor.core.publisher.Flux;
28-
import reactor.core.publisher.Mono;
29-
import reactor.core.publisher.MonoProcessor;
30-
import reactor.core.publisher.MonoSource;
27+
import reactor.core.CoreSubscriber;
28+
import reactor.core.publisher.*;
3129

3230
public class TestingRSocket implements RSocket {
3331

@@ -60,7 +58,7 @@ public Mono<Void> fireAndForget(Payload payload) {
6058

6159
@Override
6260
public Mono<Payload> requestResponse(Payload payload) {
63-
return MonoSource.wrap(
61+
return Mono.from(
6462
subscriber ->
6563
subscriber.onSubscribe(
6664
new Subscription() {

rsocket-test/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ dependencies {
2020
compile 'org.mockito:mockito-core:1.10.19'
2121
compile 'org.hamcrest:hamcrest-library:1.3'
2222
compile 'org.hdrhistogram:HdrHistogram:latest.release'
23-
compile "io.projectreactor:reactor-test:3.1.0.M2"
23+
compile "io.projectreactor:reactor-test:3.1.0.M3"
2424
}

rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronClientChannelConnector.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,18 @@
2929
import io.rsocket.aeron.internal.reactivestreams.messages.ConnectEncoder;
3030
import io.rsocket.aeron.internal.reactivestreams.messages.MessageHeaderDecoder;
3131
import io.rsocket.aeron.internal.reactivestreams.messages.MessageHeaderEncoder;
32-
import java.nio.ByteBuffer;
33-
import java.util.concurrent.ConcurrentHashMap;
34-
import java.util.concurrent.atomic.AtomicLong;
35-
import java.util.function.IntConsumer;
3632
import org.agrona.DirectBuffer;
3733
import org.agrona.concurrent.UnsafeBuffer;
3834
import org.slf4j.Logger;
3935
import org.slf4j.LoggerFactory;
4036
import reactor.core.publisher.Mono;
41-
import reactor.core.publisher.MonoSource;
4237
import reactor.core.publisher.Operators;
4338

39+
import java.nio.ByteBuffer;
40+
import java.util.concurrent.ConcurrentHashMap;
41+
import java.util.concurrent.atomic.AtomicLong;
42+
import java.util.function.IntConsumer;
43+
4444
/** Brokers a connection to a remote Aeron server. */
4545
public class AeronClientChannelConnector
4646
implements ReactiveStreamsRemote.ClientChannelConnector<
@@ -149,7 +149,7 @@ private int poll() {
149149

150150
@Override
151151
public Mono<AeronChannel> apply(AeronClientConfig aeronClientConfig) {
152-
return MonoSource.wrap(
152+
return Mono.from(
153153
subscriber -> {
154154
subscriber.onSubscribe(Operators.emptySubscription());
155155
final long channelId = CHANNEL_ID_COUNTER.get();

rsocket-transport-aeron/src/main/java/io/rsocket/aeron/internal/reactivestreams/AeronOutPublisher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.reactivestreams.Subscription;
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
32+
import reactor.core.CoreSubscriber;
3233
import reactor.core.publisher.Flux;
3334

3435
/** */
@@ -61,7 +62,7 @@ public AeronOutPublisher(
6162
}
6263

6364
@Override
64-
public void subscribe(Subscriber<? super DirectBuffer> destination) {
65+
public void subscribe(CoreSubscriber<? super DirectBuffer> destination) {
6566
Objects.requireNonNull(destination);
6667
synchronized (this) {
6768
if (this.destination != null && subscription.canEmit()) {

rsocket-transport-netty/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
dependencies {
1818
compile project(':rsocket-core')
19-
compile 'io.projectreactor.ipc:reactor-netty:0.6.4.RELEASE'
19+
compile 'io.projectreactor.ipc:reactor-netty:0.7.0.M1'
2020
compile "io.netty:netty-handler:4.1.12.Final"
2121
compile "io.netty:netty-handler-proxy:4.1.12.Final"
2222
compile "io.netty:netty-codec-http:4.1.12.Final"

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public static WebsocketClientTransport create(URI uri) {
6060
private static HttpClient createClient(URI uri) {
6161
if (isSecureWebsocket(uri)) {
6262
return HttpClient.create(
63-
options -> options.sslSupport().connect(uri.getHost(), getPort(uri, 443)));
63+
options -> options.sslSupport().connectAddress(() -> InetSocketAddress.createUnresolved(uri.getHost(), getPort(uri, 443))));
6464
} else {
6565
return HttpClient.create(uri.getHost(), getPort(uri, 80));
6666
}

0 commit comments

Comments
 (0)