Skip to content

Commit d100b65

Browse files
betamoojpinner-lyft
authored andcommitted
Add rate limiting parameters to migrate_boolean_attributes migration. (pynamodb#423)
1 parent 2a10609 commit d100b65

File tree

1 file changed

+19
-2
lines changed

1 file changed

+19
-2
lines changed

pynamodb/migration.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""
55

66
import logging
7+
import time
78

89
from botocore.exceptions import ClientError
910
from pynamodb.exceptions import UpdateError
@@ -32,7 +33,10 @@ def migrate_boolean_attributes(model_class,
3233
attribute_names,
3334
read_capacity_to_consume_per_second=10,
3435
allow_rate_limited_scan_without_consumed_capacity=False,
35-
mock_conditional_update_failure=False):
36+
mock_conditional_update_failure=False,
37+
page_size=None,
38+
limit=None,
39+
number_of_secs_to_back_off=1):
3640
"""
3741
Migrates boolean attributes per GitHub `issue 404 <https://github.com/pynamodb/PynamoDB/issues/404>`_.
3842
@@ -79,15 +83,21 @@ def migrate_boolean_attributes(model_class,
7983
meant to trigger the code path in boto, to allow us to unit test that
8084
we are jumping through appropriate hoops handling the resulting
8185
failure and distinguishing it from other failures.
86+
:param page_size: Passed along to the underlying 'page_size'. Page size of the scan to DynamoDB.
87+
:param limit: Passed along to the underlying 'limit'. Used to limit the number of results returned.
88+
:param number_of_secs_to_back_off: Number of seconds to sleep when exceeding capacity.
8289
8390
:return: (number_of_items_in_need_of_update, number_of_them_that_failed_due_to_conditional_update)
8491
"""
8592
log.info('migrating items; no progress will be reported until completed; this may take a while')
8693
num_items_with_actions = 0
8794
num_update_failures = 0
95+
items_processed = 0
8896

8997
for item in model_class.rate_limited_scan(_build_lba_filter_condition(attribute_names),
9098
read_capacity_to_consume_per_second=read_capacity_to_consume_per_second,
99+
page_size=page_size,
100+
limit=limit,
91101
allow_rate_limited_scan_without_consumed_capacity=allow_rate_limited_scan_without_consumed_capacity):
92102
actions = []
93103
condition = None
@@ -119,10 +129,17 @@ def migrate_boolean_attributes(model_class,
119129
if code == 'ConditionalCheckFailedException':
120130
log.warn('conditional update failed (concurrent writes?) for object: %s (you will need to re-run migration)', item)
121131
num_update_failures += 1
132+
elif code == 'ProvisionedThroughputExceededException':
133+
log.warn('provisioned write capacity exceeded at object: %s backing off (you will need to re-run migration)', item)
134+
num_update_failures += 1
135+
time.sleep(number_of_secs_to_back_off)
122136
else:
123137
raise
124138
else:
125139
raise
126-
log.info('finished migrating; %s items required updates, %s failed due to racing writes and require re-running migration',
140+
items_processed += 1
141+
if items_processed % 1000 == 0:
142+
log.info('processed items: %s Thousand',items_processed/1000)
143+
log.info('finished migrating; %s items required updates, %s failed due to racing writes and/or exceeding capacity and require re-running migration',
127144
num_items_with_actions, num_update_failures)
128145
return num_items_with_actions, num_update_failures

0 commit comments

Comments
 (0)