Skip to content

Commit 5694eb1

Browse files
committed
Allow IO#close to interrupt fiber scheduler.
1 parent 2ca8769 commit 5694eb1

File tree

6 files changed

+169
-23
lines changed

6 files changed

+169
-23
lines changed

include/ruby/fiber/scheduler.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,12 @@ struct rb_fiber_scheduler_blocking_operation_state {
411411
*/
412412
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state);
413413

414+
/**
415+
* Interrupt a fiber by raising an exception. You can construct an exception using `rb_make_exception`.
416+
*
417+
*/
418+
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception);
419+
414420
/**
415421
* Create and schedule a non-blocking fiber.
416422
*

internal/thread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ void *rb_thread_prevent_fork(void *(*func)(void *), void *data); /* for ext/sock
7272
VALUE rb_thread_io_blocking_region(struct rb_io *io, rb_blocking_function_t *func, void *data1);
7373
VALUE rb_thread_io_blocking_call(struct rb_io *io, rb_blocking_function_t *func, void *data1, int events);
7474

75+
// Invoke the given function, with the specified argument, in a way that `IO#close` from another execution context can interrupt it.
76+
VALUE rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument);
77+
7578
/* thread.c (export) */
7679
int ruby_thread_has_gvl_p(void); /* for ext/fiddle/closure.c */
7780

