1818import os
1919import socket
2020import sys
21- from threading import Lock , Thread
21+ from threading import Lock , Thread , Event
2222import time
2323import weakref
24+ import sys
2425
2526from six .moves import range
2627
@@ -51,6 +52,33 @@ def _cleanup(loop_weakref):
5152 loop ._cleanup ()
5253
5354
55+ class WaitableTimer (Timer ):
56+ def __init__ (self , timeout , callback ):
57+ Timer .__init__ (self , timeout , callback )
58+ self .callback = callback
59+ self .event = Event ()
60+
61+ self .final_exception = None
62+
63+ def finish (self , time_now ):
64+ try :
65+ finished = Timer .finish (self , time_now )
66+ if finished :
67+ self .event .set ()
68+ return True
69+ return False
70+
71+ except Exception as e :
72+ self .final_exception = e
73+ self .event .set ()
74+ return True
75+
76+ def wait (self , timeout = None ):
77+ self .event .wait (timeout )
78+ if self .final_exception :
79+ raise self .final_exception
80+
81+
5482class _PipeWrapper (object ):
5583
5684 def __init__ (self , fd ):
@@ -239,6 +267,11 @@ def _run_loop(self):
239267 def add_timer (self , timer ):
240268 self ._timers .add_timer (timer )
241269
270+ # This function is called from a different thread than the event loop
271+ # thread, so for this call to be thread safe, we must wake up the loop
272+ # in case it's stuck at a select
273+ self .wake_loop ()
274+
242275 def _cleanup (self ):
243276 global _dispatcher_map
244277
@@ -305,16 +338,23 @@ def __init__(self, *args, **kwargs):
305338 self .deque_lock = Lock ()
306339
307340 self ._connect_socket ()
308- asyncore .dispatcher .__init__ (self , self ._socket , _dispatcher_map )
341+
342+ # start the event loop if needed
343+ self ._loop .maybe_start ()
344+
345+ init_handler = WaitableTimer (
346+ timeout = 0 ,
347+ callback = partial (asyncore .dispatcher .__init__ ,
348+ self , self ._socket , _dispatcher_map )
349+ )
350+ AsyncoreConnection ._loop .add_timer (init_handler )
351+ init_handler .wait (kwargs ["connect_timeout" ])
309352
310353 self ._writable = True
311354 self ._readable = True
312355
313356 self ._send_options_message ()
314357
315- # start the event loop if needed
316- self ._loop .maybe_start ()
317-
318358 def close (self ):
319359 with self .lock :
320360 if self .is_closed :
@@ -324,7 +364,10 @@ def close(self):
324364 log .debug ("Closing connection (%s) to %s" , id (self ), self .host )
325365 self ._writable = False
326366 self ._readable = False
327- asyncore .dispatcher .close (self )
367+
368+ # We don't have to wait for this to be closed, we can just schedule it
369+ AsyncoreConnection .create_timer (0 , partial (asyncore .dispatcher .close , self ))
370+
328371 log .debug ("Closed socket to %s" , self .host )
329372
330373 if not self .is_defunct :
0 commit comments