Skip to content

Commit 8ad3ad1

Browse files
betamooharleyk
authored andcommitted
Rate limited Page Iterator (pynamodb#481)
1 parent bb58f04 commit 8ad3ad1

File tree

4 files changed

+234
-7
lines changed

4 files changed

+234
-7
lines changed

docs/rate_limited_operations.rst

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
Rate-Limited Operation
2+
================
3+
4+
`Scan`, `Query` and `Count` operations can be rate-limited based on the consumed capacities returned from DynamoDB.
5+
Simply specify the `rate_limit` argument when calling these methods.
6+
7+
.. note::
8+
9+
Rate-limiting is only meant to slow operations down to conform to capacity limitations.
10+
Rate-limiting can not be used to speed operations up. Specifying a higher rate-limit that exceeds the possible
11+
writing speed allowed by the environment, will not have any effect.
12+
13+
Example Usage
14+
^^^^^^^^^^^^
15+
16+
Suppose that you have defined a `User` Model for the examples below.
17+
18+
.. code-block:: python
19+
20+
from pynamodb.models import Model
21+
from pynamodb.attributes import (
22+
UnicodeAttribute
23+
)
24+
25+
26+
class User(Model):
27+
class Meta:
28+
table_name = 'Users'
29+
30+
id = UnicodeAttribute(hash_key=True)
31+
name = UnicodeAttribute(range_key=True)
32+
33+
34+
Here is an example using `rate-limit` in while scaning the `User` model
35+
36+
.. code-block:: python
37+
38+
# Using only 5 RCU per second
39+
for user in User.scan(rate_limit = 5):
40+
print("User id: {}, name: {}".format(user.id, user.name))
41+
42+
43+
Query
44+
^^^^^^^^^^^^^
45+
46+
You can use `rate-limit` when querying items from your table:
47+
48+
.. code-block:: python
49+
50+
# Using only 15 RCU per second
51+
for user in User.query('id1', User.name.startswith('re'), rate_limit = 15):
52+
print("Query returned user {0}".format(user))
53+
54+
55+
Count
56+
^^^^^^^^^^^^^
57+
58+
You can use `rate-limit` when counting items in your table:
59+
60+
.. code-block:: python
61+
62+
# Using only 15 RCU per second
63+
count = User.count(rate_limit = 15):
64+
print("Count : {}".format(count))
65+

pynamodb/models.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,7 @@ def count(cls,
536536
consistent_read=False,
537537
index_name=None,
538538
limit=None,
539+
rate_limit=None,
539540
**filters):
540541
"""
541542
Provides a filtered count
@@ -589,7 +590,8 @@ def count(cls,
589590
cls._get_connection().query,
590591
query_args,
591592
query_kwargs,
592-
limit=limit
593+
limit=limit,
594+
rate_limit=rate_limit
593595
)
594596

595597
# iterate through results
@@ -610,6 +612,7 @@ def query(cls,
610612
last_evaluated_key=None,
611613
attributes_to_get=None,
612614
page_size=None,
615+
rate_limit=None,
613616
**filters):
614617
"""
615618
Provides a high level query API
@@ -674,7 +677,8 @@ def query(cls,
674677
query_args,
675678
query_kwargs,
676679
map_fn=cls.from_raw_data,
677-
limit=limit
680+
limit=limit,
681+
rate_limit=rate_limit
678682
)
679683

680684
@classmethod
@@ -763,6 +767,7 @@ def scan(cls,
763767
page_size=None,
764768
consistent_read=None,
765769
index_name=None,
770+
rate_limit=None,
766771
**filters):
767772
"""
768773
Iterates through all items in the table
@@ -807,7 +812,8 @@ def scan(cls,
807812
scan_args,
808813
scan_kwargs,
809814
map_fn=cls.from_raw_data,
810-
limit=limit
815+
limit=limit,
816+
rate_limit=rate_limit,
811817
)
812818

813819
@classmethod

pynamodb/pagination.py

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,70 @@
1-
from pynamodb.constants import CAMEL_COUNT, ITEMS, LAST_EVALUATED_KEY, SCANNED_COUNT
1+
import time
2+
from pynamodb.constants import (CAMEL_COUNT, ITEMS, LAST_EVALUATED_KEY, SCANNED_COUNT,
3+
CONSUMED_CAPACITY, TOTAL, CAPACITY_UNITS)
4+
5+
6+
class RateLimiter(object):
7+
"""
8+
RateLimiter limits operations to a pre-set rate of units/seconds
9+
10+
Example:
11+
Initialize a RateLimiter with the desired rate
12+
rate_limiter = RateLimiter(rate_limit)
13+
14+
Now, every time before calling an operation, call acquire()
15+
rate_limiter.acquire()
16+
17+
And after an operation, update the number of units consumed
18+
rate_limiter.consume(units)
19+
20+
"""
21+
def __init__(self, rate_limit, time_module = None):
22+
"""
23+
Initializes a RateLimiter object
24+
25+
:param rate_limit: The desired rate
26+
:param time_module: Optional: the module responsible for calculating time. Intended to be used for testing purposes.
27+
"""
28+
if rate_limit <= 0:
29+
raise ValueError("rate_limit must be greater than zero")
30+
self._rate_limit = rate_limit
31+
self._consumed = 0
32+
self._time_of_last_acquire = 0.0
33+
self._time_module = time_module or time
34+
35+
def consume(self, units):
36+
"""
37+
Records the amount of units consumed.
38+
39+
:param units: Number of units consumed
40+
41+
:return: None
42+
"""
43+
self._consumed += units
44+
45+
def acquire(self):
46+
"""
47+
Sleeps the appropriate amount of time to follow the rate limit restriction
48+
49+
:return: None
50+
"""
51+
52+
self._time_module.sleep(max(0, self._consumed/float(self.rate_limit) - (self._time_module.time()-self._time_of_last_acquire)))
53+
self._consumed = 0
54+
self._time_of_last_acquire = self._time_module.time()
55+
56+
@property
57+
def rate_limit(self):
58+
"""
59+
A limit of units per seconds
60+
"""
61+
return self._rate_limit
62+
63+
@rate_limit.setter
64+
def rate_limit(self, rate_limit):
65+
if rate_limit <= 0:
66+
raise ValueError("rate_limit must be greater than zero")
67+
self._rate_limit = rate_limit
268

369

470
class PageIterator(object):
@@ -8,13 +74,16 @@ class PageIterator(object):
874
http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.Pagination
975
http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination
1076
"""
11-
def __init__(self, operation, args, kwargs):
77+
def __init__(self, operation, args, kwargs, rate_limit = None):
1278
self._operation = operation
1379
self._args = args
1480
self._kwargs = kwargs
1581
self._first_iteration = True
1682
self._last_evaluated_key = kwargs.get('exclusive_start_key')
1783
self._total_scanned_count = 0
84+
self._rate_limiter = None
85+
if rate_limit:
86+
self._rate_limiter = RateLimiter(rate_limit)
1887

1988
def __iter__(self):
2089
return self
@@ -26,10 +95,18 @@ def __next__(self):
2695
self._first_iteration = False
2796

2897
self._kwargs['exclusive_start_key'] = self._last_evaluated_key
98+
99+
if self._rate_limiter:
100+
self._rate_limiter.acquire()
101+
self._kwargs['return_consumed_capacity'] = TOTAL
29102
page = self._operation(*self._args, **self._kwargs)
30103
self._last_evaluated_key = page.get(LAST_EVALUATED_KEY)
31104
self._total_scanned_count += page[SCANNED_COUNT]
32105

106+
if self._rate_limiter:
107+
consumed_capacity = page.get(CONSUMED_CAPACITY, {}).get(CAPACITY_UNITS, 0)
108+
self._rate_limiter.consume(consumed_capacity)
109+
33110
return page
34111

35112
def next(self):
@@ -69,8 +146,8 @@ class ResultIterator(object):
69146
http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.Pagination
70147
http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination
71148
"""
72-
def __init__(self, operation, args, kwargs, map_fn=None, limit=None):
73-
self.page_iter = PageIterator(operation, args, kwargs)
149+
def __init__(self, operation, args, kwargs, map_fn=None, limit=None, rate_limit = None):
150+
self.page_iter = PageIterator(operation, args, kwargs, rate_limit)
74151
self._first_iteration = True
75152
self._map_fn = map_fn
76153
self._limit = limit

pynamodb/tests/test_pagination.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import pytest
2+
from pynamodb.pagination import RateLimiter
3+
4+
5+
class MockTime():
6+
def __init__(self):
7+
self.current_time = 0.0
8+
9+
def sleep(self, amount):
10+
self.current_time += amount
11+
12+
def time(self):
13+
return self.current_time
14+
15+
def increment_time(self, amount):
16+
self.current_time += amount
17+
18+
19+
def test_rate_limiter_exceptions():
20+
with pytest.raises(ValueError):
21+
r = RateLimiter(0)
22+
23+
with pytest.raises(ValueError):
24+
r = RateLimiter(-1)
25+
26+
with pytest.raises(ValueError):
27+
r = RateLimiter(10)
28+
r.rate_limit = 0
29+
30+
with pytest.raises(ValueError):
31+
r = RateLimiter(10)
32+
r.rate_limit = -1
33+
34+
35+
def test_basic_rate_limiting():
36+
mock_time = MockTime()
37+
r = RateLimiter(0.1, mock_time)
38+
39+
# 100 operations
40+
for i in range(0, 100):
41+
r.acquire()
42+
# Simulates an operation that takes 1 second
43+
mock_time.increment_time(1)
44+
r.consume(1)
45+
46+
# Since the first acquire doesn't take time, thus we should be expecting (100-1) * 10 seconds = 990 delay
47+
# plus 1 for the last increment_time(1) operation
48+
assert mock_time.time() == 991.0
49+
50+
51+
def test_basic_rate_limiting_small_increment():
52+
mock_time = MockTime()
53+
r = RateLimiter(0.1, mock_time)
54+
55+
# 100 operations
56+
for i in range(0, 100):
57+
r.acquire()
58+
# Simulates an operation that takes 2 second
59+
mock_time.increment_time(2)
60+
r.consume(1)
61+
62+
# Since the first acquire doesn't take time, thus we should be expecting (100-1) * 10 seconds = 990 delay
63+
# plus 2 for the last increment_time(2) operation
64+
assert mock_time.time() == 992.0
65+
66+
67+
def test_basic_rate_limiting_large_increment():
68+
mock_time = MockTime()
69+
r = RateLimiter(0.1, mock_time)
70+
71+
# 100 operations
72+
for i in range(0, 100):
73+
r.acquire()
74+
# Simulates an operation that takes 2 second
75+
mock_time.increment_time(11)
76+
r.consume(1)
77+
78+
# The operation takes longer than the minimum wait, so rate limiting should have no effect
79+
assert mock_time.time() == 1100.0

0 commit comments

Comments
 (0)