Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/ruby/fiber/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@ struct rb_fiber_scheduler_blocking_operation_state {
*/
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state);

/**
* Interrupt a fiber by raising an exception. You can construct an exception using `rb_make_exception`.
*
*/
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception);

/**
* Create and schedule a non-blocking fiber.
*
Expand Down
13 changes: 13 additions & 0 deletions include/ruby/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,19 @@ VALUE rb_io_get_io(VALUE io);
*/
VALUE rb_io_check_io(VALUE io);

/**
* Invoke the given function with the given data, which may block the current
* execution context.
*
* During the execution of the function, the current execution context may be
* interrupted by an exception raised by another thread if the IO is closed.
*
* @param[in] self An IO.
* @param[in] function A function to call.
* @param[in] argument An argument to pass to the function.
*/
VALUE rb_io_interruptible_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument);

/**
* Queries the tied IO for writing. An IO can be duplexed. Fine. The thing
* is, that characteristics could sometimes be achieved by the underlying
Expand Down
3 changes: 3 additions & 0 deletions internal/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ void *rb_thread_prevent_fork(void *(*func)(void *), void *data); /* for ext/sock
VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd);
VALUE rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, int events);

// Invoke the given function, with the specified argument, in a way that `IO#close` from another execution context can interrupt it.
VALUE rb_thread_io_interruptible_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument);

/* thread.c (export) */
int ruby_thread_has_gvl_p(void); /* for ext/fiddle/closure.c */

Expand Down
28 changes: 28 additions & 0 deletions io.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,32 @@ VALUE rb_io_blocking_region(struct rb_io *io, rb_blocking_function_t *function,
return rb_io_blocking_region_wait(io, function, argument, 0);
}

VALUE
rb_io_interruptible_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument)
{
VALUE scheduler = rb_fiber_scheduler_current();

if (scheduler == Qnil) {
return function(argument);
} else {
return rb_thread_io_interruptible_operation(self, function, argument);
}
}

static VALUE
io_interruptible_operation_begin(VALUE block)
{
return rb_proc_call_with_block(block, 0, NULL, Qnil);
}

static VALUE
io_interruptible_operation(VALUE self)
{
VALUE block = rb_block_proc();

return rb_io_interruptible_operation(self, io_interruptible_operation_begin, block);
}

struct argf {
VALUE filename, current_file;
long last_lineno; /* $. */
Expand Down Expand Up @@ -15700,6 +15726,8 @@ Init_IO(void)

rb_define_method(rb_cIO, "initialize", rb_io_initialize, -1);

rb_define_method(rb_cIO, "interruptible_operation", io_interruptible_operation, 0);

rb_output_fs = Qnil;
rb_define_hooked_variable("$,", &rb_output_fs, 0, deprecated_str_setter);

Expand Down
12 changes: 11 additions & 1 deletion scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ static ID id_io_close;
static ID id_address_resolve;

static ID id_blocking_operation_wait;
static ID id_fiber_interrupt;

static ID id_fiber_schedule;

Expand Down Expand Up @@ -108,14 +109,14 @@ Init_Fiber_Scheduler(void)
id_io_pread = rb_intern_const("io_pread");
id_io_write = rb_intern_const("io_write");
id_io_pwrite = rb_intern_const("io_pwrite");

id_io_wait = rb_intern_const("io_wait");
id_io_select = rb_intern_const("io_select");
id_io_close = rb_intern_const("io_close");

id_address_resolve = rb_intern_const("address_resolve");

id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
id_fiber_interrupt = rb_intern_const("fiber_interrupt");

id_fiber_schedule = rb_intern_const("fiber");

Expand Down Expand Up @@ -766,6 +767,15 @@ VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*functi
return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
}

VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
{
VALUE arguments[] = {
fiber, exception
};

return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
}

/*
* Document-method: Fiber::Scheduler#fiber
* call-seq: fiber(&block)
Expand Down
32 changes: 31 additions & 1 deletion test/fiber/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,15 @@ def next_timeout
def run
# $stderr.puts [__method__, Fiber.current].inspect

readable = writable = nil

while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
# May only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
begin
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
rescue IOError
# Ignore - this can happen if the IO is closed while we are waiting.
end

# puts "readable: #{readable}" if readable&.any?
# puts "writable: #{writable}" if writable&.any?
Expand Down Expand Up @@ -290,6 +296,30 @@ def unblock(blocker, fiber)
io.write_nonblock('.')
end

class FiberInterrupt
def initialize(fiber, exception)
@fiber = fiber
@exception = exception
end

def alive?
@fiber.alive?
end

def transfer
@fiber.raise(@exception)
end
end

def fiber_interrupt(fiber, exception)
@lock.synchronize do
@ready << FiberInterrupt.new(fiber, exception)
end

io = @urgent.last
io.write_nonblock('.')
end

# This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use
# it to create scheduled fibers, but it is not required in practice;
# `Fiber.new` is usually sufficient.
Expand Down
72 changes: 52 additions & 20 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static rb_internal_thread_specific_key_t specific_key_count;

struct waiting_fd {
struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */
rb_thread_t *th;
rb_execution_context_t *ec;
int fd;
struct rb_io_close_wait_list *busy;
};
Expand Down Expand Up @@ -1691,12 +1691,14 @@ waitfd_to_waiting_flag(int wfd_event)
}

