3434from feast .transformation .pandas_transformation import PandasTransformation
3535from feast .transformation .python_transformation import PythonTransformation
3636from feast .transformation .substrait_transformation import SubstraitTransformation
37- from feast .type_map import (
38- feast_value_type_to_pandas_type ,
39- python_type_to_feast_value_type ,
40- )
4137from feast .usage import log_exceptions
4238from feast .value_type import ValueType
4339
@@ -490,69 +486,15 @@ def get_transformed_features(
490486 )
491487
492488 def infer_features (self ) -> None :
493- if self .mode in {"pandas" , "substrait" }:
494- self ._infer_features_df ()
495- elif self .mode == "python" :
496- self ._infer_features_dict ()
497- else :
498- raise Exception (
499- f'Invalid OnDemandFeatureMode: { self .mode } . Expected one of "pandas" or "python".'
500- )
501-
502- def _infer_features_dict (self ):
503- """
504- Infers the set of features associated to this feature view from the input source.
505-
506- Raises:
507- RegistryInferenceFailure: The set of features could not be inferred.
508- """
509- rand_dict_value : Dict [str , Any ] = {
510- "float" : [1.0 ],
511- "int" : [1 ],
512- "str" : ["hello world" ],
513- "bytes" : [str .encode ("hello world" )],
514- "bool" : [True ],
515- "datetime64[ns]" : [datetime .utcnow ()],
516- }
517-
518- feature_dict = {}
519- for feature_view_projection in self .source_feature_view_projections .values ():
520- for feature in feature_view_projection .features :
521- dtype = feast_value_type_to_pandas_type (feature .dtype .to_value_type ())
522- feature_dict [f"{ feature_view_projection .name } __{ feature .name } " ] = (
523- rand_dict_value [dtype ] if dtype in rand_dict_value else [None ]
524- )
525- feature_dict [f"{ feature .name } " ] = (
526- rand_dict_value [dtype ] if dtype in rand_dict_value else [None ]
527- )
528- for request_data in self .source_request_sources .values ():
529- for field in request_data .schema :
530- dtype = feast_value_type_to_pandas_type (field .dtype .to_value_type ())
531- feature_dict [f"{ field .name } " ] = (
532- rand_dict_value [dtype ] if dtype in rand_dict_value else [None ]
533- )
534-
535- output_dict : Dict [str , List [Any ]] = self .feature_transformation .transform (
536- feature_dict
489+ inferred_features = self .feature_transformation .infer_features (
490+ self ._construct_random_input ()
537491 )
538- inferred_features = []
539- for f , dt in output_dict .items ():
540- inferred_features .append (
541- Field (
542- name = f ,
543- dtype = from_value_type (
544- python_type_to_feast_value_type (
545- f , type_name = type (dt [0 ]).__name__
546- )
547- ),
548- )
549- )
550492
551493 if self .features :
552494 missing_features = []
553- for specified_features in self .features :
554- if specified_features not in inferred_features :
555- missing_features .append (specified_features )
495+ for specified_feature in self .features :
496+ if specified_feature not in inferred_features :
497+ missing_features .append (specified_feature )
556498 if missing_features :
557499 raise SpecifiedFeaturesNotPresentError (
558500 missing_features , inferred_features , self .name
@@ -566,66 +508,42 @@ def _infer_features_dict(self):
566508 f"Could not infer Features for the feature view '{ self .name } '." ,
567509 )
568510
569- def _infer_features_df (self ) -> None :
570- """
571- Infers the set of features associated to this feature view from the input source.
572-
573- Raises:
574- RegistryInferenceFailure: The set of features could not be inferred.
575- """
576- rand_df_value : Dict [str , Any ] = {
577- "float" : 1.0 ,
578- "int" : 1 ,
579- "str" : "hello world" ,
580- "bytes" : str .encode ("hello world" ),
581- "bool" : True ,
582- "datetime64[ns]" : datetime .utcnow (),
511+ def _construct_random_input (self ) -> Dict [str , List [Any ]]:
512+ rand_dict_value : Dict [ValueType , List [Any ]] = {
513+ ValueType .BYTES : [str .encode ("hello world" )],
514+ ValueType .STRING : ["hello world" ],
515+ ValueType .INT32 : [1 ],
516+ ValueType .INT64 : [1 ],
517+ ValueType .DOUBLE : [1.0 ],
518+ ValueType .FLOAT : [1.0 ],
519+ ValueType .BOOL : [True ],
520+ ValueType .UNIX_TIMESTAMP : [datetime .utcnow ()],
521+ ValueType .BYTES_LIST : [[str .encode ("hello world" )]],
522+ ValueType .STRING_LIST : [["hello world" ]],
523+ ValueType .INT32_LIST : [[1 ]],
524+ ValueType .INT64_LIST : [[1 ]],
525+ ValueType .DOUBLE_LIST : [[1.0 ]],
526+ ValueType .FLOAT_LIST : [[1.0 ]],
527+ ValueType .BOOL_LIST : [[True ]],
528+ ValueType .UNIX_TIMESTAMP_LIST : [[datetime .utcnow ()]],
583529 }
584530
585- df = pd . DataFrame ()
531+ feature_dict = {}
586532 for feature_view_projection in self .source_feature_view_projections .values ():
587533 for feature in feature_view_projection .features :
588- dtype = feast_value_type_to_pandas_type (feature .dtype .to_value_type ())
589- df [f"{ feature_view_projection .name } __{ feature .name } " ] = pd .Series (
590- dtype = dtype
534+ feature_dict [f"{ feature_view_projection .name } __{ feature .name } " ] = (
535+ rand_dict_value .get (feature .dtype .to_value_type (), [None ])
536+ )
537+ feature_dict [f"{ feature .name } " ] = rand_dict_value .get (
538+ feature .dtype .to_value_type (), [None ]
591539 )
592- sample_val = rand_df_value [dtype ] if dtype in rand_df_value else None
593- df [f"{ feature .name } " ] = pd .Series (data = sample_val , dtype = dtype )
594540 for request_data in self .source_request_sources .values ():
595541 for field in request_data .schema :
596- dtype = feast_value_type_to_pandas_type (field .dtype .to_value_type ())
597- sample_val = rand_df_value [dtype ] if dtype in rand_df_value else None
598- df [f"{ field .name } " ] = pd .Series (sample_val , dtype = dtype )
599-
600- output_df : pd .DataFrame = self .feature_transformation .transform (df )
601- inferred_features = []
602- for f , dt in zip (output_df .columns , output_df .dtypes ):
603- inferred_features .append (
604- Field (
605- name = f ,
606- dtype = from_value_type (
607- python_type_to_feast_value_type (f , type_name = str (dt ))
608- ),
542+ feature_dict [f"{ field .name } " ] = rand_dict_value .get (
543+ field .dtype .to_value_type (), [None ]
609544 )
610- )
611545
612- if self .features :
613- missing_features = []
614- for specified_features in self .features :
615- if specified_features not in inferred_features :
616- missing_features .append (specified_features )
617- if missing_features :
618- raise SpecifiedFeaturesNotPresentError (
619- missing_features , inferred_features , self .name
620- )
621- else :
622- self .features = inferred_features
623-
624- if not self .features :
625- raise RegistryInferenceFailure (
626- "OnDemandFeatureView" ,
627- f"Could not infer Features for the feature view '{ self .name } '." ,
628- )
546+ return feature_dict
629547
630548 @staticmethod
631549 def get_requested_odfvs (
@@ -682,59 +600,28 @@ def mainify(obj) -> None:
682600
683601 def decorator (user_function ):
684602 return_annotation = inspect .signature (user_function ).return_annotation
685- if (
686- return_annotation
687- and return_annotation .__module__ == "ibis.expr.types.relations"
688- and return_annotation .__name__ == "Table"
689- ):
690- import ibis
691- import ibis .expr .datatypes as dt
692- from ibis_substrait .compiler .core import SubstraitCompiler
693-
694- compiler = SubstraitCompiler ()
695-
696- input_fields : Field = []
697-
698- for s in sources :
699- if isinstance (s , FeatureView ):
700- fields = s .projection .features
701- else :
702- fields = s .features
703-
704- input_fields .extend (
705- [
706- (
707- f .name ,
708- dt .dtype (
709- feast_value_type_to_pandas_type (f .dtype .to_value_type ())
710- ),
711- )
712- for f in fields
713- ]
603+ udf_string = dill .source .getsource (user_function )
604+ mainify (user_function )
605+ if mode == "pandas" :
606+ if return_annotation not in (inspect ._empty , pd .DataFrame ):
607+ raise TypeError (
608+ f"return signature for { user_function } is { return_annotation } but should be pd.DataFrame"
714609 )
610+ transformation = PandasTransformation (user_function , udf_string )
611+ elif mode == "python" :
612+ if return_annotation not in (inspect ._empty , Dict [str , Any ]):
613+ raise TypeError (
614+ f"return signature for { user_function } is { return_annotation } but should be Dict[str, Any]"
615+ )
616+ transformation = PythonTransformation (user_function , udf_string )
617+ elif mode == "substrait" :
618+ from ibis .expr .types .relations import Table
715619
716- expr = user_function (ibis .table (input_fields , "t" ))
717-
718- transformation = SubstraitTransformation (
719- substrait_plan = compiler .compile (expr ).SerializeToString ()
720- )
721- else :
722- udf_string = dill .source .getsource (user_function )
723- mainify (user_function )
724- if mode == "pandas" :
725- if return_annotation not in (inspect ._empty , pd .DataFrame ):
726- raise TypeError (
727- f"return signature for { user_function } is { return_annotation } but should be pd.DataFrame"
728- )
729- transformation = PandasTransformation (user_function , udf_string )
730- elif mode == "python" :
731- if return_annotation not in (inspect ._empty , Dict [str , Any ]):
732- raise TypeError (
733- f"return signature for { user_function } is { return_annotation } but should be Dict[str, Any]"
734- )
735- transformation = PythonTransformation (user_function , udf_string )
736- elif mode == "substrait" :
737- pass
620+ if return_annotation not in (inspect ._empty , Table ):
621+ raise TypeError (
622+ f"return signature for { user_function } is { return_annotation } but should be ibis.expr.types.relations.Table"
623+ )
624+ transformation = SubstraitTransformation .from_ibis (user_function , sources )
738625
739626 on_demand_feature_view_obj = OnDemandFeatureView (
740627 name = user_function .__name__ ,
0 commit comments