How does Sidekiq work?

2024-02-04

This article was originally published on DanSvetlov.me and is republished here with permission of the author. This article is relevant to Sidekiq v7.

Sidekiq is one of the most ubiquitous1 Ruby background job processors out there. To anybody who has worked with Ruby on and off Rails, it needs no introduction. Sidekiq has a 10+ year track record of being an efficient, battle-tested and simple-to-use solution for offloading the execution of application logic into the background.

It utilizes a threaded model for job processing, uses Redis as a backend and claims to have an ‘at least once’ semantic when it comes to processing jobs (with a caveat) in a free open-source version. Sidekiq also offers 2 additional paid versions - Pro and Enterprise, each of them introducing additional features and extensions. For obvious reasons, I will not go into the details of these versions.

This article will delve into the internals of Sidekiq to highlight its key aspects, as well as design and implementation decisions that I personally find interesting or peculiar, by diving directly into the source code and following a job through its full lifecycle.

I will not cover the user-facing API and general “how-to’s” of Sidekiq. Its wiki provides a better resource for this. Basic acquaintance with the library, as well as Ruby, is expected.

The code examined in this article was sourced from Sidekiq version 7.2 and Ruby 3.2 on MRI/cruby. Even though the code discussed may become obsolete with newer Sidekiq versions, it would take a drastic philosophy change on the maintainers’ part for the general principles and architecture to be become outdated. Nonetheless, the article could serve as a valuable resource for anyone looking to practice their systems programming skills by building a job processor.

It’s worth noting that I’m neither a maintainer nor a contributor of Sidekiq. All of my observations and conclusions stem from reading the source code, comments, and discussions from issues and pull requests sourced via git blame.

Booting up

I believe that the best way to get acquainted with code is by examining its entry points first. Sidekiq, being a background job processor, must be initiated as a separate process. bin/sidekiq is the script that is used to initiate this process, with its primary responsibility being the instantiation of the Sidekiq::CLI singleton class.

The CLI object parses the configuration passed to the process as command line arguments via Ruby’s OptionParser, and initialises the global default Sidekiq::Config.

If a valid path to a YAML config is passed via --config/-C CLI argument, it will be used; otherwise, it will assume that this configuration can be found under the relative ./config path. Alternatively it’s also possible to supply a custom --require/-R argument that is supposed to point to the root directory of an application where the job classes are located. Configuration in that case would be sourced from <path-specified-by-require>/config.

If the configuration file exists, Sidekiq will evaluate it using ERB. This allows defining the config file as a template. However, one notable caveat is that this file will be evaluated before the Rails application or a file specified via --require is required, which makes referencing constants defined in the application code impossible.2

Then, queues and concurrency configuration options are populated if their values were not explicitly set. With default values, Sidekiq will process only the default queue and set its concurrency to the value of the RAILS_MAX_THREADS environment variable if Sidekiq is being used with a Rails application.

As a penultimate step in the configuration process, both the queues and concurrency options get set on every capsule. Capsules will be examined in detail in the following sections, but for now, capsules can be thought of as compartmentalised groups of configuration options. In a basic Sidekiq setup, capsules are not exposed to the user; however, a default capsule is implicitly used. It is possible to define custom capsules in the configuration YAML file and via Sidekiq.configure_server.

Finally, CLI instance validates that the --require argument points to an existing file, or, in case it points to a directory, that config/application.rb exists. It also checks that concurrency and timeout are positive integer numbers.

Entering the main loop

After loading and validating the configuration, the bin/sidekiq executable calls run on the CLI instance. At this point, the application gets loaded.

If the path in the require config param is a directory, which is . by default unless explicitly configured, Sidekiq assumes that it’s being used in a Rails application and requires rails and sidekiq/rails, followed by a require File.expand_path("#{@config[:require]}/config/environment.rb").

Otherwise, if require points to a file, that file gets required. This means that the require config param always has to be explicitly supplied and point to an entry point that loads the application code if used in a non-Rails application.

Calls to Sidekiq.configure_server and Sidekiq.configure_client also get evaluated during the application boot since they’re typically part of the eagerly loadable application code. This setup most commonly occurs in a config/initializers/sidekiq.rb Rails initializer.

Signal handling

Before proceeding to the next step in the run method, it might be worth refreshing our memory on what signals are.

In essence, signals can be considered a type of inter-process communication. It’s possible to define custom handlers for most signals, which will execute arbitrary logic once the process receives a corresponding one. One use case for signals is controlling long-lived daemon processes, such as Sidekiq. For example, Kubernetes sends a SIGTERM to pods so that they can shut down gracefully and perform necessary cleanup, which is relevant for a job processor like Sidekiq. Most modern orchestration tools and hosting platforms support graceful termination by sending either a SIGINT or SIGTERM to running processes.

With theory out of the way, let’s see how Sidekiq interacts with signals here:

self_read, self_write = IO.pipe
sigs = %w[INT TERM TTIN TSTP]
# ...
sigs.each do |sig|
  old_handler = Signal.trap(sig) do
    if old_handler.respond_to?(:call)
      begin
        old_handler.call
      rescue Exception => exc
        # signal handlers can't use Logger so puts only
        puts ["Error in #{sig} handler", exc].inspect
      end
    end

    self_write.puts(sig)
  end
rescue ArgumentError
  puts "Signal #{sig} not supported"
end

The first interesting thing to note is the creation of an unnamed pipe using the IO.pipe API provided by Ruby. Its return value is a 2-member array of IO instances wrapping the file descriptors returned by the underlying system call (pipe2 or pipe on most UNIX-like systems):

int
rb_cloexec_pipe(int descriptors[2])
{
#ifdef HAVE_PIPE2
    int result = pipe2(descriptors, O_CLOEXEC | O_NONBLOCK);
#else
    int result = pipe(descriptors);
#endif

    if (result < 0)
        return result;

// ...
}

Unnamed pipes are extremely handy in forking environments since they can be used by the parent process to communicate with its children and vice versa without much complicated setup. However, Sidekiq does not employ a forking model (in a free OS version) and instead utilizes threads extensively, which all share the process’s memory. So why use a pipe? Before we answer this question, let’s examine what happens in Sidekiq’s signal handlers.

As evident from the Ruby snippet above, Sidekiq sets up handlers for SIGINT, SIGTERM, SIGTTIN, and SIGTSTP. Each handler is defined sequentially using Signal.trap, which, on UNIX-like systems, internally constructs a handler and invokes the sigaction system call:

static sighandler_t
ruby_signal(int signum, sighandler_t handler)
{
    struct sigaction sigact, old;

  // Code that populates the sigact struct is omitted

  // Register the signal handler
    if (sigaction(signum, &sigact, &old) < 0) {
        return SIG_ERR;
    }

    if (old.sa_flags & SA_SIGINFO)
        handler = (sighandler_t)old.sa_sigaction;
    else
        handler = old.sa_handler;
    ASSUME(handler != SIG_ERR);

    // Return the previous handler of the same signal
    return handler;
}

As observed, this function returns an instance of a handler previously registered for the same signal.

Let’s revisit the signal registration in Sidekiq::CLI:

old_handler = Signal.trap(sig) do
  if old_handler.respond_to?(:call)
    begin
      old_handler.call
    rescue Exception => exc
      puts ["Error in #{sig} handler", exc].inspect
    end
  end
  self_write.puts(sig)
end

This code might appear confusing at first glance - how is old_handler being called from within a block when old_handler itself is the return value of the said block? To unravel this little Inception moment, consider the following facts:

So, Signal.trap converts the passed block into an instance of Proc, does not evaluate it, and registers it in the global trap_list instead. When a signal is received, the corresponding handler gets called. old_handler will evaluate to either a string signifying the handler behavior (e.g., DEFAULT or IGNORE), which will be the case if no custom signal traps were defined previously, or an old custom handler. Sidekiq politely handles previously defined trap handlers since it cannot make assumptions about the environment it is running in. It’s possible that a developer or another library already declared a signal handler that is expected to be called.

