Skip to content

Commit c4a3429

Browse files
committed
Merge pull request #363 from msgpack/fix-number-buffer
Fix MessageUnapcker.prepareNumberBuffer
2 parents 57ea2a0 + 919c71e commit c4a3429

2 files changed

Lines changed: 85 additions & 49 deletions

File tree

msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -208,36 +208,44 @@ private MessageBuffer prepareNumberBuffer(int readLength)
208208
position += readLength; // here assumes following buffer.getXxx never throws exception
209209
return buffer; // Return the default buffer
210210
}
211-
else if (remaining == 0) {
212-
buffer = getNextBuffer();
213-
position = readLength;
214-
nextReadPosition = 0;
215-
return buffer;
216-
}
217211
else {
218-
// When the default buffer doesn't contain the whole length
219-
220-
// TODO This doesn't work if MessageBuffer is allocated by newDirectBuffer.
221-
// Add copy method to MessageBuffer to solve this issue.
222-
223-
// Copy the data fragment from the current buffer
224-
225-
numberBuffer.putBytes(0,
226-
buffer.array(), buffer.arrayOffset() + position,
227-
remaining);
228-
229-
// TODO loop this method until castBuffer is filled
230-
MessageBuffer next = getNextBuffer();
212+
// When the default buffer doesn't contain the whole length,
213+
// fill the temporary buffer from the current data fragment and
214+
// next fragment(s).
215+
216+
// TODO buffer.array() doesn't work if MessageBuffer is allocated by
217+
// newDirectBuffer. dd copy method to MessageBuffer to solve this issue.
218+
219+
int off = 0;
220+
if (remaining > 0) {
221+
numberBuffer.putBytes(0,
222+
buffer.array(), buffer.arrayOffset() + position,
223+
remaining);
224+
readLength -= remaining;
225+
off += remaining;
226+
}
231227

232-
numberBuffer.putBytes(remaining,
233-
next.array(), next.arrayOffset(),
234-
readLength - remaining);
228+
while (true) {
229+
nextBuffer();
230+
int nextSize = buffer.size();
231+
if (nextSize >= readLength) {
232+
numberBuffer.putBytes(off,
233+
buffer.array(), buffer.arrayOffset(),
234+
readLength);
235+
position = readLength;
236+
break;
237+
}
238+
else {
239+
numberBuffer.putBytes(off,
240+
buffer.array(), buffer.arrayOffset(),
241+
nextSize);
242+
readLength -= nextSize;
243+
off += nextSize;
244+
}
245+
}
235246

236-
buffer = next;
237-
position = readLength - remaining;
238247
nextReadPosition = 0;
239-
240-
return numberBuffer; // Return the numberBuffer
248+
return numberBuffer;
241249
}
242250
}
243251

msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,33 @@
1616
package org.msgpack.core
1717

1818
import java.io._
19-
import java.nio.ByteBuffer
2019

2120
import org.msgpack.core.buffer._
2221
import org.msgpack.value.ValueType
23-
import xerial.core.io.IOUtil
22+
import xerial.core.io.IOUtil._
2423

2524
import scala.util.Random
2625

27-
/**
28-
* Created on 2014/05/07.
29-
*/
26+
object MessageUnpackerTest {
27+
class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput {
28+
var cursor = 0
29+
override def next(): MessageBuffer = {
30+
if (cursor < array.length) {
31+
val a = array(cursor)
32+
cursor += 1
33+
MessageBuffer.wrap(a)
34+
}
35+
else {
36+
null
37+
}
38+
}
39+
40+
override def close(): Unit = {}
41+
}
42+
}
43+
44+
import MessageUnpackerTest._
45+
3046
class MessageUnpackerTest extends MessagePackSpec {
3147

3248
def testData: Array[Byte] = {
@@ -246,21 +262,6 @@ class MessageUnpackerTest extends MessagePackSpec {
246262

247263
}
248264

249-
class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput {
250-
var cursor = 0
251-
override def next(): MessageBuffer = {
252-
if (cursor < array.length) {
253-
val a = array(cursor)
254-
cursor += 1
255-
MessageBuffer.wrap(a)
256-
}
257-
else {
258-
null
259-
}
260-
}
261-
262-
override def close(): Unit = {}
263-
}
264265

265266
"read data at the buffer boundary" taggedAs ("boundary") in {
266267

@@ -587,7 +588,7 @@ class MessageUnpackerTest extends MessagePackSpec {
587588
val N = 1000
588589
val t = time("unpacker", repeat = 10) {
589590
block("no-buffer-reset") {
590-
IOUtil.withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker =>
591+
withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker =>
591592
for (i <- 0 until N) {
592593
val buf = new ArrayBufferInput(arr)
593594
unpacker.reset(buf)
@@ -598,7 +599,7 @@ class MessageUnpackerTest extends MessagePackSpec {
598599
}
599600

600601
block("reuse-array-input") {
601-
IOUtil.withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker =>
602+
withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker =>
602603
val buf = new ArrayBufferInput(arr)
603604
for (i <- 0 until N) {
604605
buf.reset(arr)
@@ -610,7 +611,7 @@ class MessageUnpackerTest extends MessagePackSpec {
610611
}
611612

612613
block("reuse-message-buffer") {
613-
IOUtil.withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker =>
614+
withResource(MessagePack.newDefaultUnpacker(arr)) { unpacker =>
614615
val buf = new ArrayBufferInput(arr)
615616
for (i <- 0 until N) {
616617
buf.reset(mb)
@@ -703,5 +704,32 @@ class MessageUnpackerTest extends MessagePackSpec {
703704
Seq(8185, 8186, 8187, 8188, 16377, 16378, 16379, 16380).foreach { n => check(s, n)}
704705
}
705706
}
707+
708+
def readTest(input:MessageBufferInput): Unit = {
709+
withResource(MessagePack.newDefaultUnpacker(input)) { unpacker =>
710+
while (unpacker.hasNext) {
711+
unpacker.unpackValue()
712+
}
713+
}
714+
}
715+
716+
"read value length at buffer boundary" taggedAs("number-boundary") in {
717+
val input = new SplitMessageBufferInput(Array(
718+
Array[Byte](MessagePack.Code.STR16),
719+
Array[Byte](0x00),
720+
Array[Byte](0x05), // STR16 length at the boundary
721+
"hello".getBytes(MessagePack.UTF8))
722+
)
723+
readTest(input)
724+
725+
val input2 = new SplitMessageBufferInput(Array(
726+
Array[Byte](MessagePack.Code.STR32),
727+
Array[Byte](0x00),
728+
Array[Byte](0x00, 0x00),
729+
Array[Byte](0x05), // STR32 length at the boundary
730+
"hello".getBytes(MessagePack.UTF8))
731+
)
732+
readTest(input2)
733+
}
706734
}
707735
}

0 commit comments

Comments
 (0)