io_buffer.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2733,7 +2733,6 @@ io_buffer_blocking_region_ensure(VALUE _argument)
27332733
static VALUE
27342734
io_buffer_blocking_region(VALUE io, struct rb_io_buffer *buffer, rb_blocking_function_t *function, void *data)
27352735
{
2736-
io = rb_io_get_io(io);
27372736
struct rb_io *ioptr;
27382737
RB_IO_POINTER(io, ioptr);
27392738

@@ -2798,6 +2797,8 @@ io_buffer_read_internal(void *_argument)
27982797
VALUE
27992798
rb_io_buffer_read(VALUE self, VALUE io, size_t length, size_t offset)
28002799
{
2800+
io = rb_io_get_io(io);
2801+
28012802
VALUE scheduler = rb_fiber_scheduler_current();
28022803
if (scheduler != Qnil) {
28032804
VALUE result = rb_fiber_scheduler_io_read(scheduler, io, self, length, offset);
@@ -2915,6 +2916,8 @@ io_buffer_pread_internal(void *_argument)
29152916
VALUE
29162917
rb_io_buffer_pread(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset)
29172918
{
2919+
io = rb_io_get_io(io);
2920+
29182921
VALUE scheduler = rb_fiber_scheduler_current();
29192922
if (scheduler != Qnil) {
29202923
VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, self, length, offset);
@@ -3035,6 +3038,8 @@ io_buffer_write_internal(void *_argument)
30353038
VALUE
30363039
rb_io_buffer_write(VALUE self, VALUE io, size_t length, size_t offset)
30373040
{
3041+
io = rb_io_get_write_io(rb_io_get_io(io));
3042+
30383043
VALUE scheduler = rb_fiber_scheduler_current();
30393044
if (scheduler != Qnil) {
30403045
VALUE result = rb_fiber_scheduler_io_write(scheduler, io, self, length, offset);
@@ -3099,6 +3104,7 @@ io_buffer_write(int argc, VALUE *argv, VALUE self)
30993104

31003105
return rb_io_buffer_write(self, io, length, offset);
31013106
}
3107+
31023108
struct io_buffer_pwrite_internal_argument {
31033109
// The file descriptor to write to:
31043110
int descriptor;
@@ -3144,6 +3150,8 @@ io_buffer_pwrite_internal(void *_argument)
31443150
VALUE
31453151
rb_io_buffer_pwrite(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset)
31463152
{
3153+
io = rb_io_get_write_io(rb_io_get_io(io));
3154+
31473155
VALUE scheduler = rb_fiber_scheduler_current();
31483156
if (scheduler != Qnil) {
31493157
VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, self, length, offset);

scheduler.c

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ static ID id_io_close;
3737
static ID id_address_resolve;
3838

3939
static ID id_blocking_operation_wait;
40+
static ID id_fiber_interrupt;
4041

4142
static ID id_fiber_schedule;
4243

@@ -116,6 +117,7 @@ Init_Fiber_Scheduler(void)
116117
id_address_resolve = rb_intern_const("address_resolve");
117118

118119
id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
120+
id_fiber_interrupt = rb_intern_const("fiber_interrupt");
119121

120122
id_fiber_schedule = rb_intern_const("fiber");
121123

@@ -442,10 +444,21 @@ rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
442444
* Expected to return the subset of events that are ready immediately.
443445
*
444446
*/
447+
static VALUE
448+
fiber_scheduler_io_wait(VALUE _argument) {
449+
VALUE *arguments = (VALUE*)_argument;
450+
451+
return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
452+
}
453+
445454
VALUE
446455
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
447456
{
448-
return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
457+
VALUE arguments[] = {
458+
scheduler, io, events, timeout
459+
};
460+
461+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
449462
}
450463

451464
VALUE
@@ -515,14 +528,25 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
515528
*
516529
* The method should be considered _experimental_.
517530
*/
531+
static VALUE
532+
fiber_scheduler_io_read(VALUE _argument) {
533+
VALUE *arguments = (VALUE*)_argument;
534+
535+
return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
536+
}
537+
518538
VALUE
519539
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
520540
{
541+
if (!rb_respond_to(scheduler, id_io_read)) {
542+
return RUBY_Qundef;
543+
}
544+
521545
VALUE arguments[] = {
522-
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
546+
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
523547
};
524548

525-
return rb_check_funcall(scheduler, id_io_read, 4, arguments);
549+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
526550
}
527551

528552
/*
@@ -539,14 +563,25 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
539563
*
540564
* The method should be considered _experimental_.
541565
*/
566+
static VALUE
567+
fiber_scheduler_io_pread(VALUE _argument) {
568+
VALUE *arguments = (VALUE*)_argument;
569+
570+
return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
571+
}
572+
542573
VALUE
543574
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
544575
{
576+
if (!rb_respond_to(scheduler, id_io_pread)) {
577+
return RUBY_Qundef;
578+
}
579+
545580
VALUE arguments[] = {
546-
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
581+
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
547582
};
548583

549-
return rb_check_funcall(scheduler, id_io_pread, 5, arguments);
584+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
550585
}
551586

552587
/*
@@ -577,14 +612,25 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff
577612
*
578613
* The method should be considered _experimental_.
579614
*/
615+
static VALUE
616+
fiber_scheduler_io_write(VALUE _argument) {
617+
VALUE *arguments = (VALUE*)_argument;
618+
619+
return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
620+
}
621+
580622
VALUE
581623
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
582624
{
625+
if (!rb_respond_to(scheduler, id_io_write)) {
626+
return RUBY_Qundef;
627+
}
628+
583629
VALUE arguments[] = {
584-
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
630+
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
585631
};
586632

587-
return rb_check_funcall(scheduler, id_io_write, 4, arguments);
633+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
588634
}
589635

590636
/*
@@ -602,14 +648,25 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
602648
* The method should be considered _experimental_.
603649
*
604650
*/
651+
static VALUE
652+
fiber_scheduler_io_pwrite(VALUE _argument) {
653+
VALUE *arguments = (VALUE*)_argument;
654+
655+
return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
656+
}
657+
605658
VALUE
606659
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
607660
{
661+
if (!rb_respond_to(scheduler, id_io_pwrite)) {
662+
return RUBY_Qundef;
663+
}
664+
608665
VALUE arguments[] = {
609-
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
666+
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
610667
};
611668

612-
return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments);
669+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
613670
}
614671

615672
VALUE
@@ -766,6 +823,15 @@ VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*functi
766823
return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
767824
}
768825

826+
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
827+
{
828+
VALUE arguments[] = {
829+
fiber, exception
830+
};
831+
832+
return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
833+
}
834+
769835
/*
770836
* Document-method: Fiber::Scheduler#fiber
771837
* call-seq: fiber(&block)

test/fiber/scheduler.rb

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,15 @@ def next_timeout
6868
def run
6969
# $stderr.puts [__method__, Fiber.current].inspect
7070

71+
readable = writable = nil
72+
7173
while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
7274
# May only handle file descriptors up to 1024...
73-
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
75+
begin
76+
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
77+
rescue IOError
78+
# Ignore - this can happen if the IO is closed while we are waiting.
79+
end
7480

7581
# puts "readable: #{readable}" if readable&.any?
7682
# puts "writable: #{writable}" if writable&.any?
@@ -290,6 +296,30 @@ def unblock(blocker, fiber)
290296
io.write_nonblock('.')
291297
end
292298

299+
class FiberInterrupt
300+
def initialize(fiber, exception)
301+
@fiber = fiber
302+
@exception = exception
303+
end
304+
305+
def alive?
306+
@fiber.alive?
307+
end
308+
309+
def transfer
310+
@fiber.raise(@exception)
311+
end
312+
end
313+
314+
def fiber_interrupt(fiber, exception)
315+
@lock.synchronize do
316+
@ready << FiberInterrupt.new(fiber, exception)
317+
end
318+
319+
io = @urgent.last
320+
io.write_nonblock('.')
321+
end
322+
293323
# This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use
294324
# it to create scheduled fibers, but it is not required in practice;
295325
# `Fiber.new` is usually sufficient.

thread.c

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1731,6 +1731,37 @@ rb_io_blocking_operation_release(struct rb_io *io, struct rb_io_blocking_operati
17311731
}
17321732
}
17331733

1734+
1735+
static VALUE
1736+
rb_thread_io_blocking_operation_ensure(VALUE _argument)
1737+
{
1738+
struct io_blocking_operation_arguments *arguments = (void*)_argument;
1739+
1740+
rb_io_blocking_operation_release(arguments->io, arguments->blocking_operation);
1741+
1742+
return Qnil;
1743+
}
1744+
1745+
VALUE
1746+
rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument)
1747+
{
1748+
struct rb_io *io;
1749+
RB_IO_POINTER(self, io);
1750+
1751+
rb_execution_context_t *ec = GET_EC();
1752+
struct rb_io_blocking_operation blocking_operation = {
1753+
.ec = ec,
1754+
};
1755+
ccan_list_add(&io->blocking_operations, &blocking_operation.list);
1756+
1757+
struct io_blocking_operation_arguments io_blocking_operation_arguments = {
1758+
.io = io,
1759+
.blocking_operation = &blocking_operation
1760+
};
1761+
1762+
return rb_ensure(function, argument, rb_thread_io_blocking_operation_ensure, (VALUE)&io_blocking_operation_arguments);
1763+
}
1764+
17341765
static bool
17351766
thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
17361767
{
@@ -1832,7 +1863,7 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void
18321863
saved_errno = errno;
18331864
}, ubf_select, th, FALSE);
18341865

1835-
th = rb_ec_thread_ptr(ec);
1866+
RUBY_ASSERT(th == rb_ec_thread_ptr(ec));
18361867
if (events &&
18371868
blocking_call_retryable_p((int)val, saved_errno) &&
18381869
thread_io_wait_events(th, fd, events, NULL)) {
@@ -2636,10 +2667,10 @@ rb_ec_reset_raised(rb_execution_context_t *ec)
26362667
return 1;
26372668
}
26382669

2639-
static size_t
2640-
thread_io_close_notify_all(struct rb_io *io)
2670+
static VALUE
2671+
thread_io_close_notify_all(VALUE _io)
26412672
{
2642-
RUBY_ASSERT_CRITICAL_SECTION_ENTER();
2673+
struct rb_io *io = (struct rb_io *)_io;
26432674

26442675
size_t count = 0;
26452676
rb_vm_t *vm = io->closing_ec->thread_ptr->vm;
@@ -2650,17 +2681,17 @@ thread_io_close_notify_all(struct rb_io *io)
26502681
rb_execution_context_t *ec = blocking_operation->ec;
26512682

26522683
rb_thread_t *thread = ec->thread_ptr;
2653-
rb_threadptr_pending_interrupt_enque(thread, error);
2654-
2655-
// This operation is slow:
2656-
rb_threadptr_interrupt(thread);
2684+
if (thread->scheduler != Qnil) {
2685+
rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
2686+
} else {
2687+
rb_threadptr_pending_interrupt_enque(thread, error);
2688+
rb_threadptr_interrupt(thread);
2689+
}
26572690

26582691
count += 1;
26592692
}
26602693

2661-
RUBY_ASSERT_CRITICAL_SECTION_LEAVE();
2662-
2663-
return count;
2694+
return (VALUE)count;
26642695
}
26652696

26662697
size_t
@@ -2683,7 +2714,9 @@ rb_thread_io_close_interrupt(struct rb_io *io)
26832714
// This is used to ensure the correct execution context is woken up after the blocking operation is interrupted:
26842715
io->wakeup_mutex = rb_mutex_new();
26852716

2686-
return thread_io_close_notify_all(io);
2717+
VALUE result = rb_mutex_synchronize(io->wakeup_mutex, thread_io_close_notify_all, (VALUE)io);
2718+
2719+
return (size_t)result;
26872720
}
26882721

26892722
void

0 commit comments

Comments
 (0)