Skip to content

Commit 7b863d1

Browse files
authored
feat: Add Support for DynamodbOnlineStoreConfig endpoint_url parameter (#2485)
* feat: support dynamodb client and resource with endpoint_url Signed-off-by: Miguel Trejo <[email protected]> * fix: overwrite by partitionkey batchwrite operation Signed-off-by: Miguel Trejo <[email protected]> * docs: how to configure local dynamodb Signed-off-by: Miguel Trejo <[email protected]> * fix: DynamoDBonlineStore endpoint_url defaults to None Signed-off-by: Miguel Trejo <[email protected]> * docs: setup dummy aws credentials Signed-off-by: Miguel Trejo <[email protected]> * test: DynamoDBOnlineStoreConfig endpoint_url configuration Signed-off-by: Miguel Trejo <[email protected]> * feat: DynamoDBTable support endpoint_url Signed-off-by: Miguel Trejo <[email protected]>
1 parent df51b94 commit 7b863d1

File tree

2 files changed

+173
-23
lines changed

2 files changed

+173
-23
lines changed

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
1818

1919
from pydantic import StrictStr
20-
from pydantic.typing import Literal
20+
from pydantic.typing import Literal, Union
2121

2222
from feast import Entity, FeatureView, utils
2323
from feast.infra.infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE, InfraObject
@@ -50,17 +50,20 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
5050
type: Literal["dynamodb"] = "dynamodb"
5151
"""Online store type selector"""
5252

53+
batch_size: int = 40
54+
"""Number of items to retrieve in a DynamoDB BatchGetItem call."""
55+
56+
endpoint_url: Union[str, None] = None
57+
"""DynamoDB local development endpoint Url, i.e. http://localhost:8000"""
58+
5359
region: StrictStr
5460
"""AWS Region Name"""
5561

56-
table_name_template: StrictStr = "{project}.{table_name}"
57-
"""DynamoDB table name template"""
58-
5962
sort_response: bool = True
6063
"""Whether or not to sort BatchGetItem response."""
6164

62-
batch_size: int = 40
63-
"""Number of items to retrieve in a DynamoDB BatchGetItem call."""
65+
table_name_template: StrictStr = "{project}.{table_name}"
66+
"""DynamoDB table name template"""
6467

6568

6669
class DynamoDBOnlineStore(OnlineStore):
@@ -95,8 +98,12 @@ def update(
9598
"""
9699
online_config = config.online_store
97100
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
98-
dynamodb_client = self._get_dynamodb_client(online_config.region)
99-
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
101+
dynamodb_client = self._get_dynamodb_client(
102+
online_config.region, online_config.endpoint_url
103+
)
104+
dynamodb_resource = self._get_dynamodb_resource(
105+
online_config.region, online_config.endpoint_url
106+
)
100107

101108
for table_instance in tables_to_keep:
102109
try:
@@ -141,7 +148,9 @@ def teardown(
141148
"""
142149
online_config = config.online_store
143150
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
144-
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
151+
dynamodb_resource = self._get_dynamodb_resource(
152+
online_config.region, online_config.endpoint_url
153+
)
145154

146155
for table in tables:
147156
_delete_table_idempotent(
@@ -175,7 +184,9 @@ def online_write_batch(
175184
"""
176185
online_config = config.online_store
177186
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
178-
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
187+
dynamodb_resource = self._get_dynamodb_resource(
188+
online_config.region, online_config.endpoint_url
189+
)
179190

180191
table_instance = dynamodb_resource.Table(
181192
_get_table_name(online_config, config, table)
@@ -217,7 +228,9 @@ def online_read(
217228
"""
218229
online_config = config.online_store
219230
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
220-
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
231+
dynamodb_resource = self._get_dynamodb_resource(
232+
online_config.region, online_config.endpoint_url
233+
)
221234
table_instance = dynamodb_resource.Table(
222235
_get_table_name(online_config, config, table)
223236
)
@@ -260,14 +273,16 @@ def online_read(
260273
result.extend(batch_size_nones)
261274
return result
262275

263-
def _get_dynamodb_client(self, region: str):
276+
def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
264277
if self._dynamodb_client is None:
265-
self._dynamodb_client = _initialize_dynamodb_client(region)
278+
self._dynamodb_client = _initialize_dynamodb_client(region, endpoint_url)
266279
return self._dynamodb_client
267280

268-
def _get_dynamodb_resource(self, region: str):
281+
def _get_dynamodb_resource(self, region: str, endpoint_url: Optional[str] = None):
269282
if self._dynamodb_resource is None:
270-
self._dynamodb_resource = _initialize_dynamodb_resource(region)
283+
self._dynamodb_resource = _initialize_dynamodb_resource(
284+
region, endpoint_url
285+
)
271286
return self._dynamodb_resource
272287

273288
def _sort_dynamodb_response(self, responses: list, order: list):
@@ -285,12 +300,12 @@ def _sort_dynamodb_response(self, responses: list, order: list):
285300
return table_responses_ordered
286301

287302

288-
def _initialize_dynamodb_client(region: str):
289-
return boto3.client("dynamodb", region_name=region)
303+
def _initialize_dynamodb_client(region: str, endpoint_url: Optional[str] = None):
304+
return boto3.client("dynamodb", region_name=region, endpoint_url=endpoint_url)
290305

291306

292-
def _initialize_dynamodb_resource(region: str):
293-
return boto3.resource("dynamodb", region_name=region)
307+
def _initialize_dynamodb_resource(region: str, endpoint_url: Optional[str] = None):
308+
return boto3.resource("dynamodb", region_name=region, endpoint_url=endpoint_url)
294309

295310

296311
# TODO(achals): This form of user-facing templating is experimental.
@@ -327,13 +342,20 @@ class DynamoDBTable(InfraObject):
327342
Attributes:
328343
name: The name of the table.
329344
region: The region of the table.
345+
endpoint_url: Local DynamoDB Endpoint Url.
346+
_dynamodb_client: Boto3 DynamoDB client.
347+
_dynamodb_resource: Boto3 DynamoDB resource.
330348
"""
331349

332350
region: str
351+
endpoint_url = None
352+
_dynamodb_client = None
353+
_dynamodb_resource = None
333354

334-
def __init__(self, name: str, region: str):
355+
def __init__(self, name: str, region: str, endpoint_url: Optional[str] = None):
335356
super().__init__(name)
336357
self.region = region
358+
self.endpoint_url = endpoint_url
337359

338360
def to_infra_object_proto(self) -> InfraObjectProto:
339361
dynamodb_table_proto = self.to_proto()
@@ -362,8 +384,8 @@ def from_proto(dynamodb_table_proto: DynamoDBTableProto) -> Any:
362384
)
363385

364386
def update(self):
365-
dynamodb_client = _initialize_dynamodb_client(region=self.region)
366-
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)
387+
dynamodb_client = self._get_dynamodb_client(self.region, self.endpoint_url)
388+
dynamodb_resource = self._get_dynamodb_resource(self.region, self.endpoint_url)
367389

368390
try:
369391
dynamodb_resource.create_table(
@@ -384,5 +406,17 @@ def update(self):
384406
dynamodb_client.get_waiter("table_exists").wait(TableName=f"{self.name}")
385407

386408
def teardown(self):
387-
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)
409+
dynamodb_resource = self._get_dynamodb_resource(self.region, self.endpoint_url)
388410
_delete_table_idempotent(dynamodb_resource, self.name)
411+
412+
def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
413+
if self._dynamodb_client is None:
414+
self._dynamodb_client = _initialize_dynamodb_client(region, endpoint_url)
415+
return self._dynamodb_client
416+
417+
def _get_dynamodb_resource(self, region: str, endpoint_url: Optional[str] = None):
418+
if self._dynamodb_resource is None:
419+
self._dynamodb_resource = _initialize_dynamodb_resource(
420+
region, endpoint_url
421+
)
422+
return self._dynamodb_resource

sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from feast.infra.online_stores.dynamodb import (
88
DynamoDBOnlineStore,
99
DynamoDBOnlineStoreConfig,
10+
DynamoDBTable,
1011
)
1112
from feast.repo_config import RepoConfig
1213
from tests.utils.online_store_utils import (
@@ -38,6 +39,121 @@ def repo_config():
3839
)
3940

4041

42+
def test_online_store_config_default():
43+
"""Test DynamoDBOnlineStoreConfig default parameters."""
44+
aws_region = "us-west-2"
45+
dynamodb_store_config = DynamoDBOnlineStoreConfig(region=aws_region)
46+
assert dynamodb_store_config.type == "dynamodb"
47+
assert dynamodb_store_config.batch_size == 40
48+
assert dynamodb_store_config.endpoint_url is None
49+
assert dynamodb_store_config.region == aws_region
50+
assert dynamodb_store_config.sort_response is True
51+
assert dynamodb_store_config.table_name_template == "{project}.{table_name}"
52+
53+
54+
def test_dynamodb_table_default_params():
55+
"""Test DynamoDBTable default parameters."""
56+
tbl_name = "dynamodb-test"
57+
aws_region = "us-west-2"
58+
dynamodb_table = DynamoDBTable(tbl_name, aws_region)
59+
assert dynamodb_table.name == tbl_name
60+
assert dynamodb_table.region == aws_region
61+
assert dynamodb_table.endpoint_url is None
62+
assert dynamodb_table._dynamodb_client is None
63+
assert dynamodb_table._dynamodb_resource is None
64+
65+
66+
def test_online_store_config_custom_params():
67+
"""Test DynamoDBOnlineStoreConfig custom parameters."""
68+
aws_region = "us-west-2"
69+
batch_size = 20
70+
endpoint_url = "http://localhost:8000"
71+
sort_response = False
72+
table_name_template = "feast_test.dynamodb_table"
73+
dynamodb_store_config = DynamoDBOnlineStoreConfig(
74+
region=aws_region,
75+
batch_size=batch_size,
76+
endpoint_url=endpoint_url,
77+
sort_response=sort_response,
78+
table_name_template=table_name_template,
79+
)
80+
assert dynamodb_store_config.type == "dynamodb"
81+
assert dynamodb_store_config.batch_size == batch_size
82+
assert dynamodb_store_config.endpoint_url == endpoint_url
83+
assert dynamodb_store_config.region == aws_region
84+
assert dynamodb_store_config.sort_response == sort_response
85+
assert dynamodb_store_config.table_name_template == table_name_template
86+
87+
88+
def test_dynamodb_table_custom_params():
89+
"""Test DynamoDBTable custom parameters."""
90+
tbl_name = "dynamodb-test"
91+
aws_region = "us-west-2"
92+
endpoint_url = "http://localhost:8000"
93+
dynamodb_table = DynamoDBTable(tbl_name, aws_region, endpoint_url)
94+
assert dynamodb_table.name == tbl_name
95+
assert dynamodb_table.region == aws_region
96+
assert dynamodb_table.endpoint_url == endpoint_url
97+
assert dynamodb_table._dynamodb_client is None
98+
assert dynamodb_table._dynamodb_resource is None
99+
100+
101+
def test_online_store_config_dynamodb_client():
102+
"""Test DynamoDBOnlineStoreConfig configure DynamoDB client with endpoint_url."""
103+
aws_region = "us-west-2"
104+
endpoint_url = "http://localhost:8000"
105+
dynamodb_store = DynamoDBOnlineStore()
106+
dynamodb_store_config = DynamoDBOnlineStoreConfig(
107+
region=aws_region, endpoint_url=endpoint_url
108+
)
109+
dynamodb_client = dynamodb_store._get_dynamodb_client(
110+
dynamodb_store_config.region, dynamodb_store_config.endpoint_url
111+
)
112+
assert dynamodb_client.meta.region_name == aws_region
113+
assert dynamodb_client.meta.endpoint_url == endpoint_url
114+
115+
116+
def test_dynamodb_table_dynamodb_client():
117+
"""Test DynamoDBTable configure DynamoDB client with endpoint_url."""
118+
tbl_name = "dynamodb-test"
119+
aws_region = "us-west-2"
120+
endpoint_url = "http://localhost:8000"
121+
dynamodb_table = DynamoDBTable(tbl_name, aws_region, endpoint_url)
122+
dynamodb_client = dynamodb_table._get_dynamodb_client(
123+
dynamodb_table.region, dynamodb_table.endpoint_url
124+
)
125+
assert dynamodb_client.meta.region_name == aws_region
126+
assert dynamodb_client.meta.endpoint_url == endpoint_url
127+
128+
129+
def test_online_store_config_dynamodb_resource():
130+
"""Test DynamoDBOnlineStoreConfig configure DynamoDB Resource with endpoint_url."""
131+
aws_region = "us-west-2"
132+
endpoint_url = "http://localhost:8000"
133+
dynamodb_store = DynamoDBOnlineStore()
134+
dynamodb_store_config = DynamoDBOnlineStoreConfig(
135+
region=aws_region, endpoint_url=endpoint_url
136+
)
137+
dynamodb_resource = dynamodb_store._get_dynamodb_resource(
138+
dynamodb_store_config.region, dynamodb_store_config.endpoint_url
139+
)
140+
assert dynamodb_resource.meta.client.meta.region_name == aws_region
141+
assert dynamodb_resource.meta.client.meta.endpoint_url == endpoint_url
142+
143+
144+
def test_dynamodb_table_dynamodb_resource():
145+
"""Test DynamoDBTable configure DynamoDB resource with endpoint_url."""
146+
tbl_name = "dynamodb-test"
147+
aws_region = "us-west-2"
148+
endpoint_url = "http://localhost:8000"
149+
dynamodb_table = DynamoDBTable(tbl_name, aws_region, endpoint_url)
150+
dynamodb_resource = dynamodb_table._get_dynamodb_resource(
151+
dynamodb_table.region, dynamodb_table.endpoint_url
152+
)
153+
assert dynamodb_resource.meta.client.meta.region_name == aws_region
154+
assert dynamodb_resource.meta.client.meta.endpoint_url == endpoint_url
155+
156+
41157
@mock_dynamodb2
42158
@pytest.mark.parametrize("n_samples", [5, 50, 100])
43159
def test_online_read(repo_config, n_samples):

0 commit comments

Comments
 (0)