Skip to content

Commit

Permalink
Make created_at/enqueued_at storing integer milliseconds (#6564)
Browse files Browse the repository at this point in the history
Convert created_at and enqueued_at timestamps from Float (which are sketchy in JSON) to Integer (which are more predictable). Values are millisecond-resolution.
  • Loading branch information
fatkodima authored Dec 31, 2024
1 parent 541dd5b commit b7e116c
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 20 deletions.
57 changes: 49 additions & 8 deletions lib/sidekiq/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,19 @@ def fetch_stats_fast!
rescue
{}
end

now = Time.now.to_f
thence = job["enqueued_at"] || now
now - thence
enqueued_at = job["enqueued_at"]
if enqueued_at
if enqueued_at.is_a?(Float)
# old format
now - enqueued_at
else
now - enqueued_at / 1000.0
end
else
0.0
end
else
0.0
end
Expand Down Expand Up @@ -264,10 +274,20 @@ def latency
conn.lindex(@rname, -1)
}
return 0.0 unless entry

job = Sidekiq.load_json(entry)
now = Time.now.to_f
thence = job["enqueued_at"] || now
now - thence
enqueued_at = job["enqueued_at"]
if enqueued_at
if enqueued_at.is_a?(Float)
# old format
now - enqueued_at
else
now - enqueued_at / 1000.0
end
else
0.0
end
end

def each
Expand Down Expand Up @@ -420,11 +440,13 @@ def bid
end

def enqueued_at
self["enqueued_at"] ? Time.at(self["enqueued_at"]).utc : nil
if self["enqueued_at"]
time_from_timestamp(self["enqueued_at"])
end
end

def created_at
Time.at(self["created_at"] || self["enqueued_at"] || 0).utc
time_from_timestamp(self["created_at"] || self["enqueued_at"] || 0)
end

def tags
Expand All @@ -442,8 +464,18 @@ def error_backtrace
end

def latency
now = Time.now.to_f
now - (@item["enqueued_at"] || @item["created_at"] || now)
timestamp = @item["enqueued_at"] || @item["created_at"]
if timestamp
now = Time.now.to_f
if timestamp.is_a?(Float)
# old format
now - timestamp
else
now - timestamp / 1000.0
end
else
0.0
end
end

# Remove this job from the queue
Expand Down Expand Up @@ -492,6 +524,15 @@ def uncompress_backtrace(backtrace)
uncompressed = Zlib::Inflate.inflate(strict_base64_decoded)
Sidekiq.load_json(uncompressed)
end

def time_from_timestamp(timestamp)
if timestamp.is_a?(Float)
# old format, timestamps were stored as fractional seconds since the epoch
Time.at(timestamp).utc
else
Time.at(timestamp / 1000, timestamp % 1000, :millisecond)
end
end
end

# Represents a job within a Redis sorted set where the score
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def atomic_push(conn, payloads)
[at, Sidekiq.dump_json(hash)]
})
else
now = Time.now.to_f
now = (Time.now.to_f * 1000).floor # milliseconds since the epoch
grouped_queues = payloads.group_by { |job| job["queue"] }
conn.sadd("queues", grouped_queues.keys)
grouped_queues.each do |queue, grouped_payloads|
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/job_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def normalize_item(item)
item["class"] = item["class"].to_s
item["queue"] = item["queue"].to_s
item["retry_for"] = item["retry_for"].to_i if item["retry_for"]
item["created_at"] ||= Time.now.to_f
item["created_at"] ||= (Time.now.to_f * 1000).floor
item
end

Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/testing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def atomic_push(conn, payloads)
if Sidekiq::Testing.fake?
payloads.each do |job|
job = Sidekiq.load_json(Sidekiq.dump_json(job))
job["enqueued_at"] = Time.now.to_f unless job["at"]
job["enqueued_at"] = (Time.now.to_f * 1000).floor unless job["at"]
Queues.push(job["queue"], job["class"], job)
end
true
Expand Down
14 changes: 7 additions & 7 deletions test/api_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ class JobWithTags

