4040import reactor .core .publisher .Flux ;
4141import reactor .core .publisher .Mono ;
4242import reactor .core .publisher .ReplayProcessor ;
43+ import reactor .core .scheduler .Schedulers ;
4344import reactor .test .StepVerifier ;
4445
4546@ SlowTest
@@ -83,7 +84,7 @@ public void reconnectOnDisconnect() {
8384 StepVerifier .create (
8485 rSocket
8586 .requestChannel (testRequest ())
86- .take (Duration .ofSeconds (120 ))
87+ .take (Duration .ofSeconds (600 ))
8788 .map (Payload ::getDataUtf8 )
8889 .timeout (Duration .ofSeconds (12 ))
8990 .doOnNext (x -> throwOnNonContinuous (counter , x ))
@@ -210,7 +211,8 @@ private static Mono<RSocket> newClientRSocket(
210211 return RSocketFactory .connect ()
211212 .resume ()
212213 .resumeSessionDuration (Duration .ofSeconds (sessionDurationSeconds ))
213- .keepAliveTickPeriod (Duration .ofSeconds (1 ))
214+ .keepAliveTickPeriod (Duration .ofSeconds (30 ))
215+ .keepAliveAckTimeout (Duration .ofMinutes (5 ))
214216 .errorConsumer (errConsumer )
215217 .resumeStrategy (() -> new PeriodicResumeStrategy (Duration .ofSeconds (1 )))
216218 .transport (clientTransport )
@@ -224,6 +226,7 @@ private static Mono<CloseableChannel> newServerRSocket() {
224226 private static Mono <CloseableChannel > newServerRSocket (int sessionDurationSeconds ) {
225227 return RSocketFactory .receive ()
226228 .resume ()
229+ .resumeStore (t -> new InMemoryResumableFramesStore ("server" ,100_000 ))
227230 .resumeSessionDuration (Duration .ofSeconds (sessionDurationSeconds ))
228231 .acceptor ((setupPayload , rSocket ) -> Mono .just (new TestResponderRSocket ()))
229232 .transport (serverTransport (SERVER_HOST , SERVER_PORT ))
@@ -236,10 +239,18 @@ private static class TestResponderRSocket extends AbstractRSocket {
236239
237240 @ Override
238241 public Flux <Payload > requestChannel (Publisher <Payload > payloads ) {
239- return Flux .interval (Duration .ofMillis (1 ))
240- .onBackpressureLatest ()
242+ return duplicate ( Flux .interval (Duration .ofMillis (1 ))
243+ .onBackpressureLatest (). publishOn ( Schedulers . elastic ()), 20 )
241244 .map (v -> DefaultPayload .create (String .valueOf (counter .getAndIncrement ())))
242245 .takeUntilOther (Flux .from (payloads ).then ());
243246 }
247+
248+ private <T > Flux <T > duplicate (Flux <T > f , int n ) {
249+ Flux <T > r =Flux .empty ();
250+ for (int i = 0 ; i < n ; i ++) {
251+ r = r .mergeWith (f );
252+ }
253+ return r ;
254+ }
244255 }
245256}
0 commit comments