1111# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212# See the License for the specific language governing permissions and
1313# limitations under the License.
14+ import asyncio
1415import itertools
1516import logging
1617from datetime import datetime
@@ -297,7 +298,6 @@ async def online_read_async(
297298 batch_size = online_config .batch_size
298299 entity_ids = self ._to_entity_ids (config , entity_keys )
299300 entity_ids_iter = iter (entity_ids )
300- result : List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]] = []
301301 table_name = _get_table_name (online_config , config , table )
302302
303303 deserialize = TypeDeserializer ().deserialize
@@ -309,24 +309,40 @@ def to_tbl_resp(raw_client_response):
309309 "values" : deserialize (raw_client_response ["values" ]),
310310 }
311311
312+ batches = []
313+ entity_id_batches = []
314+ while True :
315+ batch = list (itertools .islice (entity_ids_iter , batch_size ))
316+ if not batch :
317+ break
318+ entity_id_batch = self ._to_client_batch_get_payload (
319+ online_config , table_name , batch
320+ )
321+ batches .append (batch )
322+ entity_id_batches .append (entity_id_batch )
323+
312324 async with self ._get_aiodynamodb_client (online_config .region ) as client :
313- while True :
314- batch = list (itertools .islice (entity_ids_iter , batch_size ))
315-
316- # No more items to insert
317- if len (batch ) == 0 :
318- break
319- batch_entity_ids = self ._to_client_batch_get_payload (
320- online_config , table_name , batch
321- )
322- response = await client .batch_get_item (
323- RequestItems = batch_entity_ids ,
324- )
325- batch_result = self ._process_batch_get_response (
326- table_name , response , entity_ids , batch , to_tbl_response = to_tbl_resp
327- )
328- result .extend (batch_result )
329- return result
325+ response_batches = await asyncio .gather (
326+ * [
327+ client .batch_get_item (
328+ RequestItems = entity_id_batch ,
329+ )
330+ for entity_id_batch in entity_id_batches
331+ ]
332+ )
333+
334+ result_batches = []
335+ for batch , response in zip (batches , response_batches ):
336+ result_batch = self ._process_batch_get_response (
337+ table_name ,
338+ response ,
339+ entity_ids ,
340+ batch ,
341+ to_tbl_response = to_tbl_resp ,
342+ )
343+ result_batches .append (result_batch )
344+
345+ return list (itertools .chain (* result_batches ))
330346
331347 def _get_aioboto_session (self ):
332348 if self ._aioboto_session is None :
0 commit comments