Skip to content

Commit 27bffd6

Browse files
committed
Fix for #670
LinkedTransferQueue replaced with SynchronousQueue to fix OutOfDirectMemoryError. InputStream.read() bytes method implemented to improve performance.
1 parent ea20be8 commit 27bffd6

2 files changed

Lines changed: 49 additions & 47 deletions

File tree

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
package com.github.dockerjava.netty.handler;
22

3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.util.concurrent.SynchronousQueue;
6+
37
import io.netty.buffer.ByteBuf;
48
import io.netty.channel.ChannelHandlerContext;
59
import io.netty.channel.SimpleChannelInboundHandler;
610

7-
import java.io.IOException;
8-
import java.io.InputStream;
9-
import java.util.concurrent.LinkedTransferQueue;
10-
import java.util.concurrent.TimeUnit;
11-
import java.util.concurrent.atomic.AtomicBoolean;
12-
1311
import com.github.dockerjava.api.async.ResultCallback;
12+
import com.google.common.base.Optional;
1413

1514
/**
1615
* Handler that converts an incoming byte stream to an {@link InputStream}.
@@ -20,46 +19,37 @@
2019
public class HttpResponseStreamHandler extends SimpleChannelInboundHandler<ByteBuf> {
2120

2221
private final HttpResponseInputStream stream = new HttpResponseInputStream();
23-
private ResultCallback<InputStream> resultCallback;
2422

2523
public HttpResponseStreamHandler(ResultCallback<InputStream> resultCallback) {
26-
this.resultCallback = resultCallback;
24+
resultCallback.onNext(stream);
2725
}
2826

2927
@Override
3028
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
31-
invokeCallbackOnFirstRead();
32-
3329
stream.write(msg.copy());
3430
}
3531

36-
private void invokeCallbackOnFirstRead() {
37-
if (resultCallback != null) {
38-
resultCallback.onNext(stream);
39-
resultCallback = null;
40-
}
41-
}
42-
4332
@Override
44-
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
33+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
4534
stream.writeComplete();
46-
super.channelReadComplete(ctx);
35+
36+
super.channelInactive(ctx);
4737
}
4838

4939
public static class HttpResponseInputStream extends InputStream {
5040

51-
private AtomicBoolean writeCompleted = new AtomicBoolean(false);
41+
private boolean writeCompleted = false;
5242

53-
private LinkedTransferQueue<ByteBuf> queue = new LinkedTransferQueue<>();
43+
private SynchronousQueue<Optional<ByteBuf>> queue = new SynchronousQueue<>();
5444

5545
private ByteBuf current = null;
5646

57-
public void write(ByteBuf byteBuf) {
58-
queue.put(byteBuf);
47+
public void write(ByteBuf byteBuf) throws InterruptedException {
48+
queue.put(Optional.of(byteBuf));
5949
}
6050

61-
public void writeComplete() {
62-
writeCompleted.set(true);
51+
public void writeComplete() throws InterruptedException {
52+
queue.put(Optional.<ByteBuf>absent());
6353
}
6454

6555
@Override
@@ -70,16 +60,16 @@ public void close() throws IOException {
7060
}
7161

7262
private void releaseQueued() {
73-
ByteBuf byteBuf = queue.poll();
74-
while (byteBuf != null) {
75-
byteBuf.release();
63+
Optional<ByteBuf> byteBuf = queue.poll();
64+
while (byteBuf != null && byteBuf.isPresent()) {
65+
byteBuf.get().release();
7666
byteBuf = queue.poll();
7767
}
7868
}
7969

8070
@Override
8171
public int available() throws IOException {
82-
poll();
72+
poll(0);
8373
return readableBytes();
8474
}
8575

@@ -94,38 +84,52 @@ private int readableBytes() {
9484

9585
@Override
9686
public int read() throws IOException {
87+
byte[] b = new byte[1];
88+
int n = read(b, 0, 1);
89+
return n != -1 ? b[0] : -1;
90+
}
9791

98-
poll();
99-
100-
if (readableBytes() == 0) {
101-
if (writeCompleted.get()) {
102-
return -1;
103-
}
104-
}
105-
106-
if (current != null && current.readableBytes() > 0) {
107-
return current.readByte() & 0xff;
92+
@Override
93+
public int read(byte[] b, int off, int len) throws IOException {
94+
off = poll(off);
95+
if (current == null) {
96+
return -1;
10897
} else {
109-
return read();
98+
int availableBytes = Math.min(len, current.readableBytes() - off);
99+
current.readBytes(b, off, availableBytes);
100+
return availableBytes;
110101
}
111102
}
112103

113-
private void poll() {
114-
if (readableBytes() == 0) {
104+
private int poll(int off) {
105+
if (writeCompleted) {
106+
return off;
107+
}
108+
while (readableBytes() <= off) {
115109
try {
116-
releaseCurrent();
117-
current = queue.poll(500, TimeUnit.MILLISECONDS);
110+
off -= releaseCurrent();
111+
Optional<ByteBuf> optional = queue.take();
112+
if (optional.isPresent()) {
113+
current = optional.get();
114+
} else {
115+
writeCompleted = true;
116+
return off;
117+
}
118118
} catch (InterruptedException e) {
119119
throw new RuntimeException(e);
120120
}
121121
}
122+
return off;
122123
}
123124

124-
private void releaseCurrent() {
125+
private int releaseCurrent() {
125126
if (current != null) {
127+
int n = current.readableBytes();
126128
current.release();
127129
current = null;
130+
return n;
128131
}
132+
return 0;
129133
}
130134
}
131135
}

src/test/java/com/github/dockerjava/netty/exec/CopyArchiveFromContainerCmdExecTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ public void copyFromContainerBinaryFile() throws Exception {
9393
}
9494

9595
InputStream response = dockerClient.copyArchiveFromContainerCmd(container.getId(), "/binary.dat").exec();
96-
Boolean bytesAvailable = response.available() > 0;
97-
assertTrue(bytesAvailable, "The file was not copied from the container.");
9896

9997
try (TarArchiveInputStream tarInputStream = new TarArchiveInputStream(response)) {
10098
TarArchiveEntry nextTarEntry = tarInputStream.getNextTarEntry();

0 commit comments

Comments
 (0)