@@ -134,6 +134,18 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
134134 return kinesis_options_proto
135135
136136
137+ _DATA_SOURCE_OPTIONS = {
138+ DataSourceProto .SourceType .BATCH_FILE : "feast.infra.offline_stores.file_source.FileSource" ,
139+ DataSourceProto .SourceType .BATCH_BIGQUERY : "feast.infra.offline_stores.bigquery_source.BigQuerySource" ,
140+ DataSourceProto .SourceType .BATCH_REDSHIFT : "feast.infra.offline_stores.redshift_source.RedshiftSource" ,
141+ DataSourceProto .SourceType .BATCH_SNOWFLAKE : "feast.infra.offline_stores.snowflake_source.SnowflakeSource" ,
142+ DataSourceProto .SourceType .STREAM_KAFKA : "feast.data_source.KafkaSource" ,
143+ DataSourceProto .SourceType .STREAM_KINESIS : "feast.data_source.KinesisSource" ,
144+ DataSourceProto .SourceType .REQUEST_SOURCE : "feast.data_source.RequestDataSource" ,
145+ DataSourceProto .SourceType .PUSH_SOURCE : "feast.data_source.PushSource" ,
146+ }
147+
148+
137149class DataSource (ABC ):
138150 """
139151 DataSource that can be used to source features.
@@ -210,48 +222,20 @@ def from_proto(data_source: DataSourceProto) -> Any:
210222 Raises:
211223 ValueError: The type of DataSource could not be identified.
212224 """
213- if data_source .data_source_class_type :
214- cls = get_data_source_class_from_type (data_source .data_source_class_type )
215- return cls .from_proto (data_source )
216-
217- if data_source .request_data_options and data_source .request_data_options .schema :
218- data_source_obj = RequestDataSource .from_proto (data_source )
219- elif data_source .file_options .file_format and data_source .file_options .file_url :
220- from feast .infra .offline_stores .file_source import FileSource
221-
222- data_source_obj = FileSource .from_proto (data_source )
223- elif (
224- data_source .bigquery_options .table_ref or data_source .bigquery_options .query
225- ):
226- from feast .infra .offline_stores .bigquery_source import BigQuerySource
227-
228- data_source_obj = BigQuerySource .from_proto (data_source )
229- elif data_source .redshift_options .table or data_source .redshift_options .query :
230- from feast .infra .offline_stores .redshift_source import RedshiftSource
231-
232- data_source_obj = RedshiftSource .from_proto (data_source )
233-
234- elif data_source .snowflake_options .table or data_source .snowflake_options .query :
235- from feast .infra .offline_stores .snowflake_source import SnowflakeSource
236-
237- data_source_obj = SnowflakeSource .from_proto (data_source )
238-
239- elif (
240- data_source .kafka_options .bootstrap_servers
241- and data_source .kafka_options .topic
242- and data_source .kafka_options .message_format
225+ data_source_type = data_source .type
226+ if not data_source_type or (
227+ data_source_type
228+ not in list (_DATA_SOURCE_OPTIONS .keys ())
229+ + [DataSourceProto .SourceType .CUSTOM_SOURCE ]
243230 ):
244- data_source_obj = KafkaSource .from_proto (data_source )
245- elif (
246- data_source .kinesis_options .record_format
247- and data_source .kinesis_options .region
248- and data_source .kinesis_options .stream_name
249- ):
250- data_source_obj = KinesisSource .from_proto (data_source )
251- else :
252231 raise ValueError ("Could not identify the source type being added." )
253232
254- return data_source_obj
233+ if data_source_type == DataSourceProto .SourceType .CUSTOM_SOURCE :
234+ cls = get_data_source_class_from_type (data_source .data_source_class_type )
235+ return cls .from_proto (data_source )
236+
237+ cls = get_data_source_class_from_type (_DATA_SOURCE_OPTIONS [data_source_type ])
238+ return cls .from_proto (data_source )
255239
256240 @abstractmethod
257241 def to_proto (self ) -> DataSourceProto :
0 commit comments