Skip to content

Commit 5ad41ae

Browse files
committed
EventsAPI: Complete callback based EventNotifier
1 parent 3d2a3f0 commit 5ad41ae

5 files changed

Lines changed: 61 additions & 40 deletions

File tree

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package com.github.dockerjava.api.command;
22

3-
import com.github.dockerjava.api.model.EventNotifier;
3+
import java.util.concurrent.ExecutorService;
44

55
/**
66
* Get events
77
*
88
* @param since - Show all events created since timestamp
99
* @param until - Stream events until this timestamp
1010
*/
11-
public interface EventsCmd extends DockerCmd<EventNotifier> {
11+
public interface EventsCmd extends DockerCmd<Void> {
1212
public EventsCmd withSince(String since);
1313

1414
public EventsCmd withUntil(String until);
@@ -19,6 +19,10 @@ public interface EventsCmd extends DockerCmd<EventNotifier> {
1919

2020
public EventCallback getEventCallback();
2121

22-
public static interface Exec extends DockerCmdExec<EventsCmd, EventNotifier> {
22+
public ExecutorService getExecutorService();
23+
24+
public void stop();
25+
26+
public static interface Exec extends DockerCmdExec<EventsCmd, Void> {
2327
}
2428
}

src/main/java/com/github/dockerjava/api/model/EventNotifier.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,44 +7,45 @@
77
import com.github.dockerjava.api.command.EventCallback;
88
import com.google.common.base.Preconditions;
99

10-
import java.io.Closeable;
11-
import java.io.IOException;
10+
import javax.ws.rs.client.WebTarget;
11+
import javax.ws.rs.core.Response;
1212
import java.io.InputStream;
1313
import java.util.concurrent.Callable;
1414

1515
/**
16-
* EventStream API
17-
* <p/>
18-
* Spawns a thread to poll for events to fill a BlockingQueue
16+
* EventNotifier API
1917
*/
20-
public class EventNotifier implements Closeable, Callable<Void> {
18+
public class EventNotifier implements Callable<Void> {
2119
private static final JsonFactory JSON_FACTORY = new JsonFactory();
2220
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
2321

2422
private final EventCallback eventCallback;
25-
private final InputStream inputStream;
23+
private final WebTarget webTarget;
2624

27-
private EventNotifier(EventCallback eventCallback, InputStream inputStream) {
25+
private EventNotifier(EventCallback eventCallback, WebTarget webTarget) {
2826
this.eventCallback = eventCallback;
29-
this.inputStream = inputStream;
27+
this.webTarget = webTarget;
3028
}
3129

32-
public static EventNotifier create(EventCallback eventCallback, InputStream inputStream) {
30+
public static EventNotifier create(EventCallback eventCallback, WebTarget webTarget) {
3331
Preconditions.checkNotNull(eventCallback, "An EventCallback must be provided");
34-
Preconditions.checkNotNull(inputStream, "An InputStream must be provided");
35-
return new EventNotifier(eventCallback, inputStream);
36-
}
37-
38-
@Override
39-
public void close() throws IOException {
40-
inputStream.close();
32+
Preconditions.checkNotNull(webTarget, "An WebTarget must be provided");
33+
return new EventNotifier(eventCallback, webTarget);
4134
}
4235

4336
@Override
4437
public Void call() throws Exception {
45-
JsonParser jp = JSON_FACTORY.createParser(inputStream);
46-
while (jp.nextToken() != JsonToken.END_OBJECT && !jp.isClosed()) {
47-
eventCallback.onEvent(OBJECT_MAPPER.readValue(jp, Event.class));
38+
Response response = webTarget.request().get(Response.class);
39+
InputStream inputStream = response.readEntity(InputStream.class);
40+
try {
41+
JsonParser jp = JSON_FACTORY.createParser(inputStream);
42+
while (jp.nextToken() != JsonToken.END_OBJECT && !jp.isClosed()) {
43+
eventCallback.onEvent(OBJECT_MAPPER.readValue(jp, Event.class));
44+
}
45+
} finally {
46+
if (response != null) {
47+
response.close();
48+
}
4849
}
4950
return null;
5051
}

src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22

33
import com.github.dockerjava.api.command.EventCallback;
44
import com.github.dockerjava.api.command.EventsCmd;
5-
import com.github.dockerjava.api.model.EventNotifier;
5+
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
68

79
/**
810
* Stream docker events
911
*/
10-
public class EventsCmdImpl extends AbstrDockerCmd<EventsCmd, EventNotifier> implements EventsCmd {
12+
public class EventsCmdImpl extends AbstrDockerCmd<EventsCmd, Void> implements EventsCmd {
1113

1214
private final EventCallback eventCallback;
1315

16+
private ExecutorService executorService = Executors.newSingleThreadExecutor();
1417
private String since;
1518
private String until;
1619

@@ -47,7 +50,17 @@ public EventCallback getEventCallback() {
4750
}
4851

4952
@Override
50-
public EventNotifier exec() {
53+
public ExecutorService getExecutorService() {
54+
return executorService;
55+
}
56+
57+
@Override
58+
public void stop() {
59+
executorService.shutdown();
60+
}
61+
62+
@Override
63+
public Void exec() {
5164
return super.exec();
5265
}
5366

src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,23 @@
66
import org.slf4j.LoggerFactory;
77

88
import javax.ws.rs.client.WebTarget;
9-
import javax.ws.rs.core.Response;
10-
import java.io.InputStream;
119

12-
public class EventsCmdExec extends AbstrDockerCmdExec<EventsCmd, EventNotifier> implements EventsCmd.Exec {
10+
public class EventsCmdExec extends AbstrDockerCmdExec<EventsCmd, Void> implements EventsCmd.Exec {
1311
private static final Logger LOGGER = LoggerFactory.getLogger(EventsCmdExec.class);
1412

1513
public EventsCmdExec(WebTarget baseResource) {
1614
super(baseResource);
1715
}
1816

1917
@Override
20-
protected EventNotifier execute(EventsCmd command) {
18+
protected Void execute(EventsCmd command) {
2119
WebTarget webResource = getBaseResource().path("/events")
2220
.queryParam("since", command.getSince())
2321
.queryParam("until", command.getUntil());
2422

2523
LOGGER.trace("GET: {}", webResource);
26-
InputStream inputStream = webResource.request().get(Response.class).readEntity(InputStream.class);
27-
return EventNotifier.create(command.getEventCallback(), inputStream);
24+
EventNotifier eventNotifier = EventNotifier.create(command.getEventCallback(), webResource);
25+
command.getExecutorService().submit(eventNotifier);
26+
return null;
2827
}
2928
}

src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import com.github.dockerjava.api.DockerException;
44
import com.github.dockerjava.api.command.CreateContainerResponse;
55
import com.github.dockerjava.api.command.EventCallback;
6+
import com.github.dockerjava.api.command.EventsCmd;
67
import com.github.dockerjava.api.model.Event;
7-
import com.github.dockerjava.api.model.EventNotifier;
88
import com.github.dockerjava.client.AbstractDockerClientTest;
99
import org.testng.ITestResult;
1010
import org.testng.annotations.AfterMethod;
@@ -15,7 +15,6 @@
1515

1616
import java.io.IOException;
1717
import java.lang.reflect.Method;
18-
import java.util.concurrent.Callable;
1918
import java.util.concurrent.CountDownLatch;
2019
import java.util.concurrent.TimeUnit;
2120

@@ -59,9 +58,12 @@ public void testEventStreamTimeBound() throws InterruptedException, IOException
5958
CountDownLatch countDownLatch = new CountDownLatch(expectedEvents);
6059
EventCallback eventCallback = new EventCallbackTest(countDownLatch);
6160

62-
EventNotifier eventNotifier = dockerClient.eventsCmd(eventCallback).withSince(startTime).withUntil(endTime).exec();
63-
boolean zeroCount = countDownLatch.await(30, TimeUnit.SECONDS);
64-
eventNotifier.close();
61+
EventsCmd eventsCmd = dockerClient.eventsCmd(eventCallback).withSince(startTime).withUntil(endTime);
62+
eventsCmd.exec();
63+
64+
boolean zeroCount = countDownLatch.await(5, TimeUnit.SECONDS);
65+
66+
eventsCmd.stop();
6567
assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]");
6668
}
6769

@@ -72,12 +74,14 @@ public void testEventStreaming() throws InterruptedException, IOException {
7274

7375
CountDownLatch countDownLatch = new CountDownLatch(KNOWN_NUM_EVENTS);
7476
EventCallback eventCallback = new EventCallbackTest(countDownLatch);
75-
EventNotifier eventNotifier = dockerClient.eventsCmd(eventCallback).withSince(getEpochTime()).exec();
77+
78+
EventsCmd eventsCmd = dockerClient.eventsCmd(eventCallback).withSince(getEpochTime());
79+
eventsCmd.exec();
7680

7781
generateEvents();
7882

79-
boolean zeroCount = countDownLatch.await(30, TimeUnit.SECONDS);
80-
eventNotifier.close();
83+
boolean zeroCount = countDownLatch.await(5, TimeUnit.SECONDS);
84+
eventsCmd.stop();
8185
assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]");
8286
}
8387

0 commit comments

Comments
 (0)