@@ -150,7 +150,7 @@ static rb_internal_thread_specific_key_t specific_key_count;
150150
151151struct waiting_fd {
152152 struct ccan_list_node wfd_node ; /* <=> vm.waiting_fds */
153- rb_thread_t * th ;
153+ rb_execution_context_t * ec ;
154154 int fd ;
155155 struct rb_io_close_wait_list * busy ;
156156};
@@ -1691,12 +1691,14 @@ waitfd_to_waiting_flag(int wfd_event)
16911691}
16921692
16931693static void
1694- thread_io_setup_wfd (rb_thread_t * th , int fd , struct waiting_fd * wfd )
1694+ thread_io_setup_wfd (rb_execution_context_t * ec , int fd , struct waiting_fd * wfd )
16951695{
16961696 wfd -> fd = fd ;
1697- wfd -> th = th ;
1697+ wfd -> ec = ec ;
16981698 wfd -> busy = NULL ;
16991699
1700+ rb_thread_t * th = rb_ec_thread_ptr (ec );
1701+
17001702 RB_VM_LOCK_ENTER ();
17011703 {
17021704 ccan_list_add (& th -> vm -> waiting_fds , & wfd -> wfd_node );
@@ -1730,6 +1732,29 @@ thread_io_wake_pending_closer(struct waiting_fd *wfd)
17301732 }
17311733}
17321734
1735+
1736+ static VALUE
1737+ rb_thread_io_interruptible_operation_ensure (VALUE _argument )
1738+ {
1739+ struct waiting_fd * wfd = (struct waiting_fd * )_argument ;
1740+ thread_io_wake_pending_closer (wfd );
1741+ return Qnil ;
1742+ }
1743+
1744+ VALUE
1745+ rb_thread_io_interruptible_operation (VALUE self , VALUE (* function )(VALUE ), VALUE argument )
1746+ {
1747+ struct rb_io * io ;
1748+ RB_IO_POINTER (self , io );
1749+
1750+ rb_execution_context_t * ec = GET_EC ();
1751+
1752+ struct waiting_fd waiting_fd ;
1753+ thread_io_setup_wfd (ec , io -> fd , & waiting_fd );
1754+
1755+ return rb_ensure (function , argument , rb_thread_io_interruptible_operation_ensure , (VALUE )& waiting_fd );
1756+ }
1757+
17331758static bool
17341759thread_io_mn_schedulable (rb_thread_t * th , int events , const struct timeval * timeout )
17351760{
@@ -1815,18 +1840,18 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
18151840 // `func` or not (as opposed to some previously set value).
18161841 errno = 0 ;
18171842
1818- thread_io_setup_wfd (th , fd , & waiting_fd );
1843+ thread_io_setup_wfd (ec , fd , & waiting_fd );
18191844 {
18201845 EC_PUSH_TAG (ec );
18211846 if ((state = EC_EXEC_TAG ()) == TAG_NONE ) {
18221847 volatile enum ruby_tag_type saved_state = state ; /* for BLOCKING_REGION */
18231848 retry :
1824- BLOCKING_REGION (waiting_fd . th , {
1849+ BLOCKING_REGION (th , {
18251850 val = func (data1 );
18261851 saved_errno = errno ;
1827- }, ubf_select , waiting_fd . th , FALSE);
1852+ }, ubf_select , th , FALSE);
18281853
1829- th = rb_ec_thread_ptr (ec );
1854+ RUBY_ASSERT ( th == rb_ec_thread_ptr (ec ) );
18301855 if (events &&
18311856 blocking_call_retryable_p ((int )val , saved_errno ) &&
18321857 thread_io_wait_events (th , fd , events , NULL )) {
@@ -2640,6 +2665,7 @@ rb_ec_reset_raised(rb_execution_context_t *ec)
26402665int
26412666rb_notify_fd_close (int fd , struct rb_io_close_wait_list * busy )
26422667{
2668+ // fprintf(stderr, "rb_notify_fd_close(%d) pid=%d\n", fd, getpid());
26432669 rb_vm_t * vm = GET_THREAD ()-> vm ;
26442670 struct waiting_fd * wfd = 0 , * next ;
26452671 ccan_list_head_init (& busy -> pending_fd_users );
@@ -2650,16 +2676,23 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
26502676 {
26512677 ccan_list_for_each_safe (& vm -> waiting_fds , wfd , next , wfd_node ) {
26522678 if (wfd -> fd == fd ) {
2653- rb_thread_t * th = wfd -> th ;
2654- VALUE err ;
2679+ rb_execution_context_t * ec = wfd -> ec ;
2680+ rb_thread_t * th = rb_ec_thread_ptr ( ec ) ;
26552681
26562682 ccan_list_del (& wfd -> wfd_node );
26572683 ccan_list_add (& busy -> pending_fd_users , & wfd -> wfd_node );
26582684
26592685 wfd -> busy = busy ;
2660- err = th -> vm -> special_exceptions [ruby_error_stream_closed ];
2661- rb_threadptr_pending_interrupt_enque (th , err );
2662- rb_threadptr_interrupt (th );
2686+
2687+ VALUE error = th -> vm -> special_exceptions [ruby_error_stream_closed ];
2688+
2689+ if (th -> scheduler != Qnil ) {
2690+ rb_fiber_scheduler_fiber_interrupt (th -> scheduler , rb_fiberptr_self (ec -> fiber_ptr ), error );
2691+ } else {
2692+ // fprintf(stderr, "rb_notify_fd_close: Interrupting thread %p\n", (void *)th);
2693+ rb_threadptr_pending_interrupt_enque (th , error );
2694+ rb_threadptr_interrupt (th );
2695+ }
26632696 }
26642697 }
26652698 }
@@ -4417,7 +4450,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44174450 rb_execution_context_t * ec = GET_EC ();
44184451 rb_thread_t * th = rb_ec_thread_ptr (ec );
44194452
4420- thread_io_setup_wfd (th , fd , & wfd );
4453+ thread_io_setup_wfd (ec , fd , & wfd );
44214454
44224455 if (timeout == NULL && thread_io_wait_events (th , fd , events , NULL )) {
44234456 // fd is readable
@@ -4426,16 +4459,16 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44264459 errno = 0 ;
44274460 }
44284461 else {
4429- EC_PUSH_TAG (wfd . th -> ec );
4462+ EC_PUSH_TAG (ec );
44304463 if ((state = EC_EXEC_TAG ()) == TAG_NONE ) {
44314464 rb_hrtime_t * to , rel , end = 0 ;
4432- RUBY_VM_CHECK_INTS_BLOCKING (wfd . th -> ec );
4465+ RUBY_VM_CHECK_INTS_BLOCKING (ec );
44334466 timeout_prepare (& to , & rel , & end , timeout );
44344467 do {
44354468 nfds = numberof (fds );
4436- result = wait_for_single_fd_blocking_region (wfd . th , fds , nfds , to , & lerrno );
4469+ result = wait_for_single_fd_blocking_region (th , fds , nfds , to , & lerrno );
44374470
4438- RUBY_VM_CHECK_INTS_BLOCKING (wfd . th -> ec );
4471+ RUBY_VM_CHECK_INTS_BLOCKING (ec );
44394472 } while (wait_retryable (& result , lerrno , to , end ));
44404473 }
44414474 EC_POP_TAG ();
@@ -4444,7 +4477,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44444477 thread_io_wake_pending_closer (& wfd );
44454478
44464479 if (state ) {
4447- EC_JUMP_TAG (wfd . th -> ec , state );
4480+ EC_JUMP_TAG (ec , state );
44484481 }
44494482
44504483 if (result < 0 ) {
@@ -4543,14 +4576,13 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
45434576 int r ;
45444577 VALUE ptr = (VALUE )& args ;
45454578 rb_execution_context_t * ec = GET_EC ();
4546- rb_thread_t * th = rb_ec_thread_ptr (ec );
45474579
45484580 args .as .fd = fd ;
45494581 args .read = (events & RB_WAITFD_IN ) ? init_set_fd (fd , & rfds ) : NULL ;
45504582 args .write = (events & RB_WAITFD_OUT ) ? init_set_fd (fd , & wfds ) : NULL ;
45514583 args .except = (events & RB_WAITFD_PRI ) ? init_set_fd (fd , & efds ) : NULL ;
45524584 args .tv = timeout ;
4553- thread_io_setup_wfd (th , fd , & args .wfd );
4585+ thread_io_setup_wfd (ec , fd , & args .wfd );
45544586
45554587 r = (int )rb_ensure (select_single , ptr , select_single_cleanup , ptr );
45564588 if (r == -1 )
0 commit comments