Skip to content

Commit faf5619

Browse files
committed
Introduce Fiber::Scheduler#fiber_interrupt and IO#interruptible_operation.
1 parent 617e860 commit faf5619

File tree

7 files changed

+144
-22
lines changed

7 files changed

+144
-22
lines changed

include/ruby/fiber/scheduler.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,12 @@ struct rb_fiber_scheduler_blocking_operation_state {
411411
*/
412412
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state);
413413

414+
/**
415+
* Interrupt a fiber by raising an exception. You can construct an exception using `rb_make_exception`.
416+
*
417+
*/
418+
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception);
419+
414420
/**
415421
* Create and schedule a non-blocking fiber.
416422
*

include/ruby/io.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,19 @@ VALUE rb_io_get_io(VALUE io);
664664
*/
665665
VALUE rb_io_check_io(VALUE io);
666666

667+
/**
668+
* Invoke the given function with the given data, which may block the current
669+
* execution context.
670+
*
671+
* During the execution of the function, the current execution context may be
672+
* interrupted by an exception raised by another thread if the IO is closed.
673+
*
674+
* @param[in] io An IO.
675+
* @param[in] function A function to call.
676+
* @param[in] data Data to pass to the function.
677+
*/
678+
VALUE rb_io_interruptible_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument);
679+
667680
/**
668681
* Queries the tied IO for writing. An IO can be duplexed. Fine. The thing
669682
* is, that characteristics could sometimes be achieved by the underlying

internal/thread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ void *rb_thread_prevent_fork(void *(*func)(void *), void *data); /* for ext/sock
7676
VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd);
7777
VALUE rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, int events);
7878

79+
// Invoke the given function, with the specified argument, in a way that `IO#close` from another execution context can interrupt it.
80+
VALUE rb_thread_io_interruptible_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument);
81+
7982
/* thread.c (export) */
8083
int ruby_thread_has_gvl_p(void); /* for ext/fiddle/closure.c */
8184

io.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,32 @@ VALUE rb_io_blocking_region(struct rb_io *io, rb_blocking_function_t *function,
233233
return rb_io_blocking_region_wait(io, function, argument, 0);
234234
}
235235

236+
VALUE
237+
rb_io_interruptible_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument)
238+
{
239+
VALUE scheduler = rb_fiber_scheduler_current();
240+
241+
if (scheduler == Qnil) {
242+
return function(argument);
243+
} else {
244+
return rb_thread_io_interruptible_operation(self, function, argument);
245+
}
246+
}
247+
248+
static VALUE
249+
io_interruptible_operation_begin(VALUE block)
250+
{
251+
return rb_proc_call_with_block(block, 0, NULL, Qnil);
252+
}
253+
254+
static VALUE
255+
io_interruptible_operation(VALUE self)
256+
{
257+
VALUE block = rb_block_proc();
258+
259+
return rb_io_interruptible_operation(self, io_interruptible_operation_begin, block);
260+
}
261+
236262
struct argf {
237263
VALUE filename, current_file;
238264
long last_lineno; /* $. */
@@ -15700,6 +15726,8 @@ Init_IO(void)
1570015726

1570115727
rb_define_method(rb_cIO, "initialize", rb_io_initialize, -1);
1570215728

15729+
rb_define_method(rb_cIO, "interruptible_operation", io_interruptible_operation, 0);
15730+
1570315731
rb_output_fs = Qnil;
1570415732
rb_define_hooked_variable("$,", &rb_output_fs, 0, deprecated_str_setter);
1570515733

scheduler.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ static ID id_io_close;
3737
static ID id_address_resolve;
3838

3939
static ID id_blocking_operation_wait;
40+
static ID id_fiber_interrupt;
4041

4142
static ID id_fiber_schedule;
4243

@@ -108,14 +109,14 @@ Init_Fiber_Scheduler(void)
108109
id_io_pread = rb_intern_const("io_pread");
109110
id_io_write = rb_intern_const("io_write");
110111
id_io_pwrite = rb_intern_const("io_pwrite");
111-
112112
id_io_wait = rb_intern_const("io_wait");
113113
id_io_select = rb_intern_const("io_select");
114114
id_io_close = rb_intern_const("io_close");
115115

116116
id_address_resolve = rb_intern_const("address_resolve");
117117

118118
id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
119+
id_fiber_interrupt = rb_intern_const("fiber_interrupt");
119120

120121
id_fiber_schedule = rb_intern_const("fiber");
121122

@@ -766,6 +767,15 @@ VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*functi
766767
return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
767768
}
768769

