Skip to content

Commit 73c9d6c

Browse files
authored
Allow IO#close to interrupt IO operations on fibers using fiber_interrupt hook. (#12839)
1 parent e320547 commit 73c9d6c

File tree

10 files changed

+328
-24
lines changed

10 files changed

+328
-24
lines changed

NEWS.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ Note: We're only listing outstanding class updates.
3535
* Update Unicode to Version 16.0.0 and Emoji Version 16.0.
3636
[[Feature #19908]][[Feature #20724]] (also applies to Regexp)
3737

38+
* Fiber::Scheduler
39+
40+
* Introduce `Fiber::Scheduler#fiber_interrupt` to interrupt a fiber with a
41+
given exception. The initial use case is to interrupt a fiber that is
42+
waiting on a blocking IO operation when the IO operation is closed.
43+
[[Feature #21166]]
44+
3845
## Stdlib updates
3946

4047
The following bundled gems are promoted from default gems.
@@ -134,6 +141,7 @@ The following bundled gems are updated.
134141
[Feature #20724]: https://bugs.ruby-lang.org/issues/20724
135142
[Feature #21047]: https://bugs.ruby-lang.org/issues/21047
136143
[Bug #21049]: https://bugs.ruby-lang.org/issues/21049
144+
[Feature #21166]: https://bugs.ruby-lang.org/issues/21166
137145
[Feature #21216]: https://bugs.ruby-lang.org/issues/21216
138146
[Feature #21258]: https://bugs.ruby-lang.org/issues/21258
139147
[Feature #21287]: https://bugs.ruby-lang.org/issues/21287

benchmark/io_close.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
prelude: |
2+
ios = 1000.times.map do
3+
100.times.map{IO.pipe}
4+
end
5+
benchmark:
6+
# Close IO
7+
io_close: |
8+
# Process each batch of ios per iteration of the benchmark.
9+
ios.pop.each do |r, w|
10+
r.close
11+
w.close
12+
end
13+
loop_count: 100

benchmark/io_close_contended.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
prelude: |
2+
ios = 100.times.map do
3+
10.times.map do
4+
pipe = IO.pipe.tap do |r, w|
5+
Thread.new do
6+
r.read
7+
rescue IOError
8+
# Ignore
9+
end
10+
end
11+
end
12+
end
13+
benchmark:
14+
# Close IO
15+
io_close_contended: |
16+
# Process each batch of ios per iteration of the benchmark.
17+
ios.pop.each do |r, w|
18+
r.close
19+
w.close
20+
end
21+
loop_count: 10

doc/fiber.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,64 @@ I/O. Windows is a notable example where socket I/O can be non-blocking but pipe
212212
I/O is blocking. Provided that there *is* a scheduler and the current thread *is
213213
non-blocking*, the operation will invoke the scheduler.
214214

215+
##### `IO#close`
216+
217+
Closing an IO interrupts all blocking operations on that IO. When a thread calls `IO#close`, it first attempts to interrupt any threads or fibers that are blocked on that IO. The closing thread waits until all blocked threads and fibers have been properly interrupted and removed from the IO's blocking list. Each interrupted thread or fiber receives an `IOError` and is cleanly removed from the blocking operation. Only after all blocking operations have been interrupted and cleaned up will the actual file descriptor be closed, ensuring proper resource cleanup and preventing potential race conditions.
218+
219+
For fibers managed by a scheduler, the interruption process involves calling `rb_fiber_scheduler_fiber_interrupt` on the scheduler. This allows the scheduler to handle the interruption in a way that's appropriate for its event loop implementation. The scheduler can then notify the fiber, which will receive an `IOError` and be removed from the blocking operation. This mechanism ensures that fiber-based concurrency works correctly with IO operations, even when those operations are interrupted by `IO#close`.
220+
221+
```mermaid
222+
sequenceDiagram
223+
participant ThreadB
224+
participant ThreadA
225+
participant Scheduler
226+
participant IO
227+
participant Fiber1
228+
participant Fiber2
229+
230+
Note over ThreadA: Thread A has a fiber scheduler
231+
activate Scheduler
232+
ThreadA->>Fiber1: Schedule Fiber 1
233+
activate Fiber1
234+
Fiber1->>IO: IO.read
235+
IO->>Scheduler: rb_thread_io_blocking_region
236+
deactivate Fiber1
237+
238+
ThreadA->>Fiber2: Schedule Fiber 2
239+
activate Fiber2
240+
Fiber2->>IO: IO.read
241+
IO->>Scheduler: rb_thread_io_blocking_region
242+
deactivate Fiber2
243+
244+
Note over Fiber1,Fiber2: Both fibers blocked on same IO
245+
246+
Note over ThreadB: IO.close
247+
activate ThreadB
248+
ThreadB->>IO: thread_io_close_notify_all
249+
Note over ThreadB: rb_mutex_sleep
250+
251+
IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber1)
252+
Scheduler->>Fiber1: fiber_interrupt with IOError
253+
activate Fiber1
254+
Note over IO: fiber_interrupt causes removal from blocking list
255+
Fiber1->>IO: rb_io_blocking_operation_exit()
256+
IO-->>ThreadB: Wakeup thread
257+
deactivate Fiber1
258+
259+
IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber2)
260+
Scheduler->>Fiber2: fiber_interrupt with IOError
261+
activate Fiber2
262+
Note over IO: fiber_interrupt causes removal from blocking list
263+
Fiber2->>IO: rb_io_blocking_operation_exit()
264+
IO-->>ThreadB: Wakeup thread
265+
deactivate Fiber2
266+
deactivate Scheduler
267+
268+
Note over ThreadB: Blocking operations list empty
269+
ThreadB->>IO: close(fd)
270+
deactivate ThreadB
271+
```
272+
215273
#### Mutex
216274

217275
The `Mutex` class can be used in a non-blocking context and is fiber specific.

include/ruby/fiber/scheduler.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout);
199199
/**
200200
* Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
201201
*
202+
* This function may be called from a different thread.
203+
*
202204
* @param[in] scheduler Target scheduler.
203205
* @param[in] blocker What was awaited for.
204206
* @param[in] fiber What to unblock.
@@ -411,6 +413,14 @@ struct rb_fiber_scheduler_blocking_operation_state {
411413
*/
412414
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state);
413415

416+
/**
417+
* Interrupt a fiber by raising an exception. You can construct an exception using `rb_make_exception`.
418+
*
419+
* This hook may be invoked by a different thread.
420+
*
421+
*/
422+
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception);
423+
414424
/**
415425
* Create and schedule a non-blocking fiber.
416426
*

internal/thread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ void *rb_thread_prevent_fork(void *(*func)(void *), void *data); /* for ext/sock
7272
VALUE rb_thread_io_blocking_region(struct rb_io *io, rb_blocking_function_t *func, void *data1);
7373
VALUE rb_thread_io_blocking_call(struct rb_io *io, rb_blocking_function_t *func, void *data1, int events);
7474

75+
// Invoke the given function, with the specified argument, in a way that `IO#close` from another execution context can interrupt it.
76+
VALUE rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument);
77+
7578
/* thread.c (export) */
7679
int ruby_thread_has_gvl_p(void); /* for ext/fiddle/closure.c */
7780

io_buffer.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2733,7 +2733,6 @@ io_buffer_blocking_region_ensure(VALUE _argument)
27332733
static VALUE
27342734
io_buffer_blocking_region(VALUE io, struct rb_io_buffer *buffer, rb_blocking_function_t *function, void *data)
27352735
{
2736-
io = rb_io_get_io(io);
27372736
struct rb_io *ioptr;
27382737
RB_IO_POINTER(io, ioptr);
27392738

@@ -2798,6 +2797,8 @@ io_buffer_read_internal(void *_argument)
27982797
VALUE
27992798
rb_io_buffer_read(VALUE self, VALUE io, size_t length, size_t offset)
28002799
{
2800+
io = rb_io_get_io(io);
2801+
28012802
VALUE scheduler = rb_fiber_scheduler_current();
28022803
if (scheduler != Qnil) {
28032804
VALUE result = rb_fiber_scheduler_io_read(scheduler, io, self, length, offset);
@@ -2915,6 +2916,8 @@ io_buffer_pread_internal(void *_argument)
29152916
VALUE
29162917
rb_io_buffer_pread(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset)
29172918
{
2919+
io = rb_io_get_io(io);
2920+
29182921
VALUE scheduler = rb_fiber_scheduler_current();
29192922
if (scheduler != Qnil) {
29202923
VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, self, length, offset);
@@ -3035,6 +3038,8 @@ io_buffer_write_internal(void *_argument)
30353038
VALUE
30363039
rb_io_buffer_write(VALUE self, VALUE io, size_t length, size_t offset)
30373040
{
3041+
io = rb_io_get_write_io(rb_io_get_io(io));
3042+
30383043
VALUE scheduler = rb_fiber_scheduler_current();
30393044
if (scheduler != Qnil) {
30403045
VALUE result = rb_fiber_scheduler_io_write(scheduler, io, self, length, offset);
@@ -3099,6 +3104,7 @@ io_buffer_write(int argc, VALUE *argv, VALUE self)
30993104

31003105
return rb_io_buffer_write(self, io, length, offset);
31013106
}
3107+
31023108
struct io_buffer_pwrite_internal_argument {
31033109
// The file descriptor to write to:
31043110
int descriptor;
@@ -3144,6 +3150,8 @@ io_buffer_pwrite_internal(void *_argument)
31443150
VALUE
31453151
rb_io_buffer_pwrite(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset)
31463152
{
3153+
io = rb_io_get_write_io(rb_io_get_io(io));
3154+
31473155
VALUE scheduler = rb_fiber_scheduler_current();
31483156
if (scheduler != Qnil) {
31493157
VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, self, length, offset);

scheduler.c

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ static ID id_io_close;
3737
static ID id_address_resolve;
3838

3939
static ID id_blocking_operation_wait;
40+
static ID id_fiber_interrupt;
4041

4142
static ID id_fiber_schedule;
4243

@@ -116,6 +117,7 @@ Init_Fiber_Scheduler(void)
116117
id_address_resolve = rb_intern_const("address_resolve");
117118

118119
id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
120+
id_fiber_interrupt = rb_intern_const("fiber_interrupt");
119121

120122
id_fiber_schedule = rb_intern_const("fiber");
121123

@@ -442,10 +444,21 @@ rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
442444
* Expected to return the subset of events that are ready immediately.
443445
*
444446
*/
447+
static VALUE
448+
fiber_scheduler_io_wait(VALUE _argument) {
449+
VALUE *arguments = (VALUE*)_argument;
450+
451+
return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
452+
}
453+
445454
VALUE
446455
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
447456
{
448-
return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
457+
VALUE arguments[] = {
458+
scheduler, io, events, timeout
459+
};
460+
461+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
449462
}
450463

451464
VALUE
@@ -515,14 +528,25 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
515528
*
516529
* The method should be considered _experimental_.
517530
*/
531+
static VALUE
532+
fiber_scheduler_io_read(VALUE _argument) {
533+
VALUE *arguments = (VALUE*)_argument;
534+
535+
return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
536+
}
537+
518538
VALUE
519539
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
520540
{
541+
if (!rb_respond_to(scheduler, id_io_read)) {
542+
return RUBY_Qundef;
543+
}
544+
521545
VALUE arguments[] = {
522-
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
546+
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
523547
};
524548

525-
return rb_check_funcall(scheduler, id_io_read, 4, arguments);
549+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
526550
}
527551

528552
/*
@@ -539,14 +563,25 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
539563
*
540564
* The method should be considered _experimental_.
541565
*/
566+
static VALUE
567+
fiber_scheduler_io_pread(VALUE _argument) {
568+
VALUE *arguments = (VALUE*)_argument;
569+
570+
return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
571+
}
572+
542573
VALUE
543574
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
544575
{
576+
if (!rb_respond_to(scheduler, id_io_pread)) {
577+
return RUBY_Qundef;
578+
}
579+
545580
VALUE arguments[] = {
546-
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
581+
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
547582
};
548583

549-
return rb_check_funcall(scheduler, id_io_pread, 5, arguments);
584+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
550585
}
551586

552587
/*
@@ -577,14 +612,25 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff
577612
*
578613
* The method should be considered _experimental_.
579614
*/
615+
static VALUE
616+
fiber_scheduler_io_write(VALUE _argument) {
617+
VALUE *arguments = (VALUE*)_argument;
618+
619+
return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
620+
}
621+
580622
VALUE
581623
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
582624
{
625+
if (!rb_respond_to(scheduler, id_io_write)) {
626+
return RUBY_Qundef;
627+
}
628+
583629
VALUE arguments[] = {
584-
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
630+
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
585631
};
586632

587-
return rb_check_funcall(scheduler, id_io_write, 4, arguments);
633+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
588634
}
589635

590636
/*
@@ -602,14 +648,25 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
602648
* The method should be considered _experimental_.
603649
*
604650
*/
651+
static VALUE
652+
fiber_scheduler_io_pwrite(VALUE _argument) {
653+
VALUE *arguments = (VALUE*)_argument;
654+
655+
return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
656+
}
657+
605658
VALUE
606659
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
607660
{
661+
if (!rb_respond_to(scheduler, id_io_pwrite)) {
662+
return RUBY_Qundef;
663+
}
664+
608665
VALUE arguments[] = {
609-
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
666+
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
610667
};
611668

612-
return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments);
669+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
613670
}
614671

615672
VALUE
@@ -766,6 +823,15 @@ VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*functi
766823
return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
767824
}
768825

826+
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
827+
{
828+
VALUE arguments[] = {
829+
fiber, exception
830+
};
831+
832+
return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
833+
}
834+
769835
/*
770836
* Document-method: Fiber::Scheduler#fiber
771837
* call-seq: fiber(&block)

0 commit comments

Comments
 (0)