Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 83 additions & 78 deletions lib/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class ThreadError < StandardError
Thread.abort_on_exception = true
end

unless defined?(Thread::RELY_ON_GVL)
Thread::RELY_ON_GVL = false
end

#
# ConditionVariable objects augment class Mutex. Using condition variables,
# it is possible to suspend while in the middle of a critical section until a
Expand Down Expand Up @@ -52,7 +56,8 @@ class ConditionVariable
# Creates a new ConditionVariable
#
def initialize
@waiters = []
@waiters = {}
@waiters.compare_by_identity
@waiters_mutex = Mutex.new
end

Expand All @@ -63,32 +68,49 @@ def initialize
# even if no other thread doesn't signal.
#
def wait(mutex, timeout=nil)
begin
# TODO: mutex should not be used
@waiters_mutex.synchronize do
@waiters.push(Thread.current)
end
mutex.sleep timeout
ensure
# Rely on GVL for sychronizing @waiters.push
@waiters[Thread.current] = true
mutex.sleep timeout do
# We could rely on GVL cause hash were set to compare_by_identity mode
@waiters.delete(Thread.current)
end
self
end if Thread::RELY_ON_GVL

def wait(mutex, timeout=nil) # :nodoc:
@waiters_mutex.synchronize do
@waiters[Thread.current] = true
end
mutex.sleep timeout do
@waiters_mutex.synchronize do
@waiters.delete(Thread.current)
end
end
self
end
end unless Thread::RELY_ON_GVL

#
# Wakes up the first thread in line waiting for this lock.
#
def signal
begin
t = @waiters_mutex.synchronize {@waiters.shift}
t, _ = @waiters.shift
t.run if t
rescue ThreadError
retry
end
self
end
end if Thread::RELY_ON_GVL

def signal # :nodoc:
begin
t, _ = @waiters_mutex.synchronize { @waiters.shift }
t.run if t
rescue ThreadError
retry
end
self
end unless Thread::RELY_ON_GVL

#
# Wakes up all threads waiting for this lock.
Expand All @@ -97,7 +119,7 @@ def broadcast
# TODO: incomplete
waiters0 = nil
@waiters_mutex.synchronize do
waiters0 = @waiters.dup
waiters0 = @waiters.keys
@waiters.clear
end
for t in waiters0
Expand Down Expand Up @@ -143,26 +165,29 @@ class Queue
#
def initialize
@que = []
@waiting = []
@waiting = {}
@waiting.compare_by_identity
@que.taint # enable tainted communication
@waiting.taint
self.taint
@mutex = Mutex.new
end

def push_no_sync(obj) # :nodoc:
@que.push obj
begin
t, _ = @waiting.shift
t.wakeup if t
rescue ThreadError
retry
end
end
private :push_no_sync
#
# Pushes +obj+ to the queue.
#
def push(obj)
@mutex.synchronize{
@que.push obj
begin
t = @waiting.shift
t.wakeup if t
rescue ThreadError
retry
end
}
@mutex.synchronize{ push_no_sync(obj) }
end

#
Expand All @@ -175,29 +200,27 @@ def push(obj)
#
alias enq push

def pop_no_sync(non_block) # :nodoc:
if non_block
raise ThreadError, "queue empty" if @que.empty?
else
while @que.empty?
@waiting[Thread.current] = true
@mutex.sleep
end
end
@que.shift
ensure
@waiting.delete(Thread.current)
end
private :pop_no_sync
#
# Retrieves data from the queue. If the queue is empty, the calling thread is
# suspended until data is pushed onto the queue. If +non_block+ is true, the
# thread isn't suspended, and an exception is raised.
#
def pop(non_block=false)
@mutex.synchronize{
begin
while true
if @que.empty?
raise ThreadError, "queue empty" if non_block
# @waiting.include? check is necessary for avoiding a race against
# Thread.wakeup [Bug 5195]
@waiting.push Thread.current unless @waiting.include?(Thread.current)
@mutex.sleep
else
return @que.shift
end
end
ensure
@waiting.delete(Thread.current)
end
}
@mutex.synchronize{ pop_no_sync(non_block) }
end

