Skip to content

Commit

Permalink
feat: Add Custom Part Metadata Decorator to ParallelCompositeUploadCo…
Browse files Browse the repository at this point in the history
…nfig (#2434)
  • Loading branch information
sydney-munro authored Mar 15, 2024
1 parent 12c9db8 commit 43b8006
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.time.Clock;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
import javax.annotation.concurrent.Immutable;
import org.checkerframework.checker.nullness.qual.NonNull;

Expand Down Expand Up @@ -125,18 +128,21 @@ public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWri
private final BufferAllocationStrategy bufferAllocationStrategy;
private final PartNamingStrategy partNamingStrategy;
private final PartCleanupStrategy partCleanupStrategy;
private final PartMetadataFieldDecorator partMetadataFieldDecorator;

private ParallelCompositeUploadBlobWriteSessionConfig(
int maxPartsPerCompose,
ExecutorSupplier executorSupplier,
BufferAllocationStrategy bufferAllocationStrategy,
PartNamingStrategy partNamingStrategy,
PartCleanupStrategy partCleanupStrategy) {
PartCleanupStrategy partCleanupStrategy,
PartMetadataFieldDecorator partMetadataFieldDecorator) {
this.maxPartsPerCompose = maxPartsPerCompose;
this.executorSupplier = executorSupplier;
this.bufferAllocationStrategy = bufferAllocationStrategy;
this.partNamingStrategy = partNamingStrategy;
this.partCleanupStrategy = partCleanupStrategy;
this.partMetadataFieldDecorator = partMetadataFieldDecorator;
}

@InternalApi
Expand All @@ -150,7 +156,8 @@ ParallelCompositeUploadBlobWriteSessionConfig withMaxPartsPerCompose(int maxPart
executorSupplier,
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy);
partCleanupStrategy,
partMetadataFieldDecorator);
}

/**
Expand All @@ -170,7 +177,8 @@ public ParallelCompositeUploadBlobWriteSessionConfig withExecutorSupplier(
executorSupplier,
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy);
partCleanupStrategy,
partMetadataFieldDecorator);
}

/**
Expand All @@ -191,7 +199,8 @@ public ParallelCompositeUploadBlobWriteSessionConfig withBufferAllocationStrateg
executorSupplier,
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy);
partCleanupStrategy,
partMetadataFieldDecorator);
}

/**
Expand All @@ -211,7 +220,8 @@ public ParallelCompositeUploadBlobWriteSessionConfig withPartNamingStrategy(
executorSupplier,
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy);
partCleanupStrategy,
partMetadataFieldDecorator);
}

/**
Expand All @@ -231,7 +241,29 @@ public ParallelCompositeUploadBlobWriteSessionConfig withPartCleanupStrategy(
executorSupplier,
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy);
partCleanupStrategy,
partMetadataFieldDecorator);
}

/**
* Specify a Part Metadata Field decorator, this will manipulate the metadata associated with part
* objects, the ultimate object metadata will remain unchanged.
*
* <p><i>Default: </i> {@link PartMetadataFieldDecorator#noOp()}
*
* @since 2.36.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public ParallelCompositeUploadBlobWriteSessionConfig withPartMetadataFieldDecorator(
PartMetadataFieldDecorator partMetadataFieldDecorator) {
checkNotNull(partMetadataFieldDecorator, "partMetadataFieldDecorator must be non null");
return new ParallelCompositeUploadBlobWriteSessionConfig(
maxPartsPerCompose,
executorSupplier,
bufferAllocationStrategy,
partNamingStrategy,
partCleanupStrategy,
partMetadataFieldDecorator);
}

@BetaApi
Expand All @@ -241,15 +273,19 @@ static ParallelCompositeUploadBlobWriteSessionConfig withDefaults() {
ExecutorSupplier.cachedPool(),
BufferAllocationStrategy.simple(ByteSizeConstants._16MiB),
PartNamingStrategy.noPrefix(),
PartCleanupStrategy.always());
PartCleanupStrategy.always(),
PartMetadataFieldDecorator.noOp());
}

@InternalApi
@Override
WriterFactory createFactory(Clock clock) throws IOException {
Executor executor = executorSupplier.get();
BufferHandlePool bufferHandlePool = bufferAllocationStrategy.get();
return new ParallelCompositeUploadWriterFactory(clock, executor, bufferHandlePool);
PartMetadataFieldDecoratorInstance partMetadataFieldDecoratorInstance =
partMetadataFieldDecorator.newInstance(clock);
return new ParallelCompositeUploadWriterFactory(
clock, executor, bufferHandlePool, partMetadataFieldDecoratorInstance);
}

/**
Expand Down Expand Up @@ -277,6 +313,7 @@ private BufferAllocationStrategy() {}
*/
@BetaApi
public static BufferAllocationStrategy simple(int capacity) {
checkArgument(capacity > 0, "bufferCapacity must be > 0");
return new SimpleBufferAllocationStrategy(capacity);
}