However, the trap context ends with the signal name being written to the unnamed pipe created earlier. The fact that the exception is emitted directly to STDOUT in case it’s raised instead of being logged using Sidekiq.logger could serve as a hint as to why the handler logic is not being evaluated directly. For the exact reason why this happens and for the actual behavior of SIGINT, SIGTERM, and other handlers, read on.

The rest of run’s method execution is spent on eagerly loading resources, namely the Redis connection pool and the server middleware chain. This pattern is commonly used to avoid race conditions during the initialization of global resources in multi-threaded environments. The run method is the perfect opportunity to do so, since at this point there’s only one main thread, making synchronization redundant for the creation of state. Alternatively, if the resource were lazily loadable, its accessor would have to be wrapped in a synchronization primitive such as a mutex, which would incur performance penalties for every happy path triggered, where the resource is already allocated and just has to be referenced.

Finally, run calls launch and passes the reader end of the pipe as an argument. launch is the method where the main thread will spend the rest of its time while the Sidekiq process is running:

def launch(self_read)
  @launcher = Sidekiq::Launcher.new(@config)

  begin
    launcher.run

    while self_read.wait_readable
      signal = self_read.gets.strip
      handle_signal(signal)
    end
  rescue Interrupt
    launcher.stop

    exit(0)
  end
end

A Sidekiq::Launcher instance is created, and run is called on it. The block where this happens is wrapped in a rescue Interrupt.

By default, Interrupt is raised only when the process does not have a custom SIGINT handler registered, which is not the case here as Signal.trap was already explicitly called with SIGINT. In that case, let’s see where it might be getting raised from.

After the launcher gets commanded to run, the main thread enters an endless loop that calls self_read.wait_readable. This method, without a timeout argument, waits indefinitely until the underlying file descriptor has any data available to read. Whenever any signal that was trapped with Signal.trap earlier gets sent to the process, it will eventually be written to the writer end of the pipe and get read here. handle_signal will call a corresponding handler defined in the Sidekiq::CLI::SIGNAL_HANDLERS hash:

SIGNAL_HANDLERS = {
  "INT" => ->(cli) { raise Interrupt },
  "TERM" => ->(cli) { raise Interrupt },
  "TSTP" => ->(cli) {
    cli.logger.info "Received TSTP, no longer accepting new work"
    cli.launcher.quiet
  },
  "TTIN" => ->(cli) {
    Thread.list.each do |thread|
      cli.logger.warn "Thread TID-#{(thread.object_id ^ ::Process.pid).to_s(36)} #{thread.name}"
      if thread.backtrace
        cli.logger.warn thread.backtrace.join("\n")
      else
        cli.logger.warn "<no backtrace available>"
      end
    end
  }
}
UNHANDLED_SIGNAL_HANDLER = ->(cli) { cli.logger.info "No signal handler registered, ignoring" }
SIGNAL_HANDLERS.default = UNHANDLED_SIGNAL_HANDLER

def handle_signal(sig)
  logger.debug "Got #{sig} signal"
  SIGNAL_HANDLERS[sig].call(self)
end

The key signal handlers to note are SIGINT, SIGTERM, and SIGTSTP.3 The former two handlers consist only of a raise Interrupt, which will be rescued in the CLI.launch loop we looked at previously. SIGTSTP is a bit different - it calls quiet on the Sidekiq::Launcher instance.

Before we dive deeper into the launcher, let’s first answer why the signal handlers are not being executed directly from trap contexts and instead get sent to a pipe and processed in a main thread loop.

Any code invoked from a trap context must be reentrant. What this means in practice is that a handful of Ruby constructs and methods cannot be used inside Signal.trap blocks, most notably Mutex.

The reason why such operations are not permitted in the trap context is simple: custom signal handlers defined in Ruby (i.e., Signal.trap called with a block) can get executed at mostly any point in a Ruby program. This means that even thread-safe code, which may be considered ‘correct’ when executed concurrently, will encounter deadlocks if the signal trap code accesses any synchronization resources shared not only by other threads, but also by the main thread itself.4

None of the Sidekiq signal handlers reference a mutex directly. However, almost all of them emit messages using Sidekiq.logger, which by default is a subclass of the plain Ruby Logger class. Logger, in turn, uses a mutex to synchronize writes internally:

def write(message)
  begin
    # ...
    synchronize do # A convenience method that calls Monitor#synchronize
      @dev.write(message)
    end
    # ...
  rescue Exception => ignored
    warn("log writing failed. #{ignored}")
  end
end

This means that it’s possible for the signal handler, which accesses the same logger instance, to be called while the main thread owns the mutex during logging.

Keeping the signal traps short and deferring the actual logic by notifying the main thread through a pipe allows for lifting restrictions that are usually applied to signal handlers.

Managing the lifecycle

As covered in the previous section, the Sidekiq::CLI instance instantiates a Sidekiq::Launcher instance, calls run on it, and enters an infinite loop waiting for incoming signals. SIGINT and SIGTERM call stop, and SIGTSTP calls quiet on the launcher instance, and so far that’s the extent of our knowledge of the launcher. Let’s dissect it.

It’s worth to note the creation of managers during initialization of a launcher:

# Launcher

def initialize(config)
  # ...
  @managers = config.capsules.values.map do |cap|
    Sidekiq::Manager.new(cap)
  end
  # ...
end

Here’s where the capsules come fully into the spotlight. Their configuration is done in the Sidekiq::CLI instance, and now they’re being used to instantiate instances of Sidekiq::Manager. Capsules can be thought of as compartments inside a Sidekiq process. They are represented as Ruby objects containing individual concurrency and queues configuration parameters.

Without custom configuration, Sidekiq uses a single ‘default’ capsule implicitly, and it’s possible to define custom capsules if the need arises.

Before we proceed to researching the launcher lifecycle methods, let’s first go deeper down the stack for a moment and examine the key components relevant to the main processing loop.

Each capsule is manifested as a manager, serving as a container that oversees the lifecycle of a set of processors. The number of processors is determined by the concurrency setting. Managers provide control over their processors through publicly exposed methods such as start, quiet, and stop.

A processor serves as the unit of execution within Sidekiq, responsible for performing jobs. Its public API includes methods such as terminate, kill, and start, providing control over its execution lifecycle to its manager.

Starting up

Let’s now return to the launcher. The following is its simplified run method:

# Launcher

def run
  @thread = safe_thread("heartbeat", &method(:start_heartbeat))
  @managers.each(&:start)
end

It initially starts a heartbeat thread that dumps some stats into Redis every 10 seconds.

safe_thread is essentially a helper method provided by Sidekiq, which wraps the passed proc in a rescue block, calling the internal exception handler via Config#handle_exception. By default, this handler simply logs the exception without re-raising it.

Manager’s start method looks like this:

# Manager

def start
  @workers.each(&:start)
end

@workers contains instances of the Processor class.

Digging one stack frame deeper, here’s the processor’s start method:

# Processor

def start
  @thread ||= safe_thread("#{config.name}/processor", &method(:run))
end

In a similar fashion to the launcher, a processor starts a thread that executes the Processor#run method. For now, let’s consider it as a black box and just assume that it starts an endless loop that picks jobs from the queue and executes them.

At this point, the Sidekiq process actually starts performing enqueued jobs. However, this is not sufficient for stability, as shutdown is an inevitable part of every process lifecycle. Handling them gracefully in a job processor such as Sidekiq is arguably even more important than it is in web servers, and the reasons for this will be covered in the following sections.

Quieting down

As we have seen already, certain signals translate into calls to the launcher - namely quiet and stop. Let’s start with quiet first as it usually should precede stop.

Here’s a trimmed version of Launcher#quiet:

# Launcher

def quiet
  return if @done

  @done = true
  @managers.each(&:quiet)
end

The early return is there in order to handle a case where the process receives several extra SIGTSTP signals.

Manager#quiet takes the same precaution:

# Manager

def quiet
  return if @done
  @done = true

  @workers.each(&:terminate)
end

And this is how processors handle terminate:

# Processor

