Skip to content

Commit 54d287e

Browse files
committed
Introduce Fiber::Scheduler#fiber_interrupt.
1 parent cc7fafb commit 54d287e

File tree

7 files changed

+121
-22
lines changed

7 files changed

+121
-22
lines changed

include/ruby/fiber/scheduler.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,12 @@ struct rb_fiber_scheduler_blocking_operation_state {
411411
*/
412412
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state);
413413

414+
/**
415+
* Interrupt a fiber by raising an exception. You can construct an exception using `rb_make_exception`.
416+
*
417+
*/
418+
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception);
419+
414420
/**
415421
* Create and schedule a non-blocking fiber.
416422
*

include/ruby/io.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,20 @@ VALUE rb_io_get_io(VALUE io);
664664
*/
665665
VALUE rb_io_check_io(VALUE io);
666666

667+
/**
668+
* Invoke the given function with the given data, which may block the current
669+
* execution context.
670+
*
671+
* During the execution of the function, the current execution context may be
672+
* interrupted by an exception raised by another thread if the IO is closed.
673+
*
674+
* @param[in] io An IO.
675+
* @param[in] function A function to call.
676+
* @param[in] data Data to pass to the function.
677+
* @param[in] flags Flags (must be 0, reserved for later use).
678+
*/
679+
VALUE rb_io_interruptable_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument, int flags);
680+
667681
/**
668682
* Queries the tied IO for writing. An IO can be duplexed. Fine. The thing
669683
* is, that characteristics could sometimes be achieved by the underlying

internal/thread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ void *rb_thread_prevent_fork(void *(*func)(void *), void *data); /* for ext/sock
7676
VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd);
7777
VALUE rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, int events);
7878

79+
// Invoke the given function, with the specified argument, in a way that `IO#close` from another execution context can interrupt it.
80+
VALUE rb_thread_io_interruptable_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument, int flags);
81+
7982
/* thread.c (export) */
8083
int ruby_thread_has_gvl_p(void); /* for ext/fiddle/closure.c */
8184

io.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,12 @@ VALUE rb_io_blocking_region(struct rb_io *io, rb_blocking_function_t *function,
233233
return rb_io_blocking_region_wait(io, function, argument, 0);
234234
}
235235

236+
VALUE
237+
rb_io_interruptable_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument, int flags)
238+
{
239+
return rb_thread_io_interruptable_operation(self, function, argument, flags);
240+
}
241+
236242
struct argf {
237243
VALUE filename, current_file;
238244
long last_lineno; /* $. */

scheduler.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ static ID id_io_close;
3737
static ID id_address_resolve;
3838

3939
static ID id_blocking_operation_wait;
40+
static ID id_fiber_interrupt;
4041

4142
static ID id_fiber_schedule;
4243

@@ -108,14 +109,14 @@ Init_Fiber_Scheduler(void)
108109
id_io_pread = rb_intern_const("io_pread");
109110
id_io_write = rb_intern_const("io_write");
110111
id_io_pwrite = rb_intern_const("io_pwrite");
111-
112112
id_io_wait = rb_intern_const("io_wait");
113113
id_io_select = rb_intern_const("io_select");
114114
id_io_close = rb_intern_const("io_close");
115115

116116
id_address_resolve = rb_intern_const("address_resolve");
117117

118118
id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
119+
id_fiber_interrupt = rb_intern_const("fiber_interrupt");
119120

120121
id_fiber_schedule = rb_intern_const("fiber");
121122

@@ -766,6 +767,15 @@ VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*functi
766767
return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
767768
}
768769

