Skip to content

Commit bb7e1b4

Browse files
authored
chore: cleanup zero-copy handle to no longer require a function to locate the ByteString instead leaving that to borrow and child ref
1 parent e89ae27 commit bb7e1b4

File tree

8 files changed

+81
-122
lines changed

8 files changed

+81
-122
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,31 @@
2929
import com.google.cloud.storage.Conversions.Decoder;
3030
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
3131
import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable;
32+
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
3233
import com.google.cloud.storage.Retrying.Retrier;
3334
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
35+
import com.google.common.base.Suppliers;
3436
import com.google.protobuf.ByteString;
3537
import com.google.storage.v2.ChecksummedData;
3638
import com.google.storage.v2.Object;
3739
import com.google.storage.v2.ReadObjectRequest;
3840
import com.google.storage.v2.ReadObjectResponse;
3941
import io.grpc.Status.Code;
42+
import java.io.Closeable;
4043
import java.io.IOException;
4144
import java.io.InterruptedIOException;
4245
import java.nio.ByteBuffer;
4346
import java.nio.channels.ClosedChannelException;
4447
import java.nio.channels.ScatteringByteChannel;
48+
import java.util.List;
4549
import java.util.Locale;
4650
import java.util.concurrent.ArrayBlockingQueue;
4751
import java.util.concurrent.CancellationException;
4852
import java.util.concurrent.ExecutionException;
4953
import java.util.concurrent.TimeUnit;
5054
import java.util.concurrent.TimeoutException;
5155
import java.util.concurrent.atomic.AtomicLong;
56+
import java.util.function.Supplier;
5257
import org.checkerframework.checker.nullness.qual.NonNull;
5358
import org.checkerframework.checker.nullness.qual.Nullable;
5459

@@ -71,7 +76,8 @@ final class GapicUnbufferedReadableByteChannel
7176

7277
private long blobOffset;
7378
private Object metadata;
74-
private ResponseContentLifecycleHandle leftovers;
79+
80+
private ReadObjectResponseChildRef leftovers;
7581

7682
GapicUnbufferedReadableByteChannel(
7783
SettableApiFuture<Object> result,
@@ -158,6 +164,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
158164
ReadObjectResponse resp = (ReadObjectResponse) take;
159165
ResponseContentLifecycleHandle<ReadObjectResponse> handle =
160166
read.getResponseContentLifecycleManager().get(resp);
167+
ReadObjectResponseChildRef ref = ReadObjectResponseChildRef.from(handle);
161168
if (resp.hasMetadata()) {
162169
Object respMetadata = resp.getMetadata();
163170
if (metadata == null) {
@@ -185,11 +192,11 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
185192
throw e;
186193
}
187194
}
188-
handle.copy(c, dsts, offset, length);
189-
if (handle.hasRemaining()) {
190-
leftovers = handle;
195+
ref.copy(c, dsts, offset, length);
196+
if (ref.hasRemaining()) {
197+
leftovers = ref;
191198
} else {
192-
handle.close();
199+
ref.close();
193200
}
194201
}
195202
long read = c.read();
@@ -414,4 +421,41 @@ public void offer(@NonNull T element) throws InterruptedException {
414421
queue.put(element);
415422
}
416423
}
424+
425+
private static final class ReadObjectResponseChildRef implements Closeable {
426+
private final ChildRef ref;
427+
private final Supplier<List<ByteBuffer>> lazyBuffers;
428+
429+
ReadObjectResponseChildRef(ChildRef ref) {
430+
this.ref = ref;
431+
this.lazyBuffers = Suppliers.memoize(() -> ref.byteString().asReadOnlyByteBufferList());
432+
}
433+
434+
static ReadObjectResponseChildRef from(
435+
ResponseContentLifecycleHandle<ReadObjectResponse> handle) {
436+
return new ReadObjectResponseChildRef(
437+
handle.borrow(response -> response.getChecksummedData().getContent()));
438+
}
439+
440+
void copy(ReadCursor c, ByteBuffer[] dsts, int offset, int length) {
441+
List<ByteBuffer> buffers = lazyBuffers.get();
442+
for (ByteBuffer b : buffers) {
443+
long copiedBytes = Buffers.copy(b, dsts, offset, length);
444+
c.advance(copiedBytes);
445+
if (b.hasRemaining()) break;
446+
}
447+
}
448+
449+
boolean hasRemaining() {
450+
List<ByteBuffer> buffers = lazyBuffers.get();
451+
for (ByteBuffer b : buffers) {
452+
if (b.hasRemaining()) return true;
453+
}
454+
return false;
455+
}
456+
457+
public void close() throws IOException {
458+
ref.close();
459+
}
460+
}
417461
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@
119119
import java.util.Objects;
120120
import java.util.Set;
121121
import java.util.concurrent.ScheduledExecutorService;
122-
import java.util.function.Function;
123122
import java.util.logging.Logger;
124123
import org.checkerframework.checker.nullness.qual.NonNull;
125124

