Skip to content

Commit 6f21e8f

Browse files
Ly CaoLy Cao
authored andcommitted
fixed GetOnlineFeatures bug
1 parent 556ccc6 commit 6f21e8f

File tree

3 files changed

+47
-19
lines changed

3 files changed

+47
-19
lines changed

go/feast/featurestore.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,23 @@ func (fs *FeatureStore) GetOnlineFeatures(request *serving.GetOnlineFeaturesRequ
4949
featureList := request.GetFeatures()
5050
featureListVal := featureList.GetVal()
5151
allFeaturesPerFeatureView := make(map[string][]string)
52+
53+
requestEntities := request.GetEntities() // map[string]*types.RepeatedValue
54+
var requestEntitiesRowLength int
55+
for _, values := range requestEntities {
56+
requestEntitiesRowLength = len(values.GetVal())
57+
}
58+
59+
response := serving.GetOnlineFeaturesResponse{Metadata: &serving.GetOnlineFeaturesResponseMetadata{FeatureNames : &serving.FeatureList{Val : make([]string, 0) }},
60+
Results: make([]*serving.GetOnlineFeaturesResponse_FeatureVector, requestEntitiesRowLength)}
61+
for i := 0; i < requestEntitiesRowLength; i++ {
62+
featureVector := serving.GetOnlineFeaturesResponse_FeatureVector{Values: make([]*types.Value, 0),
63+
Statuses: make([]serving.FieldStatus, 0),
64+
EventTimestamps: make([]*timestamppb.Timestamp, 0)}
65+
response.Results[i] = &featureVector
66+
}
67+
populateResultRowsFromColumnar(&response, requestEntities)
68+
5269
for _, featureName := range featureListVal {
5370
parsedFeatureName := strings.Split(featureName, ":")
5471
if len(parsedFeatureName) < 2 {
@@ -61,20 +78,20 @@ func (fs *FeatureStore) GetOnlineFeatures(request *serving.GetOnlineFeaturesRequ
6178
}
6279
features := allFeaturesPerFeatureView[featureViewName]
6380
allFeaturesPerFeatureView[featureViewName] = append(features, featureName)
81+
featureMetaData := featureName
82+
if request.FullFeatureNames {
83+
featureMetaData = fmt.Sprintf("%s__%s", featureViewName, featureName)
84+
}
85+
response.Metadata.FeatureNames.Val = append(response.Metadata.FeatureNames.Val, featureMetaData)
6486
}
6587

66-
requestEntities := request.GetEntities() // map[string]*types.RepeatedValue
67-
6888
// Construct a map of all feature_views to validate later
6989
registryFeatureViews := fs.registry.GetFeatureViews()
7090
featureViewsInRegistry := make(map[string]*core.FeatureView)
7191
for _, registryFeatureView := range registryFeatureViews {
7292
featureViewsInRegistry[registryFeatureView.Spec.Name] = registryFeatureView
7393
}
7494

75-
response := serving.GetOnlineFeaturesResponse{Metadata: &serving.GetOnlineFeaturesResponseMetadata{FeatureNames: featureList},
76-
Results: make([]*serving.GetOnlineFeaturesResponse_FeatureVector, 0)}
77-
7895
for featureViewName, allFeatures := range allFeaturesPerFeatureView {
7996
// Validate that all requested feature view exists inside registry
8097
if _, ok := featureViewsInRegistry[featureViewName]; !ok {
@@ -108,14 +125,12 @@ func (fs *FeatureStore) GetOnlineFeatures(request *serving.GetOnlineFeaturesRequ
108125
if _, ok := requestEntities[entitiesInFeatureView[0]]; !ok {
109126
return nil, errors.New(fmt.Sprintf("EntityKey: %s is required for feature view: %s\n", entitiesInFeatureView[0], featureViewName))
110127
}
111-
requestEntitiesRowLength := len(requestEntities[entitiesInFeatureView[0]].GetVal())
112128

113129
numJoinKeysInFeatureView := len(entitiesInFeatureView)
114130
entityKeys = make([]types.EntityKey, requestEntitiesRowLength)
115131
for index, _ := range entityKeys {
116-
entityKey := types.EntityKey{ JoinKeys: make([]string, numJoinKeysInFeatureView),
117-
EntityValues: make([]*types.Value, numJoinKeysInFeatureView)}
118-
entityKeys[index] = entityKey
132+
entityKeys[index] = types.EntityKey{ JoinKeys: make([]string, numJoinKeysInFeatureView),
133+
EntityValues: make([]*types.Value, numJoinKeysInFeatureView)}
119134
}
120135
// Building entity keys for required for each Feature View from the Feature View's Spec
121136
for joinKeyIndex, joinKey := range entitiesInFeatureView {
@@ -134,19 +149,15 @@ func (fs *FeatureStore) GetOnlineFeatures(request *serving.GetOnlineFeaturesRequ
134149
}
135150

136151
}
137-
138-
152+
139153
features, err := fs.onlineStore.OnlineRead(entityKeys, featureViewName, allFeatures)
140154

141155
if err != nil {
142156
return nil, err
143157
}
144158

145-
featureVector := serving.GetOnlineFeaturesResponse_FeatureVector{Values: make([]*types.Value, 0),
146-
Statuses: make([]serving.FieldStatus, 0),
147-
EventTimestamps: make([]*timestamppb.Timestamp, 0)}
148-
for _, featureList := range features {
149-
159+
for rowIndex, featureList := range features {
160+
featureVector := response.Results[rowIndex]
150161
for _, feature := range featureList {
151162
status := serving.FieldStatus_PRESENT
152163

@@ -162,13 +173,29 @@ func (fs *FeatureStore) GetOnlineFeatures(request *serving.GetOnlineFeaturesRequ
162173
featureVector.EventTimestamps = append(featureVector.EventTimestamps, &timeStamp)
163174

164175
}
165-
response.Results = append(response.Results, &featureVector)
176+
166177
}
167178
}
168-
169179
return &response, nil
170180
}
171181

172182
func checkOutsideMaxAge(featureTimestamp *timestamppb.Timestamp, currentTimestamp *timestamppb.Timestamp, ttl *durationpb.Duration) bool {
173183
return currentTimestamp.GetSeconds()-featureTimestamp.GetSeconds() > ttl.Seconds
174184
}
185+
186+
func populateResultRowsFromColumnar(response *serving.GetOnlineFeaturesResponse, data map[string]*types.RepeatedValue) {
187+
timeStamp := timestamppb.Now()
188+
for entityName, values := range data {
189+
response.Metadata.FeatureNames.Val = append(response.Metadata.FeatureNames.Val, entityName)
190+
191+
for rowIndex, value := range values.GetVal() {
192+
featureVector := response.Results[rowIndex]
193+
featureTimeStamp := *timeStamp
194+
featureValue := *value
195+
featureVector.Values = append(featureVector.Values, &featureValue)
196+
featureVector.Statuses = append(featureVector.Statuses, serving.FieldStatus_PRESENT)
197+
featureVector.EventTimestamps = append(featureVector.EventTimestamps, &featureTimeStamp)
198+
}
199+
}
200+
fmt.Println(response.Metadata.FeatureNames.Val)
201+
}

go/feast/featurestore_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func TestGetOnlineFeatures1(t *testing.T) {
6666

6767
fs, err := NewFeatureStore(&config)
6868
assert.Nil(t, err)
69+
// _, err = fs.GetOnlineFeatures(&request)
6970
response, err := fs.GetOnlineFeatures(&request)
7071
assert.Nil(t, err)
7172
for _, featureVector := range response.Results {

go/feast/redisonlinestore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (r *RedisOnlineStore) OnlineRead(entityKeys []types.EntityKey, view string,
147147
results := make([][]Feature, len(entityKeys))
148148

149149
for entityIndex, redisKey := range redisKeys {
150-
results[entityIndex] = make([]Feature, len(features))
150+
results[entityIndex] = make([]Feature, len(features)-1)
151151
keyString := string(*redisKey)
152152
// TODO: Add pipelining (without transactions)
153153
res, err := r.client.HMGet(ctx, keyString, hsetKeys...).Result()

0 commit comments

Comments
 (0)