2424from google .cloud .aiplatform import base
2525from google .cloud .aiplatform .compat .types import (
2626 entity_type as gca_entity_type ,
27+ feature_selector as gca_feature_selector ,
2728 featurestore_service as gca_featurestore_service ,
29+ featurestore_online_service as gca_featurestore_online_service ,
2830 io as gca_io ,
2931)
3032from google .cloud .aiplatform import featurestore
33+ from google .cloud .aiplatform import initializer
3134from google .cloud .aiplatform import utils
3235from google .cloud .aiplatform .utils import featurestore_utils
3336
37+
3438_LOGGER = base .Logger (__name__ )
3539_ALL_FEATURE_IDS = "*"
3640
@@ -40,7 +44,6 @@ class EntityType(base.VertexAiResourceNounWithFutureManager):
4044
4145 client_class = utils .FeaturestoreClientWithOverride
4246
43- _is_client_prediction_client = False
4447 _resource_noun = "entityTypes"
4548 _getter_method = "get_entity_type"
4649 _list_method = "list_entity_types"
@@ -114,6 +117,10 @@ def __init__(
114117 else featurestore_id ,
115118 )
116119
120+ self ._featurestore_online_client = self ._instantiate_featurestore_online_client (
121+ location = self .location , credentials = credentials ,
122+ )
123+
117124 @property
118125 def featurestore_name (self ) -> str :
119126 """Full qualified resource name of the managed featurestore in which this EntityType is."""
@@ -157,7 +164,7 @@ def update(
157164 self ,
158165 description : Optional [str ] = None ,
159166 labels : Optional [Dict [str , str ]] = None ,
160- request_metadata : Optional [ Sequence [Tuple [str , str ] ]] = (),
167+ request_metadata : Sequence [Tuple [str , str ]] = (),
161168 ) -> "EntityType" :
162169 """Updates an existing managed entityType resource.
163170
@@ -189,7 +196,7 @@ def update(
189196 System reserved label keys are prefixed with
190197 "aiplatform.googleapis.com/" and are immutable.
191198 request_metadata (Sequence[Tuple[str, str]]):
192- Optional . Strings which should be sent along with the request as metadata.
199+ Required . Strings which should be sent along with the request as metadata.
193200 Returns:
194201 EntityType - The updated entityType resource object.
195202 """
@@ -1138,3 +1145,144 @@ def ingest_from_gcs(
11381145 import_feature_values_request = import_feature_values_request ,
11391146 request_metadata = request_metadata ,
11401147 )
1148+
1149+ @staticmethod
1150+ def _instantiate_featurestore_online_client (
1151+ location : Optional [str ] = None ,
1152+ credentials : Optional [auth_credentials .Credentials ] = None ,
1153+ ) -> utils .FeaturestoreOnlineServingClientWithOverride :
1154+ """Helper method to instantiates featurestore online client.
1155+
1156+ Args:
1157+ location (str): The location of this featurestore.
1158+ credentials (google.auth.credentials.Credentials):
1159+ Optional custom credentials to use when interacting with
1160+ the featurestore online client.
1161+ Returns:
1162+ utils.FeaturestoreOnlineServingClientWithOverride:
1163+ Initialized featurestore online client with optional overrides.
1164+ """
1165+ return initializer .global_config .create_client (
1166+ client_class = utils .FeaturestoreOnlineServingClientWithOverride ,
1167+ credentials = credentials ,
1168+ location_override = location ,
1169+ )
1170+
1171+ def read (
1172+ self ,
1173+ entity_ids : Union [str , List [str ]],
1174+ feature_ids : Union [str , List [str ]] = "*" ,
1175+ request_metadata : Optional [Sequence [Tuple [str , str ]]] = (),
1176+ ) -> "pd.DataFrame" : # noqa: F821 - skip check for undefined name 'pd'
1177+ """Reads feature values for given feature IDs of given entity IDs in this EntityType.
1178+
1179+ Args:
1180+ entity_ids (Union[str, List[str]]):
1181+ Required. ID for a specific entity, or a list of IDs of entities
1182+ to read Feature values of. The maximum number of IDs is 100 if a list.
1183+ feature_ids (Union[str, List[str]]):
1184+ Required. ID for a specific feature, or a list of IDs of Features in the EntityType
1185+ for reading feature values. Default to "*", where value of all features will be read.
1186+ request_metadata (Sequence[Tuple[str, str]]):
1187+ Optional. Strings which should be sent along with the request as metadata.
1188+
1189+ Returns:
1190+ pd.DataFrame: entities' feature values in DataFrame
1191+ """
1192+
1193+ if isinstance (feature_ids , str ):
1194+ feature_ids = [feature_ids ]
1195+
1196+ feature_selector = gca_feature_selector .FeatureSelector (
1197+ id_matcher = gca_feature_selector .IdMatcher (ids = feature_ids )
1198+ )
1199+
1200+ if isinstance (entity_ids , str ):
1201+ read_feature_values_request = gca_featurestore_online_service .ReadFeatureValuesRequest (
1202+ entity_type = self .resource_name ,
1203+ entity_id = entity_ids ,
1204+ feature_selector = feature_selector ,
1205+ )
1206+ read_feature_values_response = self ._featurestore_online_client .read_feature_values (
1207+ request = read_feature_values_request , metadata = request_metadata
1208+ )
1209+ header = read_feature_values_response .header
1210+ entity_views = [read_feature_values_response .entity_view ]
1211+ elif isinstance (entity_ids , list ):
1212+ streaming_read_feature_values_request = gca_featurestore_online_service .StreamingReadFeatureValuesRequest (
1213+ entity_type = self .resource_name ,
1214+ entity_ids = entity_ids ,
1215+ feature_selector = feature_selector ,
1216+ )
1217+ streaming_read_feature_values_responses = [
1218+ response
1219+ for response in self ._featurestore_online_client .streaming_read_feature_values (
1220+ request = streaming_read_feature_values_request ,
1221+ metadata = request_metadata ,
1222+ )
1223+ ]
1224+ header = streaming_read_feature_values_responses [0 ].header
1225+ entity_views = [
1226+ response .entity_view
1227+ for response in streaming_read_feature_values_responses [1 :]
1228+ ]
1229+
1230+ feature_ids = [
1231+ feature_descriptor .id for feature_descriptor in header .feature_descriptors
1232+ ]
1233+
1234+ return EntityType ._construct_dataframe (
1235+ feature_ids = feature_ids , entity_views = entity_views ,
1236+ )
1237+
1238+ @staticmethod
1239+ def _construct_dataframe (
1240+ feature_ids : List [str ],
1241+ entity_views : List [
1242+ gca_featurestore_online_service .ReadFeatureValuesResponse .EntityView
1243+ ],
1244+ ) -> "pd.DataFrame" : # noqa: F821 - skip check for undefined name 'pd'
1245+ """Constructs a dataframe using the header and entity_views
1246+
1247+ Args:
1248+ feature_ids (List[str]):
1249+ Required. A list of feature ids corresponding to the feature values for each entity in entity_views.
1250+ entity_views (List[gca_featurestore_online_service.ReadFeatureValuesResponse.EntityView]):
1251+ Required. A list of Entity views with Feature values.
1252+ For each Entity view, it may be
1253+ the entity in the Featurestore if values for all
1254+ Features were requested, or a projection of the
1255+ entity in the Featurestore if values for only
1256+ some Features were requested.
1257+
1258+ Raises:
1259+ ImportError: If pandas is not installed when using this method.
1260+
1261+ Returns:
1262+ pd.DataFrame - entities feature values in DataFrame
1263+ )
1264+ """
1265+
1266+ try :
1267+ import pandas as pd
1268+ except ImportError :
1269+ raise ImportError (
1270+ f"Pandas is not installed. Please install pandas to use "
1271+ f"{ EntityType ._construct_dataframe .__name__ } "
1272+ )
1273+
1274+ data = []
1275+ for entity_view in entity_views :
1276+ entity_data = {"entity_id" : entity_view .entity_id }
1277+ for feature_id , feature_data in zip (feature_ids , entity_view .data ):
1278+ if feature_data ._pb .HasField ("value" ):
1279+ value_type = feature_data .value ._pb .WhichOneof ("value" )
1280+ feature_value = getattr (feature_data .value , value_type )
1281+ if hasattr (feature_value , "values" ):
1282+ feature_value = feature_value .values
1283+ entity_data [feature_id ] = feature_value
1284+ else :
1285+ entity_data [feature_id ] = None
1286+ data .append (entity_data )
1287+
1288+ return pd .DataFrame (data = data , columns = ["entity_id" ] + feature_ids )
0 commit comments