Skip to content

Commit 5778ad0

Browse files
author
Shu Heng
committed
Fix null pointer exception in Dataflow Runner due to unserializable backoff
1 parent ec6b26b commit 5778ad0

File tree

9 files changed

+228
-166
lines changed

9 files changed

+228
-166
lines changed

core/src/main/java/feast/core/service/SpecService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request) {
143143
* possible if a project name is not set explicitly
144144
*
145145
* <p>The version field can be one of - '*' - This will match all versions - 'latest' - This will
146-
* match the latest feature set version - '&lt;number&gt;' - This will match a specific feature set
147-
* version. This property can only be set if both the feature set name and project name are
146+
* match the latest feature set version - '&lt;number&gt;' - This will match a specific feature
147+
* set version. This property can only be set if both the feature set name and project name are
148148
* explicitly set.
149149
*
150150
* @param filter filter containing the desired featureSet name and version filter

core/src/main/java/feast/core/util/PackageUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ public class PackageUtil {
4444
* points to the resource location. Note that the extraction process can take several minutes to
4545
* complete.
4646
*
47-
* <p>One use case of this function is to detect the class path of resources to stage when
48-
* using Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be
49-
* handled by default in Apache Beam.
47+
* <p>One use case of this function is to detect the class path of resources to stage when using
48+
* Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be handled by
49+
* default in Apache Beam.
5050
*
5151
* <pre>
5252
* <code>

ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,14 @@ public PDone expand(PCollection<FeatureRow> input) {
8989
switch (storeType) {
9090
case REDIS:
9191
RedisConfig redisConfig = getStore().getRedisConfig();
92-
PCollection<FailedElement> redisWriteResult = input
93-
.apply(
94-
"FeatureRowToRedisMutation",
95-
ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
96-
.apply(
97-
"WriteRedisMutationToRedis",
98-
RedisCustomIO.write(redisConfig));
92+
PCollection<FailedElement> redisWriteResult =
93+
input
94+
.apply(
95+
"FeatureRowToRedisMutation",
96+
ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
97+
.apply("WriteRedisMutationToRedis", RedisCustomIO.write(redisConfig));
9998
if (options.getDeadLetterTableSpec() != null) {
100-
redisWriteResult.apply(
99+
redisWriteResult.apply(
101100
WriteFailedElementToBigQuery.newBuilder()
102101
.setTableSpec(options.getDeadLetterTableSpec())
103102
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())

ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@
2424
import feast.types.FieldProto;
2525
import feast.types.ValueProto.Value.ValCase;
2626
import java.util.ArrayList;
27-
import java.util.HashSet;
2827
import java.util.List;
2928
import java.util.Map;
30-
import java.util.Set;
3129
import org.apache.beam.sdk.transforms.DoFn;
3230
import org.apache.beam.sdk.values.TupleTag;
3331

@@ -111,10 +109,7 @@ public void processElement(ProcessContext context) {
111109
}
112110
context.output(getFailureTag(), failedElement.build());
113111
} else {
114-
featureRow = featureRow.toBuilder()
115-
.clearFields()
116-
.addAllFields(fields)
117-
.build();
112+
featureRow = featureRow.toBuilder().clearFields().addAllFields(fields).build();
118113
context.output(getSuccessTag(), featureRow);
119114
}
120115
}
Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,58 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2020 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
117
package feast.retry;
218

19+
import java.io.Serializable;
320
import org.apache.beam.sdk.util.BackOff;
421
import org.apache.beam.sdk.util.BackOffUtils;
522
import org.apache.beam.sdk.util.FluentBackoff;
623
import org.apache.beam.sdk.util.Sleeper;
724
import org.joda.time.Duration;
825

9-
import java.io.IOException;
10-
import java.io.Serializable;
11-
1226
public class BackOffExecutor implements Serializable {
1327

14-
private static FluentBackoff backoff;
28+
private final Integer maxRetries;
29+
private final Duration initialBackOff;
1530

16-
public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
17-
backoff = FluentBackoff.DEFAULT
18-
.withMaxRetries(maxRetries)
19-
.withInitialBackoff(initialBackOff);
20-
}
31+
public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
32+
this.maxRetries = maxRetries;
33+
this.initialBackOff = initialBackOff;
34+
}
35+
36+
public void execute(Retriable retriable) throws Exception {
37+
FluentBackoff backoff =
38+
FluentBackoff.DEFAULT.withMaxRetries(maxRetries).withInitialBackoff(initialBackOff);
39+
execute(retriable, backoff);
40+
}
2141

22-
public void execute(Retriable retriable) throws Exception {
23-
Sleeper sleeper = Sleeper.DEFAULT;
24-
BackOff backOff = backoff.backoff();
25-
while(true) {
26-
try {
27-
retriable.execute();
28-
break;
29-
} catch (Exception e) {
30-
if(retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
31-
retriable.cleanUpAfterFailure();
32-
} else {
33-
throw e;
34-
}
35-
}
42+
private void execute(Retriable retriable, FluentBackoff backoff) throws Exception {
43+
Sleeper sleeper = Sleeper.DEFAULT;
44+
BackOff backOff = backoff.backoff();
45+
while (true) {
46+
try {
47+
retriable.execute();
48+
break;
49+
} catch (Exception e) {
50+
if (retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
51+
retriable.cleanUpAfterFailure();
52+
} else {
53+
throw e;
3654
}
55+
}
3756
}
57+
}
3858
}
Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,25 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2020 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
117
package feast.retry;
218

319
public interface Retriable {
4-
void execute();
5-
Boolean isExceptionRetriable(Exception e);
6-
void cleanUpAfterFailure();
20+
void execute();
21+
22+
Boolean isExceptionRetriable(Exception e);
23+
24+
void cleanUpAfterFailure();
725
}

ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java

Lines changed: 62 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import feast.ingestion.values.FailedElement;
2121
import feast.retry.BackOffExecutor;
2222
import feast.retry.Retriable;
23+
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.List;
2326
import org.apache.avro.reflect.Nullable;
2427
import org.apache.beam.sdk.coders.AvroCoder;
2528
import org.apache.beam.sdk.coders.DefaultCoder;
@@ -38,10 +41,6 @@
3841
import redis.clients.jedis.Response;
3942
import redis.clients.jedis.exceptions.JedisConnectionException;
4043

41-
import java.io.IOException;
42-
import java.util.ArrayList;
43-
import java.util.List;
44-
4544
public class RedisCustomIO {
4645

4746
private static final int DEFAULT_BATCH_SIZE = 1000;
@@ -164,7 +163,8 @@ public void setScore(@Nullable Long score) {
164163
}
165164

166165
/** ServingStoreWrite data to a Redis server. */
167-
public static class Write extends PTransform<PCollection<RedisMutation>, PCollection<FailedElement>> {
166+
public static class Write
167+
extends PTransform<PCollection<RedisMutation>, PCollection<FailedElement>> {
168168

169169
private WriteDoFn dofn;
170170

@@ -202,9 +202,10 @@ public static class WriteDoFn extends DoFn<RedisMutation, FailedElement> {
202202
WriteDoFn(StoreProto.Store.RedisConfig redisConfig) {
203203
this.host = redisConfig.getHost();
204204
this.port = redisConfig.getPort();
205-
long backoffMs = redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
206-
this.backOffExecutor = new BackOffExecutor(redisConfig.getMaxRetries(),
207-
Duration.millis(backoffMs));
205+
long backoffMs =
206+
redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
207+
this.backOffExecutor =
208+
new BackOffExecutor(redisConfig.getMaxRetries(), Duration.millis(backoffMs));
208209
}
209210

210211
public WriteDoFn withBatchSize(int batchSize) {
@@ -233,47 +234,50 @@ public void startBundle() {
233234
}
234235

235236
private void executeBatch() throws Exception {
236-
backOffExecutor.execute(new Retriable() {
237-
@Override
238-
public void execute() {
239-
pipeline.multi();
240-
mutations.forEach(mutation -> {
241-
writeRecord(mutation);
242-
if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
243-
pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
237+
backOffExecutor.execute(
238+
new Retriable() {
239+
@Override
240+
public void execute() {
241+
pipeline.multi();
242+
mutations.forEach(
243+
mutation -> {
244+
writeRecord(mutation);
245+
if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
246+
pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
247+
}
248+
});
249+
pipeline.exec();
250+
pipeline.sync();
251+
mutations.clear();
244252
}
245-
});
246-
pipeline.exec();
247-
pipeline.sync();
248-
mutations.clear();
249-
}
250253

251-
@Override
252-
public Boolean isExceptionRetriable(Exception e) {
253-
return e instanceof JedisConnectionException;
254-
}
254+
@Override
255+
public Boolean isExceptionRetriable(Exception e) {
256+
return e instanceof JedisConnectionException;
257+
}
255258

256-
@Override
257-
public void cleanUpAfterFailure() {
258-
try {
259-
pipeline.close();
260-
} catch (IOException e) {
261-
log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
262-
}
263-
jedis = new Jedis(host, port, timeout);
264-
pipeline = jedis.pipelined();
265-
}
266-
});
259+
@Override
260+
public void cleanUpAfterFailure() {
261+
try {
262+
pipeline.close();
263+
} catch (IOException e) {
264+
log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
265+
}
266+
jedis = new Jedis(host, port, timeout);
267+
pipeline = jedis.pipelined();
268+
}
269+
});
267270
}
268271

269-
private FailedElement toFailedElement(RedisMutation mutation, Exception exception, String jobName) {
272+
private FailedElement toFailedElement(
273+
RedisMutation mutation, Exception exception, String jobName) {
270274
return FailedElement.newBuilder()
271-
.setJobName(jobName)
272-
.setTransformName("RedisCustomIO")
273-
.setPayload(mutation.getValue().toString())
274-
.setErrorMessage(exception.getMessage())
275-
.setStackTrace(ExceptionUtils.getStackTrace(exception))
276-
.build();
275+
.setJobName(jobName)
276+
.setTransformName("RedisCustomIO")
277+
.setPayload(mutation.getValue().toString())
278+
.setErrorMessage(exception.getMessage())
279+
.setStackTrace(ExceptionUtils.getStackTrace(exception))
280+
.build();
277281
}
278282

279283
@ProcessElement
@@ -284,11 +288,12 @@ public void processElement(ProcessContext context) {
284288
try {
285289
executeBatch();
286290
} catch (Exception e) {
287-
mutations.forEach(failedMutation -> {
288-
FailedElement failedElement = toFailedElement(
289-
failedMutation, e, context.getPipelineOptions().getJobName());
290-
context.output(failedElement);
291-
});
291+
mutations.forEach(
292+
failedMutation -> {
293+
FailedElement failedElement =
294+
toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName());
295+
context.output(failedElement);
296+
});
292297
mutations.clear();
293298
}
294299
}
@@ -315,16 +320,18 @@ private Response<?> writeRecord(RedisMutation mutation) {
315320
}
316321

317322
@FinishBundle
318-
public void finishBundle(FinishBundleContext context) throws IOException, InterruptedException {
319-
if(mutations.size() > 0) {
323+
public void finishBundle(FinishBundleContext context)
324+
throws IOException, InterruptedException {
325+
if (mutations.size() > 0) {
320326
try {
321327
executeBatch();
322328
} catch (Exception e) {
323-
mutations.forEach(failedMutation -> {
324-
FailedElement failedElement = toFailedElement(
325-
failedMutation, e, context.getPipelineOptions().getJobName());
326-
context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
327-
});
329+
mutations.forEach(
330+
failedMutation -> {
331+
FailedElement failedElement =
332+
toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName());
333+
context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
334+
});
328335
mutations.clear();
329336
}
330337
}

ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,14 @@ public void shouldExcludeUnregisteredFields() {
180180

181181
FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1);
182182
expected.add(randomRow);
183-
input.add(randomRow.toBuilder()
184-
.addFields(Field.newBuilder()
185-
.setName("extra")
186-
.setValue(Value.newBuilder().setStringVal("hello")))
187-
.build()
188-
);
183+
input.add(
184+
randomRow
185+
.toBuilder()
186+
.addFields(
187+
Field.newBuilder()
188+
.setName("extra")
189+
.setValue(Value.newBuilder().setStringVal("hello")))
190+
.build());
189191

190192
PCollectionTuple output =
191193
p.apply(Create.of(input))

0 commit comments

Comments
 (0)