Skip to content

Commit

Permalink
Validate DB connection pool before starting supervisor
Browse files Browse the repository at this point in the history
Also, refactor a bit configuration validation to use ActiveModel's
validations.
  • Loading branch information
rosa committed Dec 17, 2024
1 parent 019d7c7 commit 3b367e2
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 30 deletions.
43 changes: 34 additions & 9 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

module SolidQueue
class Configuration
include ActiveModel::Model

validate :ensure_configured_processes
validate :ensure_valid_recurring_tasks
validate :ensure_correctly_sized_thread_pool

class Process < Struct.new(:kind, :attributes)
def instantiate
"SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes)
Expand Down Expand Up @@ -36,10 +42,6 @@ def configured_processes
end
end

def valid?
configured_processes.any? && (skip_recurring_tasks? || invalid_tasks.none?)
end

def error_messages
if configured_processes.none?
"No workers or processed configured. Exiting..."
Expand All @@ -54,14 +56,32 @@ def error_messages
end
end

def max_number_of_threads
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
workers_options.map { |options| options[:threads] }.max + 2
end

private
attr_reader :options

def ensure_configured_processes
unless configured_processes.any?
errors.add(:base, "No processes configured")
end
end

def ensure_valid_recurring_tasks
unless skip_recurring_tasks? || invalid_tasks.none?
error_messages = invalid_tasks.map do |task|
"- #{task.key}: #{task.errors.full_messages.join(", ")}"
end

errors.add(:base, "Invalid recurring tasks:\n#{error_messages.join("\n")}")
end
end

def ensure_correctly_sized_thread_pool
if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads
errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " +
"database connection pool is #{db_pool_size}. Increase it in `config/database.yml`")
end
end

def default_options
{
config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH),
Expand Down Expand Up @@ -169,5 +189,10 @@ def load_config_from_file(file)
{}
end
end

def estimated_number_of_threads
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
(workers_options.map { |options| options[:threads] }.max || 1) + 2
end
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def start(**options)
if configuration.valid?
new(configuration).tap(&:start)
else
abort configuration.error_messages
abort configuration.errors.full_messages
end
end
end
Expand Down
13 changes: 8 additions & 5 deletions test/dummy/config/recurring_with_invalid.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
periodic_store_result:
class: StoreResultJorrrrrrb
queue: default
args: [42, { status: "custom_status" }]
schedule: every second
periodic_invalid_class:
class: StoreResultJorrrrrrb
queue: default
args: [42, { status: "custom_status" }]
schedule: every second
periodic_incorrect_schedule:
class: StoreResultJob
schedule: every 1.minute
33 changes: 18 additions & 15 deletions test/unit/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ class ConfigurationTest < ActiveSupport::TestCase
assert_processes configuration, :worker, 2
end

test "max number of threads" do
configuration = SolidQueue::Configuration.new
assert 7, configuration.max_number_of_threads
end

test "mulitple workers with the same configuration" do
background_worker = { queues: "background", polling_interval: 10, processes: 3 }
configuration = SolidQueue::Configuration.new(workers: [ background_worker ])
Expand Down Expand Up @@ -90,22 +85,30 @@ class ConfigurationTest < ActiveSupport::TestCase
assert_processes configuration, :dispatcher, 1, polling_interval: 0.1, recurring_tasks: nil
end

test "detects when there are invalid recurring tasks" do
test "validate configuration" do
# Valid and invalid recurring tasks
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_invalid))

assert_not configuration.valid?
end
assert configuration.errors.full_messages.one?
error = configuration.errors.full_messages.first

test "is valid when there are no recurring tasks" do
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring))
assert error.include?("Invalid recurring tasks")
assert error.include?("periodic_invalid_class: Class name doesn't correspond to an existing class")
assert error.include?("periodic_incorrect_schedule: Schedule is not a supported recurring schedule")

assert configuration.valid?
end
assert SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring)).valid?
assert SolidQueue::Configuration.new(skip_recurring: true).valid?

test "is valid when recurring tasks are skipped" do
configuration = SolidQueue::Configuration.new(skip_recurring: true)
# No processes
configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [])
assert_not configuration.valid?
assert_equal [ "No processes configured" ], configuration.errors.full_messages

assert configuration.valid?
# Not enough DB connections
configuration = SolidQueue::Configuration.new(workers: [{ queues: "background", threads: 50, polling_interval: 10 }])
assert_not configuration.valid?
assert_equal [ "Solid Queue is configured to use 52 threads but the database connection pool is 20. Increase it in `config/database.yml`" ],
configuration.errors.full_messages
end

private
Expand Down

0 comments on commit 3b367e2

Please sign in to comment.