770+
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
771+
{
772+
VALUE arguments[] = {
773+
fiber, exception
774+
};
775+
776+
return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
777+
}
778+
769779
/*
770780
* Document-method: Fiber::Scheduler#fiber
771781
* call-seq: fiber(&block)

test/fiber/scheduler.rb

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,15 @@ def next_timeout
6868
def run
6969
# $stderr.puts [__method__, Fiber.current].inspect
7070

71+
readable = writable = nil
72+
7173
while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
7274
# May only handle file descriptors up to 1024...
73-
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
75+
begin
76+
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
77+
rescue IOError => error
78+
# Ignore - this can happen if the IO is closed while we are waiting.
79+
end
7480

7581
# puts "readable: #{readable}" if readable&.any?
7682
# puts "writable: #{writable}" if writable&.any?
@@ -290,6 +296,30 @@ def unblock(blocker, fiber)
290296
io.write_nonblock('.')
291297
end
292298

299+
class FiberInterrupt
300+
def initialize(fiber, exception)
301+
@fiber = fiber
302+
@exception = exception
303+
end
304+
305+
def alive?
306+
@fiber.alive?
307+
end
308+
309+
def transfer
310+
@fiber.raise(@exception)
311+
end
312+
end
313+
314+
def fiber_interrupt(fiber, exception)
315+
@lock.synchronize do
316+
@ready << FiberInterrupt.new(fiber, exception)
317+
end
318+
319+
io = @urgent.last
320+
io.write_nonblock('.')
321+
end
322+
293323
# This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use
294324
# it to create scheduled fibers, but it is not required in practice;
295325
# `Fiber.new` is usually sufficient.

thread.c

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ static rb_internal_thread_specific_key_t specific_key_count;
155155

156156
struct waiting_fd {
157157
struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */
158-
rb_thread_t *th;
158+
rb_execution_context_t *ec;
159159
int fd;
160160
struct rb_io_close_wait_list *busy;
161161
};
@@ -1696,12 +1696,14 @@ waitfd_to_waiting_flag(int wfd_event)
16961696
}
16971697

16981698
static void
1699-
thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
1699+
thread_io_setup_wfd(rb_execution_context_t *ec, int fd, struct waiting_fd *wfd)
17001700
{
17011701
wfd->fd = fd;
1702-
wfd->th = th;
1702+
wfd->ec = ec;
17031703
wfd->busy = NULL;
17041704

1705+
rb_thread_t *th = rb_ec_thread_ptr(ec);
1706+
17051707
RB_VM_LOCK_ENTER();
17061708
{
17071709
ccan_list_add(&th->vm->waiting_fds, &wfd->wfd_node);
@@ -1735,6 +1737,29 @@ thread_io_wake_pending_closer(struct waiting_fd *wfd)
17351737
}
17361738
}
17371739

