Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
thread_sync.c: avoid reaching across stacks of dead threads
rb_ensure is insufficient cleanup for fork and we must
reinitialize all waitqueues in the child process.

Unfortunately this increases the footprint of ConditionVariable,
Queue and SizedQueue by 8 bytes on 32-bit (16 bytes on 64-bit).

[ruby-core:86316] [Bug ruby#14634]

git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@62934 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
  • Loading branch information
normal authored and tenderlove committed Jul 24, 2018
commit f0a8547a9003d16a74cd0e0f957ea43bff06ecaf
19 changes: 19 additions & 0 deletions test/thread/test_cv.rb
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,23 @@ def test_dump
Marshal.dump(condvar)
end
end

def test_condvar_fork
mutex = Mutex.new
condvar = ConditionVariable.new
thrs = (1..10).map do
Thread.new { mutex.synchronize { condvar.wait(mutex) } }
end
thrs.each { 3.times { Thread.pass } }
pid = fork do
mutex.synchronize { condvar.broadcast }
exit!(0)
end
_, s = Process.waitpid2(pid)
assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]'
until thrs.empty?
mutex.synchronize { condvar.broadcast }
thrs.delete_if { |t| t.join(0.01) }
end
end if Process.respond_to?(:fork)
end
48 changes: 48 additions & 0 deletions test/thread/test_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -565,4 +565,52 @@ def test_queue_with_trap
puts 'exit'
INPUT
end

def test_fork_while_queue_waiting
q = Queue.new
sq = SizedQueue.new(1)
thq = Thread.new { q.pop }
thsq = Thread.new { sq.pop }
Thread.pass until thq.stop? && thsq.stop?

pid = fork do
exit!(1) if q.num_waiting != 0
exit!(2) if sq.num_waiting != 0
exit!(6) unless q.empty?
exit!(7) unless sq.empty?
q.push :child_q
sq.push :child_sq
exit!(3) if q.pop != :child_q
exit!(4) if sq.pop != :child_sq
exit!(0)
end
_, s = Process.waitpid2(pid)
assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]'

q.push :thq
sq.push :thsq
assert_equal :thq, thq.value
assert_equal :thsq, thsq.value

sq.push(1)
th = Thread.new { q.pop; sq.pop }
thsq = Thread.new { sq.push(2) }
Thread.pass until th.stop? && thsq.stop?
pid = fork do
exit!(1) if q.num_waiting != 0
exit!(2) if sq.num_waiting != 0
exit!(3) unless q.empty?
exit!(4) if sq.empty?
exit!(5) if sq.pop != 1
exit!(0)
end
_, s = Process.waitpid2(pid)
assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]'

assert_predicate thsq, :stop?
assert_equal 1, sq.pop
assert_same sq, thsq.value
q.push('restart th')
assert_equal 2, th.value
end if Process.respond_to?(:fork)
end
2 changes: 2 additions & 0 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -4188,6 +4188,8 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
}
rb_vm_living_threads_init(vm);
rb_vm_living_threads_insert(vm, th);
rb_thread_sync_reset_all();

vm->sleeper = 0;
clear_coverage();
}
Expand Down
75 changes: 70 additions & 5 deletions thread_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
static VALUE rb_eClosedQueueError;

/*
* keep these globally so we can walk and reinitialize them at fork
* in the child process
*/
static LIST_HEAD(szqueue_list);
static LIST_HEAD(queue_list);
static LIST_HEAD(condvar_list);

/* sync_waiter is always on-stack */
struct sync_waiter {
rb_thread_t *th;
Expand Down Expand Up @@ -54,6 +62,7 @@ typedef struct rb_mutex_struct {
static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
static void rb_thread_sync_reset_all(void);
#endif
static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th);

Expand Down Expand Up @@ -535,14 +544,17 @@ void rb_mutex_allow_trap(VALUE self, int val)
/* Queue */

#define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
#define queue_live(q) UNALIGNED_MEMBER_PTR(q, live)
PACKED_STRUCT_UNALIGNED(struct rb_queue {
struct list_node live;
struct list_head waitq;
const VALUE que;
int num_waiting;
});

#define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
#define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
#define szqueue_live(sq) UNALIGNED_MEMBER_PTR(sq, q.live)
PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
struct rb_queue q;
int num_waiting_push;
Expand All @@ -559,6 +571,14 @@ queue_mark(void *ptr)
rb_gc_mark(q->que);
}

