Skip to content

Commit 5a4794f

Browse files
authored
core: add debug information in MessageDeframer. (grpc#2622)
So that MessageDeframer would include the class name of the enclosing stream when emitting errors. This should give us more information about grpc#2157
1 parent fa8f115 commit 5a4794f

File tree

4 files changed

+38
-28
lines changed

4 files changed

+38
-28
lines changed

core/src/main/java/io/grpc/internal/AbstractStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void endOfStream() {
135135
StatsTraceContext statsTraceCtx) {
136136
framer = new MessageFramer(new FramerSink(), bufferAllocator, statsTraceCtx);
137137
deframer = new MessageDeframer(new DeframerListener(), Codec.Identity.NONE, maxMessageSize,
138-
statsTraceCtx);
138+
statsTraceCtx, getClass().getName());
139139
}
140140

141141
protected final void setMaxInboundMessageSizeProtected(int maxSize) {

core/src/main/java/io/grpc/internal/AbstractStream2.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ public abstract static class TransportState implements MessageDeframer.Listener
147147
private boolean deallocated;
148148

149149
protected TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx) {
150-
deframer = new MessageDeframer(this, Codec.Identity.NONE, maxMessageSize, statsTraceCtx);
150+
deframer = new MessageDeframer(
151+
this, Codec.Identity.NONE, maxMessageSize, statsTraceCtx, getClass().getName());
151152
}
152153

153154
@VisibleForTesting

