Skip to content

Commit

Permalink
Allow creation of Transaction/AsyncTransaction from TxnContext (#149)
Browse files Browse the repository at this point in the history
This PR adds support for creating a new transaction from `DgraphClient` and `DgraphAsyncClient` using an existing `TxnContext`.
  • Loading branch information
EnricoMi authored Jul 7, 2020
1 parent 2bc855f commit 00de540
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 0 deletions.
12 changes: 12 additions & 0 deletions src/main/java/io/dgraph/AsyncTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ public class AsyncTransaction implements AutoCloseable {
this.readOnly = readOnly;
}

AsyncTransaction(DgraphAsyncClient client, DgraphStub stub, TxnContext context) {
this(client, stub);
this.context = context;
}

AsyncTransaction(
DgraphAsyncClient client, DgraphStub stub, TxnContext context, final boolean readOnly) {
this(client, stub, context);
this.context = context;
this.readOnly = readOnly;
}

/**
* Sends a query to one of the connected dgraph instances. If no mutations need to be made in the
* same transaction, it's convenient to chain the method: <code>
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/io/dgraph/DgraphAsyncClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.protobuf.InvalidProtocolBufferException;
import io.dgraph.DgraphProto.Payload;
import io.dgraph.DgraphProto.TxnContext;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
Expand Down Expand Up @@ -270,6 +271,27 @@ public AsyncTransaction newTransaction() {
return new AsyncTransaction(this, this.anyClient());
}

/**
* Creates a new AsyncTransaction object from a TxnContext. All operations performed by this
* transaction are asynchronous.
*
* <p>A transaction lifecycle is as follows:
*
* <p>- Created using AsyncTransaction#newTransaction()
*
* <p>- Various AsyncTransaction#query() and AsyncTransaction#mutate() calls made.
*
* <p>- Commit using AsyncTransacation#commit() or Discard using AsyncTransaction#discard(). If
* any mutations have been made, It's important that at least one of these methods is called to
* clean up resources. Discard is a no-op if Commit has already been called, so it's safe to call
* it after Commit.
*
* @return a new AsyncTransaction object.
*/
public AsyncTransaction newTransaction(TxnContext context) {
return new AsyncTransaction(this, this.anyClient(), context);
}

/**
* Creates a new AsyncTransaction object that only allows queries. Any AsyncTransaction#mutate()
* or AsyncTransaction#commit() call made to the read only transaction will result in
Expand All @@ -280,4 +302,16 @@ public AsyncTransaction newTransaction() {
public AsyncTransaction newReadOnlyTransaction() {
return new AsyncTransaction(this, this.anyClient(), true);
}

/**
* Creates a new AsyncTransaction object from a TnxContext that only allows queries. Any
* AsyncTransaction#mutate() or AsyncTransaction#commit() call made to the read only transaction
* will result in TxnReadOnlyException. All operations performed by this transaction are
* asynchronous.
*
* @return a new AsyncTransaction object
*/
public AsyncTransaction newReadOnlyTransaction(TxnContext context) {
return new AsyncTransaction(this, this.anyClient(), context, true);
}
}
33 changes: 33 additions & 0 deletions src/main/java/io/dgraph/DgraphClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.dgraph;

import io.dgraph.DgraphProto.Operation;
import io.dgraph.DgraphProto.TxnContext;

/**
* Implementation of a DgraphClient using grpc.
Expand Down Expand Up @@ -62,6 +63,27 @@ public Transaction newTransaction() {
return new Transaction(asyncClient.newTransaction());
}

/**
* Creates a new Transaction object from a TxnContext. All operations performed by this
* transaction are synchronous.
*
* <p>A transaction lifecycle is as follows:
*
* <p>- Created using AsyncTransaction#newTransaction()
*
* <p>- Various AsyncTransaction#query() and AsyncTransaction#mutate() calls made.
*
* <p>- Commit using Transacation#commit() or Discard using AsyncTransaction#discard(). If any
* mutations have been made, It's important that at least one of these methods is called to clean
* up resources. Discard is a no-op if Commit has already been called, so it's safe to call it
* after Commit.
*
* @return a new Transaction object.
*/
public Transaction newTransaction(TxnContext context) {
return new Transaction(asyncClient.newTransaction(context));
}

