Skip to content

Commit 4e2302b

Browse files
committed
Add batch and stream source to SDK
Signed-off-by: Michal Deutch <[email protected]>
1 parent 7fc556a commit 4e2302b

File tree

2 files changed

+42
-10
lines changed

2 files changed

+42
-10
lines changed

sdk/java/src/main/java/com/gojek/feast/core/FeatureTable.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package com.gojek.feast.core;
1818

19+
import feast.proto.core.DataSourceProto;
1920
import feast.proto.core.FeatureTableProto;
2021
import java.time.Duration;
2122
import java.util.*;
@@ -31,6 +32,8 @@ public static class Spec {
3132
private final List<String> entities;
3233
private final List<Feature> features;
3334
private final Map<String, String> labels;
35+
private DataSourceProto.DataSource batchSource;
36+
private DataSourceProto.DataSource streamSource;
3437
private Duration maxAge;
3538

3639
protected Spec(
@@ -39,24 +42,19 @@ protected Spec(
3942
List<String> entities,
4043
List<Feature> features,
4144
Map<String, String> labels,
45+
DataSourceProto.DataSource batchSource,
46+
DataSourceProto.DataSource streamSource,
4247
Duration maxAge) {
4348
this.project = project;
4449
this.name = name;
4550
this.entities = entities;
4651
this.features = features;
4752
this.labels = labels;
53+
this.batchSource = batchSource;
54+
this.streamSource = streamSource;
4855
this.maxAge = maxAge;
4956
}
5057

51-
protected Spec(
52-
String project,
53-
String name,
54-
List<String> entities,
55-
List<Feature> features,
56-
Map<String, String> labels) {
57-
this(project, name, entities, features, labels, null);
58-
}
59-
6058
public static Builder getBuilder(String project, String name) {
6159
return new Builder(project, name);
6260
}
@@ -67,6 +65,8 @@ public static class Builder {
6765
private final List<String> entities = new LinkedList<>();
6866
private final List<Feature> features = new LinkedList<>();
6967
private final Map<String, String> labels = new HashMap<>();
68+
private DataSourceProto.DataSource batchSource;
69+
private DataSourceProto.DataSource streamSource;
7070
private Duration maxAge;
7171

7272
private Builder(String project, String name) {
@@ -75,7 +75,8 @@ private Builder(String project, String name) {
7575
}
7676

7777
public Spec build() {
78-
return new Spec(project, name, entities, features, labels, maxAge);
78+
return new Spec(
79+
project, name, entities, features, labels, batchSource, streamSource, maxAge);
7980
}
8081

8182
public Builder addEntity(String entity) {
@@ -108,6 +109,16 @@ public Builder addLabels(Map<String, String> labels) {
108109
return this;
109110
}
110111

112+
public Builder setBatchSource(DataSourceProto.DataSource batchSource) {
113+
this.batchSource = batchSource;
114+
return this;
115+
}
116+
117+
public Builder setStreamSource(DataSourceProto.DataSource streamSource) {
118+
this.streamSource = streamSource;
119+
return this;
120+
}
121+
111122
public Builder setMaxAge(Duration maxAge) {
112123
this.maxAge = maxAge;
113124
return this;
@@ -134,6 +145,14 @@ public Map<String, String> getLabels() {
134145
return labels;
135146
}
136147

148+
public DataSourceProto.DataSource getBatchSource() {
149+
return batchSource;
150+
}
151+
152+
public DataSourceProto.DataSource getStreamSource() {
153+
return streamSource;
154+
}
155+
137156
public Optional<Duration> getMaxAge() {
138157
return Optional.ofNullable(maxAge);
139158
}
@@ -145,6 +164,8 @@ protected Spec(String project, FeatureTableProto.FeatureTableSpec spec) {
145164
this.features =
146165
spec.getFeaturesList().stream().map(Feature::new).collect(Collectors.toList());
147166
this.labels = spec.getLabelsMap();
167+
if (spec.hasBatchSource()) this.batchSource = spec.getBatchSource();
168+
if (spec.hasStreamSource()) this.streamSource = spec.getStreamSource();
148169
if (spec.hasMaxAge())
149170
this.maxAge =
150171
Duration.ofSeconds(spec.getMaxAge().getSeconds(), spec.getMaxAge().getNanos());
@@ -157,6 +178,12 @@ protected FeatureTableProto.FeatureTableSpec toProto() {
157178
.addAllEntities(entities)
158179
.addAllFeatures(features.stream().map(Feature::toProto).collect(Collectors.toList()))
159180
.putAllLabels(labels);
181+
if (batchSource != null) {
182+
builder.setBatchSource(batchSource);
183+
}
184+
if (streamSource != null) {
185+
builder.setStreamSource(streamSource);
186+
}
160187
if (maxAge != null) {
161188
builder.setMaxAge(
162189
com.google.protobuf.Duration.newBuilder()

sdk/java/src/test/java/com/gojek/feast/core/CoreClientTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.gojek.feast.GrpcMock;
2323
import feast.proto.core.CoreServiceGrpc;
24+
import feast.proto.core.DataSourceProto;
2425
import java.util.*;
2526
import org.junit.Before;
2627
import org.junit.Test;
@@ -81,6 +82,10 @@ public void testFeatureTable() {
8182
FeatureTable.Spec.getBuilder(CoreServiceImplMock.PROJECT, name)
8283
.addEntity("entity")
8384
.addLabel("key", "value")
85+
.setBatchSource(
86+
DataSourceProto.DataSource.newBuilder()
87+
.setType(DataSourceProto.DataSource.SourceType.BATCH_FILE)
88+
.build())
8489
.build();
8590
client.apply(spec);
8691
Assertions.assertEquals(

0 commit comments

Comments
 (0)