Skip to content

Iteration

Mike Perham edited this page Sep 15, 2024 · 14 revisions

Starting with v7.3, Sidekiq allows jobs to declare themselves as Iterable. This means that the job can be decomposed into a sequence of elements to process; Sidekiq can stop and restart the job anywhere within this sequence using a cursor. This makes deployments with long-running jobs safer and cleaner, as long as those long-running jobs are Iterable.

This functionality was donated and adapted from @fatkodima's sidekiq-iteration gem which itself was heavily influenced by Shopify's job-iteration gem. Many thanks to him and the Shopify engineers who designed and developed this API.

Setup

Your job class must include the Sidekiq::IterableJob module, not Sidekiq::Job. You will need to provide two methods:

  • build_enumerator accepts any arguments to the job along with a cursor keyword argument. You return an Enumerator which yields (item, updated_cursor) for each item to process.
  • each_iteration accepts an item along with the array of job arguments.

Here we create 50 PostCreator jobs, each of which creates 1000 DB records. Those 50 jobs may execute concurrently but each can be interrupted at any time during that 1000 record loop. Job 0 creates Posts 0-999, Job 1 creates Post 1000-1999, etc.

50.times { |idx| PostCreator.perform_async(idx*1000, 1000) }

class PostCreator
  include Sidekiq::IterableJob

  # Each PostCreator job creates N Posts but is interruptible; upon Ctrl-C
  # it will save the Post it is creating, save the current cursor
  # and schedule itself to restart shortly in the future.
  # It will resume at the next cursor value, not at 0.
  def build_enumerator(start_at, count, **kwargs)
    @start_at = start_at
    @count = count
    logger.info { "Creating posts for #{start_at}" }
    array_enumerator((start_at...(start_at + count)).to_a, **kwargs)
  end

  def each_iteration(pid, *_unused_args)
    Post.create!(id: pid, title: "Post #{pid}", body: "Body of post #{pid}")
  end

  # Once our 1000 Posts have been created, we can do some operation to
  # those 1000 Posts in bulk. Keep in mind that only the 1000 for this job are done,
  # not all 50,000. If you want to know when all 50k of the Posts have been created,
  # you'll need Sidekiq Pro's Batch feature.
  def on_complete
    logger.info { "#{@start_at} complete, updating..." }
    PostUpdater.perform_async(@start_at, @count)
  end
end

Enumerator Support

The iteration subsystem provides helper methods to create Enumerators for a few common usecases. See lib/sidekiq/job/iterable/enumerators.rb for APIs for ActiveRecord queries, batched AR queries, CSV file processing, etc. If you want to efficiently process millions of DB records in groups of 1000 at a time, there's an API for you!

Callbacks

Sidekiq::IterableJob includes callback methods you can override:

  • on_start - the first time this job is started
  • on_resume - any future time this job is started after interruption
  • on_stop - when we are done processing for now, can be due to completion or interruption
  • on_complete - when your Enumerator has finished

How It Works

Sidekiq stores a Hash of iteration data in Redis at the key it-#{jid}:

{
  "ex" => @_executions,               # number of times this job has been started
  "c" => Sidekiq.dump_json(@_cursor), # serialized cursor
  "rt" => @_runtime                   # total job runtime in seconds
}

The cursor stores the current point in the dataset being processed. For instance, if you are processing a CSV file, the cursor might be a row number. For an Array, it would be the current index.

When the job is executing, the cursor is persisted to Redis if the job raises an error or every five seconds. The cursor can be anything, an Integer index, a more complex Hash structure, etc. but it must be serializable to JSON. See Enumerator Support for examples.

After every iteration, Sidekiq calls Sidekiq::Job#interrupted?. This is a new method available to all Sidekiq::Jobs which returns true if the Sidekiq process is in the process of shutting down. If interrupted? is true, the IterableJob will flush its current state to Redis and raise Sidekiq::Job::Interrupted, so it can be re-enqueued and restarted with the latest cursor. The retry subsystem ignores this exception but other subsystems (like Batches) will treat it as a failure.

Since Sidekiq gives jobs 25 seconds to finish by default (-t 25), each iteration in your Job must not take more than 25 seconds or the process supervisor (i.e. systemd, heroku, k8s) may kill -9 your Sidekiq process, causing job loss.

If your job crashes the Sidekiq process, it's possible for the cursor to be reset to the last savepoint and items to be processed more than once. As always, leverage DB transactions to ensure idempotency and consistent data.

The iteration data in Redis is removed after on_complete is fired. It will expire after 30 days if the job dies.

More Examples

Have a good example of an IterableJob in your app? Open an issue and include your code, we'll clean it up and post it here!

CSV

class ExampleJob
  include Sidekiq::IterableJob

  # Here we have a Record in our DB which has an associated CSV file.
  # We want to iterate through that file and create a child record for each row.
  def build_enumerator(record_id, cursor:)
    @record = Example::Record.find(record_id)

    # Download the file locally to process
    Tempfile.create do |tempfile|
      tempfile.write(@record.file.download)
      csv = CSV.open(tempfile.path, headers: true, col_sep: ';')
      CsvEnumerator.new(csv).rows(cursor:)
    end
  end

  def each_iteration(row, _record_id)
    name, identifier, status_code = row.values_at('name', 'identifier', 'status_code')
    ApplicationRecord.transaction do
      Example::Creator.create!(
        source: Example::Source.create!(
          response_type: 'batch_webhook',
          identifier:,
          data: {
            example_id: @record.example_id,
            record_id: @record.id,
            name:,
            status_code:
          }
        )
      )
    end
  end

  def on_complete
    verified_file_path = "#{VERIFIED_PATH}#{@record.file_name}"

    # Once processed, we move the CSV file to the verified bucket.
    Tempfile.create do |tempfile|
      tempfile.write(@record.file.download)
      YourCloud.bucket(BUCKET).create_file(
        tempfile.path,
        verified_file_path
      )
    end
  end
end

Notes

Clone this wiki locally