Skip to content

Commit 08768b5

Browse files
author
Adrian Todt
committed
Refactored onStateUpdate, will this fix a race condition?
1 parent 628dffa commit 08768b5

1 file changed

Lines changed: 36 additions & 32 deletions

File tree

src/main/java/com/rethinkdb/net/Result.java

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -475,40 +475,44 @@ protected void throwOnCompleted() {
475475
* This function is called on next()
476476
*/
477477
protected void onStateUpdate() {
478-
final Response lastRes = currentResponse.get();
479-
if (shouldContinue(lastRes) && requesting.tryAcquire()) {
480-
// great, we should make a CONTINUE request.
481-
connection.sendContinue(lastRes.token).whenComplete((nextRes, t) -> {
482-
if (t != null) { // It errored. This means it's over.
483-
completed.completeExceptionally(t);
484-
} else { // Okay, let's process this response.
485-
currentResponse.set(nextRes);
486-
if (nextRes.type.equals(ResponseType.SUCCESS_SEQUENCE)) {
487-
try {
488-
emitting.acquire();
489-
emitData(nextRes);
490-
emitting.release();
491-
completed.complete(true); // Completed. This means it's over.
492-
} catch (Exception e) {
493-
completed.completeExceptionally(e); // It errored. This means it's over.
478+
if (requesting.tryAcquire()) {
479+
final Response lastRes = currentResponse.get();
480+
if (shouldContinue(lastRes)) {
481+
// great, we should make a CONTINUE request.
482+
connection.sendContinue(lastRes.token).whenComplete((response, t) -> {
483+
if (t == null) { // Okay, let's process this response.
484+
currentResponse.set(response);
485+
if (response.type.equals(ResponseType.SUCCESS_PARTIAL)) {
486+
// Okay, we got another partial response, so there's more.
487+
requesting.release();
488+
489+
try {
490+
emitting.acquire();
491+
emitData(response);
492+
emitting.release();
493+
onStateUpdate(); //Recursion!
494+
} catch (Exception e) {
495+
completed.completeExceptionally(e); // It errored. This means it's over.
496+
}
497+
} else if (response.type.equals(ResponseType.SUCCESS_SEQUENCE)) {
498+
try {
499+
emitting.acquire();
500+
emitData(response);
501+
emitting.release();
502+
completed.complete(true); // Completed. This means it's over.
503+
} catch (Exception e) {
504+
completed.completeExceptionally(e); // It errored. This means it's over.
505+
}
506+
} else {
507+
completed.completeExceptionally(response.makeError(query)); // It errored. This means it's over.
494508
}
495-
} else if (nextRes.type.equals(ResponseType.SUCCESS_PARTIAL)) {
496-
// Okay, we got another partial response, so there's more.
497-
498-
requesting.release(); // Request's over, release this for later.
499-
try {
500-
emitting.acquire();
501-
emitData(nextRes);
502-
emitting.release();
503-
onStateUpdate(); //Recursion!
504-
} catch (Exception e) {
505-
completed.completeExceptionally(e); // It errored. This means it's over.
506-
}
507-
} else {
508-
completed.completeExceptionally(firstRes.makeError(query)); // It errored. This means it's over.
509+
} else { // It errored. This means it's over.
510+
completed.completeExceptionally(t);
509511
}
510-
}
511-
});
512+
});
513+
} else { // Just release for re-checking later on
514+
requesting.release();
515+
}
512516
}
513517
}
514518

0 commit comments

Comments
 (0)