Skip to content

Commit 4105b78

Browse files
committed
Merge pull request #2 from marcuslinke/events
Modified Event Stream API
2 parents 5ad41ae + f79e511 commit 4105b78

4 files changed

Lines changed: 80 additions & 34 deletions

File tree

src/main/java/com/github/dockerjava/api/command/EventsCmd.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22

33
import java.util.concurrent.ExecutorService;
44

5+
56
/**
67
* Get events
78
*
89
* @param since - Show all events created since timestamp
910
* @param until - Stream events until this timestamp
1011
*/
11-
public interface EventsCmd extends DockerCmd<Void> {
12+
public interface EventsCmd extends DockerCmd<ExecutorService> {
1213
public EventsCmd withSince(String since);
1314

1415
public EventsCmd withUntil(String until);
@@ -18,11 +19,9 @@ public interface EventsCmd extends DockerCmd<Void> {
1819
public String getUntil();
1920

2021
public EventCallback getEventCallback();
22+
23+
public EventsCmd withEventCallback(EventCallback eventCallback);
2124

22-
public ExecutorService getExecutorService();
23-
24-
public void stop();
25-
26-
public static interface Exec extends DockerCmdExec<EventsCmd, Void> {
25+
public static interface Exec extends DockerCmdExec<EventsCmd, ExecutorService> {
2726
}
2827
}

src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,22 @@
11
package com.github.dockerjava.core.command;
22

3+
import java.util.concurrent.ExecutorService;
4+
35
import com.github.dockerjava.api.command.EventCallback;
46
import com.github.dockerjava.api.command.EventsCmd;
57

6-
import java.util.concurrent.ExecutorService;
7-
import java.util.concurrent.Executors;
8-
98
/**
109
* Stream docker events
1110
*/
12-
public class EventsCmdImpl extends AbstrDockerCmd<EventsCmd, Void> implements EventsCmd {
11+
public class EventsCmdImpl extends AbstrDockerCmd<EventsCmd, ExecutorService> implements EventsCmd {
1312

14-
private final EventCallback eventCallback;
15-
16-
private ExecutorService executorService = Executors.newSingleThreadExecutor();
1713
private String since;
1814
private String until;
15+
private EventCallback eventCallback;
1916

2017
public EventsCmdImpl(EventsCmd.Exec exec, EventCallback eventCallback) {
2118
super(exec);
22-
this.eventCallback = eventCallback;
19+
withEventCallback(eventCallback);
2320
}
2421

2522
@Override
@@ -33,6 +30,12 @@ public EventsCmd withUntil(String until) {
3330
this.until = until;
3431
return this;
3532
}
33+
34+
@Override
35+
public EventsCmd withEventCallback(EventCallback eventCallback) {
36+
this.eventCallback = eventCallback;
37+
return this;
38+
}
3639

3740
@Override
3841
public String getSince() {
@@ -50,17 +53,7 @@ public EventCallback getEventCallback() {
5053
}
5154

5255
@Override
53-
public ExecutorService getExecutorService() {
54-
return executorService;
55-
}
56-
57-
@Override
58-
public void stop() {
59-
executorService.shutdown();
60-
}
61-
62-
@Override
63-
public Void exec() {
56+
public ExecutorService exec() {
6457
return super.exec();
6558
}
6659

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,80 @@
11
package com.github.dockerjava.jaxrs;
22

3+
import java.io.InputStream;
4+
import java.util.concurrent.Callable;
5+
import java.util.concurrent.ExecutorService;
6+
import java.util.concurrent.Executors;
7+
8+
import com.fasterxml.jackson.core.JsonFactory;
9+
import com.fasterxml.jackson.core.JsonParser;
10+
import com.fasterxml.jackson.core.JsonToken;
11+
import com.fasterxml.jackson.databind.ObjectMapper;
12+
import com.github.dockerjava.api.command.EventCallback;
313
import com.github.dockerjava.api.command.EventsCmd;
14+
import com.github.dockerjava.api.model.Event;
415
import com.github.dockerjava.api.model.EventNotifier;
16+
import com.google.common.base.Preconditions;
17+
518
import org.slf4j.Logger;
619
import org.slf4j.LoggerFactory;
720

821
import javax.ws.rs.client.WebTarget;
22+
import javax.ws.rs.core.Response;
923

10-
public class EventsCmdExec extends AbstrDockerCmdExec<EventsCmd, Void> implements EventsCmd.Exec {
24+
public class EventsCmdExec extends AbstrDockerCmdExec<EventsCmd, ExecutorService> implements EventsCmd.Exec {
1125
private static final Logger LOGGER = LoggerFactory.getLogger(EventsCmdExec.class);
12-
26+
1327
public EventsCmdExec(WebTarget baseResource) {
1428
super(baseResource);
1529
}
1630

1731
@Override
18-
protected Void execute(EventsCmd command) {
32+
protected ExecutorService execute(EventsCmd command) {
33+
ExecutorService executorService = Executors.newSingleThreadExecutor();
34+
1935
WebTarget webResource = getBaseResource().path("/events")
2036
.queryParam("since", command.getSince())
2137
.queryParam("until", command.getUntil());
2238

2339
LOGGER.trace("GET: {}", webResource);
2440
EventNotifier eventNotifier = EventNotifier.create(command.getEventCallback(), webResource);
25-
command.getExecutorService().submit(eventNotifier);
26-
return null;
41+
executorService.submit(eventNotifier);
42+
return executorService;
43+
}
44+
45+
private static class EventNotifier implements Callable<Void> {
46+
private static final JsonFactory JSON_FACTORY = new JsonFactory();
47+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
48+
49+
private final EventCallback eventCallback;
50+
private final WebTarget webTarget;
51+
52+
private EventNotifier(EventCallback eventCallback, WebTarget webTarget) {
53+
this.eventCallback = eventCallback;
54+
this.webTarget = webTarget;
55+
}
56+
57+
public static EventNotifier create(EventCallback eventCallback, WebTarget webTarget) {
58+
Preconditions.checkNotNull(eventCallback, "An EventCallback must be provided");
59+
Preconditions.checkNotNull(webTarget, "An WebTarget must be provided");
60+
return new EventNotifier(eventCallback, webTarget);
61+
}
62+
63+
@Override
64+
public Void call() throws Exception {
65+
Response response = webTarget.request().get(Response.class);
66+
InputStream inputStream = response.readEntity(InputStream.class);
67+
try {
68+
JsonParser jp = JSON_FACTORY.createParser(inputStream);
69+
while (jp.nextToken() != JsonToken.END_OBJECT && !jp.isClosed()) {
70+
eventCallback.onEvent(OBJECT_MAPPER.readValue(jp, Event.class));
71+
}
72+
} finally {
73+
if (response != null) {
74+
response.close();
75+
}
76+
}
77+
return null;
78+
}
2779
}
2880
}

src/test/java/com/github/dockerjava/core/command/EventsCmdImplTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.github.dockerjava.api.command.EventsCmd;
77
import com.github.dockerjava.api.model.Event;
88
import com.github.dockerjava.client.AbstractDockerClientTest;
9+
910
import org.testng.ITestResult;
1011
import org.testng.annotations.AfterMethod;
1112
import org.testng.annotations.AfterTest;
@@ -16,6 +17,7 @@
1617
import java.io.IOException;
1718
import java.lang.reflect.Method;
1819
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.ExecutorService;
1921
import java.util.concurrent.TimeUnit;
2022

2123
public class EventsCmdImplTest extends AbstractDockerClientTest {
@@ -59,11 +61,11 @@ public void testEventStreamTimeBound() throws InterruptedException, IOException
5961
EventCallback eventCallback = new EventCallbackTest(countDownLatch);
6062

6163
EventsCmd eventsCmd = dockerClient.eventsCmd(eventCallback).withSince(startTime).withUntil(endTime);
62-
eventsCmd.exec();
64+
ExecutorService executorService = eventsCmd.exec();
6365

6466
boolean zeroCount = countDownLatch.await(5, TimeUnit.SECONDS);
6567

66-
eventsCmd.stop();
68+
executorService.shutdown();
6769
assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]");
6870
}
6971

@@ -76,12 +78,12 @@ public void testEventStreaming() throws InterruptedException, IOException {
7678
EventCallback eventCallback = new EventCallbackTest(countDownLatch);
7779

7880
EventsCmd eventsCmd = dockerClient.eventsCmd(eventCallback).withSince(getEpochTime());
79-
eventsCmd.exec();
81+
ExecutorService executorService = eventsCmd.exec();
8082

8183
generateEvents();
8284

8385
boolean zeroCount = countDownLatch.await(5, TimeUnit.SECONDS);
84-
eventsCmd.stop();
86+
executorService.shutdown();
8587
assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]");
8688
}
8789

0 commit comments

Comments
 (0)