Skip to content

Commit b604eb0

Browse files
author
zhilingc
committed
Merge feast-dev#12: prefetch specs and validate on job expansion
2 parents 73a0290 + 18cbad5 commit b604eb0

15 files changed

Lines changed: 471 additions & 408 deletions

File tree

ingestion/src/main/java/feast/ingestion/ImportJob.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,21 @@
3030
import feast.ingestion.config.ImportSpecSupplier;
3131
import feast.ingestion.model.Specs;
3232
import feast.ingestion.options.ImportJobOptions;
33-
import feast.ingestion.transform.*;
33+
import feast.ingestion.transform.ErrorsStoreTransform;
34+
import feast.ingestion.transform.ReadFeaturesTransform;
35+
import feast.ingestion.transform.ServingStoreTransform;
36+
import feast.ingestion.transform.ToFeatureRowExtended;
37+
import feast.ingestion.transform.ValidateTransform;
38+
import feast.ingestion.transform.WarehouseStoreTransform;
3439
import feast.ingestion.transform.fn.ConvertTypesDoFn;
3540
import feast.ingestion.transform.fn.LoggerDoFn;
3641
import feast.ingestion.transform.fn.RoundEventTimestampsDoFn;
3742
import feast.ingestion.values.PFeatureRows;
3843
import feast.specs.ImportSpecProto.ImportSpec;
3944
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
4045
import feast.types.FeatureRowProto.FeatureRow;
46+
import java.util.Arrays;
47+
import java.util.Random;
4148
import lombok.extern.slf4j.Slf4j;
4249
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
4350
import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -61,10 +68,6 @@
6168
import org.joda.time.Duration;
6269
import org.slf4j.event.Level;
6370

