|
4 | 4 | """ |
5 | 5 |
|
6 | 6 | import logging |
| 7 | +import time |
7 | 8 |
|
8 | 9 | from botocore.exceptions import ClientError |
9 | 10 | from pynamodb.exceptions import UpdateError |
@@ -32,7 +33,10 @@ def migrate_boolean_attributes(model_class, |
32 | 33 | attribute_names, |
33 | 34 | read_capacity_to_consume_per_second=10, |
34 | 35 | allow_rate_limited_scan_without_consumed_capacity=False, |
35 | | - mock_conditional_update_failure=False): |
| 36 | + mock_conditional_update_failure=False, |
| 37 | + page_size=None, |
| 38 | + limit=None, |
| 39 | + number_of_secs_to_back_off=1): |
36 | 40 | """ |
37 | 41 | Migrates boolean attributes per GitHub `issue 404 <https://github.com/pynamodb/PynamoDB/issues/404>`_. |
38 | 42 |
|
@@ -79,15 +83,21 @@ def migrate_boolean_attributes(model_class, |
79 | 83 | meant to trigger the code path in boto, to allow us to unit test that |
80 | 84 | we are jumping through appropriate hoops handling the resulting |
81 | 85 | failure and distinguishing it from other failures. |
| 86 | + :param page_size: Passed along to the underlying 'page_size'. Page size of the scan to DynamoDB. |
| 87 | + :param limit: Passed along to the underlying 'limit'. Used to limit the number of results returned. |
| 88 | + :param number_of_secs_to_back_off: Number of seconds to sleep when exceeding capacity. |
82 | 89 |
|
83 | 90 | :return: (number_of_items_in_need_of_update, number_of_them_that_failed_due_to_conditional_update) |
84 | 91 | """ |
85 | 92 | log.info('migrating items; no progress will be reported until completed; this may take a while') |
86 | 93 | num_items_with_actions = 0 |
87 | 94 | num_update_failures = 0 |
| 95 | + items_processed = 0 |
88 | 96 |
|
89 | 97 | for item in model_class.rate_limited_scan(_build_lba_filter_condition(attribute_names), |
90 | 98 | read_capacity_to_consume_per_second=read_capacity_to_consume_per_second, |
| 99 | + page_size=page_size, |
| 100 | + limit=limit, |
91 | 101 | allow_rate_limited_scan_without_consumed_capacity=allow_rate_limited_scan_without_consumed_capacity): |
92 | 102 | actions = [] |
93 | 103 | condition = None |
@@ -119,10 +129,17 @@ def migrate_boolean_attributes(model_class, |
119 | 129 | if code == 'ConditionalCheckFailedException': |
120 | 130 | log.warn('conditional update failed (concurrent writes?) for object: %s (you will need to re-run migration)', item) |
121 | 131 | num_update_failures += 1 |
| 132 | + elif code == 'ProvisionedThroughputExceededException': |
| 133 | + log.warn('provisioned write capacity exceeded at object: %s backing off (you will need to re-run migration)', item) |
| 134 | + num_update_failures += 1 |
| 135 | + time.sleep(number_of_secs_to_back_off) |
122 | 136 | else: |
123 | 137 | raise |
124 | 138 | else: |
125 | 139 | raise |
126 | | - log.info('finished migrating; %s items required updates, %s failed due to racing writes and require re-running migration', |
| 140 | + items_processed += 1 |
| 141 | + if items_processed % 1000 == 0: |
| 142 | + log.info('processed items: %s Thousand',items_processed/1000) |
| 143 | + log.info('finished migrating; %s items required updates, %s failed due to racing writes and/or exceeding capacity and require re-running migration', |
127 | 144 | num_items_with_actions, num_update_failures) |
128 | 145 | return num_items_with_actions, num_update_failures |
0 commit comments