Skip to content

Commit

Permalink
feat: Add timeout parameters to transaction requests. (#172)
Browse files Browse the repository at this point in the history
Problem:

Currently, the only way to set timeouts for a request is to either:

1. Set a timeout at the stub level with stub.withDeadlineAfter (i.e.,
   create a new stub and DgraphClient)
2. Set a timeout for every request of the stub using a
   ClientInterceptor.

The downside for (1) is the need to create a new stub and client in
order to set a timeout.

The downside for (2) is the inability to set a timeout only for a
certain request (i.e, some requests should timeout after 500
milliseconds and others 10 seconds).

Solution:

(1) and (2) can be resolved for use cases where the timeout is set only
for a single request in a transaction by passing in a specified timeout
when sending the request.

This change overloads the query, mutate, and doRequest methods to allow
setting the timeout duration and unit. They're set like
stub.withDeadlineAfter but on the request itself. Including the existing
methods, here are the overloaded methods with the timeout parameters:

* AsyncTransaction
  * queryWithVars(final string query, final Map<string, String> vars);
  * queryWithVars(final string query, final Map<string, String> vars, long duration, TimeUnit units);
  * query(final string query);
  * query(final string query, long duration, TimeUnit units);
  * queryRDFWithVars(final string query, final Map<String, String> vars);
  * queryRDFWithVars(final string query, final Map<String, String> vars, long duration, TimeUnit units);
  * queryRDF(final string query, final Map<String, String> vars);
  * queryRDF(final string query, final Map<String, String> vars, long duration, TimeUnit units);
  * mutate(Mutation mutation);
  * mutate(Mutation mutation, long duration, TimeUnit units);
  * doRequest(Request request);
  * doRequest(Request request, long duration, TimeUnit units);
* Transaction
  * queryWithVars(final string query, final Map<string, String> vars);
  * queryWithVars(final string query, final Map<string, String> vars, long duration, TimeUnit units);
  * query(final string query);
  * query(final string query, long duration, TimeUnit units);
  * queryRDFWithVars(final string query, final Map<String, String> vars);
  * queryRDFWithVars(final string query, final Map<String, String> vars, long duration, TimeUnit units);
  * queryRDF(final string query, final Map<String, String> vars);
  * queryRDF(final string query, final Map<String, String> vars, long duration, TimeUnit units);
  * mutate(Mutation mutation);
  * mutate(Mutation mutation, long duration, TimeUnit units);
  * doRequest(Request request);
  * doRequest(Request request, long duration, TimeUnit units);

Misc:

* tests: Add docker-compose.test.yml and docker-test-secret.yml
* chore: Fix javadoc typos.
  • Loading branch information
danielmai authored Dec 23, 2021
1 parent 0f3f5f5 commit 55a0da6
Show file tree
Hide file tree
Showing 11 changed files with 385 additions and 24 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,8 @@ It is recommended that you always set a deadline for each client call, after
which the client terminates. This is in line with the recommendation for any gRPC client.
Read [this forum post][deadline-post] for more details.

#### Setting deadlines for all requests

```java
channel = ManagedChannelBuilder.forAddress("localhost", 9080).usePlaintext(true).build();
DgraphGrpc.DgraphStub stub = DgraphGrpc.newStub(channel);
Expand All @@ -538,6 +540,13 @@ stub = stub.withInterceptors(timeoutInterceptor);
DgraphClient dgraphClient = new DgraphClient(stub);
```

#### Setting deadlines for a single request

```
dgraphClient.newTransaction().query(query, 500, TimeUnit.MILLISECONDS);
```


[deadline-post]: https://discuss.dgraph.io/t/dgraph-java-client-setting-deadlines-per-call/3056

### Setting Metadata Headers
Expand Down
83 changes: 83 additions & 0 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Auto-generated with: [./compose -a 3 -z 1 --acl --port_offset=100]
#
version: "3.5"
services:
alpha1:
image: dgraph/dgraph:v21.12.0
container_name: alpha1
working_dir: /data/alpha1
labels:
cluster: test
ports:
- 8180:8180
- 9180:9180
volumes:
- type: bind
source: ./docker-test-secret.txt
target: /secret/hmac
read_only: true
command: dgraph alpha -o 100 --my=alpha1:7180 --zero=zero1:5180 --logtostderr
-v=2 --raft "idx=1; group=1" --security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
--acl "secret-file=/secret/hmac; access-ttl=3s"
deploy:
resources:
limits:
memory: 32G
alpha2:
image: dgraph/dgraph:v21.12.0
container_name: alpha2
working_dir: /data/alpha2
labels:
cluster: test
ports:
- 8182:8182
- 9182:9182
volumes:
- type: bind
source: ./docker-test-secret.txt
target: /secret/hmac
read_only: true
command: dgraph alpha -o 102 --my=alpha2:7182 --zero=zero1:5180 --logtostderr
-v=2 --raft "idx=2; group=1" --security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
--acl "secret-file=/secret/hmac; access-ttl=3s"
deploy:
resources:
limits:
memory: 32G
alpha3:
image: dgraph/dgraph:v21.12.0
container_name: alpha3
working_dir: /data/alpha3
labels:
cluster: test
ports:
- 8183:8183
- 9183:9183
volumes:
- type: bind
source: ./docker-test-secret.txt
target: /secret/hmac
read_only: true
command: dgraph alpha -o 103 --my=alpha3:7183 --zero=zero1:5180 --logtostderr
-v=2 --raft "idx=3; group=1" --security "whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16;"
--acl "secret-file=/secret/hmac; access-ttl=3s"
deploy:
resources:
limits:
memory: 32G
zero1:
image: dgraph/dgraph:v21.12.0
container_name: zero1
working_dir: /data/zero1
labels:
cluster: test
ports:
- 5180:5180
- 6180:6180
command: dgraph zero -o 100 --raft='idx=1' --my=zero1:5180 --replicas=3
--logtostderr -v=2 --bindall
deploy:
resources:
limits:
memory: 32G
volumes: {}
1 change: 1 addition & 0 deletions docker-test-secret.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
12345678901234567890123456789012
2 changes: 1 addition & 1 deletion samples/DgraphJavaSampleDeadlineInterceptors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ For more configuration options, and other details, refer to [docs.dgraph.io](htt
**Warning**: The sample code, when run, will remove all data from your locally running Dgraph instance.
So make sure that you don't have any important data on your Dgraph instance.

This example in [App.java:42](./src/main/java/App.java#L42-L50) creates the
This example in [App.java:39](./src/main/java/App.java#L39-L47) creates the
DgraphStub with a deadline using a call interceptor to set timeouts **per
request**. This is most likely what you want to do. For more info, see [Setting
Deadlines](https://github.com/dgraph-io/dgraph4j/#setting-deadlines).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;

import java.net.MalformedURLException;
import java.util.Collections;
import java.util.List;
Expand All @@ -33,10 +32,8 @@ private static DgraphClient createDgraphClient() {
if (TEST_CLOUD_ENDPOINT != null && TEST_CLOUD_ENDPOINT.length() > 0) {
stub = DgraphClient.clientStubFromCloudEndpoint(TEST_CLOUD_ENDPOINT, TEST_CLOUD_API_KEY);
} else {
ManagedChannel chan = ManagedChannelBuilder
.forAddress(TEST_HOSTNAME, TEST_PORT)
.usePlaintext()
.build();
ManagedChannel chan =
ManagedChannelBuilder.forAddress(TEST_HOSTNAME, TEST_PORT).usePlaintext().build();
stub = DgraphGrpc.newStub(chan);
}
stub =
Expand Down
2 changes: 1 addition & 1 deletion samples/DgraphJavaSampleWithDeadlineAfter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ For more configuration options, and other details, refer to [docs.dgraph.io](htt
**Warning**: The sample code, when run, will remove all data from your locally running Dgraph instance.
So make sure that you don't have any important data on your Dgraph instance.

This example in [App.java:37](./src/main/java/App.java#L37) creates the
This example in [App.java:34](./src/main/java/App.java#L34) creates the
DgraphStub with a deadline set for the **entire life of the stub**. This is most
likely what you do NOT want to do. For more info, see [Setting
Deadlines](https://github.com/dgraph-io/dgraph4j/#setting-deadlines).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.dgraph.Transaction;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import java.net.MalformedURLException;
import java.util.Collections;
import java.util.List;
Expand All @@ -28,10 +27,8 @@ private static DgraphClient createDgraphClient() {
if (TEST_CLOUD_ENDPOINT != null && TEST_CLOUD_ENDPOINT.length() > 0) {
stub = DgraphClient.clientStubFromCloudEndpoint(TEST_CLOUD_ENDPOINT, TEST_CLOUD_API_KEY);
} else {
ManagedChannel chan = ManagedChannelBuilder
.forAddress(TEST_HOSTNAME, TEST_PORT)
.usePlaintext()
.build();
ManagedChannel chan =
ManagedChannelBuilder.forAddress(TEST_HOSTNAME, TEST_PORT).usePlaintext().build();
stub = DgraphGrpc.newStub(chan);
}
stub = stub.withDeadlineAfter(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -78,7 +75,7 @@ public static void main(final String[] args) {
for (int i = 1; i <= 10; i++) {
System.out.printf("Loop iteration: %d\n", i);
String query =
"query all($a: string){\n" + "all(func: eq(name, $a)) {\n" + " name\n" + " }\n" + "}";
"query all($a: string){\n" + "all(func: eq(name, $a)) {\n" + " name\n" + " }\n" + "}";
Map<String, String> vars = Collections.singletonMap("$a", "Alice");
Response res = dgraphClient.newTransaction().queryWithVars(query, vars);

Expand Down
103 changes: 95 additions & 8 deletions src/main/java/io/dgraph/AsyncTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* This is the implementation of asynchronous Dgraph transaction. The asynchrony is backed-up by
Expand Down Expand Up @@ -72,14 +73,31 @@ public class AsyncTransaction implements AutoCloseable {
/**
* Sends a query to one of the connected dgraph instances. If no mutations need to be made in the
* same transaction, it's convenient to chain the method: <code>
* client.NewTransaction().queryWithVars(...)</code>.
* client.newTransaction().queryWithVars(...)</code>.
*
* @param query query in DQL
* @param vars DQL variables used in query
* @return a Response protocol buffer object.
*/
public CompletableFuture<Response> queryWithVars(
final String query, final Map<String, String> vars) {
return this.queryWithVars(query, vars, 0, null);
}

/**
* Sends a query to one of the connected dgraph instances. If no mutations need to be made in the
* same transaction, it's convenient to chain the method: <code>
* client.newTransaction().queryWithVars(...)</code>.
*
* @param query query in DQL
* @param vars DQL variables used in query
* @param duration A non-negative timeout duration for the request. If duration is 0, then no
* timeout is set.
* @param units the time unit for the duration
* @return a Response protocol buffer object.
*/
public CompletableFuture<Response> queryWithVars(
final String query, final Map<String, String> vars, long duration, TimeUnit units) {

final Request request =
Request.newBuilder()
Expand All @@ -90,11 +108,11 @@ public CompletableFuture<Response> queryWithVars(
.setBestEffort(bestEffort)
.build();

return this.doRequest(request);
return this.doRequest(request, duration, units);
}

/**
* Calls {@code Transcation#queryWithVars} with an empty vars map.
* Calls {@code AsyncTransaction#queryWithVars} with an empty vars map.
*
* @param query query in DQL
* @return a Response protocol buffer object
Expand All @@ -103,17 +121,47 @@ public CompletableFuture<Response> query(final String query) {
return queryWithVars(query, Collections.emptyMap());
}

/**
* Calls {@code AsyncTranscation#queryWithVars} with an empty vars map.
*
* @param query query in DQL
* @param duration A non-negative timeout duration for the request. If duration is 0, then no
* timeout is set.
* @param units the time unit for the duration
* @return a Response protocol buffer object
*/
public CompletableFuture<Response> query(final String query, long duration, TimeUnit units) {
return queryWithVars(query, Collections.emptyMap(), duration, units);
}

/**
* Sends a query to one of the connected dgraph instances and returns RDF response. If no
* mutations need to be made in the same transaction, it's convenient to chain the method: <code>
* client.NewTransaction().queryRDFWithVars(...)</code>.
* client.newTransaction().queryRDFWithVars(...)</code>.
*
* @param query query in DQL
* @param vars DQL variables used in query
* @return a Response protocol buffer object.
*/
public CompletableFuture<Response> queryRDFWithVars(
final String query, final Map<String, String> vars) {
return this.queryRDFWithVars(query, vars, 0, null);
}

/**
* Sends a query to one of the connected dgraph instances and returns RDF response. If no
* mutations need to be made in the same transaction, it's convenient to chain the method: <code>
* client.newTransaction().queryRDFWithVars(...)</code>.
*
* @param query query in DQL
* @param vars DQL variables used in query
* @param duration A non-negative timeout duration for the request. If duration is 0, then no
* timeout is set.
* @param units the time unit for the duration
* @return a Response protocol buffer object.
*/
public CompletableFuture<Response> queryRDFWithVars(
final String query, final Map<String, String> vars, long duration, TimeUnit units) {

final Request request =
Request.newBuilder()
Expand All @@ -125,11 +173,11 @@ public CompletableFuture<Response> queryRDFWithVars(
.setRespFormat(Request.RespFormat.RDF)
.build();

return this.doRequest(request);
return this.doRequest(request, duration, units);
}

/**
* Calls {@code Transcation#queryRDFWithVars} with an empty vars map.
* Calls {@code AsyncTransaction#queryRDFWithVars} with an empty vars map.
*
* @param query query in DQL
* @return a Response protocol buffer object
Expand All @@ -138,6 +186,19 @@ public CompletableFuture<Response> queryRDF(final String query) {
return queryRDFWithVars(query, Collections.emptyMap());
}

/**
* Calls {@code AsyncTransaction#queryRDFWithVars} with an empty vars map.
*
* @param query query in DQL
* @param duration A non-negative timeout duration for the request. If duration is 0, then no
* timeout is set.
* @param units the time unit for the duration
* @return a Response protocol buffer object
*/
public CompletableFuture<Response> queryRDF(final String query, long duration, TimeUnit units) {
return queryRDFWithVars(query, Collections.emptyMap(), duration, units);
}

/**
* Sets the best effort flag for this transaction. The Best effort flag can only be set for
* read-only transactions, and setting the best effort flag will enable a read-only transaction to
Expand All @@ -164,24 +225,47 @@ public void setBestEffort(boolean bestEffort) {
* @return a Response protocol buffer object.
*/
public CompletableFuture<Response> mutate(Mutation mutation) {
return mutate(mutation, 0, null);
}

/**
* Allows data stored on dgraph instances to be modified. The fields in Mutation come in pairs,
* set and delete. Mutations can either be encoded as JSON or as RDFs. If the `commitNow` property
* on the Mutation object is set, this call will result in the transaction being committed. In
* this case, there is no need to subsequently call AsyncTransaction#commit.
*
* @param mutation a Mutation protocol buffer object representing the mutation.
* @param duration A non-negative timeout duration for the request. If duration is 0, then no
* timeout is set.
* @param units the time unit for the duration
* @return a Response protocol buffer object.
*/
public CompletableFuture<Response> mutate(Mutation mutation, long duration, TimeUnit units) {
Request request =
Request.newBuilder()
.addMutations(mutation)
.setCommitNow(mutation.getCommitNow())
.setStartTs(context.getStartTs())
.build();

return this.doRequest(request);
return this.doRequest(request, duration, units);
}

public CompletableFuture<Response> doRequest(Request request) {
return this.doRequest(request, 0, null);
}

/**
* Allows performing a query on dgraph instances. It could perform just query or a mutation or an
* upsert involving a query and a mutation.
*
* @param request a Request protocol buffer object.
* @param duration A non-negative timeout duration for the request. If duration is 0, then no
* timeout is set.
* @param units the time unit for the duration
* @return a Response protocol buffer object.
*/
public CompletableFuture<Response> doRequest(Request request) {
public CompletableFuture<Response> doRequest(Request request, long duration, TimeUnit units) {
if (finished) {
throw new TxnFinishedException();
}
Expand All @@ -206,6 +290,9 @@ public CompletableFuture<Response> doRequest(Request request) {
() -> {
StreamObserverBridge<Response> bridge = new StreamObserverBridge<>();
DgraphStub localStub = client.getStubWithJwt(stub);
if (duration > 0) {
localStub = localStub.withDeadlineAfter(duration, units);
}
localStub.query(requestStartTs, bridge);

return bridge
Expand Down
Loading

0 comments on commit 55a0da6

Please sign in to comment.