Skip to content

Commit 5601a09

Browse files
voonhousdavidheryanto
authored andcommitted
Added support to accept local avro files, GCS avro files and GCS wildcard paths (#375)
1 parent 77229eb commit 5601a09

6 files changed

Lines changed: 518 additions & 90 deletions

File tree

sdk/python/feast/client.py

Lines changed: 157 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
15+
import json
1616
import logging
1717
import os
1818
import time
1919
from collections import OrderedDict
2020
from typing import Dict, Union
2121
from typing import List
22+
from urllib.parse import urlparse
2223

24+
import fastavro
2325
import grpc
2426
import pandas as pd
2527
import pyarrow as pa
@@ -38,7 +40,7 @@
3840
from feast.feature_set import FeatureSet, Entity
3941
from feast.job import Job
4042
from feast.loaders.abstract_producer import get_producer
41-
from feast.loaders.file import export_dataframe_to_staging_location
43+
from feast.loaders.file import export_source_to_staging_location
4244
from feast.loaders.ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT
4345
from feast.loaders.ingest import get_feature_row_chunks
4446
from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse
@@ -322,22 +324,28 @@ def list_entities(self) -> Dict[str, Entity]:
322324
return entities_dict
323325

324326
def get_batch_features(
325-
self, feature_ids: List[str], entity_rows: pd.DataFrame
327+
self, feature_ids: List[str], entity_rows: Union[pd.DataFrame, str]
326328
) -> Job:
327329
"""
328330
Retrieves historical features from a Feast Serving deployment.
329331
330332
Args:
331-
feature_ids: List of feature ids that will be returned for each
332-
entity. Each feature id should have the following format
333+
feature_ids (List[str]):
334+
List of feature ids that will be returned for each entity.
335+
Each feature id should have the following format
333336
"feature_set_name:version:feature_name".
334-
entity_rows: Pandas dataframe containing entities and a 'datetime'
335-
column. Each entity in a feature set must be present as a column
336-
in this dataframe. The datetime column must
337+
338+
entity_rows (Union[pd.DataFrame, str]):
339+
Pandas dataframe containing entities and a 'datetime' column.
340+
Each entity in a feature set must be present as a column in this
341+
dataframe. The datetime column must contain timestamps in
342+
datetime64 format.
337343
338344
Returns:
339-
Returns a job object that can be used to monitor retrieval progress
340-
asynchronously, and can be used to materialize the results
345+
feast.job.Job:
346+
Returns a job object that can be used to monitor retrieval
347+
progress asynchronously, and can be used to materialize the
348+
results.
341349
342350
Examples:
343351
>>> from feast import Client
@@ -360,21 +368,11 @@ def get_batch_features(
360368

361369
fs_request = _build_feature_set_request(feature_ids)
362370

363-
# Validate entity rows based on entities in Feast Core
364-
self._validate_entity_rows_for_batch_retrieval(entity_rows, fs_request)
365-
366-
# Remove timezone from datetime column
367-
if isinstance(
368-
entity_rows["datetime"].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype
369-
):
370-
entity_rows["datetime"] = pd.DatetimeIndex(
371-
entity_rows["datetime"]
372-
).tz_localize(None)
373-
374371
# Retrieve serving information to determine store type and
375372
# staging location
376373
serving_info = self._serving_service_stub.GetFeastServingInfo(
377-
GetFeastServingInfoRequest(), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT
374+
GetFeastServingInfoRequest(),
375+
timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT
378376
) # type: GetFeastServingInfoResponse
379377

380378
if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH:
@@ -383,17 +381,50 @@ def get_batch_features(
383381
f"does not support batch retrieval "
384382
)
385383

386-
# Export and upload entity row dataframe to staging location
384+
if isinstance(entity_rows, pd.DataFrame):
385+
# Pandas DataFrame detected
386+
# Validate entity rows to based on entities in Feast Core
387+
self._validate_dataframe_for_batch_retrieval(
388+
entity_rows=entity_rows,
389+
feature_sets_request=fs_request
390+
)
391+
392+
# Remove timezone from datetime column
393+
if isinstance(
394+
entity_rows["datetime"].dtype,
395+
pd.core.dtypes.dtypes.DatetimeTZDtype
396+
):
397+
entity_rows["datetime"] = pd.DatetimeIndex(
398+
entity_rows["datetime"]
399+
).tz_localize(None)
400+
elif isinstance(entity_rows, str):
401+
# String based source
402+
if entity_rows.endswith((".avro", "*")):
403+
# Validate Avro entity rows to based on entities in Feast Core
404+
self._validate_avro_for_batch_retrieval(
405+
source=entity_rows,
406+
feature_sets_request=fs_request
407+
)
408+
else:
409+
raise Exception(
410+
f"Only .avro and wildcard paths are accepted as entity_rows"
411+
)
412+
else:
413+
raise Exception(f"Only pandas.DataFrame and str types are allowed"
414+
f" as entity_rows, but got {type(entity_rows)}.")
415+
416+
# Export and upload entity row DataFrame to staging location
387417
# provided by Feast
388-
staged_file = export_dataframe_to_staging_location(
418+
staged_files = export_source_to_staging_location(
389419
entity_rows, serving_info.job_staging_location
390-
) # type: str
420+
) # type: List[str]
391421

392422
request = GetBatchFeaturesRequest(
393423
feature_sets=fs_request,
394424
dataset_source=DatasetSource(
395425
file_source=DatasetSource.FileSource(
396-
file_uris=[staged_file], data_format=DataFormat.DATA_FORMAT_AVRO
426+
file_uris=staged_files,
427+
data_format=DataFormat.DATA_FORMAT_AVRO
397428
)
398429
),
399430
)
@@ -402,28 +433,107 @@ def get_batch_features(
402433
response = self._serving_service_stub.GetBatchFeatures(request)
403434
return Job(response.job, self._serving_service_stub)
404435

405-
def _validate_entity_rows_for_batch_retrieval(
406-
self, entity_rows, feature_sets_request
436+
def _validate_dataframe_for_batch_retrieval(
437+
self, entity_rows: pd.DataFrame, feature_sets_request
438+
):
439+
"""
440+
Validate whether an the entity rows in a DataFrame contains the correct
441+
information for batch retrieval.
442+
443+
Datetime column must be present in the DataFrame.
444+
445+
Args:
446+
entity_rows (pd.DataFrame):
447+
Pandas DataFrame containing entities and datetime column. Each
448+
entity in a feature set must be present as a column in this
449+
DataFrame.
450+
451+
feature_sets_request:
452+
Feature sets that will be requested.
453+
"""
454+
455+
self._validate_columns(
456+
columns=entity_rows.columns,
457+
feature_sets_request=feature_sets_request,
458+
datetime_field="datetime"
459+
)
460+
461+
def _validate_avro_for_batch_retrieval(
462+
self, source: str, feature_sets_request
407463
):
408464
"""
409-
Validate whether an entity_row dataframe contains the correct
410-
information for batch retrieval
465+
Validate whether the entity rows in an Avro source file contains the
466+
correct information for batch retrieval.
467+
468+
Only gs:// and local files (file://) uri schemes are allowed.
469+
470+
Avro file must have a column named "event_timestamp".
471+
472+
No checks will be done if a GCS path is provided.
411473
412474
Args:
413-
entity_rows: Pandas dataframe containing entities and datetime
414-
column. Each entity in a feature set must be present as a
415-
column in this dataframe.
416-
feature_sets_request: Feature sets that will be requested
475+
source (str):
476+
File path to Avro.
477+
478+
feature_sets_request:
479+
Feature sets that will be requested.
417480
"""
481+
p = urlparse(source)
418482

483+
if p.scheme == "gs":
484+
# GCS path provided (Risk is delegated to user)
485+
# No validation if GCS path is provided
486+
return
487+
elif p.scheme == "file" or not p.scheme:
488+
# Local file (file://) provided
489+
file_path = os.path.abspath(os.path.join(p.netloc, p.path))
490+
else:
491+
raise Exception(f"Unsupported uri scheme provided {p.scheme}, only "
492+
f"local files (file://), and gs:// schemes are "
493+
f"allowed")
494+
495+
with open(file_path, "rb") as f:
496+
reader = fastavro.reader(f)
497+
schema = json.loads(reader.metadata["avro.schema"])
498+
columns = [x["name"] for x in schema["fields"]]
499+
self._validate_columns(
500+
columns=columns,
501+
feature_sets_request=feature_sets_request,
502+
datetime_field="event_timestamp"
503+
)
504+
505+
def _validate_columns(
506+
self, columns: List[str],
507+
feature_sets_request,
508+
datetime_field: str
509+
) -> None:
510+
"""
511+
Check if the required column contains the correct values for batch
512+
retrieval.
513+
514+
Args:
515+
columns (List[str]):
516+
List of columns to validate against feature_sets_request.
517+
518+
feature_sets_request ():
519+
Feature sets that will be requested.
520+
521+
datetime_field (str):
522+
Name of the datetime field that must be enforced and present as
523+
a column in the data source.
524+
525+
Returns:
526+
None:
527+
None
528+
"""
419529
# Ensure datetime column exists
420-
if "datetime" not in entity_rows.columns:
530+
if datetime_field not in columns:
421531
raise ValueError(
422-
f'Entity rows does not contain "datetime" column in columns '
423-
f"{entity_rows.columns}"
532+
f'Entity rows does not contain "{datetime_field}" column in '
533+
f'columns {columns}'
424534
)
425535

426-
# Validate dataframe columns based on feature set entities
536+
# Validate Avro columns based on feature set entities
427537
for feature_set in feature_sets_request:
428538
fs = self.get_feature_set(
429539
name=feature_set.name, version=feature_set.version
@@ -434,10 +544,10 @@ def _validate_entity_rows_for_batch_retrieval(
434544
f"could not be found"
435545
)
436546
for entity_type in fs.entities:
437-
if entity_type.name not in entity_rows.columns:
547+
if entity_type.name not in columns:
438548
raise ValueError(
439-
f'Dataframe does not contain entity "{entity_type.name}"'
440-
f' column in columns "{entity_rows.columns}"'
549+
f'Input does not contain entity'
550+
f' "{entity_type.name}" column in columns "{columns}"'
441551
)
442552

443553
def get_online_features(
@@ -596,7 +706,9 @@ def ingest(
596706
return None
597707

598708

599-
def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest]:
709+
def _build_feature_set_request(
710+
feature_ids: List[str]
711+
) -> List[FeatureSetRequest]:
600712
"""
601713
Builds a list of FeatureSet objects from feature set ids in order to
602714
retrieve feature data from Feast Serving
@@ -629,7 +741,7 @@ def _read_table_from_source(
629741
max_workers: int
630742
) -> str:
631743
"""
632-
Infers a data source type (path or Pandas Dataframe) and reads it in as
744+
Infers a data source type (path or Pandas DataFrame) and reads it in as
633745
a PyArrow Table.
634746
635747
The PyArrow Table that is read will be written to a parquet file with row
@@ -674,7 +786,8 @@ def _read_table_from_source(
674786
else:
675787
table = pq.read_table(file_path)
676788
else:
677-
raise ValueError(f"Unknown data source provided for ingestion: {source}")
789+
raise ValueError(
790+
f"Unknown data source provided for ingestion: {source}")
678791

679792
# Ensure that PyArrow table is initialised
680793
assert isinstance(table, pa.lib.Table)

0 commit comments

Comments
 (0)