@@ -155,7 +155,7 @@ static rb_internal_thread_specific_key_t specific_key_count;
155155
156156struct waiting_fd {
157157 struct ccan_list_node wfd_node ; /* <=> vm.waiting_fds */
158- rb_thread_t * th ;
158+ rb_execution_context_t * ec ;
159159 int fd ;
160160 struct rb_io_close_wait_list * busy ;
161161};
@@ -1696,12 +1696,14 @@ waitfd_to_waiting_flag(int wfd_event)
16961696}
16971697
16981698static void
1699- thread_io_setup_wfd (rb_thread_t * th , int fd , struct waiting_fd * wfd )
1699+ thread_io_setup_wfd (rb_execution_context_t * ec , int fd , struct waiting_fd * wfd )
17001700{
17011701 wfd -> fd = fd ;
1702- wfd -> th = th ;
1702+ wfd -> ec = ec ;
17031703 wfd -> busy = NULL ;
17041704
1705+ rb_thread_t * th = rb_ec_thread_ptr (ec );
1706+
17051707 RB_VM_LOCK_ENTER ();
17061708 {
17071709 ccan_list_add (& th -> vm -> waiting_fds , & wfd -> wfd_node );
@@ -1735,6 +1737,29 @@ thread_io_wake_pending_closer(struct waiting_fd *wfd)
17351737 }
17361738}
17371739
1740+
1741+ static VALUE
1742+ rb_thread_io_interruptable_operation_ensure (VALUE _argument )
1743+ {
1744+ struct waiting_fd * wfd = (struct waiting_fd * )_argument ;
1745+ thread_io_wake_pending_closer (wfd );
1746+ return Qnil ;
1747+ }
1748+
1749+ VALUE
1750+ rb_thread_io_interruptable_operation (VALUE self , VALUE (* function )(VALUE ), VALUE argument , int flags )
1751+ {
1752+ struct rb_io * io ;
1753+ RB_IO_POINTER (self , io );
1754+
1755+ rb_execution_context_t * ec = GET_EC ();
1756+
1757+ struct waiting_fd waiting_fd ;
1758+ thread_io_setup_wfd (ec , io -> fd , & waiting_fd );
1759+
1760+ return rb_ensure (function , argument , rb_thread_io_interruptable_operation_ensure , (VALUE )& waiting_fd );
1761+ }
1762+
17381763static bool
17391764thread_io_mn_schedulable (rb_thread_t * th , int events , const struct timeval * timeout )
17401765{
@@ -1820,18 +1845,18 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
18201845 // `func` or not (as opposed to some previously set value).
18211846 errno = 0 ;
18221847
1823- thread_io_setup_wfd (th , fd , & waiting_fd );
1848+ thread_io_setup_wfd (ec , fd , & waiting_fd );
18241849 {
18251850 EC_PUSH_TAG (ec );
18261851 if ((state = EC_EXEC_TAG ()) == TAG_NONE ) {
18271852 volatile enum ruby_tag_type saved_state = state ; /* for BLOCKING_REGION */
18281853 retry :
1829- BLOCKING_REGION (waiting_fd . th , {
1854+ BLOCKING_REGION (th , {
18301855 val = func (data1 );
18311856 saved_errno = errno ;
1832- }, ubf_select , waiting_fd . th , FALSE);
1857+ }, ubf_select , th , FALSE);
18331858
1834- th = rb_ec_thread_ptr (ec );
1859+ RUBY_ASSERT ( th == rb_ec_thread_ptr (ec ) );
18351860 if (events &&
18361861 blocking_call_retryable_p ((int )val , saved_errno ) &&
18371862 thread_io_wait_events (th , fd , events , NULL )) {
@@ -2655,16 +2680,22 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
26552680 {
26562681 ccan_list_for_each_safe (& vm -> waiting_fds , wfd , next , wfd_node ) {
26572682 if (wfd -> fd == fd ) {
2658- rb_thread_t * th = wfd -> th ;
2659- VALUE err ;
2683+ rb_execution_context_t * ec = wfd -> ec ;
2684+ rb_thread_t * th = rb_ec_thread_ptr ( ec ) ;
26602685
26612686 ccan_list_del (& wfd -> wfd_node );
26622687 ccan_list_add (& busy -> pending_fd_users , & wfd -> wfd_node );
26632688
26642689 wfd -> busy = busy ;
2665- err = th -> vm -> special_exceptions [ruby_error_stream_closed ];
2666- rb_threadptr_pending_interrupt_enque (th , err );
2667- rb_threadptr_interrupt (th );
2690+
2691+ VALUE error = th -> vm -> special_exceptions [ruby_error_stream_closed ];
2692+
2693+ if (th -> scheduler != Qnil ) {
2694+ rb_fiber_scheduler_fiber_interrupt (th -> scheduler , rb_fiberptr_self (ec -> fiber_ptr ), error );
2695+ } else {
2696+ rb_threadptr_pending_interrupt_enque (th , error );
2697+ rb_threadptr_interrupt (th );
2698+ }
26682699 }
26692700 }
26702701 }
@@ -4422,7 +4453,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44224453 rb_execution_context_t * ec = GET_EC ();
44234454 rb_thread_t * th = rb_ec_thread_ptr (ec );
44244455
4425- thread_io_setup_wfd (th , fd , & wfd );
4456+ thread_io_setup_wfd (ec , fd , & wfd );
44264457
44274458 if (timeout == NULL && thread_io_wait_events (th , fd , events , NULL )) {
44284459 // fd is readable
@@ -4431,16 +4462,16 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44314462 errno = 0 ;
44324463 }
44334464 else {
4434- EC_PUSH_TAG (wfd . th -> ec );
4465+ EC_PUSH_TAG (ec );
44354466 if ((state = EC_EXEC_TAG ()) == TAG_NONE ) {
44364467 rb_hrtime_t * to , rel , end = 0 ;
4437- RUBY_VM_CHECK_INTS_BLOCKING (wfd . th -> ec );
4468+ RUBY_VM_CHECK_INTS_BLOCKING (ec );
44384469 timeout_prepare (& to , & rel , & end , timeout );
44394470 do {
44404471 nfds = numberof (fds );
4441- result = wait_for_single_fd_blocking_region (wfd . th , fds , nfds , to , & lerrno );
4472+ result = wait_for_single_fd_blocking_region (th , fds , nfds , to , & lerrno );
44424473
4443- RUBY_VM_CHECK_INTS_BLOCKING (wfd . th -> ec );
4474+ RUBY_VM_CHECK_INTS_BLOCKING (ec );
44444475 } while (wait_retryable (& result , lerrno , to , end ));
44454476 }
44464477 EC_POP_TAG ();
@@ -4449,7 +4480,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44494480 thread_io_wake_pending_closer (& wfd );
44504481
44514482 if (state ) {
4452- EC_JUMP_TAG (wfd . th -> ec , state );
4483+ EC_JUMP_TAG (ec , state );
44534484 }
44544485
44554486 if (result < 0 ) {
@@ -4548,14 +4579,13 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
45484579 int r ;
45494580 VALUE ptr = (VALUE )& args ;
45504581 rb_execution_context_t * ec = GET_EC ();
4551- rb_thread_t * th = rb_ec_thread_ptr (ec );
45524582
45534583 args .as .fd = fd ;
45544584 args .read = (events & RB_WAITFD_IN ) ? init_set_fd (fd , & rfds ) : NULL ;
45554585 args .write = (events & RB_WAITFD_OUT ) ? init_set_fd (fd , & wfds ) : NULL ;
45564586 args .except = (events & RB_WAITFD_PRI ) ? init_set_fd (fd , & efds ) : NULL ;
45574587 args .tv = timeout ;
4558- thread_io_setup_wfd (th , fd , & args .wfd );
4588+ thread_io_setup_wfd (ec , fd , & args .wfd );
45594589
45604590 r = (int )rb_ensure (select_single , ptr , select_single_cleanup , ptr );
45614591 if (r == -1 )
0 commit comments