Expand All @@ -291,6 +328,8 @@ public static BufferAllocationStrategy simple(int capacity) {
*/
@BetaApi
public static BufferAllocationStrategy fixedPool(int bufferCount, int bufferCapacity) {
checkArgument(bufferCount > 0, "bufferCount must be > 0");
checkArgument(bufferCapacity > 0, "bufferCapacity must be > 0");
return new FixedPoolBufferAllocationStrategy(bufferCount, bufferCapacity);
}

Expand Down Expand Up @@ -361,6 +400,7 @@ public static ExecutorSupplier cachedPool() {
*/
@BetaApi
public static ExecutorSupplier fixedPool(int poolSize) {
checkArgument(poolSize > 0, "poolSize must be > 0");
return new FixedSupplier(poolSize);
}

Expand Down Expand Up @@ -631,6 +671,79 @@ protected String fmtFields(String randomKey, String ultimateObjectName, String p
}
}

/**
* A Decorator which is used to manipulate metadata fields, specifically on the part objects
* created in a Parallel Composite Upload
*
* @see #withPartMetadataFieldDecorator(PartMetadataFieldDecorator)
* @since 2.36.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@Immutable
public abstract static class PartMetadataFieldDecorator implements Serializable {

abstract PartMetadataFieldDecoratorInstance newInstance(Clock clock);

/**
* A decorator that is used to manipulate the Custom Time Metadata field of part files. {@link
* BlobInfo#getCustomTimeOffsetDateTime()}
*
* <p>When provided with a duration, a time in the future will be calculated for each part
* object upon upload, this new value can be used in OLM rules to cleanup abandoned part files.
*
* <p>See [CustomTime OLM
* documentation](https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime)
*
* @see #withPartMetadataFieldDecorator(PartMetadataFieldDecorator)
* @since 2.36.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public static PartMetadataFieldDecorator setCustomTimeInFuture(Duration timeInFuture) {
checkNotNull(timeInFuture, "timeInFuture must not be null");
return new CustomTimeInFuture(timeInFuture);
}

@BetaApi
public static PartMetadataFieldDecorator noOp() {
return NoOp.INSTANCE;
}

@BetaApi
private static final class CustomTimeInFuture extends PartMetadataFieldDecorator {
private static final long serialVersionUID = -6213742554954751892L;
private final Duration duration;

CustomTimeInFuture(Duration duration) {
this.duration = duration;
}

@Override
PartMetadataFieldDecoratorInstance newInstance(Clock clock) {
return builder -> {
OffsetDateTime futureTime =
OffsetDateTime.from(
clock.instant().plus(duration).atZone(clock.getZone()).toOffsetDateTime());
return builder.setCustomTimeOffsetDateTime(futureTime);
};
}
}

private static final class NoOp extends PartMetadataFieldDecorator {
private static final long serialVersionUID = -4569486383992999205L;
private static final NoOp INSTANCE = new NoOp();

@Override
PartMetadataFieldDecoratorInstance newInstance(Clock clock) {
return builder -> builder;
}

/** prevent java serialization from using a new instance */
private Object readResolve() {
return INSTANCE;
}
}
}

