|
9 | 9 | * Licensed under the Apache License, Version 2.0 (the "License"); |
10 | 10 | * you may not use this file except in compliance with the License. |
11 | 11 | * You may obtain a copy of the License at |
12 | | - * |
| 12 | + * |
13 | 13 | * http://www.apache.org/licenses/LICENSE-2.0 |
14 | | - * |
| 14 | + * |
15 | 15 | * Unless required by applicable law or agreed to in writing, software |
16 | 16 | * distributed under the License is distributed on an "AS IS" BASIS, |
17 | 17 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
33 | 33 | import org.osgl.$; |
34 | 34 | import org.osgl.exception.UnexpectedIOException; |
35 | 35 | import org.osgl.http.H; |
| 36 | +import org.osgl.logging.LogManager; |
| 37 | +import org.osgl.logging.Logger; |
36 | 38 | import org.osgl.storage.ISObject; |
37 | 39 | import org.osgl.util.E; |
38 | 40 | import org.osgl.util.IO; |
|
43 | 45 | import java.io.OutputStream; |
44 | 46 | import java.nio.ByteBuffer; |
45 | 47 | import java.nio.channels.FileChannel; |
| 48 | +import java.nio.charset.StandardCharsets; |
46 | 49 | import java.util.Locale; |
| 50 | +import java.util.concurrent.locks.Condition; |
| 51 | +import java.util.concurrent.locks.ReentrantLock; |
47 | 52 |
|
48 | 53 | public class UndertowResponse extends ActResponse<UndertowResponse> { |
49 | 54 |
|
| 55 | + protected static Logger LOGGER = LogManager.get(UndertowResponse.class); |
| 56 | + |
| 57 | + private static class Buffer { |
| 58 | + boolean isSending; |
| 59 | + ByteBuffer buffer; |
| 60 | + IoCallback callback; |
| 61 | + ReentrantLock lock; |
| 62 | + Condition finalPartToGo; |
| 63 | + |
| 64 | + Buffer(IoCallback callback, ReentrantLock lock) { |
| 65 | + this.callback = $.notNull(callback); |
| 66 | + this.isSending = true; |
| 67 | + this.lock = lock; |
| 68 | + } |
| 69 | + |
| 70 | + void sendOrBuf(String content, Sender sender) { |
| 71 | + sendOrBuf(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)), sender); |
| 72 | + } |
| 73 | + |
| 74 | + void sendOrBuf(ByteBuffer content, Sender sender) { |
| 75 | + lock.lock(); |
| 76 | + try { |
| 77 | + if (null == buffer) { |
| 78 | + buffer = content; |
| 79 | + } else { |
| 80 | + ByteBuffer merged = ByteBuffer.allocate(buffer.limit() + content.limit()); |
| 81 | + merged.put(buffer).put(content).flip(); |
| 82 | + buffer = merged; |
| 83 | + } |
| 84 | + if (!isSending) { |
| 85 | + isSending = true; |
| 86 | + ByteBuffer buffer = this.buffer; |
| 87 | + this.buffer = null; |
| 88 | + sender.send(buffer, callback); |
| 89 | + } |
| 90 | + } finally { |
| 91 | + lock.unlock(); |
| 92 | + } |
| 93 | + } |
| 94 | + |
| 95 | + void sendThroughFinalPart(Sender sender) { |
| 96 | + if (null == this.buffer) { |
| 97 | + return; |
| 98 | + } |
| 99 | + lock.lock(); |
| 100 | + try { |
| 101 | + if (!isSending) { |
| 102 | + isSending = true; |
| 103 | + sender.send(this.buffer); |
| 104 | + } else { |
| 105 | + finalPartToGo = lock.newCondition(); |
| 106 | + while (isSending) { |
| 107 | + try { |
| 108 | + finalPartToGo.await(); |
| 109 | + } catch (InterruptedException e) { |
| 110 | + Thread.currentThread().interrupt(); |
| 111 | + throw E.unexpected(e); |
| 112 | + } |
| 113 | + } |
| 114 | + ByteBuffer buffer = this.buffer; |
| 115 | + this.buffer = null; |
| 116 | + sender.send(buffer, callback); |
| 117 | + } |
| 118 | + } finally { |
| 119 | + lock.unlock(); |
| 120 | + } |
| 121 | + } |
| 122 | + |
| 123 | + void partSent() { |
| 124 | + isSending = false; |
| 125 | + if (null != finalPartToGo) { |
| 126 | + finalPartToGo.signal(); |
| 127 | + } |
| 128 | + } |
| 129 | + |
| 130 | + private void clear() { |
| 131 | + isSending = false; |
| 132 | + buffer = null; |
| 133 | + } |
| 134 | + } |
| 135 | + |
50 | 136 | private static final HttpString _SERVER = new HttpString(H.Header.Names.SERVER); |
51 | 137 | private static final HttpStringCache HEADER_NAMES = HttpStringCache.HEADER; |
52 | 138 |
|
53 | 139 | private HttpServerExchange hse; |
54 | 140 |
|
55 | 141 | private boolean endAsync; |
56 | 142 | private Sender sender; |
| 143 | + private ReentrantLock lock; |
| 144 | + private IoCallback ioCallback = new DefaultIoCallback() { |
| 145 | + @Override |
| 146 | + public void onComplete(HttpServerExchange exchange, Sender sender) { |
| 147 | + if (null != lock) { |
| 148 | + lock.lock(); |
| 149 | + buffer.partSent(); |
| 150 | + lock.unlock(); |
| 151 | + } |
| 152 | + } |
| 153 | + |
| 154 | + @Override |
| 155 | + public void onException(HttpServerExchange exchange, Sender sender, IOException exception) { |
| 156 | + if (null != buffer) { |
| 157 | + buffer.clear(); |
| 158 | + buffer = null; |
| 159 | + } |
| 160 | + super.onException(exchange, sender, exception); |
| 161 | + } |
| 162 | + }; |
| 163 | + private Buffer buffer; |
57 | 164 |
|
58 | 165 | public UndertowResponse(HttpServerExchange exchange, AppConfig config) { |
59 | 166 | super(config); |
@@ -106,17 +213,21 @@ public UndertowResponse writeContent(String s) { |
106 | 213 | } |
107 | 214 |
|
108 | 215 | public void writeContentPart(String s) { |
109 | | - try { |
110 | | - sender().send(s, WRITE_PART_CALLBACK); |
111 | | - } catch (RuntimeException e) { |
112 | | - endAsync = false; |
113 | | - throw e; |
114 | | - } |
| 216 | + ByteBuffer buffer = ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)); |
| 217 | + writeContentPart(buffer); |
115 | 218 | } |
116 | 219 |
|
117 | 220 | public void writeContentPart(ByteBuffer buffer) { |
118 | 221 | try { |
119 | | - sender().send(buffer, WRITE_PART_CALLBACK); |
| 222 | + if (null != buffer) { |
| 223 | + sender().send(buffer, ioCallback); |
| 224 | + } else { |
| 225 | + this.buffer.sendOrBuf(buffer, sender()); |
| 226 | + } |
| 227 | + } catch (IllegalStateException e) { |
| 228 | + lock = new ReentrantLock(); |
| 229 | + this.buffer = new Buffer(ioCallback, lock); |
| 230 | + this.buffer.sendOrBuf(buffer, sender()); |
120 | 231 | } catch (RuntimeException e) { |
121 | 232 | endAsync = false; |
122 | 233 | throw e; |
@@ -177,7 +288,11 @@ public void commit() { |
177 | 288 | if (!endAsync) { |
178 | 289 | hse.endExchange(); |
179 | 290 | } else { |
180 | | - sender.close(IoCallback.END_EXCHANGE); |
| 291 | + if (null != buffer) { |
| 292 | + buffer.sendThroughFinalPart(sender()); |
| 293 | + } else { |
| 294 | + sender().close(IoCallback.END_EXCHANGE); |
| 295 | + } |
181 | 296 | } |
182 | 297 | markClosed(); |
183 | 298 | } |
@@ -246,10 +361,4 @@ private boolean responseStarted() { |
246 | 361 | return hse.isResponseStarted(); |
247 | 362 | } |
248 | 363 |
|
249 | | - private static final IoCallback WRITE_PART_CALLBACK = new DefaultIoCallback() { |
250 | | - @Override |
251 | | - public void onComplete(HttpServerExchange exchange, Sender sender) { |
252 | | - // do not close the exchange |
253 | | - } |
254 | | - }; |
255 | 364 | } |
0 commit comments