1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15-
15+ import json
1616import logging
1717import os
1818import time
1919from collections import OrderedDict
2020from typing import Dict , Union
2121from typing import List
22+ from urllib .parse import urlparse
2223
24+ import fastavro
2325import grpc
2426import pandas as pd
2527import pyarrow as pa
3840from feast .feature_set import FeatureSet , Entity
3941from feast .job import Job
4042from feast .loaders .abstract_producer import get_producer
41- from feast .loaders .file import export_dataframe_to_staging_location
43+ from feast .loaders .file import export_source_to_staging_location
4244from feast .loaders .ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT
4345from feast .loaders .ingest import get_feature_row_chunks
4446from feast .serving .ServingService_pb2 import GetFeastServingInfoResponse
@@ -322,22 +324,28 @@ def list_entities(self) -> Dict[str, Entity]:
322324 return entities_dict
323325
324326 def get_batch_features (
325- self , feature_ids : List [str ], entity_rows : pd .DataFrame
327+ self , feature_ids : List [str ], entity_rows : Union [ pd .DataFrame , str ]
326328 ) -> Job :
327329 """
328330 Retrieves historical features from a Feast Serving deployment.
329331
330332 Args:
331- feature_ids: List of feature ids that will be returned for each
332- entity. Each feature id should have the following format
333+ feature_ids (List[str]):
334+ List of feature ids that will be returned for each entity.
335+ Each feature id should have the following format
333336 "feature_set_name:version:feature_name".
334- entity_rows: Pandas dataframe containing entities and a 'datetime'
335- column. Each entity in a feature set must be present as a column
336- in this dataframe. The datetime column must
337+
338+ entity_rows (Union[pd.DataFrame, str]):
339+ Pandas dataframe containing entities and a 'datetime' column.
340+ Each entity in a feature set must be present as a column in this
341+ dataframe. The datetime column must contain timestamps in
342+ datetime64 format.
337343
338344 Returns:
339- Returns a job object that can be used to monitor retrieval progress
340- asynchronously, and can be used to materialize the results
345+ feast.job.Job:
346+ Returns a job object that can be used to monitor retrieval
347+ progress asynchronously, and can be used to materialize the
348+ results.
341349
342350 Examples:
343351 >>> from feast import Client
@@ -360,21 +368,11 @@ def get_batch_features(
360368
361369 fs_request = _build_feature_set_request (feature_ids )
362370
363- # Validate entity rows based on entities in Feast Core
364- self ._validate_entity_rows_for_batch_retrieval (entity_rows , fs_request )
365-
366- # Remove timezone from datetime column
367- if isinstance (
368- entity_rows ["datetime" ].dtype , pd .core .dtypes .dtypes .DatetimeTZDtype
369- ):
370- entity_rows ["datetime" ] = pd .DatetimeIndex (
371- entity_rows ["datetime" ]
372- ).tz_localize (None )
373-
374371 # Retrieve serving information to determine store type and
375372 # staging location
376373 serving_info = self ._serving_service_stub .GetFeastServingInfo (
377- GetFeastServingInfoRequest (), timeout = GRPC_CONNECTION_TIMEOUT_DEFAULT
374+ GetFeastServingInfoRequest (),
375+ timeout = GRPC_CONNECTION_TIMEOUT_DEFAULT
378376 ) # type: GetFeastServingInfoResponse
379377
380378 if serving_info .type != FeastServingType .FEAST_SERVING_TYPE_BATCH :
@@ -383,17 +381,50 @@ def get_batch_features(
383381 f"does not support batch retrieval "
384382 )
385383
386- # Export and upload entity row dataframe to staging location
384+ if isinstance (entity_rows , pd .DataFrame ):
385+ # Pandas DataFrame detected
386+ # Validate entity rows to based on entities in Feast Core
387+ self ._validate_dataframe_for_batch_retrieval (
388+ entity_rows = entity_rows ,
389+ feature_sets_request = fs_request
390+ )
391+
392+ # Remove timezone from datetime column
393+ if isinstance (
394+ entity_rows ["datetime" ].dtype ,
395+ pd .core .dtypes .dtypes .DatetimeTZDtype
396+ ):
397+ entity_rows ["datetime" ] = pd .DatetimeIndex (
398+ entity_rows ["datetime" ]
399+ ).tz_localize (None )
400+ elif isinstance (entity_rows , str ):
401+ # String based source
402+ if entity_rows .endswith ((".avro" , "*" )):
403+ # Validate Avro entity rows to based on entities in Feast Core
404+ self ._validate_avro_for_batch_retrieval (
405+ source = entity_rows ,
406+ feature_sets_request = fs_request
407+ )
408+ else :
409+ raise Exception (
410+ f"Only .avro and wildcard paths are accepted as entity_rows"
411+ )
412+ else :
413+ raise Exception (f"Only pandas.DataFrame and str types are allowed"
414+ f" as entity_rows, but got { type (entity_rows )} ." )
415+
416+ # Export and upload entity row DataFrame to staging location
387417 # provided by Feast
388- staged_file = export_dataframe_to_staging_location (
418+ staged_files = export_source_to_staging_location (
389419 entity_rows , serving_info .job_staging_location
390- ) # type: str
420+ ) # type: List[ str]
391421
392422 request = GetBatchFeaturesRequest (
393423 feature_sets = fs_request ,
394424 dataset_source = DatasetSource (
395425 file_source = DatasetSource .FileSource (
396- file_uris = [staged_file ], data_format = DataFormat .DATA_FORMAT_AVRO
426+ file_uris = staged_files ,
427+ data_format = DataFormat .DATA_FORMAT_AVRO
397428 )
398429 ),
399430 )
@@ -402,28 +433,107 @@ def get_batch_features(
402433 response = self ._serving_service_stub .GetBatchFeatures (request )
403434 return Job (response .job , self ._serving_service_stub )
404435
405- def _validate_entity_rows_for_batch_retrieval (
406- self , entity_rows , feature_sets_request
436+ def _validate_dataframe_for_batch_retrieval (
437+ self , entity_rows : pd .DataFrame , feature_sets_request
438+ ):
439+ """
440+ Validate whether an the entity rows in a DataFrame contains the correct
441+ information for batch retrieval.
442+
443+ Datetime column must be present in the DataFrame.
444+
445+ Args:
446+ entity_rows (pd.DataFrame):
447+ Pandas DataFrame containing entities and datetime column. Each
448+ entity in a feature set must be present as a column in this
449+ DataFrame.
450+
451+ feature_sets_request:
452+ Feature sets that will be requested.
453+ """
454+
455+ self ._validate_columns (
456+ columns = entity_rows .columns ,
457+ feature_sets_request = feature_sets_request ,
458+ datetime_field = "datetime"
459+ )
460+
461+ def _validate_avro_for_batch_retrieval (
462+ self , source : str , feature_sets_request
407463 ):
408464 """
409- Validate whether an entity_row dataframe contains the correct
410- information for batch retrieval
465+ Validate whether the entity rows in an Avro source file contains the
466+ correct information for batch retrieval.
467+
468+ Only gs:// and local files (file://) uri schemes are allowed.
469+
470+ Avro file must have a column named "event_timestamp".
471+
472+ No checks will be done if a GCS path is provided.
411473
412474 Args:
413- entity_rows: Pandas dataframe containing entities and datetime
414- column. Each entity in a feature set must be present as a
415- column in this dataframe.
416- feature_sets_request: Feature sets that will be requested
475+ source (str):
476+ File path to Avro.
477+
478+ feature_sets_request:
479+ Feature sets that will be requested.
417480 """
481+ p = urlparse (source )
418482
483+ if p .scheme == "gs" :
484+ # GCS path provided (Risk is delegated to user)
485+ # No validation if GCS path is provided
486+ return
487+ elif p .scheme == "file" or not p .scheme :
488+ # Local file (file://) provided
489+ file_path = os .path .abspath (os .path .join (p .netloc , p .path ))
490+ else :
491+ raise Exception (f"Unsupported uri scheme provided { p .scheme } , only "
492+ f"local files (file://), and gs:// schemes are "
493+ f"allowed" )
494+
495+ with open (file_path , "rb" ) as f :
496+ reader = fastavro .reader (f )
497+ schema = json .loads (reader .metadata ["avro.schema" ])
498+ columns = [x ["name" ] for x in schema ["fields" ]]
499+ self ._validate_columns (
500+ columns = columns ,
501+ feature_sets_request = feature_sets_request ,
502+ datetime_field = "event_timestamp"
503+ )
504+
505+ def _validate_columns (
506+ self , columns : List [str ],
507+ feature_sets_request ,
508+ datetime_field : str
509+ ) -> None :
510+ """
511+ Check if the required column contains the correct values for batch
512+ retrieval.
513+
514+ Args:
515+ columns (List[str]):
516+ List of columns to validate against feature_sets_request.
517+
518+ feature_sets_request ():
519+ Feature sets that will be requested.
520+
521+ datetime_field (str):
522+ Name of the datetime field that must be enforced and present as
523+ a column in the data source.
524+
525+ Returns:
526+ None:
527+ None
528+ """
419529 # Ensure datetime column exists
420- if "datetime" not in entity_rows . columns :
530+ if datetime_field not in columns :
421531 raise ValueError (
422- f'Entity rows does not contain "datetime " column in columns '
423- f" { entity_rows . columns } "
532+ f'Entity rows does not contain "{ datetime_field } " column in '
533+ f'columns { columns } '
424534 )
425535
426- # Validate dataframe columns based on feature set entities
536+ # Validate Avro columns based on feature set entities
427537 for feature_set in feature_sets_request :
428538 fs = self .get_feature_set (
429539 name = feature_set .name , version = feature_set .version
@@ -434,10 +544,10 @@ def _validate_entity_rows_for_batch_retrieval(
434544 f"could not be found"
435545 )
436546 for entity_type in fs .entities :
437- if entity_type .name not in entity_rows . columns :
547+ if entity_type .name not in columns :
438548 raise ValueError (
439- f'Dataframe does not contain entity " { entity_type . name } " '
440- f' column in columns "{ entity_rows . columns } "'
549+ f'Input does not contain entity'
550+ f' " { entity_type . name } " column in columns "{ columns } "'
441551 )
442552
443553 def get_online_features (
@@ -596,7 +706,9 @@ def ingest(
596706 return None
597707
598708
599- def _build_feature_set_request (feature_ids : List [str ]) -> List [FeatureSetRequest ]:
709+ def _build_feature_set_request (
710+ feature_ids : List [str ]
711+ ) -> List [FeatureSetRequest ]:
600712 """
601713 Builds a list of FeatureSet objects from feature set ids in order to
602714 retrieve feature data from Feast Serving
@@ -629,7 +741,7 @@ def _read_table_from_source(
629741 max_workers : int
630742) -> str :
631743 """
632- Infers a data source type (path or Pandas Dataframe ) and reads it in as
744+ Infers a data source type (path or Pandas DataFrame ) and reads it in as
633745 a PyArrow Table.
634746
635747 The PyArrow Table that is read will be written to a parquet file with row
@@ -674,7 +786,8 @@ def _read_table_from_source(
674786 else :
675787 table = pq .read_table (file_path )
676788 else :
677- raise ValueError (f"Unknown data source provided for ingestion: { source } " )
789+ raise ValueError (
790+ f"Unknown data source provided for ingestion: { source } " )
678791
679792 # Ensure that PyArrow table is initialised
680793 assert isinstance (table , pa .lib .Table )
0 commit comments