Skip to content

Commit 6fc82f5

Browse files
author
Adrian Todt
committed
Fixed tests, non-breaking support for async connection factories.
1 parent 3f50bdf commit 6fc82f5

5 files changed

Lines changed: 65 additions & 41 deletions

File tree

src/main/java/com/rethinkdb/net/Connection.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,24 @@
4242
* This object is thread-safe.
4343
*/
4444
public class Connection implements Closeable {
45-
private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class);
45+
private static final @NotNull Logger LOGGER = LoggerFactory.getLogger(Connection.class);
4646

47-
protected final String hostname;
47+
protected final @NotNull String hostname;
4848
protected final int port;
4949
protected final @Nullable String user;
5050
protected final @Nullable String password;
5151
protected final @Nullable Long timeout;
5252
protected final @Nullable SSLContext sslContext;
5353
//java-only
54-
protected final ConnectionSocket.Factory socketFactory;
55-
protected final ResponsePump.Factory pumpFactory;
56-
protected final FetchMode defaultFetchMode;
54+
protected final @NotNull ConnectionSocket.Factory socketFactory;
55+
protected final @NotNull ResponsePump.Factory pumpFactory;
56+
protected final @NotNull FetchMode defaultFetchMode;
5757
protected final boolean unwrapLists;
5858
protected final boolean persistentThreads;
5959

60-
protected final AtomicLong nextToken = new AtomicLong();
61-
protected final Set<Result<?>> tracked = ConcurrentHashMap.newKeySet();
62-
protected final Lock writeLock = new ReentrantLock();
60+
protected final @NotNull AtomicLong nextToken = new AtomicLong();
61+
protected final @NotNull Set<Result<?>> tracked = ConcurrentHashMap.newKeySet();
62+
protected final @NotNull Lock writeLock = new ReentrantLock();
6363