SERIALIZED_JOBS = {
"5.x" => [
'{"class":"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper","wrapped":"ApiAjJob","queue":"default","args":[{"job_class":"ApiAjJob","job_id":"f1bde53f-3852-4ae4-a879-c12eacebbbb0","provider_job_id":null,"queue_name":"default","priority":null,"arguments":[1,2,3],"executions":0,"locale":"en"}],"retry":true,"jid":"099eee72911085a511d0e312","created_at":1568305542.339916,"enqueued_at":1568305542.339947}',
'{"class":"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper","wrapped":"ActionMailer::DeliveryJob","queue":"mailers","args":[{"job_class":"ActionMailer::DeliveryJob","job_id":"19cc0115-3d1c-4bbe-a51e-bfa1385895d1","provider_job_id":null,"queue_name":"mailers","priority":null,"arguments":["ApiMailer","test_email","deliver_now",1,2,3],"executions":0,"locale":"en"}],"retry":true,"jid":"37436e5504936400e8cf98db","created_at":1568305542.370133,"enqueued_at":1568305542.370241}'
'{"class":"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper","wrapped":"ApiAjJob","queue":"default","args":[{"job_class":"ApiAjJob","job_id":"f1bde53f-3852-4ae4-a879-c12eacebbbb0","provider_job_id":null,"queue_name":"default","priority":null,"arguments":[1,2,3],"executions":0,"locale":"en"}],"retry":true,"jid":"099eee72911085a511d0e312","created_at":1568305542339,"enqueued_at":1568305542339}',
'{"class":"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper","wrapped":"ActionMailer::DeliveryJob","queue":"mailers","args":[{"job_class":"ActionMailer::DeliveryJob","job_id":"19cc0115-3d1c-4bbe-a51e-bfa1385895d1","provider_job_id":null,"queue_name":"mailers","priority":null,"arguments":["ApiMailer","test_email","deliver_now",1,2,3],"executions":0,"locale":"en"}],"retry":true,"jid":"37436e5504936400e8cf98db","created_at":1568305542370,"enqueued_at":1568305542370}'
],
"6.x" => [
'{"class":"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper","wrapped":"ApiAjJob","queue":"default","args":[{"job_class":"ApiAjJob","job_id":"ff2b48d4-bdce-4825-af6b-ef8c11ab651e","provider_job_id":null,"queue_name":"default","priority":null,"arguments":[1,2,3],"executions":0,"exception_executions":{},"locale":"en","timezone":"UTC","enqueued_at":"2019-09-12T16:28:37Z"}],"retry":true,"jid":"ce121bf77b37ae81fe61b6dc","created_at":1568305717.9469702,"enqueued_at":1568305717.947005}',
'{"class":"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper","wrapped":"ActionMailer::MailDeliveryJob","queue":"mailers","args":[{"job_class":"ActionMailer::MailDeliveryJob","job_id":"2f967da1-a389-479c-9a4e-5cc059e6d65c","provider_job_id":null,"queue_name":"mailers","priority":null,"arguments":["ApiMailer","test_email","deliver_now",{"params":{"user":{"_aj_globalid":"gid://app/User/1"}, "_aj_symbol_keys":["user"]},"args":[1,2,3],"_aj_symbol_keys":["params", "args"]}],"executions":0,"exception_executions":{},"locale":"en","timezone":"UTC","enqueued_at":"2019-09-12T16:28:37Z"}],"retry":true,"jid":"469979df52bb9ef9f48b49e1","created_at":1568305717.9457421,"enqueued_at":1568305717.9457731}'
'{"class":"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper","wrapped":"ApiAjJob","queue":"default","args":[{"job_class":"ApiAjJob","job_id":"ff2b48d4-bdce-4825-af6b-ef8c11ab651e","provider_job_id":null,"queue_name":"default","priority":null,"arguments":[1,2,3],"executions":0,"exception_executions":{},"locale":"en","timezone":"UTC","enqueued_at":"2019-09-12T16:28:37Z"}],"retry":true,"jid":"ce121bf77b37ae81fe61b6dc","created_at":1568305717946,"enqueued_at":1568305717947}',
'{"class":"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper","wrapped":"ActionMailer::MailDeliveryJob","queue":"mailers","args":[{"job_class":"ActionMailer::MailDeliveryJob","job_id":"2f967da1-a389-479c-9a4e-5cc059e6d65c","provider_job_id":null,"queue_name":"mailers","priority":null,"arguments":["ApiMailer","test_email","deliver_now",{"params":{"user":{"_aj_globalid":"gid://app/User/1"}, "_aj_symbol_keys":["user"]},"args":[1,2,3],"_aj_symbol_keys":["params", "args"]}],"executions":0,"exception_executions":{},"locale":"en","timezone":"UTC","enqueued_at":"2019-09-12T16:28:37Z"}],"retry":true,"jid":"469979df52bb9ef9f48b49e1","created_at":1568305717945,"enqueued_at":1568305717945}'
],
"8.x" => [
'{"class":"Sidekiq::ActiveJob::Wrapper","wrapped":"ApiAjJob","queue":"default","args":[{"job_class":"ApiAjJob","job_id":"37649eb0-c437-4e00-8a29-85cc12da1440","provider_job_id":null,"queue_name":"default","priority":null,"arguments":[1,2,3],"executions":0,"exception_executions":{},"locale":"en","timezone":null,"enqueued_at":"2024-09-19T16:39:26.609737000Z","scheduled_at":null}],"retry":true,"jid":"ec42f101ed1b16f27c6a6188","created_at":1726763966.610073,"enqueued_at":1726763966.6101718}',
'{"class":"Sidekiq::ActiveJob::Wrapper","wrapped":"ActionMailer::MailDeliveryJob","queue":"mailers","args":[{"job_class":"ActionMailer::MailDeliveryJob","job_id":"d6573e12-dd78-454a-83d5-67df94934c82","provider_job_id":null,"queue_name":"mailers","priority":null,"arguments":["ApiMailer","test_email","deliver_now",{"args":[1,2,3],"_aj_ruby2_keywords":["args"]}],"executions":0,"exception_executions":{},"locale":"en","timezone":null,"enqueued_at":"2024-09-19T16:45:38.673195000Z","scheduled_at":null}],"retry":true,"jid":"2cd7874f7651115f453d6315","created_at":1726764338.673335,"enqueued_at":1726764338.673404}'
'{"class":"Sidekiq::ActiveJob::Wrapper","wrapped":"ApiAjJob","queue":"default","args":[{"job_class":"ApiAjJob","job_id":"37649eb0-c437-4e00-8a29-85cc12da1440","provider_job_id":null,"queue_name":"default","priority":null,"arguments":[1,2,3],"executions":0,"exception_executions":{},"locale":"en","timezone":null,"enqueued_at":"2024-09-19T16:39:26.609737000Z","scheduled_at":null}],"retry":true,"jid":"ec42f101ed1b16f27c6a6188","created_at":1726763966610,"enqueued_at":1726763966610}',
'{"class":"Sidekiq::ActiveJob::Wrapper","wrapped":"ActionMailer::MailDeliveryJob","queue":"mailers","args":[{"job_class":"ActionMailer::MailDeliveryJob","job_id":"d6573e12-dd78-454a-83d5-67df94934c82","provider_job_id":null,"queue_name":"mailers","priority":null,"arguments":["ApiMailer","test_email","deliver_now",{"args":[1,2,3],"_aj_ruby2_keywords":["args"]}],"executions":0,"exception_executions":{},"locale":"en","timezone":null,"enqueued_at":"2024-09-19T16:45:38.673195000Z","scheduled_at":null}],"retry":true,"jid":"2cd7874f7651115f453d6315","created_at":1726764338673,"enqueued_at":1726764338673}'
]
}