770+
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
771+
{
772+
VALUE arguments[] = {
773+
fiber, exception
774+
};
775+
776+
return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
777+
}
778+
769779
/*
770780
* Document-method: Fiber::Scheduler#fiber
771781
* call-seq: fiber(&block)

test/fiber/scheduler.rb

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,15 @@ def next_timeout
6868
def run
6969
# $stderr.puts [__method__, Fiber.current].inspect
7070

71+
readable = writable = nil
72+
7173
while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
7274
# May only handle file descriptors up to 1024...
73-
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
75+
begin
76+
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
77+
rescue IOError
78+
# Ignore - this can happen if the IO is closed while we are waiting.
79+
end
7480

7581
# puts "readable: #{readable}" if readable&.any?
7682
# puts "writable: #{writable}" if writable&.any?
@@ -290,6 +296,30 @@ def unblock(blocker, fiber)
290296
io.write_nonblock('.')
291297
end
292298

299+
class FiberInterrupt
300+
def initialize(fiber, exception)
301+
@fiber = fiber
302+
@exception = exception
303+
end
304+
305+
def alive?
306+
@fiber.alive?
307+
end
308+
309+
def transfer
310+
@fiber.raise(@exception)
311+
end
312+
end
313+
314+
def fiber_interrupt(fiber, exception)
315+
@lock.synchronize do
316+
@ready << FiberInterrupt.new(fiber, exception)
317+
end
318+
319+
io = @urgent.last
320+
io.write_nonblock('.')
321+
end
322+
293323
# This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use
294324
# it to create scheduled fibers, but it is not required in practice;
295325
# `Fiber.new` is usually sufficient.

thread.c

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ static rb_internal_thread_specific_key_t specific_key_count;
150150

151151
struct waiting_fd {
152152
struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */
153-
rb_thread_t *th;
153+
rb_execution_context_t *ec;
154154
int fd;
155155
struct rb_io_close_wait_list *busy;
156156
};
@@ -1691,12 +1691,14 @@ waitfd_to_waiting_flag(int wfd_event)
16911691
}
16921692

16931693
static void
1694-
thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
1694+
thread_io_setup_wfd(rb_execution_context_t *ec, int fd, struct waiting_fd *wfd)
16951695
{
16961696
wfd->fd = fd;
1697-
wfd->th = th;
1697+
wfd->ec = ec;
16981698
wfd->busy = NULL;
16991699

1700+
rb_thread_t *th = rb_ec_thread_ptr(ec);
1701+
17001702
RB_VM_LOCK_ENTER();
17011703
{
17021704
ccan_list_add(&th->vm->waiting_fds, &wfd->wfd_node);
@@ -1730,6 +1732,29 @@ thread_io_wake_pending_closer(struct waiting_fd *wfd)
17301732
}
17311733
}
17321734

