Skip to content

Commit 6bda4d2

Browse files
authored
fix: Simplify DataSource.from_proto logic (#2424)
* fix: Simplify DataSource.from_proto logic Signed-off-by: Achal Shah <[email protected]> * simpler Signed-off-by: Achal Shah <[email protected]> * use enum instead Signed-off-by: Achal Shah <[email protected]> * rebase and add push Signed-off-by: Achal Shah <[email protected]> * imports Signed-off-by: Achal Shah <[email protected]> * imports Signed-off-by: Achal Shah <[email protected]> * imports Signed-off-by: Achal Shah <[email protected]> * CR Signed-off-by: Achal Shah <[email protected]> * Revert "CR" This reverts commit 989d6c55752b54d2dce8a65271d6963e13b99d1b. Signed-off-by: Achal Shah <[email protected]> * CR Signed-off-by: Achal Shah <[email protected]> * cr Signed-off-by: Achal Shah <[email protected]>
1 parent a734c45 commit 6bda4d2

3 files changed

Lines changed: 28 additions & 39 deletions

File tree

protos/feast/core/DataSource.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ message DataSource {
7171

7272
// This is an internal field that is represents the python class for the data source object a proto object represents.
7373
// This should be set by feast, and not by users.
74+
// The field is used primarily by custom data sources and is mandatory for them to set. Feast may set it for
75+
// first party sources as well.
7476
string data_source_class_type = 17;
7577

7678
// Defines options for DataSource that sources features from a file

sdk/python/feast/data_source.py

Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,18 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
134134
return kinesis_options_proto
135135

136136

137+
_DATA_SOURCE_OPTIONS = {
138+
DataSourceProto.SourceType.BATCH_FILE: "feast.infra.offline_stores.file_source.FileSource",
139+
DataSourceProto.SourceType.BATCH_BIGQUERY: "feast.infra.offline_stores.bigquery_source.BigQuerySource",
140+
DataSourceProto.SourceType.BATCH_REDSHIFT: "feast.infra.offline_stores.redshift_source.RedshiftSource",
141+
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake_source.SnowflakeSource",
142+
DataSourceProto.SourceType.STREAM_KAFKA: "feast.data_source.KafkaSource",
143+
DataSourceProto.SourceType.STREAM_KINESIS: "feast.data_source.KinesisSource",
144+
DataSourceProto.SourceType.REQUEST_SOURCE: "feast.data_source.RequestDataSource",
145+
DataSourceProto.SourceType.PUSH_SOURCE: "feast.data_source.PushSource",
146+
}
147+
148+
137149
class DataSource(ABC):
138150
"""
139151
DataSource that can be used to source features.
@@ -210,48 +222,20 @@ def from_proto(data_source: DataSourceProto) -> Any:
210222
Raises:
211223
ValueError: The type of DataSource could not be identified.
212224
"""
213-
if data_source.data_source_class_type:
214-
cls = get_data_source_class_from_type(data_source.data_source_class_type)
215-
return cls.from_proto(data_source)
216-
217-
if data_source.request_data_options and data_source.request_data_options.schema:
218-
data_source_obj = RequestDataSource.from_proto(data_source)
219-
elif data_source.file_options.file_format and data_source.file_options.file_url:
220-
from feast.infra.offline_stores.file_source import FileSource
221-
222-
data_source_obj = FileSource.from_proto(data_source)
223-
elif (
224-
data_source.bigquery_options.table_ref or data_source.bigquery_options.query
225-
):
226-
from feast.infra.offline_stores.bigquery_source import BigQuerySource
227-
228-
data_source_obj = BigQuerySource.from_proto(data_source)
229-
elif data_source.redshift_options.table or data_source.redshift_options.query:
230-
from feast.infra.offline_stores.redshift_source import RedshiftSource
231-
232-
data_source_obj = RedshiftSource.from_proto(data_source)
233-
234-
elif data_source.snowflake_options.table or data_source.snowflake_options.query:
235-
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
236-
237-
data_source_obj = SnowflakeSource.from_proto(data_source)
238-
239-
elif (
240-
data_source.kafka_options.bootstrap_servers
241-
and data_source.kafka_options.topic
242-
and data_source.kafka_options.message_format
225+
data_source_type = data_source.type
226+
if not data_source_type or (
227+
data_source_type
228+
not in list(_DATA_SOURCE_OPTIONS.keys())
229+
+ [DataSourceProto.SourceType.CUSTOM_SOURCE]
243230
):
244-
data_source_obj = KafkaSource.from_proto(data_source)
245-
elif (
246-
data_source.kinesis_options.record_format
247-
and data_source.kinesis_options.region
248-
and data_source.kinesis_options.stream_name
249-
):
250-
data_source_obj = KinesisSource.from_proto(data_source)
251-
else:
252231
raise ValueError("Could not identify the source type being added.")
253232

254-
return data_source_obj
233+
if data_source_type == DataSourceProto.SourceType.CUSTOM_SOURCE:
234+
cls = get_data_source_class_from_type(data_source.data_source_class_type)
235+
return cls.from_proto(data_source)
236+
237+
cls = get_data_source_class_from_type(_DATA_SOURCE_OPTIONS[data_source_type])
238+
return cls.from_proto(data_source)
255239

256240
@abstractmethod
257241
def to_proto(self) -> DataSourceProto:

sdk/python/feast/registry.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,9 @@ def apply_data_source(
320320
del registry.data_sources[idx]
321321
data_source_proto = data_source.to_proto()
322322
data_source_proto.project = project
323+
data_source_proto.data_source_class_type = (
324+
f"{data_source.__class__.__module__}.{data_source.__class__.__name__}"
325+
)
323326
registry.data_sources.append(data_source_proto)
324327
if commit:
325328
self.commit()

0 commit comments

Comments
 (0)