Skip to content

Commit

Permalink
Specialize Dispatcher and Worker looping
Browse files Browse the repository at this point in the history
The Worker and Dispatcher share the same poll loop logic
(Poller#start_loop) while having different functional requirements.

The Worker is poll looping despite not being able to execute new
jobs if at capacity.

The Dispatcher does require polling, but is reliant on shared
logic in Poller#start_loop for a Dispatcher specific optimization.

Changes:

Move the logic controlling the sleep interval per poll from
Poller#start_loop into Worker#poll and Dispatcher#poll by
requiring #poll to return the `delay` value passed into
interruptible_sleep.

Poller#start_loop:
* Removes the test based on the number of rows processed by #poll. This
  was Dispatcher specific logic.

Worker#poll:
* When Worker at full capacity: return a large value (10.minutes)
  effectively transforming Poller#start_loop from polling to wake-on-event.
* When Worker < capacity: return `polling_interval` and maintain the poll
  timing until ReadyExecutions become available.

Dispatcher#poll:
* When `due` ScheduledExecutions.zero? return `polling_interval` and
  maintain the existing poll timing when no ScheduledExecutions
  are available to process.
* When `due` ScheduledExecutions.postive? return 0. This results in
  interruptible_sleep(0) which returns immediately and without introducing
  any delays/sleeps between polls. This also allows for the existing behavior
  of looping through ScheduledExecutions via poll in order to check for
  shutdown requests between dispatch_next_batch interations.
  • Loading branch information
hms authored and rosa committed Dec 19, 2024
1 parent eb75d09 commit 6f10ab3
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 11 deletions.
3 changes: 2 additions & 1 deletion lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def metadata
private
def poll
batch = dispatch_next_batch
batch.size

batch.size.zero? ? polling_interval : 0.seconds
end

def dispatch_next_batch
Expand Down
14 changes: 9 additions & 5 deletions lib/solid_queue/processes/interruptible.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ def interrupt
queue << true
end

# Sleeps for 'time'. Can be interrupted asynchronously and return early via wake_up.
# @param time [Numeric] the time to sleep. 0 returns immediately.
# @return [true, nil]
# * returns `true` if an interrupt was requested via #wake_up between the
# last call to `interruptible_sleep` and now, resulting in an early return.
# * returns `nil` if it slept the full `time` and was not interrupted.
def interruptible_sleep(time)
# Invoking from the main thread can result in a 35% slowdown (at least when running the test suite).
# Using some form of Async (Futures) addresses this performance issue.
# Invoking this from the main thread may result in significant slowdown.
# Utilizing asynchronous execution (Futures) addresses this performance issue.
Concurrent::Promises.future(time) do |timeout|
if timeout > 0 && queue.pop(timeout:)
queue.clear
end
queue.pop(timeout:).tap { queue.clear }
end.value
end

Expand Down
8 changes: 4 additions & 4 deletions lib/solid_queue/processes/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ def start_loop
loop do
break if shutting_down?

wrap_in_app_executor do
unless poll > 0
interruptible_sleep(polling_interval)
end
delay = wrap_in_app_executor do
poll
end

interruptible_sleep(delay)
end
ensure
SolidQueue.instrument(:shutdown_process, process: self) do
Expand Down
3 changes: 2 additions & 1 deletion lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Worker < Processes::Poller
after_boot :run_start_hooks
before_shutdown :run_stop_hooks


attr_accessor :queues, :pool

def initialize(**options)
Expand All @@ -29,7 +30,7 @@ def poll
pool.post(execution)
end

executions.size
pool.idle? ? polling_interval : 10.minutes
end
end

Expand Down
24 changes: 24 additions & 0 deletions test/unit/dispatcher_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,30 @@ class DispatcherTest < ActiveSupport::TestCase
another_dispatcher&.stop
end

test "sleeps `0.seconds` between polls if there are ready to dispatch jobs" do
dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1)
dispatcher.expects(:interruptible_sleep).with(0.seconds).at_least(3)
dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once

3.times { AddToBufferJob.set(wait: 0.1).perform_later("I'm scheduled") }
assert_equal 3, SolidQueue::ScheduledExecution.count
sleep 0.1

dispatcher.start
wait_while_with_timeout(1.second) { SolidQueue::ScheduledExecution.any? }

assert_equal 0, SolidQueue::ScheduledExecution.count
assert_equal 3, SolidQueue::ReadyExecution.count
end

test "sleeps `polling_interval` between polls if there are no un-dispatched jobs" do
dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1)
dispatcher.expects(:interruptible_sleep).with(0.seconds).never
dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once
dispatcher.start
sleep 0.1
end

private
def with_polling(silence:)
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence
Expand Down
20 changes: 20 additions & 0 deletions test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,26 @@ class WorkerTest < ActiveSupport::TestCase
SolidQueue.process_heartbeat_interval = old_heartbeat_interval
end

test "sleeps `10.minutes` if at capacity" do
3.times { |i| StoreResultJob.perform_later(i, pause: 1.second) }

@worker.expects(:interruptible_sleep).with(10.minutes).at_least_once
@worker.expects(:interruptible_sleep).with(@worker.polling_interval).never

@worker.start
sleep 1.second
end

test "sleeps `polling_interval` if worker not at capacity" do
2.times { |i| StoreResultJob.perform_later(i, pause: 1.second) }

@worker.expects(:interruptible_sleep).with(@worker.polling_interval).at_least_once
@worker.expects(:interruptible_sleep).with(10.minutes).never

@worker.start
sleep 1.second
end

private
def with_polling(silence:)
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence
Expand Down

0 comments on commit 6f10ab3

Please sign in to comment.