/**
* A cleanup strategy which will dictate what cleanup operations are performed automatically when
* performing a parallel composite upload.
Expand Down Expand Up @@ -708,6 +821,8 @@ public static PartCleanupStrategy never() {
}
}

interface PartMetadataFieldDecoratorInstance extends UnaryOperator<BlobInfo.Builder> {}

private abstract static class Factory<T> implements Serializable {
private static final long serialVersionUID = 271806144227661056L;

Expand All @@ -721,12 +836,17 @@ private class ParallelCompositeUploadWriterFactory implements WriterFactory {
private final Clock clock;
private final Executor executor;
private final BufferHandlePool bufferHandlePool;
private final PartMetadataFieldDecoratorInstance partMetadataFieldDecoratorInstance;

private ParallelCompositeUploadWriterFactory(
Clock clock, Executor executor, BufferHandlePool bufferHandlePool) {
Clock clock,
Executor executor,
BufferHandlePool bufferHandlePool,
PartMetadataFieldDecoratorInstance partMetadataFieldDecoratorInstance) {
this.clock = clock;
this.executor = executor;
this.bufferHandlePool = bufferHandlePool;
this.partMetadataFieldDecoratorInstance = partMetadataFieldDecoratorInstance;
}

@Override
Expand Down Expand Up @@ -760,6 +880,7 @@ public ApiFuture<BufferedWritableByteChannel> openAsync() {
partNamingStrategy,
partCleanupStrategy,
maxPartsPerCompose,
partMetadataFieldDecoratorInstance,
result,
storageInternal,
info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
import com.google.cloud.storage.MetadataField.PartRange;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartMetadataFieldDecoratorInstance;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.cloud.storage.Storage.ComposeRequest;
import com.google.cloud.storage.UnifiedOpts.Crc32cMatch;
Expand Down Expand Up @@ -111,6 +112,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab
private final PartNamingStrategy partNamingStrategy;
private final PartCleanupStrategy partCleanupStrategy;
private final int maxElementsPerCompact;
private final PartMetadataFieldDecoratorInstance partMetadataFieldDecorator;
private final SettableApiFuture<BlobInfo> finalObject;
private final StorageInternal storage;
private final BlobInfo ultimateObject;
Expand All @@ -135,6 +137,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab
PartNamingStrategy partNamingStrategy,
PartCleanupStrategy partCleanupStrategy,
int maxElementsPerCompact,
PartMetadataFieldDecoratorInstance partMetadataFieldDecorator,
SettableApiFuture<BlobInfo> finalObject,
StorageInternal storage,
BlobInfo ultimateObject,
Expand All @@ -144,6 +147,7 @@ final class ParallelCompositeUploadWritableByteChannel implements BufferedWritab
this.partNamingStrategy = partNamingStrategy;
this.partCleanupStrategy = partCleanupStrategy;
this.maxElementsPerCompact = maxElementsPerCompact;
this.partMetadataFieldDecorator = partMetadataFieldDecorator;
this.finalObject = finalObject;
this.storage = storage;
this.ultimateObject = ultimateObject;
Expand Down Expand Up @@ -427,6 +431,7 @@ private BlobInfo definePart(BlobInfo ultimateObject, PartRange partRange, long o
PART_INDEX.appendTo(partRange, builder);
OBJECT_OFFSET.appendTo(offset, builder);
b.setMetadata(builder.build());
b = partMetadataFieldDecorator.apply(b);
return b.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
import static com.google.common.truth.Truth.assertWithMessage;

import com.google.cloud.storage.MetadataField.PartRange;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartMetadataFieldDecorator;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.common.truth.StringSubject;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import org.junit.Test;

public final class ParallelCompositeUploadBlobWriteSessionConfigTest {
Expand Down Expand Up @@ -87,6 +92,18 @@ public void partNameStrategy_objectNamePrefix() throws Exception {
() -> assertThat(fmt).startsWith("a/b/obj"));
}

@Test
public void partMetadataFieldDecorator_customTime() {
BlobInfo.Builder testBlob = BlobInfo.newBuilder("testBlob", "testBucket");
Duration duration = Duration.ofSeconds(30);
TestClock clock = TestClock.tickBy(Instant.EPOCH, Duration.ofSeconds(1));
OffsetDateTime expected =
OffsetDateTime.from(Instant.EPOCH.plus(duration).atZone(ZoneId.of("Z")));
PartMetadataFieldDecorator.setCustomTimeInFuture(duration).newInstance(clock).apply(testBlob);

assertThat(expected).isEqualTo(testBlob.build().getCustomTimeOffsetDateTime());
}

private static StringSubject assertField(String fmt, int idx) {
String[] split = fmt.split(";");
String s = split[idx];
Expand Down
Loading

0 comments on commit 43b8006

Please sign in to comment.