1735+
1736+
static VALUE
1737+
rb_thread_io_interruptible_operation_ensure(VALUE _argument)
1738+
{
1739+
struct waiting_fd *wfd = (struct waiting_fd *)_argument;
1740+
thread_io_wake_pending_closer(wfd);
1741+
return Qnil;
1742+
}
1743+
1744+
VALUE
1745+
rb_thread_io_interruptible_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument)
1746+
{
1747+
struct rb_io *io;
1748+
RB_IO_POINTER(self, io);
1749+
1750+
rb_execution_context_t *ec = GET_EC();
1751+
1752+
struct waiting_fd waiting_fd;
1753+
thread_io_setup_wfd(ec, io->fd, &waiting_fd);
1754+
1755+
return rb_ensure(function, argument, rb_thread_io_interruptible_operation_ensure, (VALUE)&waiting_fd);
1756+
}
1757+
17331758
static bool
17341759
thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
17351760
{
@@ -1815,18 +1840,18 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
18151840
// `func` or not (as opposed to some previously set value).
18161841
errno = 0;
18171842

1818-
thread_io_setup_wfd(th, fd, &waiting_fd);
1843+
thread_io_setup_wfd(ec, fd, &waiting_fd);
18191844
{
18201845
EC_PUSH_TAG(ec);
18211846
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
18221847
volatile enum ruby_tag_type saved_state = state; /* for BLOCKING_REGION */
18231848
retry:
1824-
BLOCKING_REGION(waiting_fd.th, {
1849+
BLOCKING_REGION(th, {
18251850
val = func(data1);
18261851
saved_errno = errno;
1827-
}, ubf_select, waiting_fd.th, FALSE);
1852+
}, ubf_select, th, FALSE);
18281853

1829-
th = rb_ec_thread_ptr(ec);
1854+
RUBY_ASSERT(th == rb_ec_thread_ptr(ec));
18301855
if (events &&
18311856
blocking_call_retryable_p((int)val, saved_errno) &&
18321857
thread_io_wait_events(th, fd, events, NULL)) {
@@ -2640,6 +2665,7 @@ rb_ec_reset_raised(rb_execution_context_t *ec)
26402665
int
26412666
rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
26422667
{
2668+
// fprintf(stderr, "rb_notify_fd_close(%d) pid=%d\n", fd, getpid());
26432669
rb_vm_t *vm = GET_THREAD()->vm;
26442670
struct waiting_fd *wfd = 0, *next;
26452671
ccan_list_head_init(&busy->pending_fd_users);
@@ -2650,16 +2676,23 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
26502676
{
26512677
ccan_list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
26522678
if (wfd->fd == fd) {
2653-
rb_thread_t *th = wfd->th;
2654-
VALUE err;
2679+
rb_execution_context_t *ec = wfd->ec;
2680+
rb_thread_t *th = rb_ec_thread_ptr(ec);
26552681

26562682
ccan_list_del(&wfd->wfd_node);
26572683
ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node);
26582684

26592685
wfd->busy = busy;
2660-
err = th->vm->special_exceptions[ruby_error_stream_closed];
2661-
rb_threadptr_pending_interrupt_enque(th, err);
2662-
rb_threadptr_interrupt(th);
2686+
2687+
VALUE error = th->vm->special_exceptions[ruby_error_stream_closed];
2688+
2689+
if (th->scheduler != Qnil) {
2690+
rb_fiber_scheduler_fiber_interrupt(th->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
2691+
} else {
2692+
// fprintf(stderr, "rb_notify_fd_close: Interrupting thread %p\n", (void *)th);
2693+
rb_threadptr_pending_interrupt_enque(th, error);
2694+
rb_threadptr_interrupt(th);
2695+
}
26632696
}
26642697
}
26652698
}
@@ -4417,7 +4450,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44174450
rb_execution_context_t *ec = GET_EC();
44184451
rb_thread_t *th = rb_ec_thread_ptr(ec);
44194452

4420-
thread_io_setup_wfd(th, fd, &wfd);
4453+
thread_io_setup_wfd(ec, fd, &wfd);
44214454

44224455
if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) {
44234456
// fd is readable
@@ -4426,16 +4459,16 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44264459
errno = 0;
44274460
}
44284461
else {
4429-
EC_PUSH_TAG(wfd.th->ec);
4462+
EC_PUSH_TAG(ec);
44304463
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
44314464
rb_hrtime_t *to, rel, end = 0;
4432-
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
4465+
RUBY_VM_CHECK_INTS_BLOCKING(ec);
44334466
timeout_prepare(&to, &rel, &end, timeout);
44344467
do {
44354468
nfds = numberof(fds);
4436-
result = wait_for_single_fd_blocking_region(wfd.th, fds, nfds, to, &lerrno);
4469+
result = wait_for_single_fd_blocking_region(th, fds, nfds, to, &lerrno);
44374470

4438-
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
4471+
RUBY_VM_CHECK_INTS_BLOCKING(ec);
44394472
} while (wait_retryable(&result, lerrno, to, end));
44404473
}
44414474
EC_POP_TAG();
@@ -4444,7 +4477,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44444477
thread_io_wake_pending_closer(&wfd);
44454478

44464479
if (state) {
4447-
EC_JUMP_TAG(wfd.th->ec, state);
4480+
EC_JUMP_TAG(ec, state);
44484481
}
44494482

44504483
if (result < 0) {
@@ -4543,14 +4576,13 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
45434576
int r;
45444577
VALUE ptr = (VALUE)&args;
45454578
rb_execution_context_t *ec = GET_EC();
4546-
rb_thread_t *th = rb_ec_thread_ptr(ec);
45474579

45484580
args.as.fd = fd;
45494581
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
45504582
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
45514583
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
45524584
args.tv = timeout;
4553-
thread_io_setup_wfd(th, fd, &args.wfd);
4585+
thread_io_setup_wfd(ec, fd, &args.wfd);
45544586

45554587
r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
45564588
if (r == -1)

0 commit comments

Comments
 (0)