def terminate
  @done = true
end

That’s it. Despite its name, the method doesn’t perform anything more drastic than setting an instance variable. This is where we pull the curtain on the Processor#run method:

# Processor

def run
  process_one until @done
  @callback.call(self)
rescue Sidekiq::Shutdown
  @callback.call(self)
rescue Exception => ex
  @callback.call(self, ex)
end

This aligns with our previous assumption: the run method is executed within a thread spawned by the processor in its start method, and it enters a loop where the break condition is the @done instance variable being false.

It’s important to realize that invoking terminate does not immediately stop the processor from processing the current job. Instead, it will continue processing the current job until it completes.

So, when terminate is called, the run method will exit the loop after finishing the current job. At this point, a @callback is invoked. This callback is actually a Method instance that wraps Manager#processor_result, which is passed by the manager during the initialization of each processor.

class Manager
  def initialize(capsule)
    @done = false
    @workers = Set.new
    @plock = Mutex.new
    @count.times do
      @workers << Processor.new(@config, &method(:processor_result))
    end
  end
  
  # ...

  def processor_result(processor, reason = nil)
    @plock.synchronize do
      @workers.delete(processor)
      unless @done
        p = Processor.new(@config, &method(:processor_result))
        @workers << p
        p.start
      end
    end
  end
end

Reading through the source code of processor_result, it becomes clear that processors themselves are responsible for checking out of the Manager’s @workers set. Therefore, if a processor encounters a Sidekiq::Shutdown or any other uncaught exception, it will remove itself from the set, create a new processor in a similar manner to how it’s done in Manager#initialize, add that processor to the set, and call start on it.

However, in the code path we’re examining, which follows a call to quiet, the callback will only remove the processor from the workers set. Therefore, invoking quiet will cause processors to stop picking up new jobs to run, and eventually, the worker set will become empty. As we’ve already noticed, the jobs currently being executed by processors will not be abruptly halted; instead, they will continue until completion naturally.

This makes the SIGTSTP signal the perfect candidate to be sent some time before the process is terminated. Launcher#quiet is a way to inform Sidekiq that processors should avoid picking up new jobs after they finish processing their current ones, as the process is about to be terminated. This minimizes the need for intrusive actions when a process needs to fully stop.

Stopping

The Launcher#stop method begins with a calculation of a deadline utilizing POSIX clock_gettime. CLOCK_MONOTONIC is employed to acquire the current absolute elapsed time rather than wall clock time, which can vary unpredictably:

# Launcher

def stop
  deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + @config[:timeout]

  quiet
  stoppers = @managers.map do |mgr|
    Thread.new do
      mgr.stop(deadline)
    end
  end

  stoppers.each(&:join)
end

The default timeout config is set to 25 seconds and this number is not picked randomly; historically a lot of hosting platforms and orchestrators have been using a 30 second grace period for processes to react to SIGTERM gracefully. Sidekiq makes this timeout a bit lower in order to have better chances at finishing the required cleanup work.

quiet gets called as the first step in the stop method in order to minimize work that will have to be waited on in the next step. Another purpose of calling quiet here is to ensure that processors stop picking up new jobs in case SIGTSTP was not explicitly sent prior to SIGINT or SIGTERM:

# Manager

def stop(deadline)
  quiet

  return if @workers.empty?

  logger.info { "Pausing to allow jobs to finish..." }
  wait_for(deadline) { @workers.empty? }
  return if @workers.empty?

  hard_shutdown
ensure
  capsule.stop
end

In case processors manage to finish their current work in time, they will remove themselves from their manager’s @workers array after completing their current job. The 2 conditionals in stop are there to handle the happy path by preemptively terminating execution without additional unnecessary work in such cases. This is why it’s important to make sure that quiet gets called first.

If the manager sees that there are any processors that couldn’t finish immediately before the first return if @workers.empty? check, it must wait for them in its stop method. Remember that the deadline, set to 25 seconds by default, is passed down from the launcher. The wait_for method executes the block in a loop until the block’s return value is truthy:

# Manager

def wait_for(deadline, &condblock)
  remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
  while remaining > PAUSE_TIME
    return if condblock.call
    sleep PAUSE_TIME
    remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
  end
end

If there are still active processors lingering after the deadline, hard_shutdown is called. Here’s the body of this method with the original comments preserved:

# Manager

def hard_shutdown
  # We've reached the timeout and we still have busy threads.
  # They must die but their jobs shall live on.
  cleanup = nil
  @plock.synchronize do
    cleanup = @workers.dup
  end

  if cleanup.size > 0
    jobs = cleanup.map { |p| p.job }.compact
    # ...
    capsule.fetcher.bulk_requeue(jobs)
  end

  cleanup.each do |processor|
    processor.kill
  end

  # when this method returns, we immediately call `exit` which may not give
  # the remaining threads time to run `ensure` blocks, etc. We pause here up
  # to 3 seconds to give threads a minimal amount of time to run `ensure` blocks.
  deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + 3
  wait_for(deadline) { @workers.empty? }
end

We won’t explore the functionality of capsule.fetcher.bulk_requeue at this point. This is one of the most critical pieces of logic in Sidekiq and for now, let’s assume that, as the method’s name implies, it requeues jobs.

Every processor that did not check itself out of the @workers array by the time hard_shutdown was invoked gets killed. Here’s what Processor#kill does:

# Processor

def kill(wait = false)
  @done = true
  return unless @thread

  @thread.raise ::Sidekiq::Shutdown
end

It’s similar to quiet, but it additionally raises a Sidekiq::Shutdown, a subclass of Interrupt, on the processor thread.

This exception being raised, however, does not impose an upper time bound on when the thread will be truly finished. That’s why right after every process has been killed, the manager waits 3 seconds to give them an opportunity to terminate gracefully. We’ll get to what this graceful termination consists of later.

We’ve explored how Sidekiq establishes the necessary class hierarchy to ensure the smooth handling of the process lifecycle. The launcher oversees managers, which in turn manage processors responsible for fetching and processing jobs. Let’s now look at how the processors obtain jobs to execute.

Queue processing

Sidekiq jobs get routed to queues. At the very least each Sidekiq installation will use a default queue. The queues that Sidekiq should be processing can be defined globally or on a per-capsule basis. You can specify the set of queues using the config/sidekiq.yml configuration file, the -q command-line argument, or Sidekiq.configure_server. This configuration populates the queues parameter of the default capsule, unless specific configuration was provided for a particular capsule in the config file or server configuration.

Eventually, all provided queue lists end up in the Capsule#queues= setter method, which is crucial for the queue processing logic. Here’s its contents copied verbatim with the original comment:

# Sidekiq checks queues in three modes:
# - :strict - all queues have 0 weight and are checked strictly in order
# - :weighted - queues have arbitrary weight between 1 and N
# - :random - all queues have weight of 1
def queues=(val)
  @weights = {}
  @queues = Array(val).each_with_object([]) do |qstr, memo|
    arr = qstr
    arr = qstr.split(",") if qstr.is_a?(String)
    name, weight = arr
    @weights[name] = weight.to_i
    [weight.to_i, 1].max.times do
      memo << name
    end
  end
  @mode = if @weights.values.all?(&:zero?)
    :strict
  elsif @weights.values.all? { |x| x == 1 }
    :random
  else
    :weighted
  end
end

There are three options for how queues are polled. If none of the queues have a weight defined (using the default,10 notation), the mode is set to strict. If all of the queues have their weights set to 1 explicitly, the mode is set to random. In any other case, the mode is set to weighted.

One peculiar thing to note is that the @queues array will contain m copies of each queue, where m is their weight. For example, if the queues are configured in a weighted manner with default,10 queue_one,5 queue_two,3, @queues will consist of 10 :default elements, 5 :queue_one elements, and 3 :queue_two elements.

Let’s come back over to Processor#run. As we have already seen, this method makes the processor enter a loop that calls Processor#process_one until terminate or kill is called:

# Processor

