@@ -1359,9 +1359,7 @@ def _get_online_features(
13591359 )
13601360
13611361 # Populate online features response proto with join keys and request data features
1362- online_features_response = GetOnlineFeaturesResponse (
1363- results = [GetOnlineFeaturesResponse .FeatureVector () for _ in range (num_rows )]
1364- )
1362+ online_features_response = GetOnlineFeaturesResponse (results = [])
13651363 self ._populate_result_rows_from_columnar (
13661364 online_features_response = online_features_response ,
13671365 data = dict (** join_key_values , ** request_data_features ),
@@ -1495,14 +1493,14 @@ def _populate_result_rows_from_columnar(
14951493 timestamp = Timestamp () # Only initialize this timestamp once.
14961494 # Add more values to the existing result rows
14971495 for feature_name , feature_values in data .items ():
1498-
14991496 online_features_response .metadata .feature_names .val .append (feature_name )
1500-
1501- for row_idx , proto_value in enumerate (feature_values ):
1502- result_row = online_features_response .results [row_idx ]
1503- result_row .values .append (proto_value )
1504- result_row .statuses .append (FieldStatus .PRESENT )
1505- result_row .event_timestamps .append (timestamp )
1497+ online_features_response .results .append (
1498+ GetOnlineFeaturesResponse .FeatureVector (
1499+ values = feature_values ,
1500+ statuses = [FieldStatus .PRESENT ] * len (feature_values ),
1501+ event_timestamps = [timestamp ] * len (feature_values ),
1502+ )
1503+ )
15061504
15071505 @staticmethod
15081506 def get_needed_request_data (
@@ -1643,7 +1641,7 @@ def _populate_response_from_feature_data(
16431641 Iterable [Timestamp ], Iterable ["FieldStatus.ValueType" ], Iterable [Value ]
16441642 ]
16451643 ],
1646- indexes : Iterable [Iterable [int ]],
1644+ indexes : Iterable [List [int ]],
16471645 online_features_response : GetOnlineFeaturesResponse ,
16481646 full_feature_names : bool ,
16491647 requested_features : Iterable [str ],
@@ -1678,15 +1676,21 @@ def _populate_response_from_feature_data(
16781676 requested_feature_refs
16791677 )
16801678
1679+ timestamps , statuses , values = zip (* feature_data )
1680+
16811681 # Populate the result with data fetched from the OnlineStore
1682- # which is guarenteed to be aligned with `requested_features`.
1683- for feature_row , dest_idxs in zip (feature_data , indexes ):
1684- event_timestamps , statuses , values = feature_row
1685- for dest_idx in dest_idxs :
1686- result_row = online_features_response .results [dest_idx ]
1687- result_row .event_timestamps .extend (event_timestamps )
1688- result_row .statuses .extend (statuses )
1689- result_row .values .extend (values )
1682+ # which is guaranteed to be aligned with `requested_features`.
1683+ for (
1684+ feature_idx ,
1685+ (timestamp_vector , statuses_vector , values_vector ),
1686+ ) in enumerate (zip (zip (* timestamps ), zip (* statuses ), zip (* values ))):
1687+ online_features_response .results .append (
1688+ GetOnlineFeaturesResponse .FeatureVector (
1689+ values = apply_list_mapping (values_vector , indexes ),
1690+ statuses = apply_list_mapping (statuses_vector , indexes ),
1691+ event_timestamps = apply_list_mapping (timestamp_vector , indexes ),
1692+ )
1693+ )
16901694
16911695 @staticmethod
16921696 def _augment_response_with_on_demand_transforms (
@@ -1749,13 +1753,14 @@ def _augment_response_with_on_demand_transforms(
17491753 odfv_result_names |= set (selected_subset )
17501754
17511755 online_features_response .metadata .feature_names .val .extend (selected_subset )
1752-
1753- for row_idx in range (len (online_features_response .results )):
1754- result_row = online_features_response .results [row_idx ]
1755- for feature_idx , transformed_feature in enumerate (selected_subset ):
1756- result_row .values .append (proto_values [feature_idx ][row_idx ])
1757- result_row .statuses .append (FieldStatus .PRESENT )
1758- result_row .event_timestamps .append (Timestamp ())
1756+ for feature_idx in range (len (selected_subset )):
1757+ online_features_response .results .append (
1758+ GetOnlineFeaturesResponse .FeatureVector (
1759+ values = proto_values [feature_idx ],
1760+ statuses = [FieldStatus .PRESENT ] * len (proto_values [feature_idx ]),
1761+ event_timestamps = [Timestamp ()] * len (proto_values [feature_idx ]),
1762+ )
1763+ )
17591764
17601765 @staticmethod
17611766 def _drop_unneeded_columns (
@@ -1782,13 +1787,7 @@ def _drop_unneeded_columns(
17821787
17831788 for idx in reversed (unneeded_feature_indices ):
17841789 del online_features_response .metadata .feature_names .val [idx ]
1785-
1786- for row_idx in range (len (online_features_response .results )):
1787- result_row = online_features_response .results [row_idx ]
1788- for idx in reversed (unneeded_feature_indices ):
1789- del result_row .values [idx ]
1790- del result_row .statuses [idx ]
1791- del result_row .event_timestamps [idx ]
1790+ del online_features_response .results [idx ]
17921791
17931792 def _get_feature_views_to_use (
17941793 self ,
@@ -2034,3 +2033,15 @@ def _validate_data_sources(data_sources: List[DataSource]):
20342033 )
20352034 else :
20362035 ds_names .add (case_insensitive_ds_name )
2036+
2037+
2038+ def apply_list_mapping (
2039+ lst : Iterable [Any ], mapping_indexes : Iterable [List [int ]]
2040+ ) -> Iterable [Any ]:
2041+ output_len = sum (len (item ) for item in mapping_indexes )
2042+ output = [None ] * output_len
2043+ for elem , destinations in zip (lst , mapping_indexes ):
2044+ for idx in destinations :
2045+ output [idx ] = elem
2046+
2047+ return output
0 commit comments