Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
d044588
fix: Fixing broken links to feast documentation on java readme and co…
breno-costa Jun 30, 2022
80ea7a9
fix: Change numpy version on setup.py and upgrade it to resolve depen…
breno-costa Jun 30, 2022
8e2a375
chore: Verification workflow in build wheels that ensures the wheels …
kevjumba Jun 30, 2022
86e9efd
fix: Fix grpc and update protobuf (#2894)
kevjumba Jun 30, 2022
056cfa1
feat: Support retrieval from multiple feature views with different jo…
yongheng Jun 30, 2022
51df8be
fix: Bump version of Guava to mitigate cve (#2896)
achals Jun 30, 2022
8abc2ef
feat: Add column reordering to `write_to_offline_store` (#2876)
felixwang9817 Jun 30, 2022
9b97fca
feat: Add pages for individual Features to the Feast UI (#2850)
kindalime Jul 1, 2022
0ceb39c
fix snowflake testing (#2903)
sfc-gh-madkins Jul 2, 2022
f758f9e
feat: Add snowflake online store (#2902)
sfc-gh-madkins Jul 3, 2022
8828240
chore: Change pytest fixtures to be function-scoped instead of sessio…
felixwang9817 Jul 5, 2022
0ec7d1a
fix: Change the feature store plan method to public modifier (#2904)
breno-costa Jul 5, 2022
0159f38
feat: Add custom JSON table tab w/ formatting (#2851)
kindalime Jul 5, 2022
eaf4022
docs: Grammatically updated the quickstart guide docs (#2913)
prasadzende Jul 6, 2022
38b28ca
feat: Add interfaces for batch materialization engine (#2901)
achals Jul 6, 2022
38fd001
fix: Revert "feat: Add snowflake online store (#2902)" (#2909)
kevjumba Jul 6, 2022
adf3212
chore: Refactor StreamFeatureViewMeta to FeatureViewMeta and dedupe (…
achals Jul 6, 2022
dcd8ec9
chore: Clean up push source tests (#2912)
felixwang9817 Jul 6, 2022
109ee9c
feat: Add to_remote_storage method to RetrievalJob (#2916)
achals Jul 6, 2022
66038c7
chore: Implement to_remote_storage for supported offline stores (#2918)
achals Jul 7, 2022
495f5f0
chore: Update Feast UI dependency to reflect recent changes (#2924)
adchia Jul 8, 2022
9ae22a1
chore: Bump moment from 2.29.2 to 2.29.4 in /ui (#2926)
dependabot[bot] Jul 8, 2022
130746e
fix: Fix the go build and use CgoArrowAllocator to prevent incorrect …
kevjumba Jul 8, 2022
292adc2
ci: Fixing local integration tests, defaulting to test containers (#2…
adchia Jul 11, 2022
bdeb4ae
fix: Fix build wheels workflow to install apache-arrow correctly (#2932)
kevjumba Jul 11, 2022
9fc81a2
chore: Update docs with new release workflow (#2898)
kevjumba Jul 12, 2022
4394696
ci: Add a nightly CI job for integration tests (#2652)
achals Jul 12, 2022
16ae902
fix: Resolve small typo in README file (#2930)
sudohainguyen Jul 12, 2022
040c910
fix: Deprecate 3.7 wheels and fix verification workflow (#2934)
kevjumba Jul 12, 2022
b917540
fix: Fix night ci syntax error and update readme (#2935)
kevjumba Jul 12, 2022
054446c
chore(deps): Bump moment from 2.29.3 to 2.29.4 in /sdk/python/feast/u…
dependabot[bot] Jul 12, 2022
6f79069
feat: Add an experimental lambda-based materialization engine (#2923)
achals Jul 14, 2022
1603c9e
fix: Fix nightly ci again (#2939)
kevjumba Jul 14, 2022
268f28d
chore: Fixes and Readme for python<>go interface (#2936)
achals Jul 15, 2022
ba2dcf1
fix: Update gopy to point to fork to resolve github annotation errors…
kevjumba Jul 15, 2022
d25df83
chore: More automated upgrades in repo definitions (#2941)
achals Jul 15, 2022
476fccd
chore(deps): Bump aws-java-sdk-s3 from 1.12.110 to 1.12.261 in /java/…
dependabot[bot] Jul 16, 2022
d593351
chore: Add project metadata to registry (#2938)
adchia Jul 18, 2022
d3868c5
chore: Upgrade GCP dependencies (#2945)
chhabrakadabra Jul 19, 2022
b69eadc
ci: Remove code coverage for now to keep from blocking other prs (#2950)
kevjumba Jul 19, 2022
3e3489c
chore: Remove UI from quickstart colab (#2951)
adchia Jul 19, 2022
92785b8
chore: Widen dependencies (#2928)
chhabrakadabra Jul 19, 2022
d0d27a3
fix: Version entity serialization mechanism and fix issue with int64 …
achals Jul 20, 2022
6d7b38a
docs: Include docs updates for release process. Clean up old docs (#2…
adchia Jul 20, 2022
a965af9
docs: Add docs for batch materialization engine (#2959)
achals Jul 20, 2022
46d11bc
chore(deps): Bump terser from 5.10.0 to 5.14.2 in /ui (#2961)
dependabot[bot] Jul 20, 2022
8534f69
fix: Fix typo in CONTRIBUTING.md (#2955)
jeongukjae Jul 20, 2022
23c09c8
feat: Add CustomSourceOptions to SavedDatasetStorage (#2958)
wdhorton Jul 20, 2022
5e45228
fix: Add dummy alias to pull_all_from_table_or_query (#2956)
ikrizanic Jul 21, 2022
661c053
fix: Do not allow same column to be reused in data sources (#2965)
felixwang9817 Jul 21, 2022
ffab04c
chore(deps): Bump terser from 5.13.1 to 5.14.2 in /sdk/python/feast/u…
dependabot[bot] Jul 21, 2022
a36a695
feat: Add Go option to `feast serve` command (#2966)
felixwang9817 Jul 21, 2022
a233d3f
chore: Fix test asserts for offline store write and improve some erro…
achals Jul 22, 2022
aa2a86a
chore: Add separate `go_feature_serving` flag (#2968)
felixwang9817 Jul 22, 2022
1479519
chore: Update docs for offline and online stores (#2946)
kevjumba Jul 22, 2022
a15fcb4
docs: Fix docs for Go feature retrieval (#2967)
felixwang9817 Jul 22, 2022
ac55ce2
fix: Fixing Spark min / max entity df event timestamps range return o…
levpickis Jul 25, 2022
5ae2a34
chore: Update helm chart name (#2969)
felixwang9817 Jul 25, 2022
e4507ac
fix: Remove hard-coded integration test setup for AWS & GCP (#2970)
kevjumba Jul 26, 2022
3ce5139
fix: Spark source support table with pattern "db.table" (#2606)
kfiring Jul 26, 2022
26f6b69
fix: Fix file offline store logic for feature views without ttl (#2971)
felixwang9817 Jul 26, 2022
5edf4b0
fix: Switch mysql log string to use regex (#2976)
felixwang9817 Jul 27, 2022
f2696e0
ci: Fix pip install issues from grpcio version mismatches (#2984)
adchia Jul 29, 2022
2680f7b
fix: Refactor testing and sort out unit and integration tests (#2975)
kevjumba Jul 29, 2022
61a194c
docs: Update intro documentation page (#2982)
adchia Jul 29, 2022
2ef71fc
feat: Add Snowflake online store (again) (#2922)
sfc-gh-madkins Jul 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Add column reordering to write_to_offline_store (#2876)
* Add feature extraction logic to batch writer

Signed-off-by: Felix Wang <[email protected]>

* Enable StreamProcessor to write to both online and offline stores

Signed-off-by: Felix Wang <[email protected]>

* Fix incorrect columns error message

Signed-off-by: Felix Wang <[email protected]>

* Reorder columns in _write_to_offline_store

Signed-off-by: Felix Wang <[email protected]>

* Make _write_to_offline_store a public method

Signed-off-by: Felix Wang <[email protected]>

* Import FeatureStore correctly

Signed-off-by: Felix Wang <[email protected]>

* Remove defaults for `processing_time` and `query_timeout`

Signed-off-by: Felix Wang <[email protected]>

* Clean up `test_offline_write.py`

Signed-off-by: Felix Wang <[email protected]>

* Do not do any custom logic for double underscore columns

Signed-off-by: Felix Wang <[email protected]>

* Lint

Signed-off-by: Felix Wang <[email protected]>

* Switch entity values for all tests using push sources to not affect other tests

Signed-off-by: Felix Wang <[email protected]>
  • Loading branch information
felixwang9817 authored Jun 30, 2022
commit 8abc2ef76d461b6b4bbd97e2dfdf29c1c335cb80
26 changes: 22 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,7 @@ def push(
fv.name, df, allow_registry_cache=allow_registry_cache
)
if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE:
self._write_to_offline_store(
self.write_to_offline_store(
fv.name, df, allow_registry_cache=allow_registry_cache
)

Expand Down Expand Up @@ -1415,14 +1415,18 @@ def write_to_online_store(
provider.ingest_df(feature_view, entities, df)

@log_exceptions_and_usage
def _write_to_offline_store(
def write_to_offline_store(
self,
feature_view_name: str,
df: pd.DataFrame,
allow_registry_cache: bool = True,
reorder_columns: bool = True,
):
"""
ingests data directly into the Online store
Persists the dataframe directly into the batch data source for the given feature view.

Fails if the dataframe columns do not match the columns of the batch data source. Optionally
reorders the columns of the dataframe to match.
"""
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
try:
Expand All @@ -1433,7 +1437,21 @@ def _write_to_offline_store(
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
df.reset_index(drop=True)

# Get columns of the batch source and the input dataframe.
column_names_and_types = feature_view.batch_source.get_table_column_names_and_types(
self.config
)
source_columns = [column for column, _ in column_names_and_types]
input_columns = df.columns.values.tolist()

if set(input_columns) != set(source_columns):
raise ValueError(
f"The input dataframe has columns {set(input_columns)} but the batch source has columns {set(source_columns)}."
)

if reorder_columns:
df = df.reindex(columns=source_columns)

table = pa.Table.from_pandas(df)
provider = self._get_provider()
Expand Down
57 changes: 41 additions & 16 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from types import MethodType
from typing import List
from typing import List, Optional

import pandas as pd
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import col, from_json

from feast.data_format import AvroFormat, JsonFormat
from feast.data_source import KafkaSource
from feast.data_source import KafkaSource, PushMode
from feast.feature_store import FeatureStore
from feast.infra.contrib.stream_processor import (
ProcessorConfig,
StreamProcessor,
Expand All @@ -24,16 +26,16 @@ class SparkProcessorConfig(ProcessorConfig):
class SparkKafkaProcessor(StreamProcessor):
spark: SparkSession
format: str
write_function: MethodType
preprocess_fn: Optional[MethodType]
join_keys: List[str]

def __init__(
self,
*,
fs: FeatureStore,
sfv: StreamFeatureView,
config: ProcessorConfig,
write_function: MethodType,
processing_time: str = "30 seconds",
query_timeout: int = 15,
preprocess_fn: Optional[MethodType] = None,
):
if not isinstance(sfv.stream_source, KafkaSource):
raise ValueError("data source is not kafka source")
Expand All @@ -55,15 +57,16 @@ def __init__(
if not isinstance(config, SparkProcessorConfig):
raise ValueError("config is not spark processor config")
self.spark = config.spark_session
self.write_function = write_function
self.processing_time = processing_time
self.query_timeout = query_timeout
super().__init__(sfv=sfv, data_source=sfv.stream_source)
self.preprocess_fn = preprocess_fn
self.processing_time = config.processing_time
self.query_timeout = config.query_timeout
self.join_keys = [fs.get_entity(entity).join_key for entity in sfv.entities]
super().__init__(fs=fs, sfv=sfv, data_source=sfv.stream_source)

def ingest_stream_feature_view(self) -> None:
def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
ingested_stream_df = self._ingest_stream_data()
transformed_df = self._construct_transformation_plan(ingested_stream_df)
online_store_query = self._write_to_online_store(transformed_df)
online_store_query = self._write_stream_data(transformed_df, to)
return online_store_query

def _ingest_stream_data(self) -> StreamTable:
Expand Down Expand Up @@ -119,13 +122,35 @@ def _ingest_stream_data(self) -> StreamTable:
def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
return self.sfv.udf.__call__(df) if self.sfv.udf else df

def _write_to_online_store(self, df: StreamTable):
def _write_stream_data(self, df: StreamTable, to: PushMode):
# Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema.
def batch_write(row: DataFrame, batch_id: int):
pd_row = row.toPandas()
self.write_function(
pd_row, input_timestamp="event_timestamp", output_timestamp=""
rows: pd.DataFrame = row.toPandas()

# Extract the latest feature values for each unique entity row (i.e. the join keys).
# Also add a 'created' column.
rows = (
rows.sort_values(
by=self.join_keys + [self.sfv.timestamp_field], ascending=True
)
.groupby(self.join_keys)
.nth(0)
)
rows["created"] = pd.to_datetime("now", utc=True)

# Reset indices to ensure the dataframe has all the required columns.
rows = rows.reset_index()

# Optionally execute preprocessor before writing to the online store.
if self.preprocess_fn:
rows = self.preprocess_fn(rows)

# Finally persist the data to the online store and/or offline store.
if rows.size > 0:
if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE:
self.fs.write_to_online_store(self.sfv.name, rows)
if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE:
self.fs.write_to_offline_store(self.sfv.name, rows)

query = (
df.writeStream.outputMode("update")
Expand Down
38 changes: 26 additions & 12 deletions sdk/python/feast/infra/contrib/stream_processor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from abc import ABC
from typing import Callable
from types import MethodType
from typing import TYPE_CHECKING, Optional

import pandas as pd
from pyspark.sql import DataFrame

from feast.data_source import DataSource
from feast.data_source import DataSource, PushMode
from feast.importer import import_class
from feast.repo_config import FeastConfigBaseModel
from feast.stream_feature_view import StreamFeatureView

if TYPE_CHECKING:
from feast.feature_store import FeatureStore

STREAM_PROCESSOR_CLASS_FOR_TYPE = {
("spark", "kafka"): "feast.infra.contrib.spark_kafka_processor.SparkKafkaProcessor",
}
Expand All @@ -30,21 +33,26 @@ class StreamProcessor(ABC):
and persist that data to the online store.

Attributes:
fs: The feature store where data should be persisted.
sfv: The stream feature view on which the stream processor operates.
data_source: The stream data source from which data will be ingested.
"""

fs: "FeatureStore"
sfv: StreamFeatureView
data_source: DataSource

def __init__(self, sfv: StreamFeatureView, data_source: DataSource):
def __init__(
self, fs: "FeatureStore", sfv: StreamFeatureView, data_source: DataSource
):
self.fs = fs
self.sfv = sfv
self.data_source = data_source

def ingest_stream_feature_view(self) -> None:
def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
"""
Ingests data from the stream source attached to the stream feature view; transforms the data
and then persists it to the online store.
and then persists it to the online store and/or offline store, depending on the 'to' parameter.
"""
pass

Expand All @@ -62,26 +70,32 @@ def _construct_transformation_plan(self, table: StreamTable) -> StreamTable:
"""
pass

def _write_to_online_store(self, table: StreamTable) -> None:
def _write_stream_data(self, table: StreamTable, to: PushMode) -> None:
"""
Returns query for persisting data to the online store.
Launches a job to persist stream data to the online store and/or offline store, depending
on the 'to' parameter, and returns a handle for the job.
"""
pass


def get_stream_processor_object(
config: ProcessorConfig,
fs: "FeatureStore",
sfv: StreamFeatureView,
write_function: Callable[[pd.DataFrame, str, str], None],
preprocess_fn: Optional[MethodType] = None,
):
"""
Returns a stream processor object based on the config mode and stream source type. The write function is a
function that wraps the feature store "write_to_online_store" capability.
Returns a stream processor object based on the config.

The returned object will be capable of launching an ingestion job that reads data from the
given stream feature view's stream source, transforms it if the stream feature view has a
transformation, and then writes it to the online store. It will also preprocess the data
if a preprocessor method is defined.
"""
if config.mode == "spark" and config.source == "kafka":
stream_processor = STREAM_PROCESSOR_CLASS_FOR_TYPE[("spark", "kafka")]
module_name, class_name = stream_processor.rsplit(".", 1)
cls = import_class(module_name, class_name, "StreamProcessor")
return cls(sfv=sfv, config=config, write_function=write_function,)
return cls(fs=fs, sfv=sfv, config=config, preprocess_fn=preprocess_fn)
else:
raise ValueError("other processors besides spark-kafka not supported")
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ def offline_write_batch(
)
if column_names != table.column_names:
raise ValueError(
f"The input pyarrow table has schema {pa_schema} with the incorrect columns {column_names}. "
f"The columns are expected to be (in this order): {column_names}."
f"The input pyarrow table has schema {table.schema} with the incorrect columns {table.column_names}. "
f"The schema is expected to be {pa_schema} with the columns (in this exact order) to be {column_names}."
)

if table.schema != pa_schema:
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ def offline_write_batch(
)
if column_names != table.column_names:
raise ValueError(
f"The input pyarrow table has schema {pa_schema} with the incorrect columns {column_names}. "
f"The columns are expected to be (in this order): {column_names}."
f"The input pyarrow table has schema {table.schema} with the incorrect columns {table.column_names}. "
f"The schema is expected to be {pa_schema} with the columns (in this exact order) to be {column_names}."
)

file_options = feature_view.batch_source.file_options
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ def offline_write_batch(
)
if column_names != table.column_names:
raise ValueError(
f"The input pyarrow table has schema {pa_schema} with the incorrect columns {column_names}. "
f"The columns are expected to be (in this order): {column_names}."
f"The input pyarrow table has schema {table.schema} with the incorrect columns {table.column_names}. "
f"The schema is expected to be {pa_schema} with the columns (in this exact order) to be {column_names}."
)

if table.schema != pa_schema:
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ def offline_write_batch(
)
if column_names != table.column_names:
raise ValueError(
f"The input pyarrow table has schema {pa_schema} with the incorrect columns {column_names}. "
f"The columns are expected to be (in this order): {column_names}."
f"The input pyarrow table has schema {table.schema} with the incorrect columns {table.column_names}. "
f"The schema is expected to be {pa_schema} with the columns (in this exact order) to be {column_names}."
)

if table.schema != pa_schema:
Expand Down
11 changes: 7 additions & 4 deletions sdk/python/tests/integration/e2e/test_python_feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,16 @@ def test_get_online_features(python_fs_client):
@pytest.mark.integration
@pytest.mark.universal_online_stores
def test_push(python_fs_client):
initial_temp = get_temperatures(python_fs_client, location_ids=[1])[0]
# TODO(felixwang9817): Note that we choose an entity value of 102 here since it is not included
# in the existing range of entity values (1-49). This allows us to push data for this test
# without affecting other tests. This decision is tech debt, and should be resolved by finding a
# better way to isolate data sources across tests.
json_data = json.dumps(
{
"push_source_name": "location_stats_push_source",
"df": {
"location_id": [1],
"temperature": [initial_temp * 100],
"location_id": [102],
"temperature": [4],
"event_timestamp": [str(datetime.utcnow())],
"created": [str(datetime.utcnow())],
},
Expand All @@ -79,7 +82,7 @@ def test_push(python_fs_client):

# Check new pushed temperature is fetched
assert response.status_code == 200
assert get_temperatures(python_fs_client, location_ids=[1]) == [initial_temp * 100]
assert get_temperatures(python_fs_client, location_ids=[102]) == [4]


def get_temperatures(client, location_ids: List[int]):
Expand Down
Loading