#
Expand Down Expand Up @@ -257,7 +280,8 @@ class SizedQueue < Queue
def initialize(max)
raise ArgumentError, "queue size must be positive" unless max > 0
@max = max
@queue_wait = []
@queue_wait = {}
@queue_wait.compare_by_identity
@queue_wait.taint # enable tainted comunication
super()
end
Expand All @@ -269,30 +293,26 @@ def max
@max
end

def wakeup_queue_waiter # :nodoc:
t, _ = @queue_wait.shift
t.wakeup if t
rescue ThreadError
retry
end
private :wakeup_queue_waiter

#
# Sets the maximum size of the queue.
#
def max=(max)
raise ArgumentError, "queue size must be positive" unless max > 0
diff = nil
@mutex.synchronize {
if max <= @max
@max = max
else
diff = max - @max
@max = max
diff = max - @max
@max = max
if diff > 0
diff.times { wakeup_queue_waiter }
end
}
if diff
diff.times do
begin
t = @queue_wait.shift
t.run if t
rescue ThreadError
retry
end
end
end
max
end

Expand All @@ -303,22 +323,14 @@ def max=(max)
def push(obj)
@mutex.synchronize{
begin
while true
break if @que.length < @max
@queue_wait.push Thread.current unless @queue_wait.include?(Thread.current)
while @que.length >= @max
@queue_wait[Thread.current] = true
@mutex.sleep
end
ensure
@queue_wait.delete(Thread.current)
end

@que.push obj
begin
t = @waiting.shift
t.wakeup if t
rescue ThreadError
retry
end
push_no_sync obj
}
end

Expand All @@ -335,19 +347,12 @@ def push(obj)
#
# Retrieves data from the queue and runs a waiting thread, if any.
#
def pop(*args)
retval = super
def pop(non_block=false)
@mutex.synchronize {
if @que.length < @max
begin
t = @queue_wait.shift
t.wakeup if t
rescue ThreadError
retry
end
end
retval = pop_no_sync(non_block)
wakeup_queue_waiter if @que.length < @max
retval
}
retval
end

#
Expand Down
71 changes: 71 additions & 0 deletions test/ruby/test_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,77 @@ def test_condvar_nolock_3
end.join
end

class Serializer
def initialize
@mutex = Mutex.new
@condvar = ConditionVariable.new
end
def wait
@mutex.synchronize{
@condvar.wait(@mutex)
}
end
def signal
@mutex.synchronize{ @condvar.signal }
end
end

def test_condvar_wait_timeout
serialize = Serializer.new

mutex = Mutex.new
condvar = ConditionVariable.new
def condvar.waiters
@waiters
end
thread = Thread.new do
serialize.signal
mutex.synchronize do
condvar.wait(mutex, 0.001)
end
end
serialize.wait
mutex.synchronize do
sleep(0.01)
assert_not_includes(condvar.waiters, thread)
end
end

def test_condvar_wait_timeout_2
serialize = Serializer.new

mutex = Mutex.new
condvar = ConditionVariable.new

wait_timeout = Thread.new do
serialize.signal
mutex.synchronize do
condvar.wait(mutex, 0.001)
end
end
serialize.wait

wait_forever = Thread.new do
serialize.signal
mutex.synchronize do
condvar.wait(mutex)
end
end
serialize.wait

mutex.synchronize do
sleep(0.01)
condvar.signal
end
# If wait_timeout thread didn't remove himself
# from condvar sleepers, than signale tries to
# wakeup wait_timeout instead of wait_forever
# and fatal deadlock occures in a line below
wait_timeout.join
wait_forever.join

end

def test_local_barrier
dir = File.dirname(__FILE__)
lbtest = File.join(dir, "lbtest.rb")
Expand Down
Loading