Skip to content

Commit ff4d516

Browse files
authored
provides well documented lease example (rsocket#840)
1 parent 1181272 commit ff4d516

File tree

1 file changed

+143
-83
lines changed
  • rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/lease

1 file changed

+143
-83
lines changed

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/lease/LeaseExample.java

Lines changed: 143 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -16,103 +16,179 @@
1616

1717
package io.rsocket.examples.transport.tcp.lease;
1818

19-
import static java.time.Duration.ofSeconds;
20-
2119
import io.rsocket.Payload;
2220
import io.rsocket.RSocket;
23-
import io.rsocket.SocketAcceptor;
2421
import io.rsocket.core.RSocketConnector;
2522
import io.rsocket.core.RSocketServer;
23+
import io.rsocket.examples.transport.tcp.stream.StreamingClient;
2624
import io.rsocket.lease.Lease;
2725
import io.rsocket.lease.LeaseStats;
2826
import io.rsocket.lease.Leases;
27+
import io.rsocket.lease.MissingLeaseException;
2928
import io.rsocket.transport.netty.client.TcpClientTransport;
3029
import io.rsocket.transport.netty.server.CloseableChannel;
3130
import io.rsocket.transport.netty.server.TcpServerTransport;
32-
import io.rsocket.util.DefaultPayload;
33-
import java.util.Date;
31+
import io.rsocket.util.ByteBufPayload;
32+
import java.time.Duration;
33+
import java.util.Objects;
3434
import java.util.Optional;
35+
import java.util.concurrent.ArrayBlockingQueue;
36+
import java.util.concurrent.BlockingQueue;
3537
import java.util.function.Consumer;
3638
import java.util.function.Function;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
3741
import reactor.core.publisher.Flux;
3842
import reactor.core.publisher.Mono;
43+
import reactor.core.publisher.ReplayProcessor;
44+
import reactor.util.retry.Retry;
3945

4046
public class LeaseExample {
47+
48+
private static final Logger logger = LoggerFactory.getLogger(StreamingClient.class);
49+
4150
private static final String SERVER_TAG = "server";
4251
private static final String CLIENT_TAG = "client";
4352

4453
public static void main(String[] args) {
54+
// Queue for incoming messages represented as Flux
55+
// Imagine that every fireAndForget that is pushed is processed by a worker
56+
57+
int queueCapacity = 50;
58+
BlockingQueue<String> messagesQueue = new ArrayBlockingQueue<>(queueCapacity);
59+
60+
// emulating a worker that process data from the queue
61+
Thread workerThread =
62+
new Thread(
63+
() -> {
64+
try {
65+
while (!Thread.currentThread().isInterrupted()) {
66+
String message = messagesQueue.take();
67+
logger.info("Process message {}", message);
68+
Thread.sleep(500); // emulating processing
69+
}
70+
} catch (InterruptedException e) {
71+
throw new RuntimeException(e);
72+
}
73+
});
74+
75+
workerThread.start();
4576

4677
CloseableChannel server =
4778
RSocketServer.create(
48-
(setup, sendingRSocket) -> Mono.just(new ServerRSocket(sendingRSocket)))
49-
.lease(
50-
() ->
51-
Leases.<NoopStats>create()
52-
.sender(new LeaseSender(SERVER_TAG, 7_000, 5))
53-
.receiver(new LeaseReceiver(SERVER_TAG))
54-
.stats(new NoopStats()))
55-
.bind(TcpServerTransport.create("localhost", 7000))
56-
.block();
57-
79+
(setup, sendingSocket) ->
80+
Mono.just(
81+
new RSocket() {
82+
@Override
83+
public Mono<Void> fireAndForget(Payload payload) {
84+
// add element. if overflows errors and terminates execution
85+
// specifically to show that lease can limit rate of fnf requests in
86+
// that example
87+
try {
88+
if (!messagesQueue.offer(payload.getDataUtf8())) {
89+
logger.error("Queue has been overflowed. Terminating execution");
90+
sendingSocket.dispose();
91+
workerThread.interrupt();
92+
}
93+
} finally {
94+
payload.release();
95+
}
96+
return Mono.empty();
97+
}
98+
}))
99+
.lease(() -> Leases.create().sender(new LeaseCalculator(SERVER_TAG, messagesQueue)))
100+
.bindNow(TcpServerTransport.create("localhost", 7000));
101+
102+
LeaseReceiver receiver = new LeaseReceiver(CLIENT_TAG);
58103
RSocket clientRSocket =
59104
RSocketConnector.create()
60-
.lease(
61-
() ->
62-
Leases.<NoopStats>create()
63-
.sender(new LeaseSender(CLIENT_TAG, 3_000, 5))
64-
.receiver(new LeaseReceiver(CLIENT_TAG)))
65-
.acceptor(
66-
SocketAcceptor.forRequestResponse(
67-
payload -> Mono.just(DefaultPayload.create("Client Response " + new Date()))))
105+
.lease(() -> Leases.create().receiver(receiver))
68106
.connect(TcpClientTransport.create(server.address()))
69107
.block();
70108

71-
Flux.interval(ofSeconds(1))
72-
.flatMap(
73-
signal -> {
74-
System.out.println("Client requester availability: " + clientRSocket.availability());
75-
return clientRSocket
76-
.requestResponse(DefaultPayload.create("Client request " + new Date()))
77-
.doOnError(err -> System.out.println("Client request error: " + err))
78-
.onErrorResume(err -> Mono.empty());
109+
Objects.requireNonNull(clientRSocket);
110+
111+
// generate stream of fnfs
112+
Flux.generate(
113+
() -> 0L,
114+
(state, sink) -> {
115+
sink.next(state);
116+
return state + 1;
117+
})
118+
// here we wait for the first lease for the responder side and start execution
119+
// on if there is allowance
120+
.delaySubscription(receiver.notifyWhenNewLease().then())
121+
.concatMap(
122+
tick -> {
123+
logger.info("Requesting FireAndForget({})", tick);
124+
return Mono.defer(() -> clientRSocket.fireAndForget(ByteBufPayload.create("" + tick)))
125+
.retryWhen(
126+
Retry.indefinitely()
127+
// ensures that error is the result of missed lease
128+
.filter(t -> t instanceof MissingLeaseException)
129+
.doBeforeRetryAsync(
130+
rs -> {
131+
// here we create a mechanism to delay the retry until
132+
// the new lease allowance comes in.
133+
logger.info("Ran out of leases {}", rs);
134+
return receiver.notifyWhenNewLease().then();
135+
}));
79136
})
80-
.subscribe(resp -> System.out.println("Client requester response: " + resp.getDataUtf8()));
137+
.blockLast();
81138

82139
clientRSocket.onClose().block();
83140
server.dispose();
84141
}
85142

86-
private static class LeaseSender implements Function<Optional<NoopStats>, Flux<Lease>> {
87-
private final String tag;
88-
private final int ttlMillis;
89-
private final int allowedRequests;
90-
91-
public LeaseSender(String tag, int ttlMillis, int allowedRequests) {
143+
/**
144+
* This is a class responsible for making decision on whether Responder is ready to receive new
145+
* FireAndForget or not base in the number of messages enqueued. <br>
146+
* In the nutshell this is responder-side rate-limiter logic which is created for every new
147+
* connection.<br>
148+
* In real-world projects this class has to issue leases based on real metrics
149+
*/
150+
private static class LeaseCalculator implements Function<Optional<LeaseStats>, Flux<Lease>> {
151+
final String tag;
152+
final BlockingQueue<?> queue;
153+
154+
public LeaseCalculator(String tag, BlockingQueue<?> queue) {
92155
this.tag = tag;
93-
this.ttlMillis = ttlMillis;
94-
this.allowedRequests = allowedRequests;
156+
this.queue = queue;
95157
}
96158

97159
@Override
98-
public Flux<Lease> apply(Optional<NoopStats> leaseStats) {
99-
System.out.println(
100-
String.format("%s stats are %s", tag, leaseStats.isPresent() ? "present" : "absent"));
101-
return Flux.interval(ofSeconds(1), ofSeconds(10))
102-
.onBackpressureLatest()
103-
.map(
104-
tick -> {
105-
System.out.println(
106-
String.format(
107-
"%s responder sends new leases: ttl: %d, requests: %d",
108-
tag, ttlMillis, allowedRequests));
109-
return Lease.create(ttlMillis, allowedRequests);
160+
public Flux<Lease> apply(Optional<LeaseStats> leaseStats) {
161+
logger.info("{} stats are {}", tag, leaseStats.isPresent() ? "present" : "absent");
162+
Duration ttlDuration = Duration.ofSeconds(5);
163+
// The interval function is used only for the demo purpose and should not be
164+
// considered as the way to issue leases.
165+
// For advanced RateLimiting with Leasing
166+
// consider adopting https://github.com/Netflix/concurrency-limits#server-limiter
167+
return Flux.interval(Duration.ZERO, ttlDuration.dividedBy(2))
168+
.handle(
169+
(__, sink) -> {
170+
// put queue.remainingCapacity() + 1 here if you want to observe that app is
171+
// terminated because of the queue overflowing
172+
int requests = queue.remainingCapacity();
173+
174+
// reissue new lease only if queue has remaining capacity to
175+
// accept more requests
176+
if (requests > 0) {
177+
long ttl = ttlDuration.toMillis();
178+
sink.next(Lease.create((int) ttl, requests));
179+
}
110180
});
111181
}
112182
}
113183

184+
/**
185+
* Requester-side Lease listener.<br>
186+
* In the nutshell this class implements mechanism to listen (and do appropriate actions as
187+
* needed) to incoming leases issued by the Responder
188+
*/
114189
private static class LeaseReceiver implements Consumer<Flux<Lease>> {
115-
private final String tag;
190+
final String tag;
191+
final ReplayProcessor<Lease> lastLeaseReplay = ReplayProcessor.cacheLast();
116192

117193
public LeaseReceiver(String tag) {
118194
this.tag = tag;
@@ -121,38 +197,22 @@ public LeaseReceiver(String tag) {
121197
@Override
122198
public void accept(Flux<Lease> receivedLeases) {
123199
receivedLeases.subscribe(
124-
l ->
125-
System.out.println(
126-
String.format(
127-
"%s received leases - ttl: %d, requests: %d",
128-
tag, l.getTimeToLiveMillis(), l.getAllowedRequests())));
200+
l -> {
201+
logger.info(
202+
"{} received leases - ttl: {}, requests: {}",
203+
tag,
204+
l.getTimeToLiveMillis(),
205+
l.getAllowedRequests());
206+
lastLeaseReplay.onNext(l);
207+
});
129208
}
130-
}
131209

132-
private static class NoopStats implements LeaseStats {
133-
134-
@Override
135-
public void onEvent(EventType eventType) {}
136-
}
137-
138-
private static class ServerRSocket implements RSocket {
139-
private final RSocket senderRSocket;
140-
141-
public ServerRSocket(RSocket senderRSocket) {
142-
this.senderRSocket = senderRSocket;
143-
}
144-
145-
@Override
146-
public Mono<Payload> requestResponse(Payload payload) {
147-
System.out.println("Server requester availability: " + senderRSocket.availability());
148-
senderRSocket
149-
.requestResponse(DefaultPayload.create("Server request " + new Date()))
150-
.doOnError(err -> System.out.println("Server request error: " + err))
151-
.onErrorResume(err -> Mono.empty())
152-
.subscribe(
153-
resp -> System.out.println("Server requester response: " + resp.getDataUtf8()));
154-
155-
return Mono.just(DefaultPayload.create("Server Response " + new Date()));
210+
/**
211+
* This method allows to listen to new incoming leases and delay some action (e.g . retry) until
212+
* new valid lease has come in
213+
*/
214+
public Mono<Lease> notifyWhenNewLease() {
215+
return lastLeaseReplay.filter(l -> l.isValid()).next();
156216
}
157217
}
158218
}

0 commit comments

Comments
 (0)