@@ -475,40 +475,44 @@ protected void throwOnCompleted() {
475475 * This function is called on next()
476476 */
477477 protected void onStateUpdate () {
478- final Response lastRes = currentResponse .get ();
479- if (shouldContinue (lastRes ) && requesting .tryAcquire ()) {
480- // great, we should make a CONTINUE request.
481- connection .sendContinue (lastRes .token ).whenComplete ((nextRes , t ) -> {
482- if (t != null ) { // It errored. This means it's over.
483- completed .completeExceptionally (t );
484- } else { // Okay, let's process this response.
485- currentResponse .set (nextRes );
486- if (nextRes .type .equals (ResponseType .SUCCESS_SEQUENCE )) {
487- try {
488- emitting .acquire ();
489- emitData (nextRes );
490- emitting .release ();
491- completed .complete (true ); // Completed. This means it's over.
492- } catch (Exception e ) {
493- completed .completeExceptionally (e ); // It errored. This means it's over.
478+ if (requesting .tryAcquire ()) {
479+ final Response lastRes = currentResponse .get ();
480+ if (shouldContinue (lastRes )) {
481+ // great, we should make a CONTINUE request.
482+ connection .sendContinue (lastRes .token ).whenComplete ((response , t ) -> {
483+ if (t == null ) { // Okay, let's process this response.
484+ currentResponse .set (response );
485+ if (response .type .equals (ResponseType .SUCCESS_PARTIAL )) {
486+ // Okay, we got another partial response, so there's more.
487+ requesting .release ();
488+
489+ try {
490+ emitting .acquire ();
491+ emitData (response );
492+ emitting .release ();
493+ onStateUpdate (); //Recursion!
494+ } catch (Exception e ) {
495+ completed .completeExceptionally (e ); // It errored. This means it's over.
496+ }
497+ } else if (response .type .equals (ResponseType .SUCCESS_SEQUENCE )) {
498+ try {
499+ emitting .acquire ();
500+ emitData (response );
501+ emitting .release ();
502+ completed .complete (true ); // Completed. This means it's over.
503+ } catch (Exception e ) {
504+ completed .completeExceptionally (e ); // It errored. This means it's over.
505+ }
506+ } else {
507+ completed .completeExceptionally (response .makeError (query )); // It errored. This means it's over.
494508 }
495- } else if (nextRes .type .equals (ResponseType .SUCCESS_PARTIAL )) {
496- // Okay, we got another partial response, so there's more.
497-
498- requesting .release (); // Request's over, release this for later.
499- try {
500- emitting .acquire ();
501- emitData (nextRes );
502- emitting .release ();
503- onStateUpdate (); //Recursion!
504- } catch (Exception e ) {
505- completed .completeExceptionally (e ); // It errored. This means it's over.
506- }
507- } else {
508- completed .completeExceptionally (firstRes .makeError (query )); // It errored. This means it's over.
509+ } else { // It errored. This means it's over.
510+ completed .completeExceptionally (t );
509511 }
510- }
511- });
512+ });
513+ } else { // Just release for re-checking later on
514+ requesting .release ();
515+ }
512516 }
513517 }
514518
0 commit comments