1740+
1741+
static VALUE
1742+
rb_thread_io_interruptable_operation_ensure(VALUE _argument)
1743+
{
1744+
struct waiting_fd *wfd = (struct waiting_fd *)_argument;
1745+
thread_io_wake_pending_closer(wfd);
1746+
return Qnil;
1747+
}
1748+
1749+
VALUE
1750+
rb_thread_io_interruptable_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument, int flags)
1751+
{
1752+
struct rb_io *io;
1753+
RB_IO_POINTER(self, io);
1754+
1755+
rb_execution_context_t *ec = GET_EC();
1756+
1757+
struct waiting_fd waiting_fd;
1758+
thread_io_setup_wfd(ec, io->fd, &waiting_fd);
1759+
1760+
return rb_ensure(function, argument, rb_thread_io_interruptable_operation_ensure, (VALUE)&waiting_fd);
1761+
}
1762+
17381763
static bool
17391764
thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
17401765
{
@@ -1820,18 +1845,18 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
18201845
// `func` or not (as opposed to some previously set value).
18211846
errno = 0;
18221847

1823-
thread_io_setup_wfd(th, fd, &waiting_fd);
1848+
thread_io_setup_wfd(ec, fd, &waiting_fd);
18241849
{
18251850
EC_PUSH_TAG(ec);
18261851
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
18271852
volatile enum ruby_tag_type saved_state = state; /* for BLOCKING_REGION */
18281853
retry:
1829-
BLOCKING_REGION(waiting_fd.th, {
1854+
BLOCKING_REGION(th, {
18301855
val = func(data1);
18311856
saved_errno = errno;
1832-
}, ubf_select, waiting_fd.th, FALSE);
1857+
}, ubf_select, th, FALSE);
18331858

1834-
th = rb_ec_thread_ptr(ec);
1859+
RUBY_ASSERT(th == rb_ec_thread_ptr(ec));
18351860
if (events &&
18361861
blocking_call_retryable_p((int)val, saved_errno) &&
18371862
thread_io_wait_events(th, fd, events, NULL)) {
@@ -2655,16 +2680,22 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
26552680
{
26562681
ccan_list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
26572682
if (wfd->fd == fd) {
2658-
rb_thread_t *th = wfd->th;
2659-
VALUE err;
2683+
rb_execution_context_t *ec = wfd->ec;
2684+
rb_thread_t *th = rb_ec_thread_ptr(ec);
26602685

26612686
ccan_list_del(&wfd->wfd_node);
26622687
ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node);
26632688

26642689
wfd->busy = busy;
2665-
err = th->vm->special_exceptions[ruby_error_stream_closed];
2666-
rb_threadptr_pending_interrupt_enque(th, err);
2667-
rb_threadptr_interrupt(th);
2690+
2691+
VALUE error = th->vm->special_exceptions[ruby_error_stream_closed];
2692+
2693+
if (th->scheduler != Qnil) {
2694+
rb_fiber_scheduler_fiber_interrupt(th->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
2695+
} else {
2696+
rb_threadptr_pending_interrupt_enque(th, error);
2697+
rb_threadptr_interrupt(th);
2698+
}
26682699
}
26692700
}
26702701
}
@@ -4422,7 +4453,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44224453
rb_execution_context_t *ec = GET_EC();
44234454
rb_thread_t *th = rb_ec_thread_ptr(ec);
44244455

4425-
thread_io_setup_wfd(th, fd, &wfd);
4456+
thread_io_setup_wfd(ec, fd, &wfd);
44264457

44274458
if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) {
44284459
// fd is readable
@@ -4431,16 +4462,16 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44314462
errno = 0;
44324463
}
44334464
else {
4434-
EC_PUSH_TAG(wfd.th->ec);
4465+
EC_PUSH_TAG(ec);
44354466
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
44364467
rb_hrtime_t *to, rel, end = 0;
4437-
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
4468+
RUBY_VM_CHECK_INTS_BLOCKING(ec);
44384469
timeout_prepare(&to, &rel, &end, timeout);
44394470
do {
44404471
nfds = numberof(fds);
4441-
result = wait_for_single_fd_blocking_region(wfd.th, fds, nfds, to, &lerrno);
4472+
result = wait_for_single_fd_blocking_region(th, fds, nfds, to, &lerrno);
44424473

4443-
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
4474+
RUBY_VM_CHECK_INTS_BLOCKING(ec);
44444475
} while (wait_retryable(&result, lerrno, to, end));
44454476
}
44464477
EC_POP_TAG();
@@ -4449,7 +4480,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44494480
thread_io_wake_pending_closer(&wfd);
44504481

44514482
if (state) {
4452-
EC_JUMP_TAG(wfd.th->ec, state);
4483+
EC_JUMP_TAG(ec, state);
44534484
}
44544485

44554486
if (result < 0) {
@@ -4548,14 +4579,13 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
45484579
int r;
45494580
VALUE ptr = (VALUE)&args;
45504581
rb_execution_context_t *ec = GET_EC();
4551-
rb_thread_t *th = rb_ec_thread_ptr(ec);
45524582

45534583
args.as.fd = fd;
45544584
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
45554585
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
45564586
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
45574587
args.tv = timeout;
4558-
thread_io_setup_wfd(th, fd, &args.wfd);
4588+
thread_io_setup_wfd(ec, fd, &args.wfd);
45594589

45604590
r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
45614591
if (r == -1)

0 commit comments

Comments
 (0)