@@ -1089,14 +1088,10 @@ private InternalZeroCopyGrpcStorageStub(
10891088
super(settings, clientContext, callableFactory);
10901089

10911090
this.readObjectResponseMarshaller =
1092-
new ZeroCopyResponseMarshaller<>(
1093-
ReadObjectResponse.getDefaultInstance(),
1094-
StorageV2ProtoUtils.READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION);
1091+
new ZeroCopyResponseMarshaller<>(ReadObjectResponse.getDefaultInstance());
10951092

10961093
this.bidiReadObjectResponseMarshaller =
1097-
new ZeroCopyResponseMarshaller<>(
1098-
BidiReadObjectResponse.getDefaultInstance(),
1099-
StorageV2ProtoUtils.BIDI_READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION);
1094+
new ZeroCopyResponseMarshaller<>(BidiReadObjectResponse.getDefaultInstance());
11001095

11011096
/** @see GrpcStorageStub#readObjectMethodDescriptor */
11021097
MethodDescriptor<ReadObjectRequest, ReadObjectResponse> readObjectMethodDescriptor =
@@ -1164,14 +1159,11 @@ static class ZeroCopyResponseMarshaller<Response extends Message>
11641159
private final Map<Response, InputStream> unclosedStreams;
11651160
private final Parser<Response> parser;
11661161
private final MethodDescriptor.PrototypeMarshaller<Response> baseMarshaller;
1167-
private final Function<Response, List<ByteBuffer>> toByteBuffersFunction;
11681162

1169-
ZeroCopyResponseMarshaller(
1170-
Response defaultInstance, Function<Response, List<ByteBuffer>> toByteBuffersFunction) {
1163+
ZeroCopyResponseMarshaller(Response defaultInstance) {
11711164
parser = (Parser<Response>) defaultInstance.getParserForType();
11721165
baseMarshaller =
11731166
(MethodDescriptor.PrototypeMarshaller<Response>) ProtoUtils.marshaller(defaultInstance);
1174-
this.toByteBuffersFunction = toByteBuffersFunction;
11751167
unclosedStreams = Collections.synchronizedMap(new IdentityHashMap<>());
11761168
}
11771169

@@ -1258,8 +1250,14 @@ private Response parseFrom(CodedInputStream stream) throws InvalidProtocolBuffer
12581250

12591251
@Override
12601252
public ResponseContentLifecycleHandle<Response> get(Response response) {
1261-
InputStream stream = unclosedStreams.remove(response);
1262-
return ResponseContentLifecycleHandle.create(response, toByteBuffersFunction, stream);
1253+
return ResponseContentLifecycleHandle.create(
1254+
response,
1255+
() -> {
1256+
InputStream stream = unclosedStreams.remove(response);
1257+
if (stream != null) {
1258+
stream.close();
1259+
}
1260+
});
12631261
}
12641262

12651263
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,43 +17,32 @@
1717

1818
import com.google.cloud.storage.ZeroCopySupport.DisposableByteString;
1919
import com.google.common.base.Preconditions;
20-
import com.google.common.base.Suppliers;
2120
import com.google.protobuf.ByteString;
2221
import java.io.Closeable;
2322
import java.io.IOException;
24-
import java.nio.ByteBuffer;
25-
import java.util.List;
2623
import java.util.concurrent.atomic.AtomicBoolean;
2724
import java.util.concurrent.atomic.AtomicInteger;
2825
import java.util.function.Function;
29-
import java.util.function.Supplier;
3026
import org.checkerframework.checker.nullness.qual.Nullable;
3127

3228
final class ResponseContentLifecycleHandle<Response> implements Closeable {
3329

3430
private final Response response;
3531
@Nullable private final Closeable dispose;
3632

37-
private final Supplier<List<ByteBuffer>> lazyBuffers;
3833
private final AtomicBoolean open;
3934
private final AtomicInteger refs;
4035

41-
private ResponseContentLifecycleHandle(
42-
Response response,
43-
Function<Response, List<ByteBuffer>> toBuffersFunction,
44-
@Nullable Closeable dispose) {
36+
private ResponseContentLifecycleHandle(Response response, @Nullable Closeable dispose) {
4537
this.response = response;
4638
this.dispose = dispose;
47-
this.lazyBuffers = Suppliers.memoize(() -> toBuffersFunction.apply(response));
4839
this.open = new AtomicBoolean(true);
4940
this.refs = new AtomicInteger(1);
5041
}
5142

5243
static <Response> ResponseContentLifecycleHandle<Response> create(
53-
Response response,
54-
Function<Response, List<ByteBuffer>> toBuffersFunction,
55-
@Nullable Closeable dispose) {
56-
return new ResponseContentLifecycleHandle<>(response, toBuffersFunction, dispose);
44+
Response response, @Nullable Closeable dispose) {
45+
return new ResponseContentLifecycleHandle<>(response, dispose);
5746
}
5847

5948
ChildRef borrow(Function<Response, ByteString> toByteStringFunction) {
@@ -64,23 +53,6 @@ ChildRef borrow(Function<Response, ByteString> toByteStringFunction) {
6453
return childRef;
6554
}
6655

67-
void copy(ReadCursor c, ByteBuffer[] dsts, int offset, int length) {
68-
List<ByteBuffer> buffers = lazyBuffers.get();
69-
for (ByteBuffer b : buffers) {
70-
long copiedBytes = Buffers.copy(b, dsts, offset, length);
71-
c.advance(copiedBytes);
72-
if (b.hasRemaining()) break;
73-
}
74-
}
75-
76-
boolean hasRemaining() {
77-
List<ByteBuffer> buffers = lazyBuffers.get();
78-
for (ByteBuffer b : buffers) {
79-
if (b.hasRemaining()) return true;
80-
}
81-
return false;
82-
}
83-
8456
@Override
8557
public void close() throws IOException {
8658
if (open.getAndSet(false)) {

google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ static ResponseContentLifecycleManager<ReadObjectResponse> noop() {
3030
return response ->
3131
ResponseContentLifecycleHandle.create(
3232
response,
33-
StorageV2ProtoUtils.READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION,
3433
() -> {
3534
// no-op
3635
});
@@ -40,7 +39,6 @@ static ResponseContentLifecycleManager<BidiReadObjectResponse> noopBidiReadObjec
4039
return response ->
4140
ResponseContentLifecycleHandle.create(
4241
response,
43-
StorageV2ProtoUtils.BIDI_READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION,
4442
() -> {
4543
// no-op
4644
});

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageV2ProtoUtils.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,42 +16,18 @@
1616

1717
package com.google.cloud.storage;
1818

19-
import com.google.common.collect.ImmutableList;
2019
import com.google.protobuf.InvalidProtocolBufferException;
2120
import com.google.protobuf.MessageOrBuilder;
2221
import com.google.protobuf.util.JsonFormat;
2322
import com.google.protobuf.util.JsonFormat.Printer;
24-
import com.google.storage.v2.BidiReadObjectResponse;
2523
import com.google.storage.v2.BucketAccessControl;
26-
import com.google.storage.v2.ChecksummedData;
2724
import com.google.storage.v2.ObjectAccessControl;
2825
import com.google.storage.v2.ReadObjectRequest;
29-
import com.google.storage.v2.ReadObjectResponse;
30-
import java.nio.ByteBuffer;
31-
import java.util.List;
32-
import java.util.function.Function;
3326
import java.util.function.Predicate;
3427
import org.checkerframework.checker.nullness.qual.NonNull;
3528

3629
final class StorageV2ProtoUtils {
3730

38-
static final Function<ReadObjectResponse, List<ByteBuffer>>
39-
READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION =
40-
response -> {
41-
if (response.hasChecksummedData()) {
42-
ChecksummedData checksummedData = response.getChecksummedData();
43-
if (!checksummedData.getContent().isEmpty()) {
44-
return response.getChecksummedData().getContent().asReadOnlyByteBufferList();
45-
}
46-
}
47-
return ImmutableList.of();
48-
};
49-
static final Function<BidiReadObjectResponse, List<ByteBuffer>>
50-
BIDI_READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION =
51-
response -> {
52-
throw new StorageException(0, "unsupported", "unsupported", null);
53-
};
54-
5531
private static final String VALIDATION_TEMPLATE =
5632
"offset >= 0 && limit >= 0 (%s >= 0 && %s >= 0)";
5733

google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamReadTest.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ public void byteArrayAccumulatingRead_happyPath()
5959
AtomicBoolean closed = new AtomicBoolean(false);
6060
Closeable close = () -> closed.set(true);
6161
ResponseContentLifecycleHandle<ByteString> handle =
62-
ResponseContentLifecycleHandle.create(
63-
byteString, ByteString::asReadOnlyByteBufferList, close);
62+
ResponseContentLifecycleHandle.create(byteString, close);
6463
ResponseContentLifecycleHandle<ByteString>.ChildRef childRef =
6564
handle.borrow(Function.identity());
6665
handle.close();
@@ -88,8 +87,7 @@ public void byteArrayAccumulatingRead_childRef_close_ioException_propagated() th
8887
throw new IOException(new Kaboom());
8988
};
9089
ResponseContentLifecycleHandle<ByteString> handle =
91-
ResponseContentLifecycleHandle.create(
92-
byteString, ByteString::asReadOnlyByteBufferList, throwOnClose);
90+
ResponseContentLifecycleHandle.create(byteString, throwOnClose);
9391
ResponseContentLifecycleHandle<ByteString>.ChildRef childRef =
9492
handle.borrow(Function.identity());
9593
handle.close();
@@ -327,9 +325,7 @@ public void streamingRead_leftoversAreOnlyClearedWhenFullyConsumed() throws Exce
327325
AtomicBoolean bytes1Close = new AtomicBoolean(false);
328326
try (ResponseContentLifecycleHandle<ByteString> handle =
329327
ResponseContentLifecycleHandle.create(
330-
bytes1,
331-
ByteString::asReadOnlyByteBufferList,
332-
() -> bytes1Close.compareAndSet(false, true))) {
328+
bytes1, () -> bytes1Close.compareAndSet(false, true))) {
333329
read.accept(handle.borrow(Function.identity()));
334330
}
335331

@@ -397,9 +393,7 @@ public void streamingRead_leftoversAreClosedIfNonNullAndStreamClosed() throws Ex
397393
AtomicBoolean bytes1Close = new AtomicBoolean(false);
398394
try (ResponseContentLifecycleHandle<ByteString> handle =
399395
ResponseContentLifecycleHandle.create(
400-
bytes1,
401-
ByteString::asReadOnlyByteBufferList,
402-
() -> bytes1Close.compareAndSet(false, true))) {
396+
bytes1, () -> bytes1Close.compareAndSet(false, true))) {
403397
read.accept(handle.borrow(Function.identity()));
404398
}
405399

@@ -420,9 +414,7 @@ public void streamingRead_withNewReadIdDoesNotOrphanAnyData() throws Exception {
420414
AtomicBoolean bytes1Close = new AtomicBoolean(false);
421415
try (ResponseContentLifecycleHandle<ByteString> handle =
422416
ResponseContentLifecycleHandle.create(
423-
bytes1,
424-
ByteString::asReadOnlyByteBufferList,
425-
() -> bytes1Close.compareAndSet(false, true))) {
417+
bytes1, () -> bytes1Close.compareAndSet(false, true))) {
426418
read1.accept(handle.borrow(Function.identity()));
427419
}
428420

@@ -541,8 +533,7 @@ public void accumulating_futureCancel_disposes() throws IOException {
541533
AtomicBoolean closed = new AtomicBoolean(false);
542534
Closeable close = () -> closed.set(true);
543535
ResponseContentLifecycleHandle<ByteString> handle =
544-
ResponseContentLifecycleHandle.create(
545-
byteString, ByteString::asReadOnlyByteBufferList, close);
536+
ResponseContentLifecycleHandle.create(byteString, close);
546537
ResponseContentLifecycleHandle<ByteString>.ChildRef childRef =
547538
handle.borrow(Function.identity());
548539
handle.close();
@@ -586,8 +577,7 @@ public void projections() throws Exception {
586577

587578
private static ResponseContentLifecycleHandle<ByteString> noopContentHandle(
588579
ByteString byteString) {
589-
return ResponseContentLifecycleHandle.create(
590-
byteString, ByteString::asReadOnlyByteBufferList, () -> {});
580+
return ResponseContentLifecycleHandle.create(byteString, () -> {});
591581
}
592582

593583
private static final class Kaboom extends RuntimeException {

0 commit comments

Comments
 (0)