File tree Expand file tree Collapse file tree 1 file changed +3
-2
lines changed
Expand file tree Collapse file tree 1 file changed +3
-2
lines changed Original file line number Diff line number Diff line change @@ -355,21 +355,22 @@ def _find_coordinator_ids(self, group_ids):
355355 }
356356 return groups_coordinators
357357
358- def _send_request_to_node (self , node_id , request ):
358+ def _send_request_to_node (self , node_id , request , wakeup = True ):
359359 """Send a Kafka protocol message to a specific broker.
360360
361361 Returns a future that may be polled for status and results.
362362
363363 :param node_id: The broker id to which to send the message.
364364 :param request: The message to send.
365+ :param wakeup: Optional flag to disable thread-wakeup.
365366 :return: A future object that may be polled for status and results.
366367 :exception: The exception if the message could not be sent.
367368 """
368369 while not self ._client .ready (node_id ):
369370 # poll until the connection to broker is ready, otherwise send()
370371 # will fail with NodeNotReadyError
371372 self ._client .poll ()
372- return self ._client .send (node_id , request )
373+ return self ._client .send (node_id , request , wakeup )
373374
374375 def _send_request_to_controller (self , request ):
375376 """Send a Kafka protocol message to the cluster controller.
You can’t perform that action at this time.
0 commit comments