static void
thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
thread_io_setup_wfd(rb_execution_context_t *ec, int fd, struct waiting_fd *wfd)
{
wfd->fd = fd;
wfd->th = th;
wfd->ec = ec;
wfd->busy = NULL;

rb_thread_t *th = rb_ec_thread_ptr(ec);

RB_VM_LOCK_ENTER();
{
ccan_list_add(&th->vm->waiting_fds, &wfd->wfd_node);
Expand Down Expand Up @@ -1730,6 +1732,29 @@ thread_io_wake_pending_closer(struct waiting_fd *wfd)
}
}


static VALUE
rb_thread_io_interruptible_operation_ensure(VALUE _argument)
{
struct waiting_fd *wfd = (struct waiting_fd *)_argument;
thread_io_wake_pending_closer(wfd);
return Qnil;
}

VALUE
rb_thread_io_interruptible_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument)
{
struct rb_io *io;
RB_IO_POINTER(self, io);

rb_execution_context_t *ec = GET_EC();

struct waiting_fd waiting_fd;
thread_io_setup_wfd(ec, io->fd, &waiting_fd);

return rb_ensure(function, argument, rb_thread_io_interruptible_operation_ensure, (VALUE)&waiting_fd);
}

static bool
thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
{
Expand Down Expand Up @@ -1815,18 +1840,18 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
// `func` or not (as opposed to some previously set value).
errno = 0;

thread_io_setup_wfd(th, fd, &waiting_fd);
thread_io_setup_wfd(ec, fd, &waiting_fd);
{
EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
volatile enum ruby_tag_type saved_state = state; /* for BLOCKING_REGION */
retry:
BLOCKING_REGION(waiting_fd.th, {
BLOCKING_REGION(th, {
val = func(data1);
saved_errno = errno;
}, ubf_select, waiting_fd.th, FALSE);
}, ubf_select, th, FALSE);

th = rb_ec_thread_ptr(ec);
RUBY_ASSERT(th == rb_ec_thread_ptr(ec));
if (events &&
blocking_call_retryable_p((int)val, saved_errno) &&
thread_io_wait_events(th, fd, events, NULL)) {
Expand Down Expand Up @@ -2640,6 +2665,7 @@ rb_ec_reset_raised(rb_execution_context_t *ec)
int
rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
{
// fprintf(stderr, "rb_notify_fd_close(%d) pid=%d\n", fd, getpid());
rb_vm_t *vm = GET_THREAD()->vm;
struct waiting_fd *wfd = 0, *next;
ccan_list_head_init(&busy->pending_fd_users);
Expand All @@ -2650,16 +2676,23 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
{
ccan_list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
if (wfd->fd == fd) {
rb_thread_t *th = wfd->th;
VALUE err;
rb_execution_context_t *ec = wfd->ec;
rb_thread_t *th = rb_ec_thread_ptr(ec);

ccan_list_del(&wfd->wfd_node);
ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node);

wfd->busy = busy;
err = th->vm->special_exceptions[ruby_error_stream_closed];
rb_threadptr_pending_interrupt_enque(th, err);
rb_threadptr_interrupt(th);

VALUE error = th->vm->special_exceptions[ruby_error_stream_closed];

if (th->scheduler != Qnil) {
rb_fiber_scheduler_fiber_interrupt(th->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
} else {
// fprintf(stderr, "rb_notify_fd_close: Interrupting thread %p\n", (void *)th);
rb_threadptr_pending_interrupt_enque(th, error);
rb_threadptr_interrupt(th);
}
}
}
}
Expand Down Expand Up @@ -4417,7 +4450,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);

thread_io_setup_wfd(th, fd, &wfd);
thread_io_setup_wfd(ec, fd, &wfd);

if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) {
// fd is readable
Expand All @@ -4426,16 +4459,16 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
errno = 0;
}
else {
EC_PUSH_TAG(wfd.th->ec);
EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
rb_hrtime_t *to, rel, end = 0;
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
RUBY_VM_CHECK_INTS_BLOCKING(ec);
timeout_prepare(&to, &rel, &end, timeout);
do {
nfds = numberof(fds);
result = wait_for_single_fd_blocking_region(wfd.th, fds, nfds, to, &lerrno);
result = wait_for_single_fd_blocking_region(th, fds, nfds, to, &lerrno);

RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
RUBY_VM_CHECK_INTS_BLOCKING(ec);
} while (wait_retryable(&result, lerrno, to, end));
}
EC_POP_TAG();
Expand All @@ -4444,7 +4477,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
thread_io_wake_pending_closer(&wfd);

if (state) {
EC_JUMP_TAG(wfd.th->ec, state);
EC_JUMP_TAG(ec, state);
}

if (result < 0) {
Expand Down Expand Up @@ -4543,14 +4576,13 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
int r;
VALUE ptr = (VALUE)&args;
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);

args.as.fd = fd;
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
args.tv = timeout;
thread_io_setup_wfd(th, fd, &args.wfd);
thread_io_setup_wfd(ec, fd, &args.wfd);

r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
if (r == -1)
Expand Down
Loading