def get_one
  uow = capsule.fetcher.retrieve_work
  if @down
    logger.info { "Redis is online, #{::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - @down} sec downtime" }
    @down = nil
  end
  uow
rescue Sidekiq::Shutdown
rescue => ex
  handle_fetch_exception(ex)
end

The conditional branch is there in case Sidekiq recovers from a backoff initiated by handle_fetch_exception in the rescue block. That method simply sleeps for 1 second if there’s a transient connectivity issue when contacting Redis.

Sidekiq::Shutdown is muted here to reduce noise created by the error handler invoked from handle_fetch_exception. Processing of new jobs stops regardless since @done ivar is set to false at this point.

The fetcher method in the Capsule class extracts the fetcher object. Here’s its implementation:

def fetcher
  @fetcher ||= begin
    inst = (config[:fetch_class] || Sidekiq::BasicFetch).new(self)
    inst.setup(config[:fetch_setup]) if inst.respond_to?(:setup)
    inst
  end
end

The OS version of Sidekiq comes with a single fetcher option, BasicFetch.

The Pro version includes SuperFetch, which offers a more robust durability guarantee for Sidekiq jobs. However, the specifics of this fetcher are not covered in this article.

Let’s examine the internals of BasicFetch. Here are the relevant methods, with original comments preserved:

class BasicFetch
  # We want the fetch operation to timeout every few seconds so the thread
  # can check if the process is shutting down.
  TIMEOUT = 2
  
  UnitOfWork = Struct.new(:queue, :job, :config) {
    def acknowledge
      # nothing to do
    end
  
    def queue_name
      queue.delete_prefix("queue:")
    end
  
    def requeue
      config.redis do |conn|
        conn.rpush(queue, job)
      end
    end
  }
  
  # ...
  
  def retrieve_work
    qs = queues_cmd
    # 4825 Sidekiq Pro with all queues paused will return an
    # empty set of queues
    if qs.size <= 0
      sleep(TIMEOUT)
      return nil
    end
  
    queue, job = redis { |conn| conn.blocking_call(conn.read_timeout + TIMEOUT, "brpop", *qs, TIMEOUT) }
    UnitOfWork.new(queue, job, config) if queue
  end
  
  # ...
  
  # Creating the Redis#brpop command takes into account any
  # configured queue weights. By default Redis#brpop returns
  # data from the first queue that has pending elements. We
  # recreate the queue command each time we invoke Redis#brpop
  # to honor weights and avoid queue starvation.
  def queues_cmd
    if @strictly_ordered_queues
      @queues
    else
      permute = @queues.shuffle
      permute.uniq!
      permute
    end
  end
end

True to its name, the logic in this class is straightforward: construct an array of queues (keep in mind that queue weights result in duplicate entries in the @queues array), shuffle it if there’s no strict ordering, and then pass it to Redis’ BRPOP command.

It might seem counterintuitive to call #uniq! on the resulting array when there are weighted queues, but it’s not. The more identical entries there are in the array, the higher the chance that one of these entries will appear at the head of the array. uniq! preserves the ordering of elements.

BRPOP takes multiple list names as arguments and pops the tail element of the first non-empty list in a blocking manner. This means that the first queue in the @queues array gets priority. An important detail here is the timeout. As the comment above the TIMEOUT constant suggests, Sidekiq wants these operations to time out regularly so that a process can check if terminate or kill was called in the meantime.

If the command successfully fetches a job from the list, the return values are wrapped in a UnitOfWork DTO.

The critical aspect of BasicFetch is that BRPOP removes the job from the list. Despite Sidekiq’s efforts to be as resilient as possible and cover most failure scenarios, this has a significant implication: jobs can be lost. We’ll explore some of the scenarios where this might occur and how Sidekiq attempts to mitigate them in a later section.

As you have probably noticed, the acknowledge method is empty for a BasicFetch UnitOfWork. While I haven’t had the opportunity to explore the Pro or Enterprise licensed versions of Sidekiq myself, it’s reasonable to assume that this method is relevant for the SuperFetch fetcher. SuperFetch preserves the job in Redis in some form instead of removing it immediately after fetching, which contributes to its enhanced reliability, but makes it necessary to somehow mark the jobs as being successfully processed. SuperFetch’s acknowledge most likely does exactly that. We will encounter situations where the acknowledge method becomes relevant as we dive deeper.

Back in Processor#process_one, if the fetch was successful, Processor#process is invoked. Since this method is lengthy, we’ll examine modified smaller segments at a time.

# Processor

def process(uow)
  jobstr = uow.job
  queue = uow.queue_name

  job_hash = nil
  begin
    job_hash = JSON.parse(jobstr)
  rescue => ex
    handle_exception(ex, {context: "Invalid JSON for job", jobstr: jobstr})
    now = Time.now.to_f
    redis do |conn|
      conn.multi do |xa|
        xa.zadd("dead", now.to_s, jobstr)
        xa.zremrangebyscore("dead", "-inf", now - @capsule.config[:dead_timeout_in_seconds])
        xa.zremrangebyrank("dead", 0, - @capsule.config[:dead_max_jobs])
      end
    end
    return uow.acknowledge
  end

  # ...
end

The job fetched from Redis is a JSON payload represented as a string, so it needs to be parsed first. Here, we encounter the concept of the morgue and dead jobs. Trying to process a malformed job won’t yield positive results no matter how many times it is retried, so marking the job as ‘dead’ is a practical approach.

This is achieved by the ZADD command, which adds a member to a sorted set, where sorting key is the time at which the job was pronounced dead.

The next 2 commands are there to remove dead jobs that surpassed the allowed time in the morgue, which is 6 months by default, and preserve only the latest 10000 jobs in the set. All of this is done in a Redis transaction using the MULTI command.

After the job payload gets parsed successfully, it needs to be executed. The rest of the #process method does exactly that (comments are preserved):

# Processor

def process(uow)
  # ...

  ack = false
  Thread.handle_interrupt(Sidekiq::Shutdown => :never) do
    Thread.handle_interrupt(Sidekiq::Shutdown => :immediate) do
      dispatch(job_hash, queue, jobstr) do |inst|
        config.server_middleware.invoke(inst, job_hash, queue) do
          execute_job(inst, job_hash["args"])
        end
      end
      ack = true
    rescue Sidekiq::Shutdown
      # Had to force kill this job because it didn't finish
      # within the timeout.  Don't acknowledge the work since
      # we didn't properly finish it.
    rescue Sidekiq::JobRetry::Handled => h
      # this is the common case: job raised error and Sidekiq::JobRetry::Handled
      # signals that we created a retry successfully.  We can acknowlege the job.
      ack = true
      e = h.cause || h
      handle_exception(e, {context: "Job raised exception", job: job_hash})
      raise e
    rescue Exception => ex
      # Unexpected error!  This is very bad and indicates an exception that got past
      # the retry subsystem (e.g. network partition).  We won't acknowledge the job
      # so it can be rescued when using Sidekiq Pro.
      handle_exception(ex, {context: "Internal exception!", job: job_hash, jobstr: jobstr})
      raise ex
    end
  ensure
    if ack
      uow.acknowledge
    end
  end
end

First thing that stands out is the use of Thread.handle_interrupt. In short, it allows to alter the handling of internal Ruby asynchronous events such as Thread#raise and Thread#kill - the former is exactly what is being used by Processor#kill as we have already seen.

In this particular case, first handle_interrupt is called with Sidekiq::Shutdown => :never and wraps the ensure block that calls #acknowledge on the unit of work. The second call is nested directly after the first one, and it resets the behaviour back to :immediate.

