Skip to content

Commit 835eef2

Browse files
authored
fix!: GetOnlineFeatureResponse in Python feature server should be consisten… (#2418)
* GetOnlineFeatureResponse in Python feature server should be consistent with Java implementation Signed-off-by: pyalex <[email protected]> * fix local e2e test Signed-off-by: pyalex <[email protected]> * reuse timestamp in response generation Signed-off-by: pyalex <[email protected]>
1 parent 068c765 commit 835eef2

File tree

4 files changed

+62
-56
lines changed

4 files changed

+62
-56
lines changed

sdk/python/feast/feature_store.py

Lines changed: 44 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1359,9 +1359,7 @@ def _get_online_features(
13591359
)
13601360

13611361
# Populate online features response proto with join keys and request data features
1362-
online_features_response = GetOnlineFeaturesResponse(
1363-
results=[GetOnlineFeaturesResponse.FeatureVector() for _ in range(num_rows)]
1364-
)
1362+
online_features_response = GetOnlineFeaturesResponse(results=[])
13651363
self._populate_result_rows_from_columnar(
13661364
online_features_response=online_features_response,
13671365
data=dict(**join_key_values, **request_data_features),
@@ -1495,14 +1493,14 @@ def _populate_result_rows_from_columnar(
14951493
timestamp = Timestamp() # Only initialize this timestamp once.
14961494
# Add more values to the existing result rows
14971495
for feature_name, feature_values in data.items():
1498-
14991496
online_features_response.metadata.feature_names.val.append(feature_name)
1500-
1501-
for row_idx, proto_value in enumerate(feature_values):
1502-
result_row = online_features_response.results[row_idx]
1503-
result_row.values.append(proto_value)
1504-
result_row.statuses.append(FieldStatus.PRESENT)
1505-
result_row.event_timestamps.append(timestamp)
1497+
online_features_response.results.append(
1498+
GetOnlineFeaturesResponse.FeatureVector(
1499+
values=feature_values,
1500+
statuses=[FieldStatus.PRESENT] * len(feature_values),
1501+
event_timestamps=[timestamp] * len(feature_values),
1502+
)
1503+
)
15061504

15071505
@staticmethod
15081506
def get_needed_request_data(
@@ -1643,7 +1641,7 @@ def _populate_response_from_feature_data(
16431641
Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value]
16441642
]
16451643
],
1646-
indexes: Iterable[Iterable[int]],
1644+
indexes: Iterable[List[int]],
16471645
online_features_response: GetOnlineFeaturesResponse,
16481646
full_feature_names: bool,
16491647
requested_features: Iterable[str],
@@ -1678,15 +1676,21 @@ def _populate_response_from_feature_data(
16781676
requested_feature_refs
16791677
)
16801678

1679+
timestamps, statuses, values = zip(*feature_data)
1680+
16811681
# Populate the result with data fetched from the OnlineStore
1682-
# which is guarenteed to be aligned with `requested_features`.
1683-
for feature_row, dest_idxs in zip(feature_data, indexes):
1684-
event_timestamps, statuses, values = feature_row
1685-
for dest_idx in dest_idxs:
1686-
result_row = online_features_response.results[dest_idx]
1687-
result_row.event_timestamps.extend(event_timestamps)
1688-
result_row.statuses.extend(statuses)
1689-
result_row.values.extend(values)
1682+
# which is guaranteed to be aligned with `requested_features`.
1683+
for (
1684+
feature_idx,
1685+
(timestamp_vector, statuses_vector, values_vector),
1686+
) in enumerate(zip(zip(*timestamps), zip(*statuses), zip(*values))):
1687+
online_features_response.results.append(
1688+
GetOnlineFeaturesResponse.FeatureVector(
1689+
values=apply_list_mapping(values_vector, indexes),
1690+
statuses=apply_list_mapping(statuses_vector, indexes),
1691+
event_timestamps=apply_list_mapping(timestamp_vector, indexes),
1692+
)
1693+
)
16901694

16911695
@staticmethod
16921696
def _augment_response_with_on_demand_transforms(
@@ -1749,13 +1753,14 @@ def _augment_response_with_on_demand_transforms(
17491753
odfv_result_names |= set(selected_subset)
17501754

17511755
online_features_response.metadata.feature_names.val.extend(selected_subset)
1752-
1753-
for row_idx in range(len(online_features_response.results)):
1754-
result_row = online_features_response.results[row_idx]
1755-
for feature_idx, transformed_feature in enumerate(selected_subset):
1756-
result_row.values.append(proto_values[feature_idx][row_idx])
1757-
result_row.statuses.append(FieldStatus.PRESENT)
1758-
result_row.event_timestamps.append(Timestamp())
1756+
for feature_idx in range(len(selected_subset)):
1757+
online_features_response.results.append(
1758+
GetOnlineFeaturesResponse.FeatureVector(
1759+
values=proto_values[feature_idx],
1760+
statuses=[FieldStatus.PRESENT] * len(proto_values[feature_idx]),
1761+
event_timestamps=[Timestamp()] * len(proto_values[feature_idx]),
1762+
)
1763+
)
17591764

17601765
@staticmethod
17611766
def _drop_unneeded_columns(
@@ -1782,13 +1787,7 @@ def _drop_unneeded_columns(
17821787

17831788
for idx in reversed(unneeded_feature_indices):
17841789
del online_features_response.metadata.feature_names.val[idx]
1785-
1786-
for row_idx in range(len(online_features_response.results)):
1787-
result_row = online_features_response.results[row_idx]
1788-
for idx in reversed(unneeded_feature_indices):
1789-
del result_row.values[idx]
1790-
del result_row.statuses[idx]
1791-
del result_row.event_timestamps[idx]
1790+
del online_features_response.results[idx]
17921791

17931792
def _get_feature_views_to_use(
17941793
self,
@@ -2034,3 +2033,15 @@ def _validate_data_sources(data_sources: List[DataSource]):
20342033
)
20352034
else:
20362035
ds_names.add(case_insensitive_ds_name)
2036+
2037+
2038+
def apply_list_mapping(
2039+
lst: Iterable[Any], mapping_indexes: Iterable[List[int]]
2040+
) -> Iterable[Any]:
2041+
output_len = sum(len(item) for item in mapping_indexes)
2042+
output = [None] * output_len
2043+
for elem, destinations in zip(lst, mapping_indexes):
2044+
for idx in destinations:
2045+
output[idx] = elem
2046+
2047+
return output

sdk/python/feast/online_response.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,8 @@ def __init__(self, online_response_proto: GetOnlineFeaturesResponse):
4040
for idx, val in enumerate(self.proto.metadata.feature_names.val):
4141
if val == DUMMY_ENTITY_ID:
4242
del self.proto.metadata.feature_names.val[idx]
43-
for result in self.proto.results:
44-
del result.values[idx]
45-
del result.statuses[idx]
46-
del result.event_timestamps[idx]
43+
del self.proto.results[idx]
44+
4745
break
4846

4947
def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]:
@@ -55,21 +53,18 @@ def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]:
5553
"""
5654
response: Dict[str, List[Any]] = {}
5755

58-
for result in self.proto.results:
59-
for idx, feature_ref in enumerate(self.proto.metadata.feature_names.val):
60-
native_type_value = feast_value_type_to_python_type(result.values[idx])
61-
if feature_ref not in response:
62-
response[feature_ref] = [native_type_value]
63-
else:
64-
response[feature_ref].append(native_type_value)
65-
66-
if include_event_timestamps:
67-
event_ts = result.event_timestamps[idx].seconds
68-
timestamp_ref = feature_ref + TIMESTAMP_POSTFIX
69-
if timestamp_ref not in response:
70-
response[timestamp_ref] = [event_ts]
71-
else:
72-
response[timestamp_ref].append(event_ts)
56+
for feature_ref, feature_vector in zip(
57+
self.proto.metadata.feature_names.val, self.proto.results
58+
):
59+
response[feature_ref] = [
60+
feast_value_type_to_python_type(v) for v in feature_vector.values
61+
]
62+
63+
if include_event_timestamps:
64+
timestamp_ref = feature_ref + TIMESTAMP_POSTFIX
65+
response[timestamp_ref] = [
66+
ts.seconds for ts in feature_vector.event_timestamps
67+
]
7368

7469
return response
7570

sdk/python/tests/integration/online_store/test_e2e_local.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ def _assert_online_features(
4040

4141
# Float features should still be floats from the online store...
4242
assert (
43-
response.proto.results[0]
44-
.values[
43+
response.proto.results[
4544
list(response.proto.metadata.feature_names.val).index(
4645
"driver_hourly_stats__conv_rate"
4746
)
4847
]
48+
.values[0]
4949
.float_val
5050
> 0
5151
)

sdk/python/tests/integration/online_store/test_universal_online.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,9 @@ def _get_online_features_dict_remotely(
281281
)
282282
keys = response["metadata"]["feature_names"]
283283
# Get rid of unnecessary structure in the response, leaving list of dicts
284-
response = [row["values"] for row in response["results"]]
284+
values = [row["values"] for row in response["results"]]
285285
# Convert list of dicts (response) into dict of lists which is the format of the return value
286-
return {key: [row[idx] for row in response] for idx, key in enumerate(keys)}
286+
return {key: feature_vector for key, feature_vector in zip(keys, values)}
287287

288288

289289
def get_online_features_dict(

0 commit comments

Comments
 (0)