1717from typing import Any , Callable , Dict , List , Optional , Sequence , Tuple
1818
1919from pydantic import StrictStr
20- from pydantic .typing import Literal
20+ from pydantic .typing import Literal , Union
2121
2222from feast import Entity , FeatureView , utils
2323from feast .infra .infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE , InfraObject
@@ -50,17 +50,20 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
5050 type : Literal ["dynamodb" ] = "dynamodb"
5151 """Online store type selector"""
5252
53+ batch_size : int = 40
54+ """Number of items to retrieve in a DynamoDB BatchGetItem call."""
55+
56+ endpoint_url : Union [str , None ] = None
57+ """DynamoDB local development endpoint Url, i.e. http://localhost:8000"""
58+
5359 region : StrictStr
5460 """AWS Region Name"""
5561
56- table_name_template : StrictStr = "{project}.{table_name}"
57- """DynamoDB table name template"""
58-
5962 sort_response : bool = True
6063 """Whether or not to sort BatchGetItem response."""
6164
62- batch_size : int = 40
63- """Number of items to retrieve in a DynamoDB BatchGetItem call. """
65+ table_name_template : StrictStr = "{project}.{table_name}"
66+ """DynamoDB table name template """
6467
6568
6669class DynamoDBOnlineStore (OnlineStore ):
@@ -95,8 +98,12 @@ def update(
9598 """
9699 online_config = config .online_store
97100 assert isinstance (online_config , DynamoDBOnlineStoreConfig )
98- dynamodb_client = self ._get_dynamodb_client (online_config .region )
99- dynamodb_resource = self ._get_dynamodb_resource (online_config .region )
101+ dynamodb_client = self ._get_dynamodb_client (
102+ online_config .region , online_config .endpoint_url
103+ )
104+ dynamodb_resource = self ._get_dynamodb_resource (
105+ online_config .region , online_config .endpoint_url
106+ )
100107
101108 for table_instance in tables_to_keep :
102109 try :
@@ -141,7 +148,9 @@ def teardown(
141148 """
142149 online_config = config .online_store
143150 assert isinstance (online_config , DynamoDBOnlineStoreConfig )
144- dynamodb_resource = self ._get_dynamodb_resource (online_config .region )
151+ dynamodb_resource = self ._get_dynamodb_resource (
152+ online_config .region , online_config .endpoint_url
153+ )
145154
146155 for table in tables :
147156 _delete_table_idempotent (
@@ -175,7 +184,9 @@ def online_write_batch(
175184 """
176185 online_config = config .online_store
177186 assert isinstance (online_config , DynamoDBOnlineStoreConfig )
178- dynamodb_resource = self ._get_dynamodb_resource (online_config .region )
187+ dynamodb_resource = self ._get_dynamodb_resource (
188+ online_config .region , online_config .endpoint_url
189+ )
179190
180191 table_instance = dynamodb_resource .Table (
181192 _get_table_name (online_config , config , table )
@@ -217,7 +228,9 @@ def online_read(
217228 """
218229 online_config = config .online_store
219230 assert isinstance (online_config , DynamoDBOnlineStoreConfig )
220- dynamodb_resource = self ._get_dynamodb_resource (online_config .region )
231+ dynamodb_resource = self ._get_dynamodb_resource (
232+ online_config .region , online_config .endpoint_url
233+ )
221234 table_instance = dynamodb_resource .Table (
222235 _get_table_name (online_config , config , table )
223236 )
@@ -260,14 +273,16 @@ def online_read(
260273 result .extend (batch_size_nones )
261274 return result
262275
263- def _get_dynamodb_client (self , region : str ):
276+ def _get_dynamodb_client (self , region : str , endpoint_url : Optional [ str ] = None ):
264277 if self ._dynamodb_client is None :
265- self ._dynamodb_client = _initialize_dynamodb_client (region )
278+ self ._dynamodb_client = _initialize_dynamodb_client (region , endpoint_url )
266279 return self ._dynamodb_client
267280
268- def _get_dynamodb_resource (self , region : str ):
281+ def _get_dynamodb_resource (self , region : str , endpoint_url : Optional [ str ] = None ):
269282 if self ._dynamodb_resource is None :
270- self ._dynamodb_resource = _initialize_dynamodb_resource (region )
283+ self ._dynamodb_resource = _initialize_dynamodb_resource (
284+ region , endpoint_url
285+ )
271286 return self ._dynamodb_resource
272287
273288 def _sort_dynamodb_response (self , responses : list , order : list ):
@@ -285,12 +300,12 @@ def _sort_dynamodb_response(self, responses: list, order: list):
285300 return table_responses_ordered
286301
287302
288- def _initialize_dynamodb_client (region : str ):
289- return boto3 .client ("dynamodb" , region_name = region )
303+ def _initialize_dynamodb_client (region : str , endpoint_url : Optional [ str ] = None ):
304+ return boto3 .client ("dynamodb" , region_name = region , endpoint_url = endpoint_url )
290305
291306
292- def _initialize_dynamodb_resource (region : str ):
293- return boto3 .resource ("dynamodb" , region_name = region )
307+ def _initialize_dynamodb_resource (region : str , endpoint_url : Optional [ str ] = None ):
308+ return boto3 .resource ("dynamodb" , region_name = region , endpoint_url = endpoint_url )
294309
295310
296311# TODO(achals): This form of user-facing templating is experimental.
@@ -327,13 +342,20 @@ class DynamoDBTable(InfraObject):
327342 Attributes:
328343 name: The name of the table.
329344 region: The region of the table.
345+ endpoint_url: Local DynamoDB Endpoint Url.
346+ _dynamodb_client: Boto3 DynamoDB client.
347+ _dynamodb_resource: Boto3 DynamoDB resource.
330348 """
331349
332350 region : str
351+ endpoint_url = None
352+ _dynamodb_client = None
353+ _dynamodb_resource = None
333354
334- def __init__ (self , name : str , region : str ):
355+ def __init__ (self , name : str , region : str , endpoint_url : Optional [ str ] = None ):
335356 super ().__init__ (name )
336357 self .region = region
358+ self .endpoint_url = endpoint_url
337359
338360 def to_infra_object_proto (self ) -> InfraObjectProto :
339361 dynamodb_table_proto = self .to_proto ()
@@ -362,8 +384,8 @@ def from_proto(dynamodb_table_proto: DynamoDBTableProto) -> Any:
362384 )
363385
364386 def update (self ):
365- dynamodb_client = _initialize_dynamodb_client ( region = self .region )
366- dynamodb_resource = _initialize_dynamodb_resource ( region = self .region )
387+ dynamodb_client = self . _get_dynamodb_client ( self .region , self . endpoint_url )
388+ dynamodb_resource = self . _get_dynamodb_resource ( self .region , self . endpoint_url )
367389
368390 try :
369391 dynamodb_resource .create_table (
@@ -384,5 +406,17 @@ def update(self):
384406 dynamodb_client .get_waiter ("table_exists" ).wait (TableName = f"{ self .name } " )
385407
386408 def teardown (self ):
387- dynamodb_resource = _initialize_dynamodb_resource ( region = self .region )
409+ dynamodb_resource = self . _get_dynamodb_resource ( self .region , self . endpoint_url )
388410 _delete_table_idempotent (dynamodb_resource , self .name )
411+
412+ def _get_dynamodb_client (self , region : str , endpoint_url : Optional [str ] = None ):
413+ if self ._dynamodb_client is None :
414+ self ._dynamodb_client = _initialize_dynamodb_client (region , endpoint_url )
415+ return self ._dynamodb_client
416+
417+ def _get_dynamodb_resource (self , region : str , endpoint_url : Optional [str ] = None ):
418+ if self ._dynamodb_resource is None :
419+ self ._dynamodb_resource = _initialize_dynamodb_resource (
420+ region , endpoint_url
421+ )
422+ return self ._dynamodb_resource
0 commit comments