Skip to content

Commit f72cc03

Browse files
author
Oleksii Moskalenko
authored
redis-cluster is being merged into redis (#752)
styling handle null case styling
1 parent e8e6bdd commit f72cc03

File tree

18 files changed

+70
-637
lines changed

18 files changed

+70
-637
lines changed

docs/coverage/java/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,6 @@
5959
<version>${project.version}</version>
6060
</dependency>
6161

62-
<dependency>
63-
<groupId>dev.feast</groupId>
64-
<artifactId>feast-storage-connector-redis-cluster</artifactId>
65-
<version>${project.version}</version>
66-
</dependency>
67-
6862
<dependency>
6963
<groupId>dev.feast</groupId>
7064
<artifactId>feast-ingestion</artifactId>

ingestion/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,6 @@
126126
<version>${project.version}</version>
127127
</dependency>
128128

129-
<dependency>
130-
<groupId>dev.feast</groupId>
131-
<artifactId>feast-storage-connector-redis-cluster</artifactId>
132-
<version>${project.version}</version>
133-
</dependency>
134-
135129
<dependency>
136130
<groupId>dev.feast</groupId>
137131
<artifactId>feast-storage-connector-bigquery</artifactId>

ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import feast.storage.api.writer.FeatureSink;
2727
import feast.storage.connectors.bigquery.writer.BigQueryFeatureSink;
2828
import feast.storage.connectors.redis.writer.RedisFeatureSink;
29-
import feast.storage.connectors.rediscluster.writer.RedisClusterFeatureSink;
3029
import java.util.HashMap;
3130
import java.util.Map;
3231
import org.slf4j.Logger;
@@ -84,7 +83,7 @@ public static FeatureSink getFeatureSink(
8483
StoreType storeType = store.getType();
8584
switch (storeType) {
8685
case REDIS_CLUSTER:
87-
return RedisClusterFeatureSink.fromConfig(store.getRedisClusterConfig(), featureSetSpecs);
86+
return RedisFeatureSink.fromConfig(store.getRedisClusterConfig(), featureSetSpecs);
8887
case REDIS:
8988
return RedisFeatureSink.fromConfig(store.getRedisConfig(), featureSetSpecs);
9089
case BIGQUERY:

serving/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,6 @@
110110
<version>${project.version}</version>
111111
</dependency>
112112

113-
<dependency>
114-
<groupId>dev.feast</groupId>
115-
<artifactId>feast-storage-connector-redis-cluster</artifactId>
116-
<version>${project.version}</version>
117-
</dependency>
118-
119113
<dependency>
120114
<groupId>dev.feast</groupId>
121115
<artifactId>feast-storage-connector-bigquery</artifactId>

serving/src/main/java/feast/serving/config/ServingServiceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import feast.storage.api.retriever.HistoricalRetriever;
2929
import feast.storage.api.retriever.OnlineRetriever;
3030
import feast.storage.connectors.bigquery.retriever.BigQueryHistoricalRetriever;
31+
import feast.storage.connectors.redis.retriever.RedisClusterOnlineRetriever;
3132
import feast.storage.connectors.redis.retriever.RedisOnlineRetriever;
32-
import feast.storage.connectors.rediscluster.retriever.RedisClusterOnlineRetriever;
3333
import io.opentracing.Tracer;
3434
import java.util.Map;
3535
import org.slf4j.Logger;

storage/connectors/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
<modules>
1818
<module>redis</module>
19-
<module>rediscluster</module>
2019
<module>bigquery</module>
2120
</modules>
2221

storage/connectors/redis/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@
7070
<scope>test</scope>
7171
</dependency>
7272

73+
<dependency>
74+
<groupId>net.ishiis.redis</groupId>
75+
<artifactId>redis-unit</artifactId>
76+
<version>1.0.3</version>
77+
<scope>test</scope>
78+
</dependency>
79+
7380

7481
<dependency>
7582
<groupId>junit</groupId>

storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/retriever/RedisClusterOnlineRetriever.java renamed to storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/RedisClusterOnlineRetriever.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package feast.storage.connectors.rediscluster.retriever;
17+
package feast.storage.connectors.redis.retriever;
1818

1919
import com.google.protobuf.AbstractMessageLite;
2020
import com.google.protobuf.InvalidProtocolBufferException;

storage/connectors/rediscluster/src/main/java/feast/storage/connectors/rediscluster/writer/RedisClusterIngestionClient.java renamed to storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisClusterIngestionClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package feast.storage.connectors.rediscluster.writer;
17+
package feast.storage.connectors.redis.writer;
1818

1919
import com.google.common.collect.Lists;
2020
import feast.proto.core.StoreProto;

storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import feast.proto.core.FeatureSetProto.EntitySpec;
2020
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
2121
import feast.proto.core.FeatureSetProto.FeatureSpec;
22-
import feast.proto.core.StoreProto.Store.RedisConfig;
22+
import feast.proto.core.StoreProto.Store.*;
2323
import feast.proto.storage.RedisProto.RedisKey;
2424
import feast.proto.storage.RedisProto.RedisKey.Builder;
2525
import feast.proto.types.FeatureRowProto.FeatureRow;
@@ -28,7 +28,7 @@
2828
import feast.storage.api.writer.FailedElement;
2929
import feast.storage.api.writer.WriteResult;
3030
import feast.storage.common.retry.Retriable;
31-
import io.lettuce.core.RedisConnectionException;
31+
import io.lettuce.core.RedisException;
3232
import java.io.IOException;
3333
import java.util.ArrayList;
3434
import java.util.HashMap;
@@ -63,21 +63,22 @@ public class RedisCustomIO {
6363

6464
private RedisCustomIO() {}
6565

66-
public static Write write(RedisConfig redisConfig, Map<String, FeatureSetSpec> featureSetSpecs) {
67-
return new Write(redisConfig, featureSetSpecs);
66+
public static Write write(
67+
RedisIngestionClient redisIngestionClient, Map<String, FeatureSetSpec> featureSetSpecs) {
68+
return new Write(redisIngestionClient, featureSetSpecs);
6869
}
6970

7071
/** ServingStoreWrite data to a Redis server. */
7172
public static class Write extends PTransform<PCollection<FeatureRow>, WriteResult> {
7273

7374
private Map<String, FeatureSetSpec> featureSetSpecs;
74-
private RedisConfig redisConfig;
75+
private RedisIngestionClient redisIngestionClient;
7576
private int batchSize;
7677
private int timeout;
7778

78-
public Write(RedisConfig redisConfig, Map<String, FeatureSetSpec> featureSetSpecs) {
79-
80-
this.redisConfig = redisConfig;
79+
public Write(
80+
RedisIngestionClient redisIngestionClient, Map<String, FeatureSetSpec> featureSetSpecs) {
81+
this.redisIngestionClient = redisIngestionClient;
8182
this.featureSetSpecs = featureSetSpecs;
8283
}
8384

@@ -95,7 +96,7 @@ public Write withTimeout(int timeout) {
9596
public WriteResult expand(PCollection<FeatureRow> input) {
9697
PCollectionTuple redisWrite =
9798
input.apply(
98-
ParDo.of(new WriteDoFn(redisConfig, featureSetSpecs))
99+
ParDo.of(new WriteDoFn(redisIngestionClient, featureSetSpecs))
99100
.withOutputTags(successfulInsertsTag, TupleTagList.of(failedInsertsTupleTag)));
100101
return WriteResult.in(
101102
input.getPipeline(),
@@ -111,9 +112,10 @@ public static class WriteDoFn extends DoFn<FeatureRow, FeatureRow> {
111112
private int timeout = DEFAULT_TIMEOUT;
112113
private RedisIngestionClient redisIngestionClient;
113114

114-
WriteDoFn(RedisConfig config, Map<String, FeatureSetSpec> featureSetSpecs) {
115+
WriteDoFn(
116+
RedisIngestionClient redisIngestionClient, Map<String, FeatureSetSpec> featureSetSpecs) {
115117

116-
this.redisIngestionClient = new RedisStandaloneIngestionClient(config);
118+
this.redisIngestionClient = redisIngestionClient;
117119
this.featureSetSpecs = featureSetSpecs;
118120
}
119121

@@ -140,7 +142,7 @@ public void setup() {
140142
public void startBundle() {
141143
try {
142144
redisIngestionClient.connect();
143-
} catch (RedisConnectionException e) {
145+
} catch (RedisException e) {
144146
log.error("Connection to redis cannot be established ", e);
145147
}
146148
featureRows.clear();
@@ -165,7 +167,7 @@ public void execute() throws ExecutionException, InterruptedException {
165167

166168
@Override
167169
public Boolean isExceptionRetriable(Exception e) {
168-
return e instanceof RedisConnectionException;
170+
return e instanceof RedisException;
169171
}
170172

171173
@Override

0 commit comments

Comments
 (0)