6464
protected @Nullable String dbname;
6565
protected @Nullable ConnectionSocket socket;
@@ -126,7 +126,7 @@ public boolean isOpen() {
126126
if (socket != null) {
127127
throw new ReqlDriverError("Client already connected!");
128128
}
129-
return socketFactory.newSocketAsync(hostname, port, sslContext, timeout).thenApply(socket -> {
129+
return createSocketAsync().thenApply(socket -> {
130130
this.socket = socket;
131131
HandshakeProtocol.doHandshake(socket, user, password, timeout);
132132
this.pump = pumpFactory.newPump(socket, !persistentThreads);
@@ -516,6 +516,18 @@ protected void handleOptArgs(@NotNull OptArgs optArgs) {
516516
}
517517
}
518518

519+
/**
520+
* Detects if the connection socket supports async creation or wraps it before returning.
521+
*
522+
* @return a {@link CompletableFuture} which will complete with a new {@link ConnectionSocket}.
523+
*/
524+
protected @NotNull CompletableFuture<ConnectionSocket> createSocketAsync() {
525+
if (socketFactory instanceof ConnectionSocket.AsyncFactory) {
526+
return ((ConnectionSocket.AsyncFactory) socketFactory).newSocketAsync(hostname, port, sslContext, timeout);
527+
}
528+
return CompletableFuture.supplyAsync(() -> socketFactory.newSocket(hostname, port, sslContext, timeout));
529+
}
530+
519531
// builder
520532

521533
/**

src/main/java/com/rethinkdb/net/ConnectionSocket.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,27 @@ interface Factory {
2929
int port,
3030
@Nullable SSLContext sslContext,
3131
@Nullable Long timeoutMs);
32+
}
33+
34+
/**
35+
* An asynchronous factory of sockets.
36+
*/
37+
interface AsyncFactory extends Factory {
38+
/**
39+
* Creates a new connection socket into the server.
40+
*
41+
* @param hostname the hostname
42+
* @param port the post
43+
* @param sslContext an {@link SSLContext}, if any
44+
* @param timeoutMs a timeout, in milliseconds, if any
45+
* @return a new {@link ConnectionSocket}.
46+
*/
47+
default @NotNull ConnectionSocket newSocket(@NotNull String hostname,
48+
int port,
49+
@Nullable SSLContext sslContext,
50+
@Nullable Long timeoutMs) {
51+
return newSocketAsync(hostname, port, sslContext, timeoutMs).join();
52+
}
3253

3354
/**
3455
* Creates a new connection socket asynchronously into the server.
@@ -39,12 +60,10 @@ interface Factory {
3960
* @param timeoutMs a timeout, in milliseconds, if any
4061
* @return a {@link CompletableFuture} which will complete with a new {@link ConnectionSocket}.
4162
*/
42-
default CompletableFuture<ConnectionSocket> newSocketAsync(@NotNull String hostname,
43-
int port,
44-
@Nullable SSLContext sslContext,
45-
@Nullable Long timeoutMs) {
46-
return CompletableFuture.supplyAsync(() -> newSocket(hostname, port, sslContext, timeoutMs));
47-
}
63+
@NotNull CompletableFuture<ConnectionSocket> newSocketAsync(@NotNull String hostname,
64+
int port,
65+
@Nullable SSLContext sslContext,
66+
@Nullable Long timeoutMs);
4867
}
4968

5069
/**

src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,18 @@
2222
/**
2323
* The default {@link ConnectionSocket.Factory} and {@link ResponsePump.Factory} for any default connections.
2424
*/
25-
public class DefaultConnectionFactory implements ConnectionSocket.Factory, ResponsePump.Factory {
25+
public class DefaultConnectionFactory implements ConnectionSocket.AsyncFactory, ResponsePump.Factory {
2626
public static final DefaultConnectionFactory INSTANCE = new DefaultConnectionFactory();
2727

2828
private DefaultConnectionFactory() {
2929
}
3030

3131
@Override
32-
public @NotNull ConnectionSocket newSocket(@NotNull String hostname, int port, SSLContext sslContext, Long timeoutMs) {
33-
SocketWrapper s = new SocketWrapper(hostname, port, sslContext, timeoutMs);
34-
s.connect();
35-
return s;
36-
}
37-
38-
@Override
39-
public @NotNull ResponsePump newPump(@NotNull ConnectionSocket socket) {
40-
return newPump(socket, false);
32+
public @NotNull CompletableFuture<ConnectionSocket> newSocketAsync(@NotNull String hostname,
33+
int port,
34+
@Nullable SSLContext sslContext,
35+
@Nullable Long timeoutMs) {
36+
return CompletableFuture.supplyAsync(() -> new SocketWrapper(hostname, port, sslContext, timeoutMs).connect());
4137
}
4238

4339
@Override
@@ -67,7 +63,7 @@ private static class SocketWrapper implements ConnectionSocket {
6763
this.timeoutMs = timeoutMs;
6864
}
6965

70-
void connect() {
66+
SocketWrapper connect() {
7167
try {
7268
// establish connection
7369
final InetSocketAddress addr = new InetSocketAddress(hostname, port);
@@ -97,6 +93,7 @@ void connect() {
9793
} catch (IOException e) {
9894
throw new ReqlDriverError("Connection timed out.", e);
9995
}
96+
return this;
10097
}
10198

10299
@Override

src/main/java/com/rethinkdb/net/ResponsePump.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ interface Factory {
2121
* <b><i>(Will be removed on v2.5.0)</i></b>
2222
*/
2323
@Deprecated
24-
@NotNull ResponsePump newPump(@NotNull ConnectionSocket socket);
24+
default @NotNull ResponsePump newPump(@NotNull ConnectionSocket socket) {
25+
throw new UnsupportedOperationException();
26+
}
2527

2628
/**
2729
* Creates a new response pump using the provided connection socket.
@@ -31,9 +33,7 @@ interface Factory {
3133
* @return a new {@link ResponsePump}.
3234
*/
3335
@NotNull
34-
default ResponsePump newPump(@NotNull ConnectionSocket socket, boolean daemonThreads) {
35-
return newPump(socket);
36-
}
36+
ResponsePump newPump(@NotNull ConnectionSocket socket, boolean daemonThreads);
3737
}
3838

3939
/**

src/test/java/com/rethinkdb/DbUrlTest.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111
public class DbUrlTest {
1212
public static final RethinkDB r = RethinkDB.r;
1313
private static final String DB_URL_STANDARD =
14-
"rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?auth_key=mykey&timeout=30";
14+
"rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?timeout=30";
1515
private static final String DB_URL_NON_STANDARD =
16-
"rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?auth_key=mykey&timeout=30&java.default_fetch_mode=lazy&java.unwrap_lists=true&java.persistent_threads=true";
16+
"rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?timeout=30&java.default_fetch_mode=lazy&java.unwrap_lists=true&java.persistent_threads=true";
1717
private static final String DB_URL_NON_STANDARD_ALTERNATE =
18-
"rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?authKey=mykey&timeout=30&java.defaultFetchMode=lazy&java.unwrapLists=enabled&java.persistentThreads=enabled";
18+
"rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?timeout=30&java.defaultFetchMode=lazy&java.unwrapLists=enabled&java.persistentThreads=enabled";
1919

2020
@Test
2121
public void testStandardDbUrl() {
@@ -28,7 +28,7 @@ public void testStandardDbUrl() {
2828
.hostname("myhost")
2929
.port(1234)
3030
.db("mydb")
31-
.authKey("mykey")
31+
3232
.timeout(30L)
3333
);
3434
assertEquals(
@@ -38,7 +38,7 @@ public void testStandardDbUrl() {
3838
.hostname("myhost")
3939
.port(1234)
4040
.db("mydb")
41-
.authKey("mykey")
41+
4242
.timeout(30L)
4343
.dbUrlString()
4444
);
@@ -55,7 +55,6 @@ public void testNonStandardDbUrl() {
5555
.hostname("myhost")
5656
.port(1234)
5757
.db("mydb")
58-
.authKey("mykey")
5958
.timeout(30L)
6059
.defaultFetchMode(Result.FetchMode.LAZY)
6160
.unwrapLists(true)
@@ -68,7 +67,6 @@ public void testNonStandardDbUrl() {
6867
.hostname("myhost")
6968
.port(1234)
7069
.db("mydb")
71-
.authKey("mykey")
7270
.timeout(30L)
7371
.defaultFetchMode(Result.FetchMode.LAZY)
7472
.unwrapLists(true)
@@ -86,13 +84,13 @@ public void testNonStandardAlternateDbUrl() {
8684
r.connection(DB_URL_NON_STANDARD),
8785
r.connection()
8886
.user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb")
89-
.authKey("mykey").timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true).persistentThreads(true)
87+
.timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true).persistentThreads(true)
9088
);
9189
assertEquals(
9290
r.connection(DB_URL_NON_STANDARD_ALTERNATE),
9391
r.connection()
9492
.user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb")
95-
.authKey("mykey").timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true).persistentThreads(true)
93+
.timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true).persistentThreads(true)
9694
);
9795
assertEquals(
9896
DB_URL_NON_STANDARD,
@@ -101,7 +99,6 @@ public void testNonStandardAlternateDbUrl() {
10199
.hostname("myhost")
102100
.port(1234)
103101
.db("mydb")
104-
.authKey("mykey")
105102
.timeout(30L)
106103
.defaultFetchMode(Result.FetchMode.LAZY)
107104
.unwrapLists(true)
@@ -117,7 +114,6 @@ public void testNonStandardAlternateDbUrl() {
117114
.hostname("myhost")
118115
.port(1234)
119116
.db("mydb")
120-
.authKey("mykey")
121117
.timeout(30L)
122118
.defaultFetchMode(Result.FetchMode.LAZY)
123119
.unwrapLists(true)

0 commit comments

Comments
 (0)