/**
* Creates a new AsyncTransaction object that only allows queries. Any Transaction#mutate() or
* Transaction#commit() call made to the read only transaction will result in
Expand All @@ -73,6 +95,17 @@ public Transaction newReadOnlyTransaction() {
return new Transaction(asyncClient.newReadOnlyTransaction());
}

/**
* Creates a new AsyncTransaction object from a TnxContext that only allows queries. Any
* Transaction#mutate() or Transaction#commit() call made to the read only transaction will result
* in TxnReadOnlyException. All operations performed by this transaction are synchronous.
*
* @return a new AsyncTransaction object
*/
public Transaction newReadOnlyTransaction(TxnContext context) {
return new Transaction(asyncClient.newReadOnlyTransaction(context));
}

/**
* Alter can be used to perform the following operations, by setting the right fields in the
* protocol buffer Operation object.
Expand Down
18 changes: 18 additions & 0 deletions src/test/java/io/dgraph/DgraphAsyncClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,24 @@ public void testDelete() throws Exception {
}
}

@Test
public void testNewTransactionFromContext() {
DgraphProto.TxnContext ctx = DgraphProto.TxnContext.newBuilder().setStartTs(1234L).build();
try (AsyncTransaction txn = dgraphAsyncClient.newTransaction(ctx)) {
Response response = txn.query("{ result(func: uid(0x1)) { } }").join();
assertEquals(response.getTxn().getStartTs(), 1234L);
}
}

@Test
public void testNewReadOnlyTransactionFromContext() {
DgraphProto.TxnContext ctx = DgraphProto.TxnContext.newBuilder().setStartTs(1234L).build();
try (AsyncTransaction txn = dgraphAsyncClient.newReadOnlyTransaction(ctx)) {
Response response = txn.query("{ result(func: uid(0x1)) { } }").join();
assertEquals(response.getTxn().getStartTs(), 1234L);
}
}

@Test(expectedExceptions = TxnReadOnlyException.class)
public void testMutationsInReadOnlyTransactions() {
try (AsyncTransaction txn = dgraphAsyncClient.newReadOnlyTransaction()) {
Expand Down
19 changes: 19 additions & 0 deletions src/test/java/io/dgraph/DgraphClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.dgraph.DgraphProto.Mutation;
import io.dgraph.DgraphProto.Operation;
import io.dgraph.DgraphProto.Response;
import io.dgraph.DgraphProto.TxnContext;
import java.util.Collections;
import java.util.Map;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -99,6 +100,24 @@ public void testDelete() {
}
}

@Test
public void testNewTransactionFromContext() {
TxnContext ctx = TxnContext.newBuilder().setStartTs(1234L).build();
try (Transaction txn = dgraphClient.newTransaction(ctx)) {
Response response = txn.query("{ result(func: uid(0x1)) { } }");
assertEquals(response.getTxn().getStartTs(), 1234L);
}
}

@Test
public void testNewReadOnlyTransactionFromContext() {
TxnContext ctx = TxnContext.newBuilder().setStartTs(1234L).build();
try (Transaction txn = dgraphClient.newReadOnlyTransaction(ctx)) {
Response response = txn.query("{ result(func: uid(0x1)) { } }");
assertEquals(response.getTxn().getStartTs(), 1234L);
}
}

@Test(expectedExceptions = TxnFinishedException.class)
public void testCommitAfterCommitNow() {
try (Transaction txn = dgraphClient.newTransaction()) {
Expand Down

0 comments on commit 00de540

Please sign in to comment.