64-
import java.io.IOException;
65-
import java.util.Arrays;
66-
import java.util.Random;
67-
6871
@Slf4j
6972
public class ImportJob {
7073
private static Random random = new Random(System.currentTimeMillis());
@@ -125,6 +128,13 @@ public static PipelineResult mainWithResult(String[] args) {
125128
return job.run();
126129
}
127130

131+
private static String generateName() {
132+
byte[] bytes = new byte[7];
133+
random.nextBytes(bytes);
134+
String randomHex = DigestUtils.sha1Hex(bytes).substring(0, 7);
135+
return String.format("feast-importjob-%s-%s", DateTime.now().getMillis(), randomHex);
136+
}
137+
128138
public void expand() {
129139
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
130140
coderRegistry.registerCoderForType(
@@ -139,6 +149,8 @@ public void expand() {
139149
// pass
140150
}
141151

152+
specs.validate();
153+
142154
PCollection<FeatureRow> features = pipeline.apply("Read", readFeaturesTransform);
143155
if (options.getLimit() != null && options.getLimit() > 0) {
144156
features = features.apply(Sample.any(options.getLimit()));
@@ -200,13 +212,6 @@ public void logNRows(PFeatureRows pFeatureRows, String name, int limit) {
200212
.apply("Log errors sample", ParDo.of(new LoggerDoFn(Level.ERROR, name + " ERRORS ")));
201213
}
202214

203-
private static String generateName() {
204-
byte[] bytes = new byte[7];
205-
random.nextBytes(bytes);
206-
String randomHex = DigestUtils.sha1Hex(bytes).substring(0, 7);
207-
return String.format("feast-importjob-%s-%s", DateTime.now().getMillis(), randomHex);
208-
}
209-
210215
private String retrieveId(PipelineResult result) {
211216
Class<? extends PipelineRunner<?>> runner = options.getRunner();
212217
if (runner.isAssignableFrom(DataflowRunner.class)) {

ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,11 @@
2020
import com.google.inject.AbstractModule;
2121
import com.google.inject.Provides;
2222
import com.google.inject.Singleton;
23-
import java.util.List;
24-
import org.apache.beam.sdk.options.PipelineOptions;
2523
import feast.ingestion.model.Specs;
26-
import feast.ingestion.model.SpecsImpl;
2724
import feast.ingestion.options.ImportJobOptions;
28-
import feast.ingestion.service.CachedSpecService;
2925
import feast.ingestion.service.CoreSpecService;
3026
import feast.ingestion.service.FileSpecService;
27+
import feast.ingestion.service.SpecService;
3128
import feast.ingestion.service.SpecService.Builder;
3229
import feast.ingestion.service.SpecService.UnsupportedBuilder;
3330
import feast.specs.ImportSpecProto.ImportSpec;
@@ -37,6 +34,8 @@
3734
import feast.storage.service.ErrorsStoreService;
3835
import feast.storage.service.ServingStoreService;
3936
import feast.storage.service.WarehouseStoreService;
37+
import java.util.List;
38+
import org.apache.beam.sdk.options.PipelineOptions;
4039

4140
/** An ImportJobModule is a Guice module for creating dependency injection bindings. */
4241
public class ImportJobModule extends AbstractModule {
@@ -54,23 +53,27 @@ protected void configure() {
5453
bind(ImportJobOptions.class).toInstance(options);
5554
bind(PipelineOptions.class).toInstance(options);
5655
bind(ImportSpec.class).toInstance(importSpec);
57-
bind(Specs.class).to(SpecsImpl.class);
5856
}
5957

6058
@Provides
6159
@Singleton
6260
Builder provideSpecService(ImportJobOptions options) {
6361
if (options.getCoreApiUri() != null) {
64-
return new CachedSpecService.Builder(new CoreSpecService.Builder(options.getCoreApiUri()));
62+
return new CoreSpecService.Builder(options.getCoreApiUri());
6563
} else if (options.getCoreApiSpecPath() != null) {
66-
return new CachedSpecService.Builder(
67-
new FileSpecService.Builder(options.getCoreApiSpecPath()));
64+
return new FileSpecService.Builder(options.getCoreApiSpecPath());
6865
} else {
6966
return new UnsupportedBuilder(
7067
"Cannot initialise spec service as coreApiHost or specPath was not set.");
7168
}
7269
}
7370

71+
@Provides
72+
@Singleton
73+
Specs provideSpecs(SpecService.Builder specService) {
74+
return Specs.of(options.getJobName(), importSpec, specService.build());
75+
}
76+
7477
@Provides
7578
@Singleton
7679
List<WarehouseStore> provideWarehouseStores() {

ingestion/src/main/java/feast/ingestion/model/Specs.java

Lines changed: 112 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,127 @@
1717

1818
package feast.ingestion.model;
1919

20-
import java.io.Serializable;
21-
import java.util.List;
22-
import java.util.Map;
23-
import feast.ingestion.service.SpecRetrievalException;
20+
import com.google.common.base.Preconditions;
21+
import feast.ingestion.service.SpecService;
2422
import feast.specs.EntitySpecProto.EntitySpec;
2523
import feast.specs.FeatureSpecProto.FeatureSpec;
24+
import feast.specs.ImportSpecProto.Field;
2625
import feast.specs.ImportSpecProto.ImportSpec;
2726
import feast.specs.StorageSpecProto.StorageSpec;
27+
import java.io.Serializable;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Map.Entry;
32+
import lombok.Builder;
33+
import lombok.Getter;
34+
35+
@Builder
36+
@Getter
37+
public class Specs implements Serializable {
38+
private String jobName;
39+
private ImportSpec importSpec;
40+
private Map<String, EntitySpec> entitySpecs;
41+
private Map<String, FeatureSpec> featureSpecs;
42+
private Map<String, StorageSpec> storageSpecs;
43+
private transient SpecService specService;
44+
private RuntimeException error;
45+
46+
public static Specs of(String jobName, ImportSpec importSpec, SpecService specService) {
47+
try {
48+
Specs.SpecsBuilder specsBuilder = Specs.builder().jobName(jobName).importSpec(importSpec);
49+
50+
List<Field> fields = importSpec.getSchema().getFieldsList();
51+
List<String> featureIds = new ArrayList<>();
52+
for (Field field : fields) {
53+
if (!field.getFeatureId().isEmpty()) {
54+
featureIds.add(field.getFeatureId());
55+
}
56+
}
57+
specsBuilder.featureSpecs(specService.getFeatureSpecs(featureIds));
58+
59+
List<String> entityNames = importSpec.getEntitiesList();
60+
for (FeatureSpec featureSpec : specsBuilder.featureSpecs.values()) {
61+
Preconditions.checkArgument(
62+
entityNames.contains(featureSpec.getEntity()),
63+
"Feature has entity not listed in import spec featureSpec=" + featureSpec.toString());
64+
}
65+
specsBuilder.entitySpecs(specService.getEntitySpecs(entityNames));
66+
67+
specsBuilder.storageSpecs(specService.getAllStorageSpecs());
68+
69+
return specsBuilder.build();
70+
} catch (RuntimeException e) {
71+
return Specs.builder().error(e).build();
72+
}
73+
}
2874

29-
public interface Specs extends Serializable {
30-
FeatureSpec getFeatureSpec(String featureId);
75+
public void validate() {
76+
if (error != null) {
77+
throw error;
78+
}
3179

32-
List<FeatureSpec> getFeatureSpecByServingStoreId(String storeId) throws SpecRetrievalException;
80+
// Sanity checks that our maps are built correctly
81+
for (Entry<String, FeatureSpec> entry : featureSpecs.entrySet()) {
82+
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()));
83+
}
84+
for (Entry<String, EntitySpec> entry : entitySpecs.entrySet()) {
85+
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getName()));
86+
}
87+
for (Entry<String, StorageSpec> entry : storageSpecs.entrySet()) {
88+
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()));
89+
}
3390

34-
EntitySpec getEntitySpec(String entityName) throws SpecRetrievalException;
91+
for (FeatureSpec featureSpec : featureSpecs.values()) {
92+
// Check that feature has a matching entity
93+
Preconditions.checkArgument(
94+
entitySpecs.containsKey(featureSpec.getEntity()),
95+
String.format(
96+
"Feature %s references unknown entity %s",
97+
featureSpec.getId(), featureSpec.getEntity()));
98+
// Check that feature has a matching serving store
99+
Preconditions.checkArgument(
100+
storageSpecs.containsKey(featureSpec.getDataStores().getServing().getId()),
101+
String.format(
102+
"Feature %s references unknown serving store %s",
103+
featureSpec.getId(), featureSpec.getDataStores().getServing().getId()));
104+
// Check that feature has a matching warehouse store
105+
Preconditions.checkArgument(
106+
storageSpecs.containsKey(featureSpec.getDataStores().getWarehouse().getId()),
107+
String.format(
108+
"Feature %s references unknown warehouse store %s",
109+
featureSpec.getId(), featureSpec.getDataStores().getWarehouse().getId()));
110+
}
111+
}
35112

36-
ImportSpec getImportSpec() throws SpecRetrievalException;
113+
public EntitySpec getEntitySpec(String entityName) {
114+
Preconditions.checkArgument(
115+
entitySpecs.containsKey(entityName),
116+
String.format("Unknown entity %s, spec was not initialized", entityName));
117+
return entitySpecs.get(entityName);
118+
}
37119

38-
Map<String, StorageSpec> getStorageSpecs() throws SpecRetrievalException;
120+
public FeatureSpec getFeatureSpec(String featureId) {
121+
Preconditions.checkArgument(
122+
featureSpecs.containsKey(featureId),
123+
String.format("Unknown feature %s, spec was not initialized", featureId));
124+
return featureSpecs.get(featureId);
125+
}
39126

40-
StorageSpec getStorageSpec(String storeId);
127+
public List<FeatureSpec> getFeatureSpecByServingStoreId(String storeId) {
128+
List<FeatureSpec> out = new ArrayList<>();
129+
for (FeatureSpec featureSpec : featureSpecs.values()) {
130+
if (featureSpec.getDataStores().getServing().getId().equals(storeId)) {
131+
out.add(featureSpec);
132+
}
133+
}
134+
return out;
135+
}
41136

42-
String getJobName();
137+
public StorageSpec getStorageSpec(String storeId) {
138+
Preconditions.checkArgument(
139+
storageSpecs.containsKey(storeId),
140+
String.format("Unknown store %s, spec was not initialized", storeId));
141+
return storageSpecs.get(storeId);
142+
}
43143
}

ingestion/src/main/java/feast/ingestion/model/SpecsImpl.java

Lines changed: 0 additions & 103 deletions
This file was deleted.

0 commit comments

Comments
 (0)