This achieves the following: if a processor is killed (using Thread#raise(Sidekiq::Shutdown)) when the ensure block is being evaluated, the raised exception will be ignored and the work will be acknowledged. Without setting the behaviour to :never, an acknowledgment would be missed in such case. The inner block resets it back to :immediate immediately afterward because this is the desired behavior - Sidekiq::Shutdown should interrupt the execution of the job’s code.

This is not that relevant in the OS version since BasicFetch’s UnitOfWork#acknowledge is a no op, but it’s important for SuperFetch, since its version of acknowledge most likely removes the job from Redis in order to mark it as processed.

The rescue blocks are self-explanatory thanks to the comments. One noteworthy aspect is that Sidekiq::JobRetry::Handled exceptions, which are raised when a retry is successfully created, get re-raised and propagated all the way up to Processor#run. This, in turn, triggers the callback supplied by the Manager, which deletes the processor and creates a new one. As a result, job retries will force the processor to be recreated, spawning a new Ruby thread in place of the original.

Let’s unwrap the Processor#dispatch and Processor#execute_job methods:

# Processor
# Relevant bits from #dispatch, #process & #execute_job combined together

@job_logger.prepare(job_hash) do
  @retrier.global(jobstr, queue) do
    @job_logger.call(job_hash, queue) do
      stats(jobstr, queue) do
        @reloader.call do
          klass = Object.const_get(job_hash["class"])
          inst = klass.new
          inst.jid = job_hash["jid"]
          @retrier.local(inst, jobstr, queue) do
            config.server_middleware.invoke(inst, job_hash, queue) do
              inst.perform(*job_hash["args"])
            end
          end
        end
      end
    end
  end
end

The @job_logger variable is an instance of Sidekiq::JobLogger. Its prepare method is responsible for putting job metadata into thread-local context, ensuring that logs emitted further down the stack contain useful correlation information.

Another crucial step in this pipeline is the @reloader.call invocation. This is where Sidekiq integrates with the Rails framework. A reloader, as the name suggests, is responsible for code reloading. This is relevant because jobs typically reside in the app/ directory of a Rails application, which should be reloadable. Additionally, the reloader handles ActiveRecord connection cleanup and other related concerns.

Sidekiq extracts the job class via Object.const_get(job_hash["class"]) and calls #perform on it after invoking the server middleware chain. Sidekiq’s middlewares are similar to those of Rack - arbitrary classes that respond to #call and yield if the chain should not be halted. This is the public API for integrations that want to augment or alter Sidekiq’s processing logic, which is exactly what monitoring libraries like Sentry, Datadog and others do.

Handling exceptions

Naturally, application code called from within jobs can raise exceptions. Sidekiq provides several ways to handle them, with the default behavior being to retry failed jobs up to 25 times with an exponential backoff.

Let’s examine the #local method of Processor’s @retrier ivar, an instance of Sidekiq::JobRetry which wraps the #perform method of every job class:

class JobRetry
  class Handled < ::RuntimeError; end
  class Skip < Handled; end
  
  # ...
  
  def local(jobinst, jobstr, queue)
    yield
  rescue Handled => ex
    raise ex
  rescue Sidekiq::Shutdown => ey
    raise ey
  rescue Exception => e
    raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)

    msg = JSON.parse(jobstr)
    if msg["retry"].nil?
      msg["retry"] = jobinst.class.get_sidekiq_options["retry"]
    end

    raise e unless msg["retry"]
    process_retry(jobinst, msg, queue, e)

    raise Skip
  end
end

This method is called “local” since it can be directly correlated to a particular instantiated job instance. We will see why it’s relevant once we get to #process_retry.

Sidekiq::Shutdown is re-raised because it should bypass the retry system, and Handled is being re-raised as well in case a developer decides to manually raise it from within a job.

Rescuing Exception (i.e. all exceptions) is the interesting part.

First of all, Sidekiq detects if an exception was raised due to the thread receiving a Sidekiq::Shutdown from Processor#kill. With this detection, only errors that are caused by a bug or some other transient error within the application code will trigger a retry. This prevents monitoring tools from being polluted unnecessarily and does not force well-behaving jobs into a retry set. We’ll skip the internals of #exception_caused_by_shutdown? for brevity, only mentioning that its secret sauce is Ruby’s exception#cause method.

Next, Sidekiq checks if the job was explicitly configured without retries - it simply re-raises the exception if that’s the case. By default, every Sidekiq job is retried up to 25 times.

Before we proceed to #process_retry, we need to look at the final line: raise Skip. Instead of re-raising the original exception, Sidekiq raises a subclass of JobRetry::Handled. We’ll see how it relates to the #global method once we get to it.

The original #process_retry method handles metadata management and formatting; the following snippet is a trimmed-down version of it:

# JobRetry

def process_retry(jobinst, msg, queue, exception)
  max_retry_attempts = retry_attempts_from(msg["retry"], @max_retries)

  msg["queue"] = (msg["retry_queue"] || queue)

  # Code that updates the `retry_count` job payload attribute, saves the `retried_at` and `failed_at` timestamps, filters and saves the backtrace, etc
  count = msg["retry_count"] # Increment logic of this value is omitted

  return retries_exhausted(jobinst, msg, exception) if count >= max_retry_attempts

  rf = msg["retry_for"]
  return retries_exhausted(jobinst, msg, exception) if rf && ((msg["failed_at"] + rf) < Time.now.to_f)

  strategy, delay = delay_for(jobinst, count, exception, msg)
  case strategy
  when :discard
    return
  when :kill
    return retries_exhausted(jobinst, msg, exception)
  end

  jitter = rand(10) * (count + 1)
  retry_at = Time.now.to_f + delay + jitter
  payload = JSON.parse(msg)
  redis do |conn|
    conn.zadd("retry", retry_at.to_s, payload)
  end
end

First, note that Sidekiq provides an ability to reroute the job into a different queue if it fails; this is what the second line of code is responsible for.

Immediately after, #retries_exhausted is called if the job exceeds its maximum retry count, or if the time frame during which it should be retried has passed.

If Sidekiq should attempt to retry the job, it calls #delay_for. This method returns a strategy and a delay in seconds. We’ll see where the strategy value comes from, but for now, understand that a job can be either discarded or killed, with the former causing Sidekiq to completely forget about the job instead of retrying it.

In case the job should actually be retried, the timestamp at which it should happen is calculated based on the obtained delay. Jitter is applied to this value to avoid the thundering herd problem, where a bunch of jobs are scheduled for a retry at the same second.

Finally, the job is added to a retry sorted set using ZADD, following a simillar process to dead jobs.

# JobRetry
def delay_for(jobinst, count, exception, msg)
  rv = begin
    block = jobinst&.sidekiq_retry_in_block
    # ...
    block&.call(count, exception, msg)
  rescue Exception => e
    # ...
    nil
  end

  rv = rv.to_i if rv.respond_to?(:to_i)
  delay = (count**4) + 15
  if Integer === rv && rv > 0
    delay = rv
  elsif rv == :discard
    return [:discard, nil]
  elsif rv == :kill
    return [:kill, nil]
  end

  [:default, delay]
end

#delay_for sources the strategy from an optional sidekiq_retry_in_block block defined on the job class by a developer. It can either return a symbol representing the strategy or an integer value representing the number of seconds for a delay.

If the delay value was not provided by a developer, it gets calculated with an exponential backoff formula, where count is the number of retries already conducted. With the default max retry count being 25, this means that a job (without a custom sidekiq_retry_in_block defined) will be retried 25 times over approximately 20 days.

Here’s what happens in the #retries_exhausted method:

# JobRetry

def retries_exhausted(jobinst, msg, exception)
  rv = begin
    block = jobinst&.sidekiq_retries_exhausted_block
    # ...
    block&.call(msg, exception)
  rescue => e
    # Log the error and do not reraise it
    # ...
  end

  return if rv == :discard
  unless msg["dead"] == false
    payload = Sidekiq.dump_json(msg)
    now = Time.now.to_f

    redis do |conn|
      conn.multi do |xa|
        xa.zadd("dead", now.to_s, payload)
        xa.zremrangebyscore("dead", "-inf", now - @capsule.config[:dead_timeout_in_seconds])
        xa.zremrangebyrank("dead", 0, - @capsule.config[:dead_max_jobs])
      end
    end
  end
end

Another optional user-supplied job-level configuration being used here is sidekiq_retries_exhausted_block, which allows discarding a job once its retries are exhausted. If it’s not provided, a job gets sent to the morgue in the exact same fashion as in Processor#process when a job payload is malformed.

