Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate DB connection pool before starting supervisor #457

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Validate DB connection pool before starting supervisor
Also, refactor a bit configuration validation to use ActiveModel's
validations.
  • Loading branch information
rosa committed Dec 17, 2024
commit a34889cf1d6f5c800446089d6f7b3afceded43d8
44 changes: 35 additions & 9 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

module SolidQueue
class Configuration
include ActiveModel::Model

validate :ensure_configured_processes
validate :ensure_valid_recurring_tasks
validate :ensure_correctly_sized_thread_pool

class Process < Struct.new(:kind, :attributes)
def instantiate
"SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes)
Expand Down Expand Up @@ -36,10 +42,6 @@ def configured_processes
end
end

def valid?
configured_processes.any? && (skip_recurring_tasks? || invalid_tasks.none?)
end

def error_messages
if configured_processes.none?
"No workers or processed configured. Exiting..."
Expand All @@ -54,14 +56,32 @@ def error_messages
end
end

def max_number_of_threads
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
workers_options.map { |options| options[:threads] }.max + 2
end

private
attr_reader :options

def ensure_configured_processes
unless configured_processes.any?
errors.add(:base, "No processes configured")
end
end

def ensure_valid_recurring_tasks
unless skip_recurring_tasks? || invalid_tasks.none?
error_messages = invalid_tasks.map do |task|
"- #{task.key}: #{task.errors.full_messages.join(", ")}"
end

errors.add(:base, "Invalid recurring tasks:\n#{error_messages.join("\n")}")
end
end

def ensure_correctly_sized_thread_pool
if (db_pool_size = SolidQueue::Record.connection_pool&.size) && db_pool_size < estimated_number_of_threads
errors.add(:base, "Solid Queue is configured to use #{estimated_number_of_threads} threads but the " +
"database connection pool is #{db_pool_size}. Increase it in `config/database.yml`")
end
end

def default_options
{
config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH),
Expand Down Expand Up @@ -169,5 +189,11 @@ def load_config_from_file(file)
{}
end
end

def estimated_number_of_threads
# At most "threads" in each worker + 1 thread for the worker + 1 thread for the heartbeat task
thread_count = workers_options.map { |options| options.fetch(:threads, WORKER_DEFAULTS[:threads]) }.max
(thread_count || 1) + 2
end
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def start(**options)
if configuration.valid?
new(configuration).tap(&:start)
else
abort configuration.error_messages
abort configuration.errors.full_messages.join("\n") + "\nExiting..."
end
end
end
Expand Down
13 changes: 8 additions & 5 deletions test/dummy/config/recurring_with_invalid.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
periodic_store_result:
class: StoreResultJorrrrrrb
queue: default
args: [42, { status: "custom_status" }]
schedule: every second
periodic_invalid_class:
class: StoreResultJorrrrrrb
queue: default
args: [42, { status: "custom_status" }]
schedule: every second
periodic_incorrect_schedule:
class: StoreResultJob
schedule: every 1.minute
33 changes: 18 additions & 15 deletions test/unit/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ class ConfigurationTest < ActiveSupport::TestCase
assert_processes configuration, :worker, 2
end

test "max number of threads" do
configuration = SolidQueue::Configuration.new
assert 7, configuration.max_number_of_threads
end

test "mulitple workers with the same configuration" do
background_worker = { queues: "background", polling_interval: 10, processes: 3 }
configuration = SolidQueue::Configuration.new(workers: [ background_worker ])
Expand Down Expand Up @@ -90,22 +85,30 @@ class ConfigurationTest < ActiveSupport::TestCase
assert_processes configuration, :dispatcher, 1, polling_interval: 0.1, recurring_tasks: nil
end

test "detects when there are invalid recurring tasks" do
test "validate configuration" do
# Valid and invalid recurring tasks
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_invalid))

assert_not configuration.valid?
end
assert configuration.errors.full_messages.one?
error = configuration.errors.full_messages.first

test "is valid when there are no recurring tasks" do
configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring))
assert error.include?("Invalid recurring tasks")
assert error.include?("periodic_invalid_class: Class name doesn't correspond to an existing class")
assert error.include?("periodic_incorrect_schedule: Schedule is not a supported recurring schedule")

assert configuration.valid?
end
assert SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring)).valid?
assert SolidQueue::Configuration.new(skip_recurring: true).valid?

test "is valid when recurring tasks are skipped" do
configuration = SolidQueue::Configuration.new(skip_recurring: true)
# No processes
configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [])
assert_not configuration.valid?
assert_equal [ "No processes configured" ], configuration.errors.full_messages

assert configuration.valid?
# Not enough DB connections
configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ])
assert_not configuration.valid?
assert_match /Solid Queue is configured to use \d+ threads but the database connection pool is \d+. Increase it in `config\/database.yml`/,
configuration.errors.full_messages.first
end

private
Expand Down
10 changes: 5 additions & 5 deletions test/unit/supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@ class SupervisorTest < ActiveSupport::TestCase
end

test "start with empty configuration" do
pid, _out, err = run_supervisor_as_fork_with_captured_io(workers: [], dispatchers: [])
pid, _out, error = run_supervisor_as_fork_with_captured_io(workers: [], dispatchers: [])
sleep(0.5)
assert_no_registered_processes

assert_not process_exists?(pid)
assert_match %r{No workers or processed configured. Exiting...}, err
assert_match %r{No processes configured}, error
end

test "start with invalid configuration" do
pid, _out, err = run_supervisor_as_fork_with_captured_io(recurring_schedule_file: config_file_path(:recurring_with_invalid), skip_recurring: false)
test "start with invalid recurring tasks" do
pid, _out, error = run_supervisor_as_fork_with_captured_io(recurring_schedule_file: config_file_path(:recurring_with_invalid), skip_recurring: false)

sleep(0.5)
assert_no_registered_processes

assert_not process_exists?(pid)
assert_match %r{Invalid processes configured}, err
assert_match %r{Invalid recurring tasks}, error
end

test "create and delete pidfile" do
Expand Down
Loading