Skip to content

Commit

Permalink
feat: port BufferToDiskThenUpload to work with HttpStorageOptions (#2473
Browse files Browse the repository at this point in the history
)
  • Loading branch information
BenWhitehead authored Mar 28, 2024
1 parent e5772a4 commit d84e255
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
* Buffer bytes to a temporary file on disk. On {@link WritableByteChannel#close() close()}
* upload the entire files contents to Cloud Storage. Delete the temporary file.
* </td>
* <td>gRPC</td>
* <td>gRPC, HTTP</td>
* <td>
* <ol>
* <li>A Resumable Upload Session will be used to upload the file on disk.</li>
Expand Down Expand Up @@ -272,7 +272,7 @@ public static BidiBlobWriteSessionConfig bidiWrite() {
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOException {
return bufferToDiskThenUpload(
Paths.get(System.getProperty("java.io.tmpdir"), "google-cloud-storage"));
Expand All @@ -289,7 +289,7 @@ public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOExcept
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IOException {
return bufferToDiskThenUpload(ImmutableList.of(path));
}
Expand All @@ -308,7 +308,7 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IO
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection<Path> paths)
throws IOException {
return new BufferToDiskThenUpload(ImmutableList.copyOf(paths), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@
*/
@Immutable
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public final class BufferToDiskThenUpload extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {
implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible {
private static final long serialVersionUID = 9059242302276891867L;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

final class StorageImpl extends BaseService<StorageOptions> implements Storage, StorageInternal {
Expand Down Expand Up @@ -147,7 +148,8 @@ public Blob create(BlobInfo blobInfo, BlobTargetOption... options) {
.setMd5(EMPTY_BYTE_ARRAY_MD5)
.setCrc32c(EMPTY_BYTE_ARRAY_CRC32C)
.build();
return internalCreate(updatedInfo, EMPTY_BYTE_ARRAY, 0, 0, options);
final Opts<ObjectTargetOpt> objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo);
return internalCreate(updatedInfo, EMPTY_BYTE_ARRAY, 0, 0, objectTargetOptOpts);
}

@Override
Expand All @@ -161,7 +163,8 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option
BaseEncoding.base64()
.encode(Ints.toByteArray(Hashing.crc32c().hashBytes(content).asInt())))
.build();
return internalCreate(updatedInfo, content, 0, content.length, options);
final Opts<ObjectTargetOpt> objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo);
return internalCreate(updatedInfo, content, 0, content.length, objectTargetOptOpts);
}

@Override
Expand All @@ -180,7 +183,8 @@ public Blob create(
Ints.toByteArray(
Hashing.crc32c().hashBytes(content, offset, length).asInt())))
.build();
return internalCreate(updatedInfo, content, offset, length, options);
final Opts<ObjectTargetOpt> objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo);
return internalCreate(updatedInfo, content, offset, length, objectTargetOptOpts);
}

@Override
Expand All @@ -203,12 +207,11 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op

private Blob internalCreate(
BlobInfo info,
final byte[] content,
final byte @NonNull [] content,
final int offset,
final int length,
BlobTargetOption... options) {
Opts<ObjectTargetOpt> opts) {
Preconditions.checkNotNull(content);
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(info);
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();

BlobInfo updated = opts.blobInfoMapper().apply(info.toBuilder()).build();
Expand Down Expand Up @@ -1647,4 +1650,48 @@ public BlobWriteSession blobWriteSession(BlobInfo blobInfo, BlobWriteOption... o
writerFactory.writeSession(this, blobInfo, opts);
return BlobWriteSessions.of(writableByteChannelSession);
}

@Override
public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> opts)
throws IOException {
if (Files.isDirectory(path)) {
throw new StorageException(0, path + " is a directory");
}
long size = Files.size(path);
if (size == 0L) {
return internalCreate(info, EMPTY_BYTE_ARRAY, 0, 0, opts);
}
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
BlobInfo.Builder builder = info.toBuilder().setMd5(null).setCrc32c(null);
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
StorageObject encode = codecs.blobInfo().encode(updated);

Supplier<String> uploadIdSupplier =
ResumableMedia.startUploadForBlobInfo(
getOptions(),
updated,
optionsMap,
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);

JsonResumableSession session =
ResumableSession.json(
HttpClientContext.from(storageRpc),
getOptions().asRetryDependencies(),
retryAlgorithmManager.idempotent(),
jsonResumableWrite);
HttpContentRange contentRange =
HttpContentRange.of(ByteRangeSpec.relativeLength(0L, size), size);
ResumableOperationResult<StorageObject> put =
session.put(RewindableContent.of(path), contentRange);
// all exception translation is taken care of down in the JsonResumableSession
StorageObject object = put.getObject();
if (object == null) {
// if by some odd chance the put didn't get the StorageObject, query for it
ResumableOperationResult<StorageObject> query = session.query();
object = query.getObject();
}
return codecs.blobInfo().decode(object);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public void allDefaults() throws Exception {
}

@Test
@CrossRun.Exclude(transports = Transport.HTTP)
public void bufferToTempDirThenUpload() throws Exception {
StorageOptions options = null;
if (transport == Transport.GRPC) {
Expand All @@ -78,6 +77,12 @@ public void bufferToTempDirThenUpload() throws Exception {
.toBuilder()
.setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload())
.build();
} else if (transport == Transport.HTTP) {
options =
((HttpStorageOptions) storage.getOptions())
.toBuilder()
.setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload())
.build();
}
assertWithMessage("unable to resolve options").that(options).isNotNull();
//noinspection DataFlowIssue
Expand Down

0 comments on commit d84e255

Please sign in to comment.