The last relevant method in the retrier is #global. It’s worth remembering that it wraps a call to #local, Processor#reloader, invocation of middlewares, and pretty much every other step in the Processor#process pipeline. It’s needed to catch any exceptions raised when processing a job, including those that are not raised from within application code, on a best-effort basis. Its only difference from #local is that it discards the job if it doesn’t have a retry attribute set, or if this attribute is set to false on the job class:

# JobRetry

def global(jobstr, queue)
  yield
rescue Handled => ex
  raise ex
rescue Sidekiq::Shutdown => ey
  raise ey
rescue Exception => e
  raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)

  msg = Sidekiq.load_json(jobstr)
  if msg["retry"]
    process_retry(nil, msg, queue, e)
  else
    @capsule.config.death_handlers.each do |handler|
      handler.call(msg, e)
    rescue => handler_ex
      handle_exception(handler_ex, {context: "Error calling death handler", job: msg})
    end
  end

  raise Handled
end

You might notice that similarly to #local, this method also rescues JobRetry::Handled and does not retry in such cases, since these exceptions are already handled in the #local rescue block that raises a JobRetry::Skip - a subclass of Handled. This avoids duplicate retry processing.

We have now explored the primary process of how Sidekiq handles job processing on the server. However, what about jobs that need to be retried? Also, doesn’t Sidekiq offer a mechanism for delaying the execution of jobs until a specified time in the future? We will learn how Sidekiq handles these in the section following the next one.

Enqueueing jobs

So far, we’ve looked at the internals of Sidekiq server: Launcher, Manager, and Processor constructs, which operate within a background process responsible for job processing. Now, let’s explore how jobs make their way into the queues, which leads us to Sidekiq::Client.

Although most Sidekiq users typically don’t interact directly with Sidekiq::Client, preferring higher-level abstractions like the Job API, all the convenience methods provided by this module rely on fundamental building blocks: Client#push and Client#push_bulk, which accept different combinations of arguments.

Let’s see what Client#push does:

# Client

def push(item)
  normed = normalize_item(item)
  payload = middleware.invoke(item["class"], normed, normed["queue"], @redis_pool) do
    normed
  end
  if payload
    verify_json(payload)
    raw_push([payload])
    payload["jid"]
  end
end

normalize_item performs several operations on the payload, including argument validation and generation of a unique job identifier.

Subsequently, the payload undergoes processing through the client middleware chain, mirroring the way Processor sends job payloads through server middlewares. This enables external integrations to enhance the job payload with extra attributes and implement custom logic around job dispatches.

verify_json might not need additional explanation as it does exactly what its name suggests, but it’s how it does it that makes it worthwhile to look into:

def verify_json(item)
  job_class = item["wrapped"] || item["class"]
  args = item["args"]
  mode = Sidekiq::Config::DEFAULTS[:on_complex_arguments]

  if mode == :raise || mode == :warn
    if (unsafe_item = json_unsafe?(args))
      msg = <<~EOM
        Job arguments to #{job_class} must be native JSON types, but #{unsafe_item.inspect} is a #{unsafe_item.class}.
        See https://github.com/sidekiq/sidekiq/wiki/Best-Practices
        To disable this error, add `Sidekiq.strict_args!(false)` to your initializer.
      EOM

      if mode == :raise
        raise(ArgumentError, msg)
      else
        warn(msg)
      end
    end
  end
end

# ...

RECURSIVE_JSON_UNSAFE = {
  Integer => ->(val) {},
  Float => ->(val) {},
  TrueClass => ->(val) {},
  FalseClass => ->(val) {},
  NilClass => ->(val) {},
  String => ->(val) {},
  Array => ->(val) {
    val.each do |e|
      unsafe_item = RECURSIVE_JSON_UNSAFE[e.class].call(e)
      return unsafe_item unless unsafe_item.nil?
    end
    nil
  },
  Hash => ->(val) {
    val.each do |k, v|
      return k unless String === k

      unsafe_item = RECURSIVE_JSON_UNSAFE[v.class].call(v)
      return unsafe_item unless unsafe_item.nil?
    end
    nil
  }
}

RECURSIVE_JSON_UNSAFE.default = ->(val) { val }
RECURSIVE_JSON_UNSAFE.compare_by_identity
private_constant :RECURSIVE_JSON_UNSAFE

def json_unsafe?(item)
  RECURSIVE_JSON_UNSAFE[item.class].call(item)
end

Boiled down, the verify_json method validates job arguments expected by the #perform method to ensure they adhere to valid JSON format. In versions predating Sidekiq 7.0, the json_unsafe? method employed a straightforward check by verifying if the arguments remained unchanged after being dumped and parsed as JSON: JSON.parse(JSON.dump(item["args"])) == item["args"]. However, this approach, involving the dumping and parsing of potentially large arguments for each job, clearly warranted performance enhancements.

To address this, the RECURSIVE_JSON_UNSAFE constant was introduced. Additional improvement lies in RECURSIVE_JSON_UNSAFE.compare_by_identity, which makes the hash resolve key objects based on their object_id instead of calling hash, resulting in a faster process. Extracting a static object_id that remains unchanged during an object’s lifetime, albeit recursively, is much faster than going through 2 iterations of JSON processing.

In this new version, every Ruby class known to be ‘JSON-safe’ returns a proc that returns nil when executed. Arrays and hashes receive special treatment since they contain other objects, making this hash recursive. If a job payload contains an object of any other class not declared as a key in the hash, the arguments are deemed unsafe, and an appropriate action will be taken, with the default being to raise an error.

raw_push is the place where job gets put into Redis:

# Client

def raw_push(payloads)
  @redis_pool.with do |conn|
    retryable = true
    begin
      conn.pipelined do |pipeline|
        atomic_push(pipeline, payloads)
      end
    rescue RedisClient::Error => ex
      if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/
        conn.close
        retryable = false
        retry
      end
      raise
    end
  end
  true
end

def atomic_push(conn, payloads)
  if payloads.first.key?("at")
    conn.zadd("schedule", payloads.flat_map { |hash|
      at = hash.delete("at").to_s
      # ...
      [at, JSON.generate(hash)]
    })
  else
    queue = payloads.first["queue"]
    now = Time.now.to_f
    to_push = payloads.map { |entry|
      entry["enqueued_at"] = now
      JSON.generate(entry)
    }
    conn.sadd("queues", [queue])
    conn.lpush("queue:#{queue}", to_push)
  end
end

This method accepts an array of payloads to support #push_bulk. #push passes a single array element as an argument.

The rescue is included to handle Redis failovers that necessitate reopening the connection socket, attempting one transparent retry.

Subsequently, the Redis commands called by atomic_push are pipelined, reducing the round-trip time (RTT). Note that when jobs are pushed in a batch, all of them are submitted using a single LPUSH command, so pipelining here is not as critical as it could have been.5

atomic_push has 2 branches. The second branch is for immediate dispatches of jobs that will be processed as soon as possible. The SADD command is there to store a set of queues for monitoring purposes and is not necessarily relevant to job processing itself. However, the subsequent LPUSH command is crucial as it puts jobs into the list that is regularly fetched by a Processor instance on the server side, which we have already covered. An important detail is that LPUSH adds the elements to the front of the list - BasicFetch#retrieve_work pops the element from the tail, making Sidekiq queues employ a FIFO model.

It must be strongly emphasized here that despite the first-come first-serve nature of queues, Sidekiq does not guarantee the order of job execution. A subsequent job can and will be popped from the queue list before the preceding one may be finished by another processor. However, it is possible to set the concurrency setting of a specific capsule responsible for a set of specified queues to 1. This ensures that only 1 Processor is active at any time, making job fetching sequential. Nevertheless, this still does not guarantee ordering in case a job fails to be processed and is sent to the retry set, or in case a preceding job is lost. We will explore when the latter case can happen in the following section. In any case, if strict ordering of jobs is truly needed, Sidekiq is not the best option.

