Skip to content

Commit f5c5360

Browse files
authored
feat: Refactor materialization engine (#5354)
* move old materialization engines into compute engines Signed-off-by: HaoXuAI <[email protected]> * move old materialization engines into compute engines Signed-off-by: HaoXuAI <[email protected]> * fix the test Signed-off-by: HaoXuAI <[email protected]> * fix localcompute Signed-off-by: HaoXuAI <[email protected]> * fix localcompute Signed-off-by: HaoXuAI <[email protected]> * fix localcompute Signed-off-by: HaoXuAI <[email protected]> * fix localcompute Signed-off-by: HaoXuAI <[email protected]> * format Signed-off-by: HaoXuAI <[email protected]> * fix test Signed-off-by: HaoXuAI <[email protected]> * fix test Signed-off-by: HaoXuAI <[email protected]> * fix test Signed-off-by: HaoXuAI <[email protected]> * fix lint Signed-off-by: HaoXuAI <[email protected]> * fix lint Signed-off-by: HaoXuAI <[email protected]> --------- Signed-off-by: HaoXuAI <[email protected]>
1 parent 7a5b48f commit f5c5360

39 files changed

+819
-886
lines changed
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
# Batch Materialization Engine
22

3+
Note: The materialization engine is not constructed via unified compute engine interface.
4+
35
A batch materialization engine is a component of Feast that's responsible for moving data from the offline store into the online store.
46

5-
A materialization engine abstracts over specific technologies or frameworks that are used to materialize data. It allows users to use a pure local serialized approach (which is the default LocalMaterializationEngine), or delegates the materialization to seperate components (e.g. AWS Lambda, as implemented by the the LambdaMaterializaionEngine).
7+
A materialization engine abstracts over specific technologies or frameworks that are used to materialize data. It allows users to use a pure local serialized approach (which is the default LocalComputeEngine), or delegates the materialization to seperate components (e.g. AWS Lambda, as implemented by the the LambdaComputeEngine).
68

7-
If the built-in engines are not sufficient, you can create your own custom materialization engine. Please see [this guide](../../how-to-guides/customizing-feast/creating-a-custom-materialization-engine.md) for more details.
9+
If the built-in engines are not sufficient, you can create your own custom materialization engine. Please see [this guide](../../how-to-guides/customizing-feast/creating-a-custom-compute-engine.md) for more details.
810

911
Please see [feature\_store.yaml](../../reference/feature-repository/feature-store-yaml.md#overview) for configuring engines.

docs/how-to-guides/customizing-feast/creating-a-custom-materialization-engine.md renamed to docs/how-to-guides/customizing-feast/creating-a-custom-compute-engine.md

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
1-
# Adding a custom batch materialization engine
1+
# Adding a custom compute engine
22

33
### Overview
44

5-
Feast batch materialization operations (`materialize` and `materialize-incremental`) execute through a `BatchMaterializationEngine`.
5+
Feast batch materialization operations (`materialize` and `materialize-incremental`), and get_historical_features are executed through a `ComputeEngine`.
66

7-
Custom batch materialization engines allow Feast users to extend Feast to customize the materialization process. Examples include:
7+
Custom batch compute engines allow Feast users to extend Feast to customize the materialization and get_historical_features process. Examples include:
88

99
* Setting up custom materialization-specific infrastructure during `feast apply` (e.g. setting up Spark clusters or Lambda Functions)
1010
* Launching custom batch ingestion (materialization) jobs (Spark, Beam, AWS Lambda)
1111
* Tearing down custom materialization-specific infrastructure during `feast teardown` (e.g. tearing down Spark clusters, or deleting Lambda Functions)
1212

13-
Feast comes with built-in materialization engines, e.g, `LocalMaterializationEngine`, and an experimental `LambdaMaterializationEngine`. However, users can develop their own materialization engines by creating a class that implements the contract in the [BatchMaterializationEngine class](https://github.com/feast-dev/feast/blob/6d7b38a39024b7301c499c20cf4e7aef6137c47c/sdk/python/feast/infra/materialization/batch\_materialization\_engine.py#L72).
13+
Feast comes with built-in materialization engines, e.g, `LocalComputeEngine`, and an experimental `LambdaComputeEngine`. However, users can develop their own compute engines by creating a class that implements the contract in the [ComputeEngine class](https://github.com/feast-dev/feast/blob/85514edbb181df083e6a0d24672c00f0624dcaa3/sdk/python/feast/infra/compute_engines/base.py#L19).
1414

1515
### Guide
1616

17-
The fastest way to add custom logic to Feast is to extend an existing materialization engine. The most generic engine is the `LocalMaterializationEngine` which contains no cloud-specific logic. The guide that follows will extend the `LocalProvider` with operations that print text to the console. It is up to you as a developer to add your custom code to the engine methods, but the guide below will provide the necessary scaffolding to get you started.
17+
The fastest way to add custom logic to Feast is to implement the ComputeEngine. The guide that follows will extend the `LocalProvider` with operations that print text to the console. It is up to you as a developer to add your custom code to the engine methods, but the guide below will provide the necessary scaffolding to get you started.
1818

1919
#### Step 1: Define an Engine class
2020

21-
The first step is to define a custom materialization engine class. We've created the `MyCustomEngine` below. This python file can be placed in your `feature_repo` directory if you're following the Quickstart guide.
21+
The first step is to define a custom compute engine class. We've created the `MyCustomEngine` below. This python file can be placed in your `feature_repo` directory if you're following the Quickstart guide.
2222

2323
```python
2424
from typing import List, Sequence, Union
@@ -27,14 +27,16 @@ from feast.entity import Entity
2727
from feast.feature_view import FeatureView
2828
from feast.batch_feature_view import BatchFeatureView
2929
from feast.stream_feature_view import StreamFeatureView
30-
from feast.infra.materialization.local_engine import LocalMaterializationJob, LocalMaterializationEngine
30+
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
31+
from feast.infra.compute_engines.local.job import LocalMaterializationJob
32+
from feast.infra.compute_engines.base import ComputeEngine
3133
from feast.infra.common.materialization_job import MaterializationTask
32-
from feast.infra.offline_stores.offline_store import OfflineStore
34+
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
3335
from feast.infra.online_stores.online_store import OnlineStore
3436
from feast.repo_config import RepoConfig
3537

3638

37-
class MyCustomEngine(LocalMaterializationEngine):
39+
class MyCustomEngine(ComputeEngine):
3840
def __init__(
3941
self,
4042
*,
@@ -80,9 +82,13 @@ class MyCustomEngine(LocalMaterializationEngine):
8082
)
8183
for task in tasks
8284
]
85+
86+
def get_historical_features(self, task: HistoricalRetrievalTask) -> RetrievalJob:
87+
raise NotImplementedError
8388
```
8489

85-
Notice how in the above engine we have only overwritten two of the methods on the `LocalMaterializatinEngine`, namely `update` and `materialize`. These two methods are convenient to replace if you are planning to launch custom batch jobs.
90+
Notice how in the above engine we have only overwritten two of the methods on the `LocalComputeEngine`, namely `update` and `materialize`. These two methods are convenient to replace if you are planning to launch custom batch jobs.
91+
If you want to use the compute to execute the get_historical_features method, you will need to implement the `get_historical_features` method as well.
8692

8793
#### Step 2: Configuring Feast to use the engine
8894

sdk/python/feast/batch_feature_view.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def __init__(
7979
ttl: Optional[timedelta] = None,
8080
tags: Optional[Dict[str, str]] = None,
8181
online: bool = False,
82-
offline: bool = True,
82+
offline: bool = False,
8383
description: str = "",
8484
owner: str = "",
8585
schema: Optional[List[Field]] = None,

sdk/python/feast/infra/common/materialization_job.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ class MaterializationTask:
2020
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
2121
start_time: datetime
2222
end_time: datetime
23-
tqdm_builder: Callable[[int], tqdm]
23+
only_latest: bool = True
24+
tqdm_builder: Union[None, Callable[[int], tqdm]] = None
2425

2526

2627
class MaterializationJobStatus(enum.Enum):

sdk/python/feast/infra/materialization/aws_lambda/Dockerfile renamed to sdk/python/feast/infra/compute_engines/aws_lambda/Dockerfile

File renamed without changes.

sdk/python/feast/infra/materialization/aws_lambda/app.py renamed to sdk/python/feast/infra/compute_engines/aws_lambda/app.py

File renamed without changes.

sdk/python/feast/infra/materialization/aws_lambda/lambda_engine.py renamed to sdk/python/feast/infra/compute_engines/aws_lambda/lambda_engine.py

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
import logging
44
from concurrent.futures import ThreadPoolExecutor, wait
55
from dataclasses import dataclass
6-
from datetime import datetime
7-
from typing import Callable, List, Literal, Optional, Sequence, Union
6+
from typing import Literal, Optional, Sequence, Union
87

98
import boto3
9+
import pyarrow as pa
1010
from botocore.config import Config
1111
from pydantic import StrictStr
12-
from tqdm import tqdm
1312

1413
from feast import utils
1514
from feast.batch_feature_view import BatchFeatureView
@@ -21,9 +20,8 @@
2120
MaterializationJobStatus,
2221
MaterializationTask,
2322
)
24-
from feast.infra.materialization.batch_materialization_engine import (
25-
BatchMaterializationEngine,
26-
)
23+
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
24+
from feast.infra.compute_engines.base import ComputeEngine
2725
from feast.infra.offline_stores.offline_store import OfflineStore
2826
from feast.infra.online_stores.online_store import OnlineStore
2927
from feast.infra.registry.base_registry import BaseRegistry
@@ -40,8 +38,8 @@
4038
logger = logging.getLogger(__name__)
4139

4240

43-
class LambdaMaterializationEngineConfig(FeastConfigBaseModel):
44-
"""Batch Materialization Engine config for lambda based engine"""
41+
class LambdaComputeEngineConfig(FeastConfigBaseModel):
42+
"""Batch Compute Engine config for lambda based engine"""
4543

4644
type: Literal["lambda"] = "lambda"
4745
""" Type selector"""
@@ -82,11 +80,18 @@ def url(self) -> Optional[str]:
8280
return None
8381

8482

85-
class LambdaMaterializationEngine(BatchMaterializationEngine):
83+
class LambdaComputeEngine(ComputeEngine):
8684
"""
8785
WARNING: This engine should be considered "Alpha" functionality.
8886
"""
8987

88+
def get_historical_features(
89+
self, registry: BaseRegistry, task: HistoricalRetrievalTask
90+
) -> pa.Table:
91+
raise NotImplementedError(
92+
"Lambda Compute Engine does not support get_historical_features"
93+
)
94+
9095
def update(
9196
self,
9297
project: str,
@@ -160,30 +165,14 @@ def __init__(
160165
config = Config(read_timeout=DEFAULT_TIMEOUT + 10)
161166
self.lambda_client = boto3.client("lambda", config=config)
162167

163-
def materialize(
164-
self, registry, tasks: List[MaterializationTask]
165-
) -> List[MaterializationJob]:
166-
return [
167-
self._materialize_one(
168-
registry,
169-
task.feature_view,
170-
task.start_time,
171-
task.end_time,
172-
task.project,
173-
task.tqdm_builder,
174-
)
175-
for task in tasks
176-
]
177-
178168
def _materialize_one(
179-
self,
180-
registry: BaseRegistry,
181-
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
182-
start_date: datetime,
183-
end_date: datetime,
184-
project: str,
185-
tqdm_builder: Callable[[int], tqdm],
169+
self, registry: BaseRegistry, task: MaterializationTask, **kwargs
186170
):
171+
feature_view = task.feature_view
172+
start_date = task.start_time
173+
end_date = task.end_time
174+
project = task.project
175+
187176
entities = []
188177
for entity_name in feature_view.entities:
189178
entities.append(registry.get_entity(entity_name, project))
Lines changed: 106 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,130 @@
1-
from abc import ABC
2-
from typing import Union
1+
from abc import ABC, abstractmethod
2+
from typing import List, Optional, Sequence, Union
33

44
import pyarrow as pa
55

66
from feast import RepoConfig
7+
from feast.batch_feature_view import BatchFeatureView
8+
from feast.entity import Entity
9+
from feast.feature_view import FeatureView
710
from feast.infra.common.materialization_job import (
811
MaterializationJob,
912
MaterializationTask,
1013
)
1114
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
1215
from feast.infra.compute_engines.dag.context import ColumnInfo, ExecutionContext
13-
from feast.infra.offline_stores.offline_store import OfflineStore
16+
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
1417
from feast.infra.online_stores.online_store import OnlineStore
15-
from feast.infra.registry.registry import Registry
18+
from feast.infra.registry.base_registry import BaseRegistry
19+
from feast.on_demand_feature_view import OnDemandFeatureView
20+
from feast.stream_feature_view import StreamFeatureView
1621
from feast.utils import _get_column_names
1722

1823

1924
class ComputeEngine(ABC):
2025
"""
21-
The interface that Feast uses to control the compute system that handles materialization and get_historical_features.
26+
The interface that Feast uses to control to compute system that handles materialization and get_historical_features.
2227
Each engine must implement:
2328
- materialize(): to generate and persist features
24-
- get_historical_features(): to perform point-in-time correct joins
29+
- get_historical_features(): to perform historical retrieval of features
2530
Engines should use FeatureBuilder and DAGNode abstractions to build modular, pluggable workflows.
2631
"""
2732

2833
def __init__(
2934
self,
3035
*,
31-
registry: Registry,
3236
repo_config: RepoConfig,
3337
offline_store: OfflineStore,
3438
online_store: OnlineStore,
3539
**kwargs,
3640
):
37-
self.registry = registry
3841
self.repo_config = repo_config
3942
self.offline_store = offline_store
4043
self.online_store = online_store
4144

42-
def materialize(self, task: MaterializationTask) -> MaterializationJob:
43-
raise NotImplementedError
45+
@abstractmethod
46+
def update(
47+
self,
48+
project: str,
49+
views_to_delete: Sequence[
50+
Union[BatchFeatureView, StreamFeatureView, FeatureView]
51+
],
52+
views_to_keep: Sequence[
53+
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
54+
],
55+
entities_to_delete: Sequence[Entity],
56+
entities_to_keep: Sequence[Entity],
57+
):
58+
"""
59+
Prepares cloud resources required for batch materialization for the specified set of Feast objects.
60+
61+
Args:
62+
project: Feast project to which the objects belong.
63+
views_to_delete: Feature views whose corresponding infrastructure should be deleted.
64+
views_to_keep: Feature views whose corresponding infrastructure should not be deleted, and
65+
may need to be updated.
66+
entities_to_delete: Entities whose corresponding infrastructure should be deleted.
67+
entities_to_keep: Entities whose corresponding infrastructure should not be deleted, and
68+
may need to be updated.
69+
"""
70+
pass
71+
72+
@abstractmethod
73+
def teardown_infra(
74+
self,
75+
project: str,
76+
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
77+
entities: Sequence[Entity],
78+
):
79+
"""
80+
Tears down all cloud resources used by the materialization engine for the specified set of Feast objects.
81+
82+
Args:
83+
project: Feast project to which the objects belong.
84+
fvs: Feature views whose corresponding infrastructure should be deleted.
85+
entities: Entities whose corresponding infrastructure should be deleted.
86+
"""
87+
pass
4488

45-
def get_historical_features(self, task: HistoricalRetrievalTask) -> pa.Table:
89+
def materialize(
90+
self,
91+
registry: BaseRegistry,
92+
tasks: Union[MaterializationTask, List[MaterializationTask]],
93+
**kwargs,
94+
) -> List[MaterializationJob]:
95+
if isinstance(tasks, MaterializationTask):
96+
tasks = [tasks]
97+
return [self._materialize_one(registry, task, **kwargs) for task in tasks]
98+
99+
def _materialize_one(
100+
self,
101+
registry: BaseRegistry,
102+
task: MaterializationTask,
103+
**kwargs,
104+
) -> MaterializationJob:
105+
raise NotImplementedError(
106+
"Materialization is not implemented for this compute engine."
107+
)
108+
109+
def get_historical_features(
110+
self, registry: BaseRegistry, task: HistoricalRetrievalTask
111+
) -> Union[RetrievalJob, pa.Table]:
46112
raise NotImplementedError
47113

48114
def get_execution_context(
49115
self,
116+
registry: BaseRegistry,
50117
task: Union[MaterializationTask, HistoricalRetrievalTask],
51118
) -> ExecutionContext:
52119
entity_defs = [
53-
self.registry.get_entity(name, task.project)
120+
registry.get_entity(name, task.project)
54121
for name in task.feature_view.entities
55122
]
56123
entity_df = None
57124
if hasattr(task, "entity_df") and task.entity_df is not None:
58125
entity_df = task.entity_df
59126

60-
column_info = self.get_column_info(task)
127+
column_info = self.get_column_info(registry, task)
61128
return ExecutionContext(
62129
project=task.project,
63130
repo_config=self.repo_config,
@@ -70,14 +137,39 @@ def get_execution_context(
70137

71138
def get_column_info(
72139
self,
140+
registry: BaseRegistry,
73141
task: Union[MaterializationTask, HistoricalRetrievalTask],
74142
) -> ColumnInfo:
143+
entities = []
144+
for entity_name in task.feature_view.entities:
145+
entities.append(registry.get_entity(entity_name, task.project))
146+
75147
join_keys, feature_cols, ts_col, created_ts_col = _get_column_names(
76-
task.feature_view, self.registry.list_entities(task.project)
148+
task.feature_view, entities
77149
)
150+
field_mapping = self.get_field_mapping(task.feature_view)
151+
78152
return ColumnInfo(
79153
join_keys=join_keys,
80154
feature_cols=feature_cols,
81155
ts_col=ts_col,
82156
created_ts_col=created_ts_col,
157+
field_mapping=field_mapping,
83158
)
159+
160+
def get_field_mapping(
161+
self, feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
162+
) -> Optional[dict]:
163+
"""
164+
Get the field mapping for a feature view.
165+
Args:
166+
feature_view: The feature view to get the field mapping for.
167+
168+
Returns:
169+
A dictionary mapping field names to column names.
170+
"""
171+
if feature_view.stream_source:
172+
return feature_view.stream_source.field_mapping
173+
if feature_view.batch_source:
174+
return feature_view.batch_source.field_mapping
175+
return None

0 commit comments

Comments
 (0)