Skip to content

Commit 577edca

Browse files
Oleksii Moskalenkowoop
andauthored
Upgrade ingestion to allow for in-flight updates to feature sets for sinks (#757)
* In-Flight update of FeatureSetSpecs in Sinks (BQ & redis). BQ table schema update as part of data load * constant Co-authored-by: Willem Pienaar <[email protected]>
1 parent d23f027 commit 577edca

File tree

33 files changed

+1655
-509
lines changed

33 files changed

+1655
-509
lines changed

.prow/config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ presubmits:
255255
secret:
256256
secretName: feast-e2e-service-account
257257
- name: docker-socket
258-
hostPath:
259-
path: /var/run/docker.sock
258+
hostPath:
259+
path: /var/run/docker.sock
260260
containers:
261261
- image: google/cloud-sdk:273.0.0
262262
command: ["infra/scripts/test-end-to-end-batch-dataflow.sh"]

core/src/main/java/feast/core/job/JobUpdateTask.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.TimeoutException;
3939
import lombok.Getter;
4040
import lombok.extern.slf4j.Slf4j;
41+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
4142

4243
/**
4344
* JobUpdateTask is a callable that starts or updates a job given a set of featureSetSpecs, as well
@@ -172,8 +173,11 @@ private Job updateStatus(Job job) {
172173

173174
String createJobId(String sourceId, String storeName) {
174175
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
175-
String sourceIdTrunc = sourceId.split("/")[0].toLowerCase();
176-
String jobId = String.format("%s-to-%s", sourceIdTrunc, storeName) + dateSuffix;
176+
String[] sourceParts = sourceId.split("/", 2);
177+
String sourceType = sourceParts[0].toLowerCase();
178+
String sourceHash =
179+
Hashing.murmur3_128().hashUnencodedChars(sourceParts[1]).toString().substring(0, 10);
180+
String jobId = String.format("%s-%s-to-%s-%s", sourceType, sourceHash, storeName, dateSuffix);
177181
return jobId.replaceAll("_", "-");
178182
}
179183

core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -137,38 +137,15 @@ public Job startJob(Job job) {
137137
}
138138

139139
/**
140-
* Update an existing Dataflow job.
140+
* Drain existing job. Replacement will be created on next run (when job gracefully stop)
141141
*
142142
* @param job job of target job to change
143-
* @return Dataflow-specific job id
143+
* @return same job as input
144144
*/
145145
@Override
146146
public Job updateJob(Job job) {
147-
try {
148-
List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
149-
for (FeatureSet featureSet : job.getFeatureSets()) {
150-
featureSetProtos.add(featureSet.toProto());
151-
}
152-
153-
String extId =
154-
submitDataflowJob(
155-
job.getId(),
156-
featureSetProtos,
157-
job.getSource().toProto(),
158-
job.getStore().toProto(),
159-
true);
160-
161-
job.setExtId(extId);
162-
job.setStatus(JobStatus.PENDING);
163-
return job;
164-
} catch (InvalidProtocolBufferException e) {
165-
log.error(e.getMessage());
166-
throw new IllegalArgumentException(
167-
String.format(
168-
"DataflowJobManager failed to UPDATE job with id '%s' because the job"
169-
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
170-
job.getId(), e.getMessage()));
171-
}
147+
abortJob(job.getExtId());
148+
return job;
172149
}
173150

174151
/**
@@ -283,7 +260,6 @@ private ImportOptions getPipelineOptions(
283260
pipelineOptions.setJobName(jobName);
284261
pipelineOptions.setFilesToStage(
285262
detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader()));
286-
287263
if (metrics.isEnabled()) {
288264
pipelineOptions.setMetricsExporterType(metrics.getType());
289265
if (metrics.getType().equals("statsd")) {

core/src/main/java/feast/core/job/direct/DirectRunnerConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@ public class DirectRunnerConfig extends RunnerConfig {
2929
/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
3030
public String deadletterTableSpec;
3131

32+
public String tempLocation;
33+
3234
public DirectRunnerConfig(DirectRunnerConfigOptions runnerConfigOptions) {
3335
this.deadletterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
3436
this.targetParallelism = runnerConfigOptions.getTargetParallelism();
37+
this.tempLocation = runnerConfigOptions.getTempLocation();
3538
}
3639
}

core/src/main/resources/application-override.yaml

Whitespace-only changes.

core/src/main/resources/application.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ feast:
3838
runners:
3939
- name: direct
4040
type: DirectRunner
41-
options: {}
41+
options:
42+
tempLocation: gs://bucket/tempLocation
4243

4344
- name: dataflow
4445
type: DataflowRunner

infra/scripts/setup-common-functions.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ start_feast_core() {
8383

8484
if [ -n "$1" ]; then
8585
echo "Custom Spring application.yml location provided: $1"
86-
export CONFIG_ARG="--spring.config.location=file://$1"
86+
export CONFIG_ARG="--spring.config.location=classpath:/application.yml,file://$1"
8787
fi
8888

8989
nohup java -jar core/target/feast-core-$FEAST_BUILD_VERSION.jar $CONFIG_ARG &>/var/log/feast-core.log &

infra/scripts/test-end-to-end-batch-dataflow.sh

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ test -z ${GCLOUD_PROJECT} && GCLOUD_PROJECT="kf-feast"
99
test -z ${GCLOUD_REGION} && GCLOUD_REGION="us-central1"
1010
test -z ${GCLOUD_NETWORK} && GCLOUD_NETWORK="default"
1111
test -z ${GCLOUD_SUBNET} && GCLOUD_SUBNET="default"
12-
test -z ${TEMP_BUCKET} && TEMP_BUCKET="feast-templocation-kf-feast"
12+
test -z ${TEMP_BUCKET} && TEMP_BUCKET="kf-feast-dataflow-temp"
1313
test -z ${K8_CLUSTER_NAME} && K8_CLUSTER_NAME="feast-e2e-dataflow"
1414
test -z ${HELM_RELEASE_NAME} && HELM_RELEASE_NAME="pr-$PULL_NUMBER"
1515
test -z ${HELM_COMMON_NAME} && HELM_COMMON_NAME="deps"
@@ -124,6 +124,7 @@ Helm install common parts (kafka, redis, etc)
124124
--set "feast-core.enabled=false" \
125125
--set "feast-online-serving.enabled=false" \
126126
--set "feast-batch-serving.enabled=false" \
127+
--set "postgresql.enabled=false"
127128
"$HELM_COMMON_NAME" .
128129

129130
}
@@ -149,7 +150,6 @@ Helm install feast
149150
cd $ORIGINAL_DIR/infra/charts/feast
150151

151152
helm install --wait --timeout 300s --debug --values="values-end-to-end-batch-dataflow-updated.yaml" \
152-
--set "postgresql.enabled=false" \
153153
--set "kafka.enabled=false" \
154154
--set "redis.enabled=false" \
155155
--set "prometheus-statsd-exporter.enabled=false" \
@@ -172,6 +172,8 @@ function clean() {
172172
# Uninstall helm release before clearing PVCs
173173
helm uninstall ${HELM_RELEASE_NAME}
174174

175+
kubectl delete pvc data-${HELM_RELEASE_NAME}-postgresql-0
176+
175177
# Stop Dataflow jobs from retrieved Dataflow job ids in ingesting_jobs.txt
176178
if [ -f ingesting_jobs.txt ]; then
177179
while read line
@@ -270,11 +272,11 @@ LOGS_ARTIFACT_PATH=/logs/artifacts
270272

271273
cd $ORIGINAL_DIR/tests/e2e
272274

273-
core_ip=$(kubectl get -o jsonpath="{.spec.clusterIP}" service ${HELM_RELEASE_NAME}-feast-core)
274-
serving_ip=$(kubectl get -o jsonpath="{.spec.clusterIP}" service ${HELM_RELEASE_NAME}-feast-batch-serving)
275+
core_ip=$(kubectl get -o jsonpath="{.status.loadBalancer.ingress[0].ip}" service ${HELM_RELEASE_NAME}-feast-core)
276+
serving_ip=$(kubectl get -o jsonpath="{.status.loadBalancer.ingress[0].ip}" service ${HELM_RELEASE_NAME}-feast-batch-serving)
275277

276278
set +e
277-
pytest -v bq/bq-batch-retrieval.py -m dataflow_runner --core_url "$core_ip:6565" --serving_url "$serving_ip:6566" --gcs_path "gs://${TEMP_BUCKET}/" --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml
279+
pytest -s -v bq/bq-batch-retrieval.py -m dataflow_runner --core_url "$core_ip:6565" --serving_url "$serving_ip:6566" --gcs_path "gs://${TEMP_BUCKET}/" --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml
278280
TEST_EXIT_CODE=$?
279281

280282
if [[ ${TEST_EXIT_CODE} != 0 ]]; then

infra/scripts/test-end-to-end-batch.sh

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,20 @@ else
5656
echo "[DEBUG] Skipping building jars"
5757
fi
5858

59-
export FEAST_JOBS_POLLING_INTERVAL_MILLISECONDS=10000
60-
start_feast_core
59+
cat <<EOF > /tmp/core.warehouse.application.yml
60+
feast:
61+
jobs:
62+
polling_interval_milliseconds: 10000
63+
active_runner: direct
64+
runners:
65+
- name: direct
66+
type: DirectRunner
67+
options:
68+
tempLocation: gs://${TEMP_BUCKET}/tempLocation
69+
70+
EOF
71+
72+
start_feast_core /tmp/core.warehouse.application.yml
6173

6274
DATASET_NAME=feast_$(date +%s)
6375
bq --location=US --project_id=${GOOGLE_CLOUD_PROJECT} mk \
@@ -87,6 +99,7 @@ feast:
8799
staging_location: ${JOBS_STAGING_LOCATION}
88100
initial_retry_delay_seconds: 1
89101
total_timeout_seconds: 21600
102+
write_triggering_frequency_seconds: 1
90103
subscriptions:
91104
- name: "*"
92105
project: "*"

infra/scripts/test-templates/values-end-to-end-batch-dataflow.yaml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ feast-core:
55
enabled: true
66
postgresql:
77
existingSecret: feast-postgresql
8+
service:
9+
type: LoadBalancer
810
image:
911
tag: $IMAGE_TAG
12+
logLevel: INFO
1013
application-override.yaml:
11-
spring:
12-
datasource:
13-
url: jdbc:postgresql://$HELM_COMMON_NAME-postgresql:5432/postgres
1414
feast:
1515
stream:
1616
options:
@@ -43,6 +43,8 @@ feast-online-serving:
4343
enabled: true
4444
image:
4545
tag: $IMAGE_TAG
46+
service:
47+
type: LoadBalancer
4648
application-override.yaml:
4749
feast:
4850
active_store: online
@@ -66,6 +68,8 @@ feast-batch-serving:
6668
tag: $IMAGE_TAG
6769
gcpServiceAccount:
6870
enabled: true
71+
service:
72+
type: LoadBalancer
6973
application-override.yaml:
7074
feast:
7175
active_store: historical
@@ -80,6 +84,7 @@ feast-batch-serving:
8084
staging_location: gs://$TEMP_BUCKET/stagingLocation
8185
initial_retry_delay_seconds: 3
8286
total_timeout_seconds: 21600
87+
write_triggering_frequency_seconds: 1
8388
subscriptions:
8489
- name: "*"
8590
project: "*"

0 commit comments

Comments
 (0)