The first branch of atomic_push accommodates scheduled jobs, which we haven’t touched until now. Scheduled jobs enable users to defer their job’s execution until a specific time in the future. In order to achieve this, ZADD is used again with a sorted set, similar to dead and retry sets.

Retrying & scheduling

By now, we have explored most of the server’s internals, and we have also examined how Sidekiq#client pushes jobs into the queues, including an option to schedule a job for execution in the future. However, we have not yet explored the final component that enables the processing of scheduled jobs and jobs that need to be retried. We noted that both types of jobs get added into corresponding sorted sets using ZADD, but how do they get processed?

When we previously examined the Launcher, we intentionally glossed over Sidekiq::Scheduled::Poller. Whenever a Sidekiq process boots, it calls Launcher#run, which in turn initiates the poller thread.

def start
  @thread ||= safe_thread("scheduler") {
    initial_wait

    until @done
      enqueue
      wait
    end
  }
end

Its main loop is extremely similar to that of Processor, so I won’t cover where @done is set.

Before we delve into the details of the initial_wait, enqueue, and wait methods, let’s take a step back. Based on what we’ve seen so far, we can confidently say that multiple Sidekiq processes require no coordination at all. It’s common practice to use a single Redis instance and launch multiple Sidekiq processes, each sharing the same set of queues, to increase throughput and availability. These processes simply attempt to BRPOP jobs from queue lists atomically, and each processor gets its share of work.

However, the scheduled and retry sets operate differently. Jobs in these sets should not be processed immediately; they are pushed into the respective queues when their time comes for processing by the Processors. The Poller handles this task, but what if there are several Sidekiq processes? Each will have its own poller thread competing for access to shared Redis sets. While this isn’t an issue for normal processing of queue lists, as BRPOP operations take constant time, extracting desired keys from a scheduled or retry sorted set has a different computational complexity. Continuously bombarding Redis with commands used by the poller from several processes negatively impacts its health and performance. Therefore, the pollers must either coordinate to ensure that only one runs at a time, or their executions must be artificially spread out. Sidekiq opts for the latter option, and we’ll explore how it achieves that.

Sidekiq::Scheduled is one of the most commented modules in Sidekiq, so all of the original comments are preserved in following snippets. Here’s #initial_wait that gets called once per Sidekiq process:

# Scheduled::Poller

INITIAL_WAIT = 10

# ...

def initial_wait
  # Have all processes sleep between 5-15 seconds. 10 seconds to give time for
  # the heartbeat to register (if the poll interval is going to be calculated by the number
  # of workers), and 5 random seconds to ensure they don't all hit Redis at the same time.
  total = 0
  total += INITIAL_WAIT unless @config[:poll_interval_average]
  total += (5 * rand)

  @sleeper.pop(total)
rescue Timeout::Error
ensure
  # periodically clean out the `processes` set in Redis which can collect
  # references to dead processes over time. The process count affects how
  # often we scan for scheduled jobs.
  cleanup
end

Heartbeat is mentioned in the first comment - similar to a poller, it’s another thread that routinely updates some process metadata stored in Redis. The only thing relevant to the poller is the updating of the processes set, which contains unique identifiers of each Sidekiq process; we will see how it exactly comes into play shortly.

@sleeper.pop(total) simply makes the thread wait for a specified amount of time.

# Scheduled::Poller

def cleanup
  # dont run cleanup more than once per minute
  return 0 unless redis { |conn| conn.set("process_cleanup", "1", "NX", "EX", "60") }

  count = 0
  redis do |conn|
    procs = conn.sscan("processes").to_a
    heartbeats = conn.pipelined { |pipeline|
      procs.each do |key|
        pipeline.hget(key, "info")
      end
    }

    # the hash named key has an expiry of 60 seconds.
    # if it's not found, that means the process has not reported
    # in to Redis and probably died.
    to_prune = procs.select.with_index { |proc, i|
      heartbeats[i].nil?
    }
    count = conn.srem("processes", to_prune) unless to_prune.empty?
  end

  count
end

As can be seen, the poller is responsible for cleaning out dead processes’ metadata from Redis. Despite what the comment in #initial_wait says, this only happens once a poller starts, and, under normal circumstances, it starts only when a process boots up.

Why exactly this is necessary can be seen in the #wait method, which is being called in a loop after every #enqueue:

# Scheduled::Poller

def wait
  @sleeper.pop(random_poll_interval)
rescue Timeout::Error
  # expected
rescue => ex
  # if poll_interval_average hasn't been calculated yet, we can
  # raise an error trying to reach Redis.
  logger.error ex.message
  handle_exception(ex)
  sleep 5
end

def random_poll_interval
  # We want one Sidekiq process to schedule jobs every N seconds.  We have M processes
  # and **don't** want to coordinate.
  #
  # So in N*M second timespan, we want each process to schedule once.  The basic loop is:
  #
  # * sleep a random amount within that N*M timespan
  # * wake up and schedule
  #
  # We want to avoid one edge case: imagine a set of 2 processes, scheduling every 5 seconds,
  # so N*M = 10.  Each process decides to randomly sleep 8 seconds, now we've failed to meet
  # that 5 second average. Thankfully each schedule cycle will sleep randomly so the next
  # iteration could see each process sleep for 1 second, undercutting our average.
  #
  # So below 10 processes, we special case and ensure the processes sleep closer to the average.
  # In the example above, each process should schedule every 10 seconds on average. We special
  # case smaller clusters to add 50% so they would sleep somewhere between 5 and 15 seconds.
  # As we run more processes, the scheduling interval average will approach an even spread
  # between 0 and poll interval so we don't need this artifical boost.
  #
  count = process_count
  interval = poll_interval_average(count)
  
  if count < 10
    # For small clusters, calculate a random interval that is ±50% the desired average.
    interval * rand + interval.to_f / 2
  else
    # With 10+ processes, we should have enough randomness to get decent polling
    # across the entire timespan
    interval * rand
  end
end

# We do our best to tune the poll interval to the size of the active Sidekiq
# cluster.  If you have 30 processes and poll every 15 seconds, that means one
# Sidekiq is checking Redis every 0.5 seconds - way too often for most people
# and really bad if the retry or scheduled sets are large.
#
# Instead try to avoid polling more than once every 15 seconds.  If you have
# 30 Sidekiq processes, we'll poll every 30 * 15 or 450 seconds.
# To keep things statistically random, we'll sleep a random amount between
# 225 and 675 seconds for each poll or 450 seconds on average.  Otherwise restarting
# all your Sidekiq processes at the same time will lead to them all polling at
# the same time: the thundering herd problem.
#
# We only do this if poll_interval_average is unset (the default).
def poll_interval_average(count)
  @config[:poll_interval_average] || scaled_poll_interval(count)
end

# Calculates an average poll interval based on the number of known Sidekiq processes.
# This minimizes a single point of failure by dispersing check-ins but without taxing
# Redis if you run many Sidekiq processes.
def scaled_poll_interval(process_count)
  process_count * @config[:average_scheduled_poll_interval]
end

def process_count
  pcount = Sidekiq.redis { |conn| conn.scard("processes") }
  pcount = 1 if pcount == 0
  pcount
end

Poll interval logic is explained in detail thanks to the comprehensive comments. The amount of active Sidekiq processes consuming jobs from the same Redis installation is taken into consideration when calculating an interval with which pollers should contact Redis. It all boils down to minimizing concurrent poller operations.

Finally, let’s see what all of this effort is for. Poller#enqueue instantiates Sidekiq::Scheduled::Enq, which encapsulates scheduled dequeue logic and calls #enqueue_jobs on it:

