|
1 | 1 | package com.kpelykh.docker.client; |
2 | 2 |
|
3 | | -import com.sun.jersey.api.client.ClientHandler; |
4 | | -import com.sun.jersey.api.client.ClientHandlerException; |
5 | | -import com.sun.jersey.api.client.ClientRequest; |
6 | | -import com.sun.jersey.api.client.ClientResponse; |
| 3 | +import com.sun.jersey.api.client.*; |
| 4 | +import com.sun.jersey.api.client.config.ClientConfig; |
| 5 | +import com.sun.jersey.core.header.InBoundHeaders; |
| 6 | +import com.sun.jersey.core.util.ReaderWriter; |
7 | 7 | import jnr.unixsocket.UnixSocketAddress; |
8 | 8 | import jnr.unixsocket.UnixSocketChannel; |
| 9 | +import org.apache.http.Header; |
| 10 | +import org.apache.http.HttpEntity; |
| 11 | +import org.apache.http.HttpException; |
| 12 | +import org.apache.http.HttpResponse; |
| 13 | +import org.apache.http.client.methods.*; |
| 14 | +import org.apache.http.entity.AbstractHttpEntity; |
| 15 | +import org.apache.http.entity.BufferedHttpEntity; |
| 16 | +import org.apache.http.impl.DefaultHttpResponseFactory; |
| 17 | +import org.apache.http.impl.io.DefaultHttpResponseParser; |
| 18 | +import org.apache.http.impl.io.HttpRequestWriter; |
| 19 | +import org.apache.http.message.BasicLineFormatter; |
| 20 | +import org.apache.http.message.BasicLineParser; |
| 21 | +import org.apache.http.params.BasicHttpParams; |
| 22 | +import org.slf4j.Logger; |
| 23 | +import org.slf4j.LoggerFactory; |
9 | 24 |
|
10 | | -import java.io.File; |
11 | | -import java.io.IOException; |
12 | | -import java.io.InputStreamReader; |
13 | | -import java.io.PrintWriter; |
14 | | -import java.nio.CharBuffer; |
| 25 | +import javax.annotation.concurrent.NotThreadSafe; |
| 26 | +import java.io.*; |
| 27 | +import java.net.URI; |
15 | 28 | import java.nio.channels.Channels; |
| 29 | +import java.util.ArrayList; |
| 30 | +import java.util.List; |
16 | 31 |
|
17 | | -public class UnixSocketClientHandler implements ClientHandler { |
| 32 | +/** |
| 33 | + * TODO: Make thread-safe. |
| 34 | + */ |
| 35 | +@NotThreadSafe |
| 36 | +public class UnixSocketClientHandler extends RequestWriter implements ClientHandler { |
| 37 | + |
| 38 | + private static final Logger LOGGER = LoggerFactory.getLogger(UnixSocketClientHandler.class); |
| 39 | + |
| 40 | + public static final int BUFFERSIZE = 1024; |
| 41 | + public static final String DOCKER_SOCKET_PATH = "/var/run/docker.sock"; |
18 | 42 |
|
19 | 43 | @Override |
20 | 44 | public ClientResponse handle(ClientRequest cr) throws ClientHandlerException { |
21 | | - System.out.println("UnixSocketClientHandler.handle " + cr); |
| 45 | + LOGGER.info("handle " + cr); |
| 46 | + |
| 47 | + try { |
| 48 | + File path = new File(DOCKER_SOCKET_PATH); |
| 49 | + UnixSocketAddress address = new UnixSocketAddress(path); |
| 50 | + UnixSocketChannel channel = UnixSocketChannel.open(address); |
| 51 | + OutputStream unixSocketChannelOutputStream = Channels.newOutputStream(channel); |
22 | 52 |
|
23 | | - File path = new File("/var/run/docker.sock"); |
| 53 | + final HttpUriRequest request = getUriHttpRequest(cr); |
| 54 | + BasicHttpParams params = new BasicHttpParams(); |
24 | 55 |
|
25 | | - String data = "GET /images/json?all=0 HTTP/1.1\n\n"; |
| 56 | + UnixSocketSessionOutputBuffer outputBuffer = new UnixSocketSessionOutputBuffer(); |
| 57 | + outputBuffer.init(unixSocketChannelOutputStream, BUFFERSIZE, params); |
| 58 | + HttpRequestWriter writer = new HttpRequestWriter(outputBuffer, new BasicLineFormatter(), params); |
| 59 | + writer.write(request); |
| 60 | + outputBuffer.flush(); |
| 61 | + |
| 62 | + UnixSocketSessionInputBuffer inputBuffer = new UnixSocketSessionInputBuffer(); |
| 63 | + inputBuffer.init(Channels.newInputStream(channel), BUFFERSIZE, params); |
| 64 | + |
| 65 | + HttpResponse response = new DefaultHttpResponseParser(inputBuffer, new BasicLineParser(), new DefaultHttpResponseFactory(), params).parse(); |
| 66 | + |
| 67 | + ClientResponse clientResponse = new ClientResponse(response.getStatusLine().getStatusCode(), |
| 68 | + getInBoundHeaders(response), |
| 69 | + new HttpClientResponseInputStream(response), |
| 70 | + getMessageBodyWorkers()); |
| 71 | + if (!clientResponse.hasEntity()) { |
| 72 | + clientResponse.bufferEntity(); |
| 73 | + clientResponse.close(); |
| 74 | + } |
| 75 | + |
| 76 | + System.out.println(clientResponse.getType()); |
| 77 | + System.out.println(clientResponse.getClientResponseStatus()); |
| 78 | + System.out.println(clientResponse.getStatus()); |
| 79 | + System.out.println(clientResponse.getHeaders()); |
| 80 | + |
| 81 | + return clientResponse; |
26 | 82 |
|
27 | | - UnixSocketAddress address = new UnixSocketAddress(path); |
28 | | - UnixSocketChannel channel = null; |
29 | | - try { |
30 | | - channel = UnixSocketChannel.open(address); |
31 | 83 | } catch (IOException e) { |
32 | 84 | e.printStackTrace(); |
| 85 | + } catch (HttpException e) { |
| 86 | + e.printStackTrace(); |
33 | 87 | } |
34 | | - System.out.println("connected to " + channel.getRemoteSocketAddress()); |
35 | | - PrintWriter w = new PrintWriter(Channels.newOutputStream(channel)); |
36 | | - w.print(data); |
37 | | - w.flush(); |
| 88 | + return null; |
| 89 | + } |
| 90 | + |
| 91 | + private HttpUriRequest getUriHttpRequest(final ClientRequest cr) { |
| 92 | + final String strMethod = cr.getMethod(); |
| 93 | + final URI uri = cr.getURI(); |
| 94 | + |
| 95 | + final HttpEntity entity = getHttpEntity(cr); |
| 96 | + final HttpUriRequest request; |
| 97 | + |
| 98 | + if (strMethod.equals("GET")) { |
| 99 | + request = new HttpGet(uri); |
| 100 | + } else if (strMethod.equals("POST")) { |
| 101 | + request = new HttpPost(uri); |
| 102 | + } else if (strMethod.equals("PUT")) { |
| 103 | + request = new HttpPut(uri); |
| 104 | + } else if (strMethod.equals("DELETE")) { |
| 105 | + request = new HttpDelete(uri); |
| 106 | + } else if (strMethod.equals("HEAD")) { |
| 107 | + request = new HttpHead(uri); |
| 108 | + } else if (strMethod.equals("OPTIONS")) { |
| 109 | + request = new HttpOptions(uri); |
| 110 | + } else { |
| 111 | + request = new HttpEntityEnclosingRequestBase() { |
| 112 | + @Override |
| 113 | + public String getMethod() { |
| 114 | + return strMethod; |
| 115 | + } |
| 116 | + |
| 117 | + @Override |
| 118 | + public URI getURI() { |
| 119 | + return uri; |
| 120 | + } |
| 121 | + }; |
| 122 | + } |
| 123 | + |
| 124 | + if (entity != null && request instanceof HttpEntityEnclosingRequestBase) { |
| 125 | + ((HttpEntityEnclosingRequestBase) request).setEntity(entity); |
| 126 | + } else if (entity != null) { |
| 127 | + throw new ClientHandlerException("Adding entity to http method " + cr.getMethod() + " is not supported."); |
| 128 | + } |
| 129 | + |
| 130 | + return request; |
| 131 | + } |
38 | 132 |
|
39 | | - InputStreamReader r = new InputStreamReader(Channels.newInputStream(channel)); |
| 133 | + private HttpEntity getHttpEntity(final ClientRequest cr) { |
| 134 | + final Object entity = cr.getEntity(); |
| 135 | + |
| 136 | + if (entity == null) |
| 137 | + return null; |
| 138 | + |
| 139 | + final RequestEntityWriter requestEntityWriter = getRequestEntityWriter(cr); |
40 | 140 |
|
41 | | - CharBuffer result = CharBuffer.allocate(1024); |
42 | 141 | try { |
43 | | - r.read(result); |
44 | | - } catch (IOException e) { |
45 | | - e.printStackTrace(); |
| 142 | + HttpEntity httpEntity = new AbstractHttpEntity() { |
| 143 | + @Override |
| 144 | + public boolean isRepeatable() { |
| 145 | + return false; |
| 146 | + } |
| 147 | + |
| 148 | + @Override |
| 149 | + public long getContentLength() { |
| 150 | + return requestEntityWriter.getSize(); |
| 151 | + } |
| 152 | + |
| 153 | + @Override |
| 154 | + public InputStream getContent() throws IOException, IllegalStateException { |
| 155 | + return null; |
| 156 | + } |
| 157 | + |
| 158 | + @Override |
| 159 | + public void writeTo(OutputStream outputStream) throws IOException { |
| 160 | + requestEntityWriter.writeRequestEntity(outputStream); |
| 161 | + } |
| 162 | + |
| 163 | + @Override |
| 164 | + public boolean isStreaming() { |
| 165 | + return false; |
| 166 | + } |
| 167 | + }; |
| 168 | + |
| 169 | + if (cr.getProperties().get(ClientConfig.PROPERTY_CHUNKED_ENCODING_SIZE) != null) { |
| 170 | + // TODO return InputStreamEntity |
| 171 | + return httpEntity; |
| 172 | + } else { |
| 173 | + return new BufferedHttpEntity(httpEntity); |
| 174 | + } |
| 175 | + } catch (Exception ex) { |
| 176 | + // TODO warning/error? |
46 | 177 | } |
47 | | - result.flip(); |
48 | | - System.out.println("read from server: " + result.toString()); |
49 | 178 |
|
50 | 179 | return null; |
51 | 180 | } |
| 181 | + |
| 182 | + private InBoundHeaders getInBoundHeaders(final HttpResponse response) { |
| 183 | + final InBoundHeaders headers = new InBoundHeaders(); |
| 184 | + final Header[] respHeaders = response.getAllHeaders(); |
| 185 | + for (Header header : respHeaders) { |
| 186 | + List<String> list = headers.get(header.getName()); |
| 187 | + if (list == null) { |
| 188 | + list = new ArrayList<String>(); |
| 189 | + } |
| 190 | + list.add(header.getValue()); |
| 191 | + headers.put(header.getName(), list); |
| 192 | + } |
| 193 | + return headers; |
| 194 | + } |
| 195 | + |
| 196 | + private static final class HttpClientResponseInputStream extends FilterInputStream { |
| 197 | + |
| 198 | + HttpClientResponseInputStream(final HttpResponse response) throws IOException { |
| 199 | + super(getInputStream(response)); |
| 200 | + } |
| 201 | + |
| 202 | + @Override |
| 203 | + public void close() |
| 204 | + throws IOException { |
| 205 | + super.close(); |
| 206 | + } |
| 207 | + } |
| 208 | + |
| 209 | + private static InputStream getInputStream(final HttpResponse response) throws IOException { |
| 210 | + |
| 211 | + if (response.getEntity() == null) { |
| 212 | + return new ByteArrayInputStream(new byte[0]); |
| 213 | + } else { |
| 214 | + final InputStream i = response.getEntity().getContent(); |
| 215 | + if (i.markSupported()) |
| 216 | + return i; |
| 217 | + return new BufferedInputStream(i, ReaderWriter.BUFFER_SIZE); |
| 218 | + } |
| 219 | + } |
52 | 220 | } |
0 commit comments