Skip to content

Commit

Permalink
Validate DB connection pool before starting supervisor
Browse files Browse the repository at this point in the history
Also, refactor a bit configuration validation to use ActiveModel's
validations.
  • Loading branch information
rosa committed Dec 17, 2024
1 parent 019d7c7 commit a34889c
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 35 deletions.
44 changes: 35 additions & 9 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

module SolidQueue
class Configuration
include ActiveModel::Model

validate :ensure_configured_processes
validate :ensure_valid_recurring_tasks
validate :ensure_correctly_sized_thread_pool

class Process < Struct.new(:kind, :attributes)
def instantiate
"SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes)
Expand Down Expand Up @@ -36,10 +42,6 @@ def configured_processes
end
end

def valid?
configured_processes.any? && (skip_recurring_tasks? || invalid_tasks.none?)
end

def error_messages
if configured_processes.none?
"No workers or processed configured. Exiting..."
Expand All @@ -54,14 +56,32 @@ def error_messages
end
end

def max_number_of_threads
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
workers_options.map { |options| options[:threads] }.max + 2
end

private
attr_reader :options

def ensure_configured_processes
unless configured_processes.any?
errors.add(:base, "No processes configured")
end
end

def ensure_valid_recurring_tasks
unless skip_recurring_tasks? || invalid_tasks.none?
error_messages = invalid_tasks.map do |task|
"- #{task.key}: #{task.errors.full_messages.join(", ")}"
end

errors.add(:base, "Invalid recurring tasks:\n#{error_messages.join("\n")}")
end
end

def ensure_correctly_sized_thread_pool
if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads
errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " +
"database connection pool is #{db_pool_size}. Increase it in `config/database.yml`")
end
end

def default_options
{
config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH),
Expand Down Expand Up @@ -169,5 +189,11 @@ def load_config_from_file(file)
{}
end
end

def estimated_number_of_threads
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
thread_count = workers_options.map { |options| options.fetch(:threads, WORKER_DEFAULTS[:threads]) }.max
(thread_count || 1) + 2
end
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def start(**options)
if configuration.valid?
new(configuration).tap(&:start)
else
abort configuration.error_messages
abort configuration.errors.full_messages.join("\n") + "\nExiting..."
end
end
end
Expand Down
13 changes: 8 additions & 5 deletions test/dummy/config/recurring_with_invalid.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
periodic_store_result:
class: StoreResultJorrrrrrb
queue: default
args: [42, { status: "custom_status" }]
schedule: every second
periodic_invalid_class:
class: StoreResultJorrrrrrb
queue: default
args: [42, { status: "custom_status" }]
schedule: every second
periodic_incorrect_schedule:
class: StoreResultJob
schedule: every 1.minute
33 changes: 18 additions & 15 deletions test/unit/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ class ConfigurationTest < ActiveSupport::TestCase
assert_processes configuration, :worker, 2
end

test "max number of threads" do
configuration = SolidQueue::Configuration.new
assert 7, configuration.max_number_of_threads
end

test "mulitple workers with the same configuration" do
background_worker = { queues: "background", polling_interval: 10, processes: 3 }
configuration = SolidQueue::Configuration.new(workers: [ background_worker ])
Expand Down Expand Up @@ -90,22 +85,30 @@ class ConfigurationTest < ActiveSupport::TestCase
assert_processes configuration, :dispatcher, 1, polling_interval: 0.1, recurring_tasks: nil
end

test "detects when there are invalid recurring tasks" do
test "validate configuration" do
# Valid and invalid recurring tasks
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_invalid))

assert_not configuration.valid?
end
assert configuration.errors.full_messages.one?
error = configuration.errors.full_messages.first

test "is valid when there are no recurring tasks" do
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring))
assert error.include?("Invalid recurring tasks")
assert error.include?("periodic_invalid_class: Class name doesn't correspond to an existing class")
assert error.include?("periodic_incorrect_schedule: Schedule is not a supported recurring schedule")

assert configuration.valid?
end
assert SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring)).valid?
assert SolidQueue::Configuration.new(skip_recurring: true).valid?

test "is valid when recurring tasks are skipped" do
configuration = SolidQueue::Configuration.new(skip_recurring: true)
# No processes
configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [])
assert_not configuration.valid?
assert_equal [ "No processes configured" ], configuration.errors.full_messages

assert configuration.valid?
# Not enough DB connections
configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ])
assert_not configuration.valid?
assert_match /Solid Queue is configured to use \d+ threads but the database connection pool is \d+. Increase it in `config\/database.yml`/,
configuration.errors.full_messages.first
end

private
Expand Down
10 changes: 5 additions & 5 deletions test/unit/supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@ class SupervisorTest < ActiveSupport::TestCase
end

test "start with empty configuration" do
pid, _out, err = run_supervisor_as_fork_with_captured_io(workers: [], dispatchers: [])
pid, _out, error = run_supervisor_as_fork_with_captured_io(workers: [], dispatchers: [])
sleep(0.5)
assert_no_registered_processes

assert_not process_exists?(pid)
assert_match %r{No workers or processed configured. Exiting...}, err
assert_match %r{No processes configured}, error
end

test "start with invalid configuration" do
pid, _out, err = run_supervisor_as_fork_with_captured_io(recurring_schedule_file: config_file_path(:recurring_with_invalid), skip_recurring: false)
test "start with invalid recurring tasks" do
pid, _out, error = run_supervisor_as_fork_with_captured_io(recurring_schedule_file: config_file_path(:recurring_with_invalid), skip_recurring: false)

sleep(0.5)
assert_no_registered_processes

assert_not process_exists?(pid)
assert_match %r{Invalid processes configured}, err
assert_match %r{Invalid recurring tasks}, error
end

test "create and delete pidfile" do
Expand Down

0 comments on commit a34889c

Please sign in to comment.