class Enq
  LUA_ZPOPBYSCORE = <<~LUA
    local key, now = KEYS[1], ARGV[1]
    local jobs = redis.call("zrange", key, "-inf", now, "byscore", "limit", 0, 1)
    if jobs[1] then
      redis.call("zrem", key, jobs[1])
      return jobs[1]
    end
  LUA

  # ...

  def enqueue_jobs(sorted_sets = %w[retry schedule])
    # A job's "score" in Redis is the time at which it should be processed.
    # Just check Redis for the set of jobs with a timestamp before now.
    redis do |conn|
      sorted_sets.each do |sorted_set|
        # Get next item in the queue with score (time to execute) <= now.
        # We need to go through the list one at a time to reduce the risk of something
        # going wrong between the time jobs are popped from the scheduled queue and when
        # they are pushed onto a work queue and losing the jobs.
        while !@done && (job = zpopbyscore(conn, keys: [sorted_set], argv: [Time.now.to_f.to_s]))
          @client.push(JSON.generate(job))
        end
      end
    end
  end

  # ...

  private

  def zpopbyscore(conn, keys: nil, argv: nil)
    if @lua_zpopbyscore_sha.nil?
      @lua_zpopbyscore_sha = conn.script(:load, LUA_ZPOPBYSCORE)
    end

    conn.call("EVALSHA", @lua_zpopbyscore_sha, keys.size, *keys, *argv)
  rescue RedisClient::CommandError => e
    raise unless e.message.start_with?("NOSCRIPT")

    @lua_zpopbyscore_sha = nil
    retry
  end
end

First thing to note is the LUA_ZPOPBYSCORE string, along with SCRIPT and EVALSHA Redis commands. This is Sidekiq making use of a Redis feature that allows execution of atomic Lua scripts on the server. The script is preloaded and cached in Redis using conn.script so that subsequent commands utilizing it can benefit from increased performance.

The script itself takes an array of sets (retry and schedule) and a current timestamp (note that Lua array indexing starts with 1). It then calls ZRANGE, which returns a single job with the lowest timestamp that should be dispatched to the queue. The job element is then removed from the set using ZREM.

The complexity of ZRANGE is O(log(N) + M), where N is the total number of elements in the set and M is the number of elements returned. However, Sidekiq dequeues one scheduled or to-be-retried job at a time to be safe, as the comment suggests, meaning that the overall complexity will be O(M * log(N)).

ZREM itself is already O(M * log(N)), so the overall complexity doesn’t change by the fact that the whole script is called for every relevant job.

For every successfully dequeued job, Client#push is called to dispatch the job to its target queue.

This, along with the fact that Redis is mostly single-threaded, makes it clear why spreading out the pollers in time is so important. With several processes and a large enough retry or scheduled set (and in my experience, retry set sizes can easily be in the order of thousands, if not more), executing this logic concurrently can significantly impair Redis’s performance or even grind it to a halt.

Job loss potential

Now that we’ve explored the complete lifecycle of a job as it’s processed by the server, let’s examine potential scenarios where a job might become lost.

We know that the OS version of Sidekiq only provides BasicFetch as a means to dequeue jobs from Redis, which effectively removes them from the corresponding queue lists. This means that if a Sidekiq process is terminated abruptly, such as with a SIGKILL, all in-progress jobs will be lost. For instance, this can occur if the process exhausts its memory, prompting Kubernetes to forcefully terminate the pod with a kill -9 command, or if other orchestrators and hosting providers take similar actions.

But how does Sidekiq manage hanging, long-running, or simply in-progress jobs during a graceful termination?

Let’s revisit Manager#hard_shutdown, which is called when a Sidekiq process receives a SIGTERM or SIGINT:

# Manager

def hard_shutdown
  cleanup = nil
  @plock.synchronize do
    cleanup = @workers.dup
  end

  if cleanup.size > 0
    jobs = cleanup.map { |p| p.job }.compact

    capsule.fetcher.bulk_requeue(jobs)
  end

  cleanup.each do |processor|
    processor.kill
  end

  # ...
end

capsule.fetcher.bulk_requeue(jobs) is called before processor threads are terminated. At this point, waiting for them to handle the Sidekiq::Shutdown exception and gracefully exit is not feasible.6 Therefore, the best option is to requeue all jobs that are in progress, even though it means that some jobs that finish just before the process exits will also be requeued.

BasicFetch#bulk_requeue simply takes the job payloads extracted from active processors and puts them back into the corresponding Redis lists:

# BasicFetch

def bulk_requeue(inprogress)
  return if inprogress.empty?

  # ...
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue] ||= []
    jobs_to_requeue[unit_of_work.queue] << unit_of_work.job
  end

  redis do |conn|
    conn.pipelined do |pipeline|
      jobs_to_requeue.each do |queue, jobs|
        pipeline.rpush(queue, jobs)
      end
    end
  end
  # ...
end

BasicFetch’s requeue logic is a potential scenario where job loss can occur. Redis might run out of memory while a job is being processed, making it impossible to put the job back into the queue. This is also a concern for ordinary retries.

Additionally, although not directly related to Sidekiq, jobs can be lost due to specific Redis server misconfigurations. The eviction policy, which determines how Redis handles memory when it’s full, plays a significant role here. Any setting other than noevict may result in jobs being evicted from Redis under certain circumstances. Furthermore, Redis can encounter failures and lose data after a reboot, particularly with certain persistence configurations. This situation can lead to Sidekiq clients incorrectly assuming that a job will be processed after a successful push, when in fact it may not reach the Sidekiq server at all.

It is also possible for the SIGTERM or SIGINT handler not to be executed within the predefined timeout period, which the orchestrator running Sidekiq waits for before sending SIGKILL. Redis connectivity issues are the most likely culprit here. Theoretically, GC pauses and the condition of the host machine can also impede the Sidekiq process during this crucial task, but I personally have never encountered, nor have I heard of, such cases where a Ruby program is stalled for 25 seconds.

It goes without saying that killing the Sidekiq process with SIGKILL without having sent SIGINT or SIGTERM prior will result in all of the in-progress jobs being lost.

It should be noted that the decision to design BasicFetch in its current manner is intentional. By using a single BRPOP command to fetch a job, Sidekiq remains extremely simple and lightweight in terms of its load on Redis. However, this also means that if your jobs absolutely must never be lost, relying solely on Sidekiq might not be the best choice. Nevertheless, it may be possible to refactor the application code and business logic that warrants strong durability guarantees to include a liveness component, such as a recurring cron job, that would attempt to ensure the completion of a specific workflow by repeatedly enqueuing a job until the desired final state is achieved.

This makes Sidekiq employ an ‘at least once’ delivery semantic, with the possibility of losing jobs during severe degradation of the system’s health. However, the Pro version ensures that job loss is impossible, as jobs never leave Redis until they are acknowledged with SuperFetch enabled, effectively making it truly ‘at least once’ without any caveats.

Sidekiq leverages simple Redis data structures like lists with constant-time push and pop operations, along with adjustable thread counts, to achieve scalability, efficiency, and ease of management. In the OS version, scaling can involve deploying multiple Sidekiq processes, each consuming jobs from the same Redis installation. The Enterprise version offers a forking model for multi-process execution, providing an alternative way to scale. For fine-grained control over queues, capsules can be used to organize processing with specific concurrency settings. Additionally, you can prioritize certain queues, enforce strict queue processing order (distinct from job ordering), or evenly poll queues. However, it’s important to note that job ordering within queues isn’t supported. Nevertheless, employing a single processor per queue could provide a rudimentary form of ordering, which may suffice depending on your application’s requirements.


  1. Based on my personal experience of having contributed to 6+ Ruby projects, as well as Github ‘used by’ stats. By no means do I claim this to be the ultimate and definitive metric. ↩︎

  2. Technically, it is possible to require the Rails application or a particular file right in the configuration template, but one might question if this is a wise decision. ↩︎

  3. The SIGTTIN handler is there for ease of debugging. The process will dump stack traces of all active threads once it receives it. ↩︎

  4. A helpful guide to what operations are not permitted in Signal.trap context on MRI can be found here↩︎

  5. Previously, this paragraph stated that pipelining in the case of atomic_push helps avoid RTT for every job submitted in a batch, which is not true. Thanks to Igor Wiedler for spotting this error. ↩︎

  6. Waiting for a thread to process an exception raised using Thread#raise can take an indefinite amount of time in certain cases. ↩︎