Skip to content

Commit ef91abb

Browse files
author
Adrian Todt
committed
Response pump is now faster due to Response parsing being done in another thread instead on the read-loop.
1 parent 74be98a commit ef91abb

2 files changed

Lines changed: 32 additions & 23 deletions

File tree

src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,17 @@ public ThreadResponsePump(ConnectionSocket socket, boolean daemon) {
199199

200200
// read response and send it to whoever is waiting, if anyone
201201
try {
202-
final Response response = Response.readFromSocket(socket);
203-
final CompletableFuture<Response> awaiter = awaiting.remove(response.token);
204-
if (awaiter != null) {
205-
awaiter.complete(response);
206-
}
202+
CompletableFuture.supplyAsync(Response.readFromSocket(socket)).handle((response, t) -> {
203+
if (t != null) {
204+
shutdown(t);
205+
} else {
206+
final CompletableFuture<Response> awaiter = awaiting.remove(response.token);
207+
if (awaiter != null) {
208+
awaiter.complete(response);
209+
}
210+
}
211+
return null;
212+
});
207213
} catch (Exception e) {
208214
shutdown(e);
209215
return;
@@ -229,12 +235,12 @@ public boolean isAlive() {
229235
return thread.isAlive();
230236
}
231237

232-
private void shutdown(Exception e) {
238+
private void shutdown(Throwable t) {
233239
Map<Long, CompletableFuture<Response>> awaiting = this.awaiting;
234240
this.awaiting = null;
235241
thread.interrupt();
236242
if (awaiting != null) {
237-
awaiting.forEach((token, future) -> future.completeExceptionally(e));
243+
awaiting.forEach((token, future) -> future.completeExceptionally(t));
238244
}
239245
}
240246

src/main/java/com/rethinkdb/net/Response.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import java.util.Map;
2222
import java.util.Objects;
23+
import java.util.function.Supplier;
2324
import java.util.stream.Collectors;
2425

2526
public class Response {
@@ -83,7 +84,7 @@ public String toString() {
8384
}
8485

8586
@SuppressWarnings("unchecked")
86-
public static Response readFromSocket(ConnectionSocket socket) {
87+
public static Supplier<Response> readFromSocket(ConnectionSocket socket) {
8788
final ByteBuffer header = socket.read(12);
8889
final long token = header.getLong();
8990
final int responseLength = header.getInt();
@@ -100,20 +101,22 @@ public static Response readFromSocket(ConnectionSocket socket) {
100101
);
101102
}
102103

103-
Map<String, Object> json = Internals.readJson(buffer);
104-
return new Response(
105-
token,
106-
ResponseType.fromValue(((Long) json.get("t")).intValue()),
107-
(List<Object>) json.getOrDefault("r", Collections.emptyList()),
108-
((List<Long>) json.getOrDefault("n", Collections.emptyList()))
109-
.stream()
110-
.map(Long::intValue)
111-
.map(ResponseNote::maybeFromValue)
112-
.filter(Objects::nonNull)
113-
.collect(Collectors.toList()),
114-
Profile.fromList((List<Object>) json.get("p")),
115-
Backtrace.fromList((List<Object>) json.getOrDefault("b", null)),
116-
json.containsKey("e") ? ErrorType.maybeFromValue(((Long) json.get("e")).intValue()) : null
117-
);
104+
return () -> {
105+
Map<String, Object> json = Internals.readJson(buffer);
106+
return new Response(
107+
token,
108+
ResponseType.fromValue(((Long) json.get("t")).intValue()),
109+
(List<Object>) json.getOrDefault("r", Collections.emptyList()),
110+
((List<Long>) json.getOrDefault("n", Collections.emptyList()))
111+
.stream()
112+
.map(Long::intValue)
113+
.map(ResponseNote::maybeFromValue)
114+
.filter(Objects::nonNull)
115+
.collect(Collectors.toList()),
116+
Profile.fromList((List<Object>) json.get("p")),
117+
Backtrace.fromList((List<Object>) json.getOrDefault("b", null)),
118+
json.containsKey("e") ? ErrorType.maybeFromValue(((Long) json.get("e")).intValue()) : null
119+
);
120+
};
118121
}
119122
}

0 commit comments

Comments
 (0)