11package com .github .dockerjava .netty .handler ;
22
3+ import java .io .IOException ;
4+ import java .io .InputStream ;
5+ import java .util .concurrent .SynchronousQueue ;
6+
37import io .netty .buffer .ByteBuf ;
48import io .netty .channel .ChannelHandlerContext ;
59import io .netty .channel .SimpleChannelInboundHandler ;
610
7- import java .io .IOException ;
8- import java .io .InputStream ;
9- import java .util .concurrent .LinkedTransferQueue ;
10- import java .util .concurrent .TimeUnit ;
11- import java .util .concurrent .atomic .AtomicBoolean ;
12-
1311import com .github .dockerjava .api .async .ResultCallback ;
12+ import com .google .common .base .Optional ;
1413
1514/**
1615 * Handler that converts an incoming byte stream to an {@link InputStream}.
2019public class HttpResponseStreamHandler extends SimpleChannelInboundHandler <ByteBuf > {
2120
2221 private final HttpResponseInputStream stream = new HttpResponseInputStream ();
23- private ResultCallback <InputStream > resultCallback ;
2422
2523 public HttpResponseStreamHandler (ResultCallback <InputStream > resultCallback ) {
26- this . resultCallback = resultCallback ;
24+ resultCallback . onNext ( stream ) ;
2725 }
2826
2927 @ Override
3028 protected void channelRead0 (ChannelHandlerContext ctx , ByteBuf msg ) throws Exception {
31- invokeCallbackOnFirstRead ();
32-
3329 stream .write (msg .copy ());
3430 }
3531
36- private void invokeCallbackOnFirstRead () {
37- if (resultCallback != null ) {
38- resultCallback .onNext (stream );
39- resultCallback = null ;
40- }
41- }
42-
4332 @ Override
44- public void channelReadComplete (ChannelHandlerContext ctx ) throws Exception {
33+ public void channelInactive (ChannelHandlerContext ctx ) throws Exception {
4534 stream .writeComplete ();
46- super .channelReadComplete (ctx );
35+
36+ super .channelInactive (ctx );
4737 }
4838
4939 public static class HttpResponseInputStream extends InputStream {
5040
51- private AtomicBoolean writeCompleted = new AtomicBoolean ( false ) ;
41+ private boolean writeCompleted = false ;
5242
53- private LinkedTransferQueue < ByteBuf > queue = new LinkedTransferQueue <>();
43+ private SynchronousQueue < Optional < ByteBuf >> queue = new SynchronousQueue <>();
5444
5545 private ByteBuf current = null ;
5646
57- public void write (ByteBuf byteBuf ) {
58- queue .put (byteBuf );
47+ public void write (ByteBuf byteBuf ) throws InterruptedException {
48+ queue .put (Optional . of ( byteBuf ) );
5949 }
6050
61- public void writeComplete () {
62- writeCompleted . set ( true );
51+ public void writeComplete () throws InterruptedException {
52+ queue . put ( Optional .< ByteBuf > absent () );
6353 }
6454
6555 @ Override
@@ -70,16 +60,16 @@ public void close() throws IOException {
7060 }
7161
7262 private void releaseQueued () {
73- ByteBuf byteBuf = queue .poll ();
74- while (byteBuf != null ) {
75- byteBuf .release ();
63+ Optional < ByteBuf > byteBuf = queue .poll ();
64+ while (byteBuf != null && byteBuf . isPresent () ) {
65+ byteBuf .get (). release ();
7666 byteBuf = queue .poll ();
7767 }
7868 }
7969
8070 @ Override
8171 public int available () throws IOException {
82- poll ();
72+ poll (0 );
8373 return readableBytes ();
8474 }
8575
@@ -94,38 +84,52 @@ private int readableBytes() {
9484
9585 @ Override
9686 public int read () throws IOException {
87+ byte [] b = new byte [1 ];
88+ int n = read (b , 0 , 1 );
89+ return n != -1 ? b [0 ] : -1 ;
90+ }
9791
98- poll ();
99-
100- if (readableBytes () == 0 ) {
101- if (writeCompleted .get ()) {
102- return -1 ;
103- }
104- }
105-
106- if (current != null && current .readableBytes () > 0 ) {
107- return current .readByte () & 0xff ;
92+ @ Override
93+ public int read (byte [] b , int off , int len ) throws IOException {
94+ off = poll (off );
95+ if (current == null ) {
96+ return -1 ;
10897 } else {
109- return read ();
98+ int availableBytes = Math .min (len , current .readableBytes () - off );
99+ current .readBytes (b , off , availableBytes );
100+ return availableBytes ;
110101 }
111102 }
112103
113- private void poll () {
114- if (readableBytes () == 0 ) {
104+ private int poll (int off ) {
105+ if (writeCompleted ) {
106+ return off ;
107+ }
108+ while (readableBytes () <= off ) {
115109 try {
116- releaseCurrent ();
117- current = queue .poll (500 , TimeUnit .MILLISECONDS );
110+ off -= releaseCurrent ();
111+ Optional <ByteBuf > optional = queue .take ();
112+ if (optional .isPresent ()) {
113+ current = optional .get ();
114+ } else {
115+ writeCompleted = true ;
116+ return off ;
117+ }
118118 } catch (InterruptedException e ) {
119119 throw new RuntimeException (e );
120120 }
121121 }
122+ return off ;
122123 }
123124
124- private void releaseCurrent () {
125+ private int releaseCurrent () {
125126 if (current != null ) {
127+ int n = current .readableBytes ();
126128 current .release ();
127129 current = null ;
130+ return n ;
128131 }
132+ return 0 ;
129133 }
130134 }
131135}
0 commit comments