core/src/main/java/io/grpc/internal/MessageDeframer.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ private enum State {
9999
private final Listener listener;
100100
private int maxInboundMessageSize;
101101
private final StatsTraceContext statsTraceCtx;
102+
private final String debugString;
102103
private Decompressor decompressor;
103104
private State state = State.HEADER;
104105
private int requiredLength = HEADER_LENGTH;
@@ -117,13 +118,15 @@ private enum State {
117118
* @param decompressor the compression used if a compressed frame is encountered, with
118119
* {@code NONE} meaning unsupported
119120
* @param maxMessageSize the maximum allowed size for received messages.
121+
* @param debugString a string that will appear on errors statuses
120122
*/
121123
public MessageDeframer(Listener listener, Decompressor decompressor, int maxMessageSize,
122-
StatsTraceContext statsTraceCtx) {
124+
StatsTraceContext statsTraceCtx, String debugString) {
123125
this.listener = Preconditions.checkNotNull(listener, "sink");
124126
this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
125127
this.maxInboundMessageSize = maxMessageSize;
126128
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
129+
this.debugString = debugString;
127130
}
128131

129132
void setMaxInboundMessageSize(int messageSize) {
@@ -278,8 +281,8 @@ private void deliver() {
278281
} else {
279282
// We've received the entire stream and have data available but we don't have
280283
// enough to read the next frame ... this is bad.
281-
throw Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame")
282-
.asRuntimeException();
284+
throw Status.INTERNAL.withDescription(
285+
debugString + ": Encountered end-of-stream mid-frame").asRuntimeException();
283286
}
284287
}
285288

@@ -335,16 +338,17 @@ private boolean readRequiredBytes() {
335338
private void processHeader() {
336339
int type = nextFrame.readUnsignedByte();
337340
if ((type & RESERVED_MASK) != 0) {
338-
throw Status.INTERNAL.withDescription("Frame header malformed: reserved bits not zero")
341+
throw Status.INTERNAL.withDescription(
342+
debugString + ": Frame header malformed: reserved bits not zero")
339343
.asRuntimeException();
340344
}
341345
compressedFlag = (type & COMPRESSED_FLAG_MASK) != 0;
342346

343347
// Update the required length to include the length of the frame.
344348
requiredLength = nextFrame.readInt();
345349
if (requiredLength < 0 || requiredLength > maxInboundMessageSize) {
346-
throw Status.INTERNAL.withDescription(String.format("Frame size %d exceeds maximum: %d. ",
347-
requiredLength, maxInboundMessageSize)).asRuntimeException();
350+
throw Status.INTERNAL.withDescription(String.format("%s: Frame size %d exceeds maximum: %d. ",
351+
debugString, requiredLength, maxInboundMessageSize)).asRuntimeException();
348352
}
349353

350354
// Continue reading the frame body.
@@ -373,14 +377,16 @@ private InputStream getUncompressedBody() {
373377
private InputStream getCompressedBody() {
374378
if (decompressor == Codec.Identity.NONE) {
375379
throw Status.INTERNAL.withDescription(
376-
"Can't decode compressed frame as compression not configured.").asRuntimeException();
380+
debugString + ": Can't decode compressed frame as compression not configured.")
381+
.asRuntimeException();
377382
}
378383

379384
try {
380385
// Enforce the maxMessageSize limit on the returned stream.
381386
InputStream unlimitedStream =
382387
decompressor.decompress(ReadableBuffers.openStream(nextFrame, true));
383-
return new SizeEnforcingInputStream(unlimitedStream, maxInboundMessageSize, statsTraceCtx);
388+
return new SizeEnforcingInputStream(
389+
unlimitedStream, maxInboundMessageSize, statsTraceCtx, debugString);
384390
} catch (IOException e) {
385391
throw new RuntimeException(e);
386392
}
@@ -393,14 +399,17 @@ private InputStream getCompressedBody() {
393399
static final class SizeEnforcingInputStream extends FilterInputStream {
394400
private final int maxMessageSize;
395401
private final StatsTraceContext statsTraceCtx;
402+
private final String debugString;
396403
private long maxCount;
397404
private long count;
398405
private long mark = -1;
399406

400-
SizeEnforcingInputStream(InputStream in, int maxMessageSize, StatsTraceContext statsTraceCtx) {
407+
SizeEnforcingInputStream(InputStream in, int maxMessageSize, StatsTraceContext statsTraceCtx,
408+
String debugString) {
401409
super(in);
402410
this.maxMessageSize = maxMessageSize;
403411
this.statsTraceCtx = statsTraceCtx;
412+
this.debugString = debugString;
404413
}
405414

406415
@Override
@@ -464,8 +473,8 @@ private void reportCount() {
464473
private void verifySize() {
465474
if (count > maxMessageSize) {
466475
throw Status.INTERNAL.withDescription(String.format(
467-
"Compressed frame exceeds maximum frame size: %d. Bytes read: %d. ",
468-
maxMessageSize, count)).asRuntimeException();
476+
"%s: Compressed frame exceeds maximum frame size: %d. Bytes read: %d. ",
477+
debugString, maxMessageSize, count)).asRuntimeException();
469478
}
470479
}
471480
}

core/src/test/java/io/grpc/internal/MessageDeframerTest.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public class MessageDeframerTest {
8888
"service/method", statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
8989

9090
private MessageDeframer deframer = new MessageDeframer(listener, Codec.Identity.NONE,
91-
DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx);
91+
DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, "test");
9292

9393
private ArgumentCaptor<InputStream> messages = ArgumentCaptor.forClass(InputStream.class);
9494

@@ -208,7 +208,7 @@ public void endOfStreamCallbackShouldWaitForMessageDelivery() {
208208
@Test
209209
public void compressed() {
210210
deframer = new MessageDeframer(listener, new Codec.Gzip(), DEFAULT_MAX_MESSAGE_SIZE,
211-
statsTraceCtx);
211+
statsTraceCtx, "test");
212212
deframer.request(1);
213213

214214
byte[] payload = compress(new byte[1000]);
@@ -246,7 +246,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
246246
public void sizeEnforcingInputStream_readByteBelowLimit() throws IOException {
247247
ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(Charsets.UTF_8));
248248
SizeEnforcingInputStream stream =
249-
new MessageDeframer.SizeEnforcingInputStream(in, 4, statsTraceCtx);
249+
new MessageDeframer.SizeEnforcingInputStream(in, 4, statsTraceCtx, "test");
250250

251251
while (stream.read() != -1) {}
252252

@@ -259,7 +259,7 @@ public void sizeEnforcingInputStream_readByteBelowLimit() throws IOException {
259259
public void sizeEnforcingInputStream_readByteAtLimit() throws IOException {
260260
ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(Charsets.UTF_8));
261261
SizeEnforcingInputStream stream =
262-
new MessageDeframer.SizeEnforcingInputStream(in, 3, statsTraceCtx);
262+
new MessageDeframer.SizeEnforcingInputStream(in, 3, statsTraceCtx, "test");
263263

264264
while (stream.read() != -1) {}
265265

@@ -272,10 +272,10 @@ public void sizeEnforcingInputStream_readByteAtLimit() throws IOException {
272272
public void sizeEnforcingInputStream_readByteAboveLimit() throws IOException {
273273
ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(Charsets.UTF_8));
274274
SizeEnforcingInputStream stream =
275-
new MessageDeframer.SizeEnforcingInputStream(in, 2, statsTraceCtx);
275+
new MessageDeframer.SizeEnforcingInputStream(in, 2, statsTraceCtx, "test");
276276

277277
thrown.expect(StatusRuntimeException.class);
278-
thrown.expectMessage("INTERNAL: Compressed frame exceeds");
278+
thrown.expectMessage("INTERNAL: test: Compressed frame exceeds");
279279

280280
while (stream.read() != -1) {}
281281

@@ -287,7 +287,7 @@ public void sizeEnforcingInputStream_readByteAboveLimit() throws IOException {
287287
public void sizeEnforcingInputStream_readBelowLimit() throws IOException {
288288
ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(Charsets.UTF_8));
289289
SizeEnforcingInputStream stream =
290-
new MessageDeframer.SizeEnforcingInputStream(in, 4, statsTraceCtx);
290+
new MessageDeframer.SizeEnforcingInputStream(in, 4, statsTraceCtx, "test");
291291
byte[] buf = new byte[10];
292292

293293
int read = stream.read(buf, 0, buf.length);
@@ -302,7 +302,7 @@ public void sizeEnforcingInputStream_readBelowLimit() throws IOException {
302302
public void sizeEnforcingInputStream_readAtLimit() throws IOException {
303303
ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(Charsets.UTF_8));
304304
SizeEnforcingInputStream stream =
305-
new MessageDeframer.SizeEnforcingInputStream(in, 3, statsTraceCtx);
305+
new MessageDeframer.SizeEnforcingInputStream(in, 3, statsTraceCtx, "test");
306306
byte[] buf = new byte[10];
307307

308308
int read = stream.read(buf, 0, buf.length);
@@ -317,11 +317,11 @@ public void sizeEnforcingInputStream_readAtLimit() throws IOException {
317317
public void sizeEnforcingInputStream_readAboveLimit() throws IOException {
318318
ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(Charsets.UTF_8));
319319
SizeEnforcingInputStream stream =
320-
new MessageDeframer.SizeEnforcingInputStream(in, 2, statsTraceCtx);
320+
new MessageDeframer.SizeEnforcingInputStream(in, 2, statsTraceCtx, "test");
321321
byte[] buf = new byte[10];
322322

323323
thrown.expect(StatusRuntimeException.class);
324-
thrown.expectMessage("INTERNAL: Compressed frame exceeds");
324+
thrown.expectMessage("INTERNAL: test: Compressed frame exceeds");
325325

326326
stream.read(buf, 0, buf.length);
327327

@@ -333,7 +333,7 @@ public void sizeEnforcingInputStream_readAboveLimit() throws IOException {
333333
public void sizeEnforcingInputStream_skipBelowLimit() throws IOException {
334334
ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(Charsets.UTF_8));
335335
SizeEnforcingInputStream stream =
336-
new MessageDeframer.SizeEnforcingInputStream(in, 4, statsTraceCtx);
336+
new MessageDeframer.SizeEnforcingInputStream(in, 4, statsTraceCtx, "test");
337337

338338
long skipped = stream.skip(4);
339339

@@ -348,7 +348,7 @@ public void sizeEnforcingInputStream_skipBelowLimit() throws IOException {
348348
public void sizeEnforcingInputStream_skipAtLimit() throws IOException {
349349
ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(Charsets.UTF_8));
350350
SizeEnforcingInputStream stream =
351-
new MessageDeframer.SizeEnforcingInputStream(in, 3, statsTraceCtx);
351+
new MessageDeframer.SizeEnforcingInputStream(in, 3, statsTraceCtx, "test");
352352

353353
long skipped = stream.skip(4);
354354

@@ -362,10 +362,10 @@ public void sizeEnforcingInputStream_skipAtLimit() throws IOException {
362362
public void sizeEnforcingInputStream_skipAboveLimit() throws IOException {
363363
ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(Charsets.UTF_8));
364364
SizeEnforcingInputStream stream =
365-
new MessageDeframer.SizeEnforcingInputStream(in, 2, statsTraceCtx);
365+
new MessageDeframer.SizeEnforcingInputStream(in, 2, statsTraceCtx, "test");
366366

367367
thrown.expect(StatusRuntimeException.class);
368-
thrown.expectMessage("INTERNAL: Compressed frame exceeds");
368+
thrown.expectMessage("INTERNAL: test: Compressed frame exceeds");
369369

370370
stream.skip(4);
371371

@@ -377,7 +377,7 @@ public void sizeEnforcingInputStream_skipAboveLimit() throws IOException {
377377
public void sizeEnforcingInputStream_markReset() throws IOException {
378378
ByteArrayInputStream in = new ByteArrayInputStream("foo".getBytes(Charsets.UTF_8));
379379
SizeEnforcingInputStream stream =
380-
new MessageDeframer.SizeEnforcingInputStream(in, 3, statsTraceCtx);
380+
new MessageDeframer.SizeEnforcingInputStream(in, 3, statsTraceCtx, "test");
381381
// stream currently looks like: |foo
382382
stream.skip(1); // f|oo
383383
stream.mark(10); // any large number will work.

0 commit comments

Comments
 (0)