8989from feast .saved_dataset import SavedDataset , SavedDatasetStorage , ValidationReference
9090from feast .ssl_ca_trust_store_setup import configure_ca_trust_store_env_variables
9191from feast .stream_feature_view import StreamFeatureView
92+ from feast .transformation .pandas_transformation import PandasTransformation
93+ from feast .transformation .python_transformation import PythonTransformation
9294from feast .utils import _utc_now
9395
9496warnings .simplefilter ("once" , DeprecationWarning )
@@ -1546,6 +1548,64 @@ def _get_feature_view_and_df_for_online_write(
15461548 df = pd .DataFrame (df )
15471549 except Exception as _ :
15481550 raise DataFrameSerializationError (df )
1551+
1552+ # # Apply transformations if this is an OnDemandFeatureView with write_to_online_store=True
1553+ if (
1554+ isinstance (feature_view , OnDemandFeatureView )
1555+ and feature_view .write_to_online_store
1556+ ):
1557+ if (
1558+ feature_view .mode == "python"
1559+ and isinstance (
1560+ feature_view .feature_transformation , PythonTransformation
1561+ )
1562+ and df is not None
1563+ ):
1564+ input_dict = (
1565+ df .to_dict (orient = "records" )[0 ]
1566+ if feature_view .singleton
1567+ else df .to_dict (orient = "list" )
1568+ )
1569+ transformed_data = feature_view .feature_transformation .udf (input_dict )
1570+ if feature_view .write_to_online_store :
1571+ entities = [
1572+ self .get_entity (entity )
1573+ for entity in (feature_view .entities or [])
1574+ ]
1575+ join_keys = [entity .join_key for entity in entities if entity ]
1576+ join_keys = [k for k in join_keys if k in input_dict .keys ()]
1577+ transformed_df = pd .DataFrame (transformed_data )
1578+ input_df = pd .DataFrame (input_dict )
1579+ if input_df .shape [0 ] == transformed_df .shape [0 ]:
1580+ for k in input_dict :
1581+ if k not in transformed_data :
1582+ transformed_data [k ] = input_dict [k ]
1583+ transformed_df = pd .DataFrame (transformed_data )
1584+ else :
1585+ transformed_df = pd .merge (
1586+ transformed_df ,
1587+ input_df ,
1588+ how = "left" ,
1589+ on = join_keys ,
1590+ )
1591+ else :
1592+ # overwrite any transformed features and update the dictionary
1593+ for k in input_dict :
1594+ if k not in transformed_data :
1595+ transformed_data [k ] = input_dict [k ]
1596+ df = pd .DataFrame (transformed_data )
1597+ elif feature_view .mode == "pandas" and isinstance (
1598+ feature_view .feature_transformation , PandasTransformation
1599+ ):
1600+ transformed_df = feature_view .feature_transformation .udf (df )
1601+ if df is not None :
1602+ for col in df .columns :
1603+ transformed_df [col ] = df [col ]
1604+ df = transformed_df
1605+
1606+ else :
1607+ raise Exception ("Unsupported OnDemandFeatureView mode" )
1608+
15491609 return feature_view , df
15501610
15511611 def write_to_online_store (
@@ -1887,7 +1947,7 @@ def retrieve_online_documents_v2(
18871947
18881948 (
18891949 available_feature_views ,
1890- _ ,
1950+ available_odfv_views ,
18911951 ) = utils ._get_feature_views_to_use (
18921952 registry = self ._registry ,
18931953 project = self .project ,
@@ -1898,13 +1958,20 @@ def retrieve_online_documents_v2(
18981958 feature_view_set = set ()
18991959 for feature in features :
19001960 feature_view_name = feature .split (":" )[0 ]
1901- feature_view = self .get_feature_view (feature_view_name )
1961+ if feature_view_name in [fv .name for fv in available_odfv_views ]:
1962+ feature_view : Union [OnDemandFeatureView , FeatureView ] = (
1963+ self .get_on_demand_feature_view (feature_view_name )
1964+ )
1965+ else :
1966+ feature_view = self .get_feature_view (feature_view_name )
19021967 feature_view_set .add (feature_view .name )
19031968 if len (feature_view_set ) > 1 :
19041969 raise ValueError ("Document retrieval only supports a single feature view." )
19051970 requested_features = [
19061971 f .split (":" )[1 ] for f in features if isinstance (f , str ) and ":" in f
19071972 ]
1973+ if len (available_feature_views ) == 0 :
1974+ available_feature_views .extend (available_odfv_views ) # type: ignore[arg-type]
19081975
19091976 requested_feature_view = available_feature_views [0 ]
19101977 if not requested_feature_view :
0 commit comments