@@ -603,13 +603,28 @@ public void streamIdExhausted() throws Exception {
603603 int startId = Integer .MAX_VALUE - 2 ;
604604 initTransport (startId , new ConnectedCallback (false ));
605605
606- MockStreamListener listener1 = new MockStreamListener ();
607- clientTransport .newStream (method , new Metadata .Headers (), listener1 );
606+ MockStreamListener listener = new MockStreamListener ();
607+ clientTransport .newStream (method , new Metadata .Headers (), listener ). request ( 1 );
608608
609+ // New stream should be failed.
609610 assertNewStreamFail ();
610611
612+ // The alive stream should be functional, receives a message.
613+ frameHandler ().headers (
614+ false , false , startId , 0 , grpcResponseHeaders (), HeadersMode .HTTP_20_HEADERS );
615+ assertNotNull (listener .headers );
616+ String message = "hello" ;
617+ Buffer buffer = createMessageFrame (message );
618+ frameHandler ().data (false , startId , buffer , (int ) buffer .size ());
619+
611620 getStream (startId ).cancel (Status .CANCELLED );
612- listener1 .waitUntilStreamClosed ();
621+ // Receives the second message after be cancelled.
622+ buffer = createMessageFrame (message );
623+ frameHandler ().data (false , startId , buffer , (int ) buffer .size ());
624+
625+ listener .waitUntilStreamClosed ();
626+ // Should only have the first message delivered.
627+ assertEquals (message , listener .messages .get (0 ));
613628 verify (frameWriter , timeout (TIME_OUT_MS )).rstStream (eq (startId ), eq (ErrorCode .CANCEL ));
614629 verify (transportListener ).transportShutdown (isA (Status .class ));
615630 verify (transportListener , timeout (TIME_OUT_MS )).transportTerminated ();
@@ -627,6 +642,12 @@ public void pendingStreamSucceed() throws Exception {
627642 // The second stream should be pending.
628643 OkHttpClientStream stream2 =
629644 clientTransport .newStream (method , new Metadata .Headers (), listener2 );
645+ String sentMessage = "hello" ;
646+ InputStream input = new ByteArrayInputStream (sentMessage .getBytes (UTF_8 ));
647+ assertEquals (5 , input .available ());
648+ stream2 .writeMessage (input );
649+ stream2 .flush ();
650+ stream2 .halfClose ();
630651
631652 waitForStreamPending (1 );
632653 assertEquals (1 , activeStreamCount ());
@@ -635,10 +656,16 @@ public void pendingStreamSucceed() throws Exception {
635656 stream1 .cancel (Status .CANCELLED );
636657 listener1 .waitUntilStreamClosed ();
637658
638- // The second stream should be active now.
659+ // The second stream should be active now, and the pending data should be sent out .
639660 assertEquals (1 , activeStreamCount ());
640661 assertEquals (0 , clientTransport .getPendingStreamSize ());
641- stream2 .cancel (Status .CANCELLED );
662+ ArgumentCaptor <Buffer > captor = ArgumentCaptor .forClass (Buffer .class );
663+ verify (frameWriter , timeout (TIME_OUT_MS ))
664+ .data (eq (false ), eq (5 ), captor .capture (), eq (5 + HEADER_LENGTH ));
665+ Buffer sentFrame = captor .getValue ();
666+ assertEquals (createMessageFrame (sentMessage ), sentFrame );
667+ verify (frameWriter , timeout (TIME_OUT_MS )).data (eq (true ), eq (5 ), any (Buffer .class ), eq (0 ));
668+ stream2 .sendCancel (Status .CANCELLED );
642669 }
643670
644671 @ Test
0 commit comments