static void
queue_free(void *ptr)
{
struct rb_queue *q = ptr;
list_del(queue_live(q));
ruby_xfree(ptr);
}

static size_t
queue_memsize(const void *ptr)
{
Expand All @@ -567,7 +587,7 @@ queue_memsize(const void *ptr)

static const rb_data_type_t queue_data_type = {
"queue",
{queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
{queue_mark, queue_free, queue_memsize,},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
};

Expand All @@ -579,6 +599,7 @@ queue_alloc(VALUE klass)

obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
list_head_init(queue_waitq(q));
list_add(&queue_list, queue_live(q));
return obj;
}

Expand All @@ -601,6 +622,14 @@ szqueue_mark(void *ptr)
queue_mark(&sq->q);
}

static void
szqueue_free(void *ptr)
{
struct rb_szqueue *sq = ptr;
list_del(szqueue_live(sq));
ruby_xfree(ptr);
}

static size_t
szqueue_memsize(const void *ptr)
{
Expand All @@ -609,7 +638,7 @@ szqueue_memsize(const void *ptr)

static const rb_data_type_t szqueue_data_type = {
"sized_queue",
{szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
{szqueue_mark, szqueue_free, szqueue_memsize,},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
};

Expand All @@ -621,6 +650,7 @@ szqueue_alloc(VALUE klass)
&szqueue_data_type, sq);
list_head_init(szqueue_waitq(sq));
list_head_init(szqueue_pushq(sq));
list_add(&szqueue_list, szqueue_live(sq));
return obj;
}

Expand Down Expand Up @@ -866,7 +896,7 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
list_add_tail(&qw.as.q->waitq, &qw.w.node);
qw.as.q->num_waiting++;

rb_ensure(queue_sleep, Qfalse, queue_sleep_done, (VALUE)&qw);
rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
}
}

Expand Down Expand Up @@ -1108,7 +1138,7 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
list_add_tail(pushq, &qw.w.node);
sq->num_waiting_push++;

rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw);
rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
}
}

Expand Down Expand Up @@ -1212,6 +1242,7 @@ rb_szqueue_empty_p(VALUE self)
/* TODO: maybe this can be IMEMO */
struct rb_condvar {
struct list_head waitq;
struct list_node live;
};

/*
Expand Down Expand Up @@ -1242,6 +1273,14 @@ struct rb_condvar {
* }
*/

static void
condvar_free(void *ptr)
{
struct rb_condvar *cv = ptr;
list_del(&cv->live);
ruby_xfree(ptr);
}

static size_t
condvar_memsize(const void *ptr)
{
Expand All @@ -1250,7 +1289,7 @@ condvar_memsize(const void *ptr)

static const rb_data_type_t cv_data_type = {
"condvar",
{0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
{0, condvar_free, condvar_memsize,},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
};

Expand All @@ -1272,6 +1311,7 @@ condvar_alloc(VALUE klass)

obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
list_head_init(&cv->waitq);
list_add(&condvar_list, &cv->live);

return obj;
}
Expand Down Expand Up @@ -1385,6 +1425,31 @@ define_thread_class(VALUE outer, const char *name, VALUE super)
return klass;
}

#if defined(HAVE_WORKING_FORK)
/* we must not reference stacks of dead threads in a forked child */
static void
rb_thread_sync_reset_all(void)
{
struct rb_queue *q = 0;
struct rb_szqueue *sq = 0;
struct rb_condvar *cv = 0;

list_for_each(&queue_list, q, live) {
list_head_init(queue_waitq(q));
q->num_waiting = 0;
}
list_for_each(&szqueue_list, sq, q.live) {
list_head_init(szqueue_waitq(sq));
list_head_init(szqueue_pushq(sq));
sq->num_waiting_push = 0;
sq->q.num_waiting = 0;
}
list_for_each(&condvar_list, cv, live) {
list_head_init(&cv->waitq);
}
}
#endif

static void
Init_thread_sync(void)
{
Expand Down