Skip to content

Commit c50a9ff

Browse files
authored
feat: Refactor ODFV schema inference (feast-dev#4076)
* refactor odfv scheam inference Signed-off-by: tokoko <[email protected]> * bugfix odfv schema inference Signed-off-by: tokoko <[email protected]> * remove print statement Signed-off-by: tokoko <[email protected]> --------- Signed-off-by: tokoko <[email protected]>
1 parent f3a9c64 commit c50a9ff

5 files changed

Lines changed: 144 additions & 167 deletions

File tree

sdk/python/feast/on_demand_feature_view.py

Lines changed: 52 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,6 @@
3434
from feast.transformation.pandas_transformation import PandasTransformation
3535
from feast.transformation.python_transformation import PythonTransformation
3636
from feast.transformation.substrait_transformation import SubstraitTransformation
37-
from feast.type_map import (
38-
feast_value_type_to_pandas_type,
39-
python_type_to_feast_value_type,
40-
)
4137
from feast.usage import log_exceptions
4238
from feast.value_type import ValueType
4339

@@ -490,69 +486,15 @@ def get_transformed_features(
490486
)
491487

492488
def infer_features(self) -> None:
493-
if self.mode in {"pandas", "substrait"}:
494-
self._infer_features_df()
495-
elif self.mode == "python":
496-
self._infer_features_dict()
497-
else:
498-
raise Exception(
499-
f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".'
500-
)
501-
502-
def _infer_features_dict(self):
503-
"""
504-
Infers the set of features associated to this feature view from the input source.
505-
506-
Raises:
507-
RegistryInferenceFailure: The set of features could not be inferred.
508-
"""
509-
rand_dict_value: Dict[str, Any] = {
510-
"float": [1.0],
511-
"int": [1],
512-
"str": ["hello world"],
513-
"bytes": [str.encode("hello world")],
514-
"bool": [True],
515-
"datetime64[ns]": [datetime.utcnow()],
516-
}
517-
518-
feature_dict = {}
519-
for feature_view_projection in self.source_feature_view_projections.values():
520-
for feature in feature_view_projection.features:
521-
dtype = feast_value_type_to_pandas_type(feature.dtype.to_value_type())
522-
feature_dict[f"{feature_view_projection.name}__{feature.name}"] = (
523-
rand_dict_value[dtype] if dtype in rand_dict_value else [None]
524-
)
525-
feature_dict[f"{feature.name}"] = (
526-
rand_dict_value[dtype] if dtype in rand_dict_value else [None]
527-
)
528-
for request_data in self.source_request_sources.values():
529-
for field in request_data.schema:
530-
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type())
531-
feature_dict[f"{field.name}"] = (
532-
rand_dict_value[dtype] if dtype in rand_dict_value else [None]
533-
)
534-
535-
output_dict: Dict[str, List[Any]] = self.feature_transformation.transform(
536-
feature_dict
489+
inferred_features = self.feature_transformation.infer_features(
490+
self._construct_random_input()
537491
)
538-
inferred_features = []
539-
for f, dt in output_dict.items():
540-
inferred_features.append(
541-
Field(
542-
name=f,
543-
dtype=from_value_type(
544-
python_type_to_feast_value_type(
545-
f, type_name=type(dt[0]).__name__
546-
)
547-
),
548-
)
549-
)
550492

551493
if self.features:
552494
missing_features = []
553-
for specified_features in self.features:
554-
if specified_features not in inferred_features:
555-
missing_features.append(specified_features)
495+
for specified_feature in self.features:
496+
if specified_feature not in inferred_features:
497+
missing_features.append(specified_feature)
556498
if missing_features:
557499
raise SpecifiedFeaturesNotPresentError(
558500
missing_features, inferred_features, self.name
@@ -566,66 +508,42 @@ def _infer_features_dict(self):
566508
f"Could not infer Features for the feature view '{self.name}'.",
567509
)
568510

569-
def _infer_features_df(self) -> None:
570-
"""
571-
Infers the set of features associated to this feature view from the input source.
572-
573-
Raises:
574-
RegistryInferenceFailure: The set of features could not be inferred.
575-
"""
576-
rand_df_value: Dict[str, Any] = {
577-
"float": 1.0,
578-
"int": 1,
579-
"str": "hello world",
580-
"bytes": str.encode("hello world"),
581-
"bool": True,
582-
"datetime64[ns]": datetime.utcnow(),
511+
def _construct_random_input(self) -> Dict[str, List[Any]]:
512+
rand_dict_value: Dict[ValueType, List[Any]] = {
513+
ValueType.BYTES: [str.encode("hello world")],
514+
ValueType.STRING: ["hello world"],
515+
ValueType.INT32: [1],
516+
ValueType.INT64: [1],
517+
ValueType.DOUBLE: [1.0],
518+
ValueType.FLOAT: [1.0],
519+
ValueType.BOOL: [True],
520+
ValueType.UNIX_TIMESTAMP: [datetime.utcnow()],
521+
ValueType.BYTES_LIST: [[str.encode("hello world")]],
522+
ValueType.STRING_LIST: [["hello world"]],
523+
ValueType.INT32_LIST: [[1]],
524+
ValueType.INT64_LIST: [[1]],
525+
ValueType.DOUBLE_LIST: [[1.0]],
526+
ValueType.FLOAT_LIST: [[1.0]],
527+
ValueType.BOOL_LIST: [[True]],
528+
ValueType.UNIX_TIMESTAMP_LIST: [[datetime.utcnow()]],
583529
}
584530

585-
df = pd.DataFrame()
531+
feature_dict = {}
586532
for feature_view_projection in self.source_feature_view_projections.values():
587533
for feature in feature_view_projection.features:
588-
dtype = feast_value_type_to_pandas_type(feature.dtype.to_value_type())
589-
df[f"{feature_view_projection.name}__{feature.name}"] = pd.Series(
590-
dtype=dtype
534+
feature_dict[f"{feature_view_projection.name}__{feature.name}"] = (
535+
rand_dict_value.get(feature.dtype.to_value_type(), [None])
536+
)
537+
feature_dict[f"{feature.name}"] = rand_dict_value.get(
538+
feature.dtype.to_value_type(), [None]
591539
)
592-
sample_val = rand_df_value[dtype] if dtype in rand_df_value else None
593-
df[f"{feature.name}"] = pd.Series(data=sample_val, dtype=dtype)
594540
for request_data in self.source_request_sources.values():
595541
for field in request_data.schema:
596-
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type())
597-
sample_val = rand_df_value[dtype] if dtype in rand_df_value else None
598-
df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype)
599-
600-
output_df: pd.DataFrame = self.feature_transformation.transform(df)
601-
inferred_features = []
602-
for f, dt in zip(output_df.columns, output_df.dtypes):
603-
inferred_features.append(
604-
Field(
605-
name=f,
606-
dtype=from_value_type(
607-
python_type_to_feast_value_type(f, type_name=str(dt))
608-
),
542+
feature_dict[f"{field.name}"] = rand_dict_value.get(
543+
field.dtype.to_value_type(), [None]
609544
)
610-
)
611545

612-
if self.features:
613-
missing_features = []
614-
for specified_features in self.features:
615-
if specified_features not in inferred_features:
616-
missing_features.append(specified_features)
617-
if missing_features:
618-
raise SpecifiedFeaturesNotPresentError(
619-
missing_features, inferred_features, self.name
620-
)
621-
else:
622-
self.features = inferred_features
623-
624-
if not self.features:
625-
raise RegistryInferenceFailure(
626-
"OnDemandFeatureView",
627-
f"Could not infer Features for the feature view '{self.name}'.",
628-
)
546+
return feature_dict
629547

630548
@staticmethod
631549
def get_requested_odfvs(
@@ -682,59 +600,28 @@ def mainify(obj) -> None:
682600

683601
def decorator(user_function):
684602
return_annotation = inspect.signature(user_function).return_annotation
685-
if (
686-
return_annotation
687-
and return_annotation.__module__ == "ibis.expr.types.relations"
688-
and return_annotation.__name__ == "Table"
689-
):
690-
import ibis
691-
import ibis.expr.datatypes as dt
692-
from ibis_substrait.compiler.core import SubstraitCompiler
693-
694-
compiler = SubstraitCompiler()
695-
696-
input_fields: Field = []
697-
698-
for s in sources:
699-
if isinstance(s, FeatureView):
700-
fields = s.projection.features
701-
else:
702-
fields = s.features
703-
704-
input_fields.extend(
705-
[
706-
(
707-
f.name,
708-
dt.dtype(
709-
feast_value_type_to_pandas_type(f.dtype.to_value_type())
710-
),
711-
)
712-
for f in fields
713-
]
603+
udf_string = dill.source.getsource(user_function)
604+
mainify(user_function)
605+
if mode == "pandas":
606+
if return_annotation not in (inspect._empty, pd.DataFrame):
607+
raise TypeError(
608+
f"return signature for {user_function} is {return_annotation} but should be pd.DataFrame"
714609
)
610+
transformation = PandasTransformation(user_function, udf_string)
611+
elif mode == "python":
612+
if return_annotation not in (inspect._empty, Dict[str, Any]):
613+
raise TypeError(
614+
f"return signature for {user_function} is {return_annotation} but should be Dict[str, Any]"
615+
)
616+
transformation = PythonTransformation(user_function, udf_string)
617+
elif mode == "substrait":
618+
from ibis.expr.types.relations import Table
715619

716-
expr = user_function(ibis.table(input_fields, "t"))
717-
718-
transformation = SubstraitTransformation(
719-
substrait_plan=compiler.compile(expr).SerializeToString()
720-
)
721-
else:
722-
udf_string = dill.source.getsource(user_function)
723-
mainify(user_function)
724-
if mode == "pandas":
725-
if return_annotation not in (inspect._empty, pd.DataFrame):
726-
raise TypeError(
727-
f"return signature for {user_function} is {return_annotation} but should be pd.DataFrame"
728-
)
729-
transformation = PandasTransformation(user_function, udf_string)
730-
elif mode == "python":
731-
if return_annotation not in (inspect._empty, Dict[str, Any]):
732-
raise TypeError(
733-
f"return signature for {user_function} is {return_annotation} but should be Dict[str, Any]"
734-
)
735-
transformation = PythonTransformation(user_function, udf_string)
736-
elif mode == "substrait":
737-
pass
620+
if return_annotation not in (inspect._empty, Table):
621+
raise TypeError(
622+
f"return signature for {user_function} is {return_annotation} but should be ibis.expr.types.relations.Table"
623+
)
624+
transformation = SubstraitTransformation.from_ibis(user_function, sources)
738625

739626
on_demand_feature_view_obj = OnDemandFeatureView(
740627
name=user_function.__name__,

sdk/python/feast/transformation/pandas_transformation.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
from types import FunctionType
2+
from typing import Any, Dict, List
23

34
import dill
45
import pandas as pd
56

7+
from feast.field import Field, from_value_type
68
from feast.protos.feast.core.Transformation_pb2 import (
79
UserDefinedFunctionV2 as UserDefinedFunctionProto,
810
)
11+
from feast.type_map import (
12+
python_type_to_feast_value_type,
13+
)
914

1015

1116
class PandasTransformation:
@@ -33,6 +38,21 @@ def transform(self, input_df: pd.DataFrame) -> pd.DataFrame:
3338
)
3439
return output_df
3540

41+
def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
42+
df = pd.DataFrame.from_dict(random_input)
43+
44+
output_df: pd.DataFrame = self.transform(df)
45+
46+
return [
47+
Field(
48+
name=f,
49+
dtype=from_value_type(
50+
python_type_to_feast_value_type(f, type_name=str(dt))
51+
),
52+
)
53+
for f, dt in zip(output_df.columns, output_df.dtypes)
54+
]
55+
3656
def __eq__(self, other):
3757
if not isinstance(other, PandasTransformation):
3858
raise TypeError(

sdk/python/feast/transformation/python_transformation.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
from types import FunctionType
2-
from typing import Dict
2+
from typing import Any, Dict, List
33

44
import dill
55

6+
from feast.field import Field, from_value_type
67
from feast.protos.feast.core.Transformation_pb2 import (
78
UserDefinedFunctionV2 as UserDefinedFunctionProto,
89
)
10+
from feast.type_map import (
11+
python_type_to_feast_value_type,
12+
)
913

1014

1115
class PythonTransformation:
@@ -33,6 +37,19 @@ def transform(self, input_dict: Dict) -> Dict:
3337
)
3438
return {**input_dict, **output_dict}
3539

40+
def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
41+
output_dict: Dict[str, List[Any]] = self.transform(random_input)
42+
43+
return [
44+
Field(
45+
name=f,
46+
dtype=from_value_type(
47+
python_type_to_feast_value_type(f, type_name=type(dt[0]).__name__)
48+
),
49+
)
50+
for f, dt in output_dict.items()
51+
]
52+
3653
def __eq__(self, other):
3754
if not isinstance(other, PythonTransformation):
3855
raise TypeError(

0 commit comments

Comments
 (0)