Expand Down Expand Up @@ -142,7 +142,7 @@ class JobWithTags
describe "enqueued" do
it "handles latency for good jobs" do
@cfg.redis do |conn|
conn.rpush "queue:default", "{\"enqueued_at\": #{Time.now.to_f}}"
conn.rpush "queue:default", "{\"enqueued_at\": #{(Time.now.to_f * 1000).floor}}"
conn.sadd "queues", ["default"]
end
s = Sidekiq::Stats.new
Expand Down
2 changes: 1 addition & 1 deletion test/scheduled_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def call(worker_class, job, queue, r)
%w[queue:queue_1 queue:queue_2 queue:queue_4 queue:queue_5].each do |queue_name|
assert_equal 1, conn.llen(queue_name)
job = Sidekiq.load_json(conn.lrange(queue_name, 0, -1)[0])
assert_equal enqueued_time.to_f, job["enqueued_at"]
assert_equal enqueued_time.to_i * 1000, job["enqueued_at"]
assert_equal created_time.to_f, job["created_at"]
end
end
Expand Down
2 changes: 1 addition & 1 deletion test/testing_fake_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def perform(a, b)
it "stubs the async call" do
assert_equal 0, DirectJob.jobs.size
assert DirectJob.perform_async(1, 2)
assert_in_delta Time.now.to_f, DirectJob.jobs.last["enqueued_at"], 0.1
assert_in_delta (Time.now.to_f * 1000).floor, DirectJob.jobs.last["enqueued_at"], 0.1
assert_equal 1, DirectJob.jobs.size
assert DirectJob.perform_in(10, 1, 2)
refute DirectJob.jobs.last["enqueued_at"]
Expand Down

0 comments on commit b7e116c

Please sign in to comment.