1616
1717package io .rsocket .examples .transport .tcp .lease ;
1818
19- import static java .time .Duration .ofSeconds ;
20-
2119import io .rsocket .Payload ;
2220import io .rsocket .RSocket ;
23- import io .rsocket .SocketAcceptor ;
2421import io .rsocket .core .RSocketConnector ;
2522import io .rsocket .core .RSocketServer ;
23+ import io .rsocket .examples .transport .tcp .stream .StreamingClient ;
2624import io .rsocket .lease .Lease ;
2725import io .rsocket .lease .LeaseStats ;
2826import io .rsocket .lease .Leases ;
27+ import io .rsocket .lease .MissingLeaseException ;
2928import io .rsocket .transport .netty .client .TcpClientTransport ;
3029import io .rsocket .transport .netty .server .CloseableChannel ;
3130import io .rsocket .transport .netty .server .TcpServerTransport ;
32- import io .rsocket .util .DefaultPayload ;
33- import java .util .Date ;
31+ import io .rsocket .util .ByteBufPayload ;
32+ import java .time .Duration ;
33+ import java .util .Objects ;
3434import java .util .Optional ;
35+ import java .util .concurrent .ArrayBlockingQueue ;
36+ import java .util .concurrent .BlockingQueue ;
3537import java .util .function .Consumer ;
3638import java .util .function .Function ;
39+ import org .slf4j .Logger ;
40+ import org .slf4j .LoggerFactory ;
3741import reactor .core .publisher .Flux ;
3842import reactor .core .publisher .Mono ;
43+ import reactor .core .publisher .ReplayProcessor ;
44+ import reactor .util .retry .Retry ;
3945
4046public class LeaseExample {
47+
48+ private static final Logger logger = LoggerFactory .getLogger (StreamingClient .class );
49+
4150 private static final String SERVER_TAG = "server" ;
4251 private static final String CLIENT_TAG = "client" ;
4352
4453 public static void main (String [] args ) {
54+ // Queue for incoming messages represented as Flux
55+ // Imagine that every fireAndForget that is pushed is processed by a worker
56+
57+ int queueCapacity = 50 ;
58+ BlockingQueue <String > messagesQueue = new ArrayBlockingQueue <>(queueCapacity );
59+
60+ // emulating a worker that process data from the queue
61+ Thread workerThread =
62+ new Thread (
63+ () -> {
64+ try {
65+ while (!Thread .currentThread ().isInterrupted ()) {
66+ String message = messagesQueue .take ();
67+ logger .info ("Process message {}" , message );
68+ Thread .sleep (500 ); // emulating processing
69+ }
70+ } catch (InterruptedException e ) {
71+ throw new RuntimeException (e );
72+ }
73+ });
74+
75+ workerThread .start ();
4576
4677 CloseableChannel server =
4778 RSocketServer .create (
48- (setup , sendingRSocket ) -> Mono .just (new ServerRSocket (sendingRSocket )))
49- .lease (
50- () ->
51- Leases .<NoopStats >create ()
52- .sender (new LeaseSender (SERVER_TAG , 7_000 , 5 ))
53- .receiver (new LeaseReceiver (SERVER_TAG ))
54- .stats (new NoopStats ()))
55- .bind (TcpServerTransport .create ("localhost" , 7000 ))
56- .block ();
57-
79+ (setup , sendingSocket ) ->
80+ Mono .just (
81+ new RSocket () {
82+ @ Override
83+ public Mono <Void > fireAndForget (Payload payload ) {
84+ // add element. if overflows errors and terminates execution
85+ // specifically to show that lease can limit rate of fnf requests in
86+ // that example
87+ try {
88+ if (!messagesQueue .offer (payload .getDataUtf8 ())) {
89+ logger .error ("Queue has been overflowed. Terminating execution" );
90+ sendingSocket .dispose ();
91+ workerThread .interrupt ();
92+ }
93+ } finally {
94+ payload .release ();
95+ }
96+ return Mono .empty ();
97+ }
98+ }))
99+ .lease (() -> Leases .create ().sender (new LeaseCalculator (SERVER_TAG , messagesQueue )))
100+ .bindNow (TcpServerTransport .create ("localhost" , 7000 ));
101+
102+ LeaseReceiver receiver = new LeaseReceiver (CLIENT_TAG );
58103 RSocket clientRSocket =
59104 RSocketConnector .create ()
60- .lease (
61- () ->
62- Leases .<NoopStats >create ()
63- .sender (new LeaseSender (CLIENT_TAG , 3_000 , 5 ))
64- .receiver (new LeaseReceiver (CLIENT_TAG )))
65- .acceptor (
66- SocketAcceptor .forRequestResponse (
67- payload -> Mono .just (DefaultPayload .create ("Client Response " + new Date ()))))
105+ .lease (() -> Leases .create ().receiver (receiver ))
68106 .connect (TcpClientTransport .create (server .address ()))
69107 .block ();
70108
71- Flux .interval (ofSeconds (1 ))
72- .flatMap (
73- signal -> {
74- System .out .println ("Client requester availability: " + clientRSocket .availability ());
75- return clientRSocket
76- .requestResponse (DefaultPayload .create ("Client request " + new Date ()))
77- .doOnError (err -> System .out .println ("Client request error: " + err ))
78- .onErrorResume (err -> Mono .empty ());
109+ Objects .requireNonNull (clientRSocket );
110+
111+ // generate stream of fnfs
112+ Flux .generate (
113+ () -> 0L ,
114+ (state , sink ) -> {
115+ sink .next (state );
116+ return state + 1 ;
117+ })
118+ // here we wait for the first lease for the responder side and start execution
119+ // on if there is allowance
120+ .delaySubscription (receiver .notifyWhenNewLease ().then ())
121+ .concatMap (
122+ tick -> {
123+ logger .info ("Requesting FireAndForget({})" , tick );
124+ return Mono .defer (() -> clientRSocket .fireAndForget (ByteBufPayload .create ("" + tick )))
125+ .retryWhen (
126+ Retry .indefinitely ()
127+ // ensures that error is the result of missed lease
128+ .filter (t -> t instanceof MissingLeaseException )
129+ .doBeforeRetryAsync (
130+ rs -> {
131+ // here we create a mechanism to delay the retry until
132+ // the new lease allowance comes in.
133+ logger .info ("Ran out of leases {}" , rs );
134+ return receiver .notifyWhenNewLease ().then ();
135+ }));
79136 })
80- .subscribe ( resp -> System . out . println ( "Client requester response: " + resp . getDataUtf8 ()) );
137+ .blockLast ( );
81138
82139 clientRSocket .onClose ().block ();
83140 server .dispose ();
84141 }
85142
86- private static class LeaseSender implements Function <Optional <NoopStats >, Flux <Lease >> {
87- private final String tag ;
88- private final int ttlMillis ;
89- private final int allowedRequests ;
90-
91- public LeaseSender (String tag , int ttlMillis , int allowedRequests ) {
143+ /**
144+ * This is a class responsible for making decision on whether Responder is ready to receive new
145+ * FireAndForget or not base in the number of messages enqueued. <br>
146+ * In the nutshell this is responder-side rate-limiter logic which is created for every new
147+ * connection.<br>
148+ * In real-world projects this class has to issue leases based on real metrics
149+ */
150+ private static class LeaseCalculator implements Function <Optional <LeaseStats >, Flux <Lease >> {
151+ final String tag ;
152+ final BlockingQueue <?> queue ;
153+
154+ public LeaseCalculator (String tag , BlockingQueue <?> queue ) {
92155 this .tag = tag ;
93- this .ttlMillis = ttlMillis ;
94- this .allowedRequests = allowedRequests ;
156+ this .queue = queue ;
95157 }
96158
97159 @ Override
98- public Flux <Lease > apply (Optional <NoopStats > leaseStats ) {
99- System .out .println (
100- String .format ("%s stats are %s" , tag , leaseStats .isPresent () ? "present" : "absent" ));
101- return Flux .interval (ofSeconds (1 ), ofSeconds (10 ))
102- .onBackpressureLatest ()
103- .map (
104- tick -> {
105- System .out .println (
106- String .format (
107- "%s responder sends new leases: ttl: %d, requests: %d" ,
108- tag , ttlMillis , allowedRequests ));
109- return Lease .create (ttlMillis , allowedRequests );
160+ public Flux <Lease > apply (Optional <LeaseStats > leaseStats ) {
161+ logger .info ("{} stats are {}" , tag , leaseStats .isPresent () ? "present" : "absent" );
162+ Duration ttlDuration = Duration .ofSeconds (5 );
163+ // The interval function is used only for the demo purpose and should not be
164+ // considered as the way to issue leases.
165+ // For advanced RateLimiting with Leasing
166+ // consider adopting https://github.com/Netflix/concurrency-limits#server-limiter
167+ return Flux .interval (Duration .ZERO , ttlDuration .dividedBy (2 ))
168+ .handle (
169+ (__ , sink ) -> {
170+ // put queue.remainingCapacity() + 1 here if you want to observe that app is
171+ // terminated because of the queue overflowing
172+ int requests = queue .remainingCapacity ();
173+
174+ // reissue new lease only if queue has remaining capacity to
175+ // accept more requests
176+ if (requests > 0 ) {
177+ long ttl = ttlDuration .toMillis ();
178+ sink .next (Lease .create ((int ) ttl , requests ));
179+ }
110180 });
111181 }
112182 }
113183
184+ /**
185+ * Requester-side Lease listener.<br>
186+ * In the nutshell this class implements mechanism to listen (and do appropriate actions as
187+ * needed) to incoming leases issued by the Responder
188+ */
114189 private static class LeaseReceiver implements Consumer <Flux <Lease >> {
115- private final String tag ;
190+ final String tag ;
191+ final ReplayProcessor <Lease > lastLeaseReplay = ReplayProcessor .cacheLast ();
116192
117193 public LeaseReceiver (String tag ) {
118194 this .tag = tag ;
@@ -121,38 +197,22 @@ public LeaseReceiver(String tag) {
121197 @ Override
122198 public void accept (Flux <Lease > receivedLeases ) {
123199 receivedLeases .subscribe (
124- l ->
125- System .out .println (
126- String .format (
127- "%s received leases - ttl: %d, requests: %d" ,
128- tag , l .getTimeToLiveMillis (), l .getAllowedRequests ())));
200+ l -> {
201+ logger .info (
202+ "{} received leases - ttl: {}, requests: {}" ,
203+ tag ,
204+ l .getTimeToLiveMillis (),
205+ l .getAllowedRequests ());
206+ lastLeaseReplay .onNext (l );
207+ });
129208 }
130- }
131209
132- private static class NoopStats implements LeaseStats {
133-
134- @ Override
135- public void onEvent (EventType eventType ) {}
136- }
137-
138- private static class ServerRSocket implements RSocket {
139- private final RSocket senderRSocket ;
140-
141- public ServerRSocket (RSocket senderRSocket ) {
142- this .senderRSocket = senderRSocket ;
143- }
144-
145- @ Override
146- public Mono <Payload > requestResponse (Payload payload ) {
147- System .out .println ("Server requester availability: " + senderRSocket .availability ());
148- senderRSocket
149- .requestResponse (DefaultPayload .create ("Server request " + new Date ()))
150- .doOnError (err -> System .out .println ("Server request error: " + err ))
151- .onErrorResume (err -> Mono .empty ())
152- .subscribe (
153- resp -> System .out .println ("Server requester response: " + resp .getDataUtf8 ()));
154-
155- return Mono .just (DefaultPayload .create ("Server Response " + new Date ()));
210+ /**
211+ * This method allows to listen to new incoming leases and delay some action (e.g . retry) until
212+ * new valid lease has come in
213+ */
214+ public Mono <Lease > notifyWhenNewLease () {
215+ return lastLeaseReplay .filter (l -> l .isValid ()).next ();
156216 }
157217 }
158218}
0 commit comments