Skip to content

Commit 16bc03f

Browse files
committed
resumption: update long running test
Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent f55cf8f commit 16bc03f

1 file changed

Lines changed: 15 additions & 4 deletions

File tree

rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import reactor.core.publisher.Flux;
4141
import reactor.core.publisher.Mono;
4242
import reactor.core.publisher.ReplayProcessor;
43+
import reactor.core.scheduler.Schedulers;
4344
import reactor.test.StepVerifier;
4445

4546
@SlowTest
@@ -83,7 +84,7 @@ public void reconnectOnDisconnect() {
8384
StepVerifier.create(
8485
rSocket
8586
.requestChannel(testRequest())
86-
.take(Duration.ofSeconds(120))
87+
.take(Duration.ofSeconds(600))
8788
.map(Payload::getDataUtf8)
8889
.timeout(Duration.ofSeconds(12))
8990
.doOnNext(x -> throwOnNonContinuous(counter, x))
@@ -210,7 +211,8 @@ private static Mono<RSocket> newClientRSocket(
210211
return RSocketFactory.connect()
211212
.resume()
212213
.resumeSessionDuration(Duration.ofSeconds(sessionDurationSeconds))
213-
.keepAliveTickPeriod(Duration.ofSeconds(1))
214+
.keepAliveTickPeriod(Duration.ofSeconds(30))
215+
.keepAliveAckTimeout(Duration.ofMinutes(5))
214216
.errorConsumer(errConsumer)
215217
.resumeStrategy(() -> new PeriodicResumeStrategy(Duration.ofSeconds(1)))
216218
.transport(clientTransport)
@@ -224,6 +226,7 @@ private static Mono<CloseableChannel> newServerRSocket() {
224226
private static Mono<CloseableChannel> newServerRSocket(int sessionDurationSeconds) {
225227
return RSocketFactory.receive()
226228
.resume()
229+
.resumeStore(t -> new InMemoryResumableFramesStore("server",100_000))
227230
.resumeSessionDuration(Duration.ofSeconds(sessionDurationSeconds))
228231
.acceptor((setupPayload, rSocket) -> Mono.just(new TestResponderRSocket()))
229232
.transport(serverTransport(SERVER_HOST, SERVER_PORT))
@@ -236,10 +239,18 @@ private static class TestResponderRSocket extends AbstractRSocket {
236239

237240
@Override
238241
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
239-
return Flux.interval(Duration.ofMillis(1))
240-
.onBackpressureLatest()
242+
return duplicate(Flux.interval(Duration.ofMillis(1))
243+
.onBackpressureLatest().publishOn(Schedulers.elastic()), 20)
241244
.map(v -> DefaultPayload.create(String.valueOf(counter.getAndIncrement())))
242245
.takeUntilOther(Flux.from(payloads).then());
243246
}
247+
248+
private <T> Flux<T> duplicate(Flux<T> f, int n) {
249+
Flux<T> r =Flux.empty();
250+
for (int i = 0; i < n; i++) {
251+
r = r.mergeWith(f);
252+
}
253+
return r;
254+
}
244255
}
245256
}

0 commit comments

Comments
 (0)