Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ Note: We're only listing outstanding class updates.
* Update Unicode to Version 16.0.0 and Emoji Version 16.0.
[[Feature #19908]][[Feature #20724]] (also applies to Regexp)

* Fiber::Scheduler

* Introduce `Fiber::Scheduler#fiber_interrupt` to interrupt a fiber with a
given exception. The initial use case is to interrupt a fiber that is
waiting on a blocking IO operation when the IO operation is closed.
[[Feature #21166]]

## Stdlib updates

The following bundled gems are promoted from default gems.
Expand Down Expand Up @@ -134,6 +141,7 @@ The following bundled gems are updated.
[Feature #20724]: https://bugs.ruby-lang.org/issues/20724
[Feature #21047]: https://bugs.ruby-lang.org/issues/21047
[Bug #21049]: https://bugs.ruby-lang.org/issues/21049
[Feature #21166]: https://bugs.ruby-lang.org/issues/21166
[Feature #21216]: https://bugs.ruby-lang.org/issues/21216
[Feature #21258]: https://bugs.ruby-lang.org/issues/21258
[Feature #21287]: https://bugs.ruby-lang.org/issues/21287
13 changes: 13 additions & 0 deletions benchmark/io_close.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
prelude: |
ios = 1000.times.map do
100.times.map{IO.pipe}
end
benchmark:
# Close IO
io_close: |
# Process each batch of ios per iteration of the benchmark.
ios.pop.each do |r, w|
r.close
w.close
end
loop_count: 100
21 changes: 21 additions & 0 deletions benchmark/io_close_contended.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
prelude: |
ios = 100.times.map do
10.times.map do
pipe = IO.pipe.tap do |r, w|
Thread.new do
r.read
rescue IOError
# Ignore
end
end
end
end
benchmark:
# Close IO
io_close_contended: |
# Process each batch of ios per iteration of the benchmark.
ios.pop.each do |r, w|
r.close
w.close
end
loop_count: 10
58 changes: 58 additions & 0 deletions doc/fiber.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,64 @@ I/O. Windows is a notable example where socket I/O can be non-blocking but pipe
I/O is blocking. Provided that there *is* a scheduler and the current thread *is
non-blocking*, the operation will invoke the scheduler.

##### `IO#close`

Closing an IO interrupts all blocking operations on that IO. When a thread calls `IO#close`, it first attempts to interrupt any threads or fibers that are blocked on that IO. The closing thread waits until all blocked threads and fibers have been properly interrupted and removed from the IO's blocking list. Each interrupted thread or fiber receives an `IOError` and is cleanly removed from the blocking operation. Only after all blocking operations have been interrupted and cleaned up will the actual file descriptor be closed, ensuring proper resource cleanup and preventing potential race conditions.

For fibers managed by a scheduler, the interruption process involves calling `rb_fiber_scheduler_fiber_interrupt` on the scheduler. This allows the scheduler to handle the interruption in a way that's appropriate for its event loop implementation. The scheduler can then notify the fiber, which will receive an `IOError` and be removed from the blocking operation. This mechanism ensures that fiber-based concurrency works correctly with IO operations, even when those operations are interrupted by `IO#close`.

```mermaid
sequenceDiagram
participant ThreadB
participant ThreadA
participant Scheduler
participant IO
participant Fiber1
participant Fiber2

Note over ThreadA: Thread A has a fiber scheduler
activate Scheduler
ThreadA->>Fiber1: Schedule Fiber 1
activate Fiber1
Fiber1->>IO: IO.read
IO->>Scheduler: rb_thread_io_blocking_region
deactivate Fiber1

ThreadA->>Fiber2: Schedule Fiber 2
activate Fiber2
Fiber2->>IO: IO.read
IO->>Scheduler: rb_thread_io_blocking_region
deactivate Fiber2

Note over Fiber1,Fiber2: Both fibers blocked on same IO

Note over ThreadB: IO.close
activate ThreadB
ThreadB->>IO: thread_io_close_notify_all
Note over ThreadB: rb_mutex_sleep

IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber1)
Scheduler->>Fiber1: fiber_interrupt with IOError
activate Fiber1
Note over IO: fiber_interrupt causes removal from blocking list
Fiber1->>IO: rb_io_blocking_operation_exit()
IO-->>ThreadB: Wakeup thread
deactivate Fiber1

IO->>Scheduler: rb_fiber_scheduler_fiber_interrupt(Fiber2)
Scheduler->>Fiber2: fiber_interrupt with IOError
activate Fiber2
Note over IO: fiber_interrupt causes removal from blocking list
Fiber2->>IO: rb_io_blocking_operation_exit()
IO-->>ThreadB: Wakeup thread
deactivate Fiber2
deactivate Scheduler

Note over ThreadB: Blocking operations list empty
ThreadB->>IO: close(fd)
deactivate ThreadB
```

#### Mutex

The `Mutex` class can be used in a non-blocking context and is fiber specific.
Expand Down
10 changes: 10 additions & 0 deletions include/ruby/fiber/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout);
/**
* Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
*
* This function may be called from a different thread.
*
* @param[in] scheduler Target scheduler.
* @param[in] blocker What was awaited for.
* @param[in] fiber What to unblock.
Expand Down Expand Up @@ -411,6 +413,14 @@ struct rb_fiber_scheduler_blocking_operation_state {
*/
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state);

/**
* Interrupt a fiber by raising an exception. You can construct an exception using `rb_make_exception`.
*
* This hook may be invoked by a different thread.
*
*/
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception);

/**
* Create and schedule a non-blocking fiber.
*
Expand Down
3 changes: 3 additions & 0 deletions internal/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ void *rb_thread_prevent_fork(void *(*func)(void *), void *data); /* for ext/sock
VALUE rb_thread_io_blocking_region(struct rb_io *io, rb_blocking_function_t *func, void *data1);
VALUE rb_thread_io_blocking_call(struct rb_io *io, rb_blocking_function_t *func, void *data1, int events);

// Invoke the given function, with the specified argument, in a way that `IO#close` from another execution context can interrupt it.
VALUE rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument);

/* thread.c (export) */
int ruby_thread_has_gvl_p(void); /* for ext/fiddle/closure.c */

Expand Down
10 changes: 9 additions & 1 deletion io_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -2733,7 +2733,6 @@ io_buffer_blocking_region_ensure(VALUE _argument)
static VALUE
io_buffer_blocking_region(VALUE io, struct rb_io_buffer *buffer, rb_blocking_function_t *function, void *data)
{
io = rb_io_get_io(io);
struct rb_io *ioptr;
RB_IO_POINTER(io, ioptr);

Expand Down Expand Up @@ -2798,6 +2797,8 @@ io_buffer_read_internal(void *_argument)
VALUE
rb_io_buffer_read(VALUE self, VALUE io, size_t length, size_t offset)
{
io = rb_io_get_io(io);

VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_read(scheduler, io, self, length, offset);
Expand Down Expand Up @@ -2915,6 +2916,8 @@ io_buffer_pread_internal(void *_argument)
VALUE
rb_io_buffer_pread(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset)
{
io = rb_io_get_io(io);

VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, self, length, offset);
Expand Down Expand Up @@ -3035,6 +3038,8 @@ io_buffer_write_internal(void *_argument)
VALUE
rb_io_buffer_write(VALUE self, VALUE io, size_t length, size_t offset)
{
io = rb_io_get_write_io(rb_io_get_io(io));

VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_write(scheduler, io, self, length, offset);
Expand Down Expand Up @@ -3099,6 +3104,7 @@ io_buffer_write(int argc, VALUE *argv, VALUE self)

return rb_io_buffer_write(self, io, length, offset);
}

struct io_buffer_pwrite_internal_argument {
// The file descriptor to write to:
int descriptor;
Expand Down Expand Up @@ -3144,6 +3150,8 @@ io_buffer_pwrite_internal(void *_argument)
VALUE
rb_io_buffer_pwrite(VALUE self, VALUE io, rb_off_t from, size_t length, size_t offset)
{
io = rb_io_get_write_io(rb_io_get_io(io));

VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, self, length, offset);
Expand Down
84 changes: 75 additions & 9 deletions scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ static ID id_io_close;
static ID id_address_resolve;

static ID id_blocking_operation_wait;
static ID id_fiber_interrupt;

static ID id_fiber_schedule;

Expand Down Expand Up @@ -116,6 +117,7 @@ Init_Fiber_Scheduler(void)
id_address_resolve = rb_intern_const("address_resolve");

id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
id_fiber_interrupt = rb_intern_const("fiber_interrupt");

id_fiber_schedule = rb_intern_const("fiber");

Expand Down Expand Up @@ -442,10 +444,21 @@ rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
* Expected to return the subset of events that are ready immediately.
*
*/
static VALUE
fiber_scheduler_io_wait(VALUE _argument) {
VALUE *arguments = (VALUE*)_argument;

return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
}

VALUE
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
VALUE arguments[] = {
scheduler, io, events, timeout
};

return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
}

VALUE
Expand Down Expand Up @@ -515,14 +528,25 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
*
* The method should be considered _experimental_.
*/
static VALUE
fiber_scheduler_io_read(VALUE _argument) {
VALUE *arguments = (VALUE*)_argument;

return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
}

VALUE
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_read)) {
return RUBY_Qundef;
}

VALUE arguments[] = {
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};

return rb_check_funcall(scheduler, id_io_read, 4, arguments);
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
}

/*
Expand All @@ -539,14 +563,25 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
*
* The method should be considered _experimental_.
*/
static VALUE
fiber_scheduler_io_pread(VALUE _argument) {
VALUE *arguments = (VALUE*)_argument;

return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
}

VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_pread)) {
return RUBY_Qundef;
}

VALUE arguments[] = {
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};

return rb_check_funcall(scheduler, id_io_pread, 5, arguments);
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
}

/*
Expand Down Expand Up @@ -577,14 +612,25 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff
*
* The method should be considered _experimental_.
*/
static VALUE
fiber_scheduler_io_write(VALUE _argument) {
VALUE *arguments = (VALUE*)_argument;

return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
}

VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_write)) {
return RUBY_Qundef;
}

VALUE arguments[] = {
io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};

return rb_check_funcall(scheduler, id_io_write, 4, arguments);
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
}

/*
Expand All @@ -602,14 +648,25 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
* The method should be considered _experimental_.
*
*/
static VALUE
fiber_scheduler_io_pwrite(VALUE _argument) {
VALUE *arguments = (VALUE*)_argument;

return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
}

VALUE
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
if (!rb_respond_to(scheduler, id_io_pwrite)) {
return RUBY_Qundef;
}

VALUE arguments[] = {
io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};

return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments);
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
}

VALUE
Expand Down Expand Up @@ -766,6 +823,15 @@ VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*functi
return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
}

VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
{
VALUE arguments[] = {
fiber, exception
};

return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
}

/*
* Document-method: Fiber::Scheduler#fiber
* call-seq: fiber(&block)
Expand Down
Loading
Loading