-
Notifications
You must be signed in to change notification settings - Fork 116
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement a check-in event scheduler, which schedules check-in events to be transmitted in a separate thread, with a minimum wait period of ten seconds between requests, and a wait period of a tenth of a second before the first request, referred to as "debounce" periods. This is a relatively minor improvement for the existing cron check-ins, but it is a requirement for the heartbeat check-ins, both to avoid slowing down customers' applications with blocking requests, and to avoid misuse of the feature from spamming our servers. The scheduler also acts as a deduplicator, removing "similar enough" check-in events -- again, not particularly interesting for cron check-ins, but a requirement to minimise damage when heartbeat check-ins are misused. Implement support for NDJSON payloads in the transmitter in order to send all the check-ins in a single request. The whole setup is lazily instantiated, so no threads will be spawned until a check-in event actually needs to be transmitted. Internally, a queue is used to communicate with the thread, which is suspended until a message is pushed into the queue. A second thread is used to keep track of the debounce period, and is killed after a debounce period does not cause any events to be sent. When `Appsignal.stop` is called, the scheduler is gracefully stopped, blocking the call to `Appsignal.stop` until it has transmitted all enqueued check-in events. During this time, new check-in events cannot be scheduled.
- Loading branch information
Showing
15 changed files
with
1,036 additions
and
180 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
--- | ||
bump: patch | ||
type: change | ||
--- | ||
|
||
Send check-ins concurrently. When calling `Appsignal::CheckIn.cron`, instead of blocking the current thread while the check-in events are sent, schedule them to be sent in a separate thread. | ||
|
||
When shutting down your application manually, call `Appsignal.stop` to block until all scheduled check-ins have been sent. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
# frozen_string_literal: true | ||
|
||
module Appsignal | ||
module CheckIn | ||
class Scheduler | ||
INITIAL_DEBOUNCE_SECONDS = 0.1 | ||
BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 10 | ||
|
||
def initialize | ||
# The mutex is used to synchronize access to the events array, the | ||
# waker thread and the main thread, as well as queue writes | ||
# (which depend on the events array) and closes (so they do not | ||
# happen at the same time that an event is added to the scheduler) | ||
@mutex = Mutex.new | ||
# The transmitter thread will be started when an event is first added. | ||
@thread = nil | ||
@queue = Thread::Queue.new | ||
# Scheduled events that have not been sent to the transmitter thread | ||
# yet. A copy of this array is pushed to the queue by the waker thread | ||
# after it has awaited the debounce period. | ||
@events = [] | ||
# The waker thread is used to schedule debounces. It will be started | ||
# when an event is first added. | ||
@waker = nil | ||
# For internal testing purposes. | ||
@transmitted = 0 | ||
end | ||
|
||
def schedule(event) | ||
unless Appsignal.active? | ||
Appsignal.internal_logger.debug( | ||
"Cannot transmit #{describe([event])}: AppSignal is not active" | ||
) | ||
return | ||
end | ||
|
||
@mutex.synchronize do | ||
if @queue.closed? | ||
Appsignal.internal_logger.debug( | ||
"Cannot transmit #{describe([event])}: AppSignal is stopped" | ||
) | ||
return | ||
end | ||
add_event(event) | ||
# If we're not already waiting to be awakened from a scheduled | ||
# debounce, schedule a short debounce, which will push the events | ||
# to the queue and schedule a long debounce. | ||
start_waker(INITIAL_DEBOUNCE_SECONDS) if @waker.nil? | ||
|
||
Appsignal.internal_logger.debug( | ||
"Scheduling #{describe([event])} to be transmitted" | ||
) | ||
|
||
# Make sure to start the thread after an event has been added. | ||
@thread ||= Thread.new(&method(:run)) | ||
end | ||
end | ||
|
||
def stop | ||
@mutex.synchronize do | ||
# Flush all events before closing the queue. | ||
push_events | ||
rescue ClosedQueueError | ||
# The queue is already closed (by a previous call to `#stop`) | ||
# so it is not possible to push events to it anymore. | ||
ensure | ||
# Ensure calling `#stop` closes the queue and kills | ||
# the waker thread, disallowing any further events from being | ||
# scheduled with `#schedule`. | ||
stop_waker | ||
@queue.close | ||
|
||
# Block until the thread has finished. | ||
@thread&.join | ||
end | ||
end | ||
|
||
# @api private | ||
# For internal testing purposes. | ||
attr_reader :thread, :waker, :queue, :events, :transmitted | ||
|
||
private | ||
|
||
def run | ||
loop do | ||
events = @queue.pop | ||
break if events.nil? | ||
|
||
transmit(events) | ||
@transmitted += 1 | ||
end | ||
end | ||
|
||
def transmit(events) | ||
description = describe(events) | ||
|
||
begin | ||
response = CheckIn.transmitter.transmit(events, :format => :ndjson) | ||
|
||
if (200...300).include?(response.code.to_i) | ||
Appsignal.internal_logger.debug( | ||
"Transmitted #{description}" | ||
) | ||
else | ||
Appsignal.internal_logger.error( | ||
"Failed to transmit #{description}: #{response.code} status code" | ||
) | ||
end | ||
rescue => e | ||
Appsignal.internal_logger.error("Failed to transmit #{description}: #{e.message}") | ||
end | ||
end | ||
|
||
def describe(events) | ||
if events.empty? | ||
# This shouldn't happen. | ||
"no check-in events" | ||
elsif events.length > 1 | ||
"#{events.length} check-in events" | ||
else | ||
event = events.first | ||
if event[:check_in_type] == "cron" | ||
"cron check-in `#{event[:identifier] || "unknown"}` " \ | ||
"#{event[:kind] || "unknown"} event (digest #{event[:digest] || "unknown"})" \ | ||
else | ||
"unknown check-in event" | ||
end | ||
end | ||
end | ||
|
||
# Must be called from within a `@mutex.synchronize` block. | ||
def add_event(event) | ||
# Remove redundant events, keeping the newly added one, which | ||
# should be the one with the most recent timestamp. | ||
if event[:check_in_type] == "cron" | ||
# Remove any existing cron check-in event with the same identifier, | ||
# digest and kind as the one we're adding. | ||
@events.reject! do |existing_event| | ||
next unless existing_event[:identifier] == event[:identifier] && | ||
existing_event[:digest] == event[:digest] && | ||
existing_event[:kind] == event[:kind] && | ||
existing_event[:check_in_type] == "cron" | ||
|
||
Appsignal.internal_logger.debug( | ||
"Replacing previously scheduled #{describe([existing_event])}" | ||
) | ||
|
||
true | ||
end | ||
end | ||
|
||
@events << event | ||
end | ||
|
||
# Must be called from within a `@mutex.synchronize` block. | ||
def start_waker(debounce) | ||
stop_waker | ||
|
||
@waker = Thread.new do | ||
sleep(debounce) | ||
|
||
@mutex.synchronize do | ||
# Make sure this waker doesn't get killed, so it can push | ||
# events and schedule a new waker. | ||
@waker = nil | ||
push_events | ||
end | ||
end | ||
end | ||
|
||
# Must be called from within a `@mutex.synchronize` block. | ||
def stop_waker | ||
@waker&.kill | ||
@waker&.join | ||
@waker = nil | ||
end | ||
|
||
# Must be called from within a `@mutex.synchronize` block. | ||
def push_events | ||
return if @events.empty? | ||
|
||
# Push a copy of the events to the queue, and clear the events array. | ||
# This ensures that `@events` always contains events that have not | ||
# yet been pushed to the queue. | ||
@queue.push(@events.dup) | ||
@events.clear | ||
|
||
start_waker(BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS) | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
# frozen_string_literal: true | ||
|
||
module Appsignal | ||
module Utils | ||
class NDJSON | ||
class << self | ||
def generate(body) | ||
body.map do |element| | ||
Appsignal::Utils::JSON.generate(element) | ||
end.join("\n") | ||
end | ||
end | ||
end | ||
end | ||
end |
Oops, something went wrong.