Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<!-- test dependencies -->
<logback.version>1.1.0</logback.version>
<testng.version>6.1.1</testng.version>
<netty.version>4.1.0.CR2</netty.version>
<netty.version>4.1.0.CR3</netty.version>
<hamcrest.library.version>1.3</hamcrest.library.version>
<hamcrest.jpa-matchers>1.6</hamcrest.jpa-matchers>
<lambdaj.version>2.3.3</lambdaj.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ public RC_T awaitCompletion() throws InterruptedException {

/**
* Blocks until {@link ResultCallback#onComplete()} was called or the given timeout occurs
* @return {@code true} if completed and {@code false} if the waiting time elapsed
* before {@link ResultCallback#onComplete()} was called.
*/
@SuppressWarnings("unchecked")
public RC_T awaitCompletion(long timeout, TimeUnit timeUnit) throws InterruptedException {
completed.await(timeout, timeUnit);
return (RC_T) this;
public boolean awaitCompletion(long timeout, TimeUnit timeUnit) throws InterruptedException {
return completed.await(timeout, timeUnit);
}

/**
Expand All @@ -115,11 +115,11 @@ public RC_T awaitStarted() throws InterruptedException {
/**
* Blocks until {@link ResultCallback#onStart()} was called or the given timeout occurs. {@link ResultCallback#onStart()} is called when
* the request was processed on the server side and the response is incoming.
* @return {@code true} if started and {@code false} if the waiting time elapsed
* before {@link ResultCallback#onStart()} was called.
*/
@SuppressWarnings("unchecked")
public RC_T awaitStarted(long timeout, TimeUnit timeUnit) throws InterruptedException {
started.await(timeout, timeUnit);
return (RC_T) this;
public boolean awaitStarted(long timeout, TimeUnit timeUnit) throws InterruptedException {
return started.await(timeout, timeUnit);
}

@CheckForNull
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.github.dockerjava.netty;

import io.netty.channel.Channel;
import io.netty.channel.socket.DuplexChannel;

public interface ChannelProvider {
Channel getChannel();
DuplexChannel getChannel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@
import com.github.dockerjava.netty.exec.WaitContainerCmdExec;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
Expand Down Expand Up @@ -165,8 +165,8 @@ public class DockerCmdExecFactoryImpl implements DockerCmdExecFactory {

private ChannelProvider channelProvider = new ChannelProvider() {
@Override
public Channel getChannel() {
Channel channel = connect();
public DuplexChannel getChannel() {
DuplexChannel channel = connect();
channel.pipeline().addLast(new LoggingHandler(getClass()));
return channel;
}
Expand All @@ -190,22 +190,22 @@ public void init(DockerClientConfig dockerClientConfig) {
eventLoopGroup = nettyInitializer.init(bootstrap, dockerClientConfig);
}

private Channel connect() {
private DuplexChannel connect() {
try {
return connect(bootstrap);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private Channel connect(final Bootstrap bootstrap) throws InterruptedException {
private DuplexChannel connect(final Bootstrap bootstrap) throws InterruptedException {
return nettyInitializer.connect(bootstrap);
}

private interface NettyInitializer {
EventLoopGroup init(final Bootstrap bootstrap, DockerClientConfig dockerClientConfig);

Channel connect(final Bootstrap bootstrap) throws InterruptedException;
DuplexChannel connect(final Bootstrap bootstrap) throws InterruptedException;
}

private class UnixDomainSocketInitializer implements NettyInitializer {
Expand All @@ -223,8 +223,8 @@ protected void initChannel(final UnixChannel channel) throws Exception {
}

@Override
public Channel connect(Bootstrap bootstrap) throws InterruptedException {
return bootstrap.connect(new DomainSocketAddress("/var/run/docker.sock")).sync().channel();
public DuplexChannel connect(Bootstrap bootstrap) throws InterruptedException {
return (DuplexChannel) bootstrap.connect(new DomainSocketAddress("/var/run/docker.sock")).sync().channel();
}
}

Expand Down Expand Up @@ -253,15 +253,15 @@ protected void initChannel(final SocketChannel channel) throws Exception {
}

@Override
public Channel connect(Bootstrap bootstrap) throws InterruptedException {
public DuplexChannel connect(Bootstrap bootstrap) throws InterruptedException {
String host = dockerClientConfig.getDockerHost().getHost();
int port = dockerClientConfig.getDockerHost().getPort();

if (port == -1) {
throw new RuntimeException("no port configured for " + host);
}

Channel channel = bootstrap.connect(host, port).sync().channel();
DuplexChannel channel = (DuplexChannel) bootstrap.connect(host, port).sync().channel();

if (dockerClientConfig.getDockerTlsVerify()) {
final SslHandler ssl = initSsl(dockerClientConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.socket.DuplexChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
Expand Down Expand Up @@ -150,7 +151,7 @@ public <T> void get(TypeReference<T> typeReference, ResultCallback<T> resultCall
return;
}

private Channel getChannel() {
private DuplexChannel getChannel() {
return channelProvider.getChannel();
}

Expand Down Expand Up @@ -215,7 +216,7 @@ public void post(final Object entity, final InputStream stdin, final ResultCallb

FramedResponseStreamHandler streamHandler = new FramedResponseStreamHandler(resultCallback);

final Channel channel = getChannel();
final DuplexChannel channel = getChannel();

// result callback's close() method must be called when the servers closes the connection
channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
Expand Down Expand Up @@ -262,6 +263,9 @@ public void run() {
channel.writeAndFlush(Unpooled.copiedBuffer(buffer, 0, read));
}

// we close the writing side of the socket, but keep the read side open to transfer stdout/stderr
channel.shutdownOutput();

}
}).start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void onNext(Frame frame) {
};

dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true)
.withLogs(true).exec(callback).awaitCompletion(30, TimeUnit.SECONDS).close();
.withLogs(true).exec(callback).awaitCompletion(30, TimeUnit.SECONDS);
callback.close();

assertThat(callback.toString(), containsString(snippet));
}
Expand Down Expand Up @@ -97,7 +98,8 @@ public void onNext(Frame frame) {
};

dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true)
.exec(callback).awaitCompletion(15, TimeUnit.SECONDS).close();
.exec(callback).awaitCompletion(15, TimeUnit.SECONDS);
callback.close();

System.out.println("log: " + callback.toString());

Expand Down Expand Up @@ -130,7 +132,8 @@ public void onNext(Frame frame) {
InputStream stdin = new ByteArrayInputStream("".getBytes());

dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true)
.withLogs(true).withStdIn(stdin).exec(callback).awaitCompletion(30, TimeUnit.SECONDS).close();
.withLogs(true).withStdIn(stdin).exec(callback).awaitCompletion(30, TimeUnit.SECONDS);
callback.close();
}

public static class AttachContainerTestCallback extends AttachContainerResultCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public void onNext(Frame frame) {
};

dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true)
.withLogs(true).exec(callback).awaitCompletion(10, TimeUnit.SECONDS).close();
.withLogs(true).exec(callback).awaitCompletion(10, TimeUnit.SECONDS);
callback.close();

assertThat(callback.toString(), containsString(snippet));
}
Expand Down Expand Up @@ -104,7 +105,8 @@ public void onNext(Frame frame) {
InputStream stdin = new ByteArrayInputStream((snippet + "\n").getBytes());

dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true)
.withStdIn(stdin).exec(callback).awaitCompletion(2, TimeUnit.SECONDS).close();
.withStdIn(stdin).exec(callback).awaitCompletion(2, TimeUnit.SECONDS);
callback.close();

assertThat(callback.toString(), containsString(snippet));
}
Expand Down Expand Up @@ -133,7 +135,8 @@ public void onNext(Frame frame) {
};

dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true)
.exec(callback).awaitCompletion(10, TimeUnit.SECONDS).close();
.exec(callback).awaitCompletion(10, TimeUnit.SECONDS);
callback.close();

// HexDump.dump(collectFramesCallback.toString().getBytes(), 0, System.out, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,10 @@ public void execStartAttachStdin() throws Exception {

ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(container.getId())
.withAttachStdout(true).withAttachStdin(true).withCmd("cat").exec();
dockerClient.execStartCmd(execCreateCmdResponse.getId()).withDetach(false).withTty(true).withStdIn(stdin)
boolean completed = dockerClient.execStartCmd(execCreateCmdResponse.getId()).withDetach(false).withTty(true).withStdIn(stdin)
.exec(new ExecStartResultCallback(stdout, System.err)).awaitCompletion(5, TimeUnit.SECONDS);

assertTrue(completed, "The process was not finished.");
assertEquals(stdout.toString("UTF-8"), "STDIN\n");
}

Expand All @@ -138,9 +139,10 @@ public void execStartNotAttachedStdin() throws Exception {

ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(container.getId())
.withAttachStdout(true).withAttachStdin(false).withCmd("/bin/sh").exec();
dockerClient.execStartCmd(execCreateCmdResponse.getId()).withDetach(false).withStdIn(stdin)
boolean completed = dockerClient.execStartCmd(execCreateCmdResponse.getId()).withDetach(false).withStdIn(stdin)
.exec(new ExecStartResultCallback(stdout, System.err)).awaitCompletion(5, TimeUnit.SECONDS);

assertTrue(completed, "The process was not finished.");
assertEquals(stdout.toString(), "");
}
}