Skip to content

Commit

Permalink
Transmit check-in events in thread
Browse files Browse the repository at this point in the history
Implement a check-in event scheduler, which schedules check-in events
to be transmitted in a separate thread, with a minimum wait period
of ten seconds between requests, and a wait period of a tenth of a
second before the first request, referred to as "debounce" periods.

This is a relatively minor improvement for the existing cron
check-ins, but it is a requirement for the heartbeat check-ins, both
to avoid slowing down customers' applications with blocking requests,
and to avoid misuse of the feature from spamming our servers.

The scheduler also acts as a deduplicator, removing "similar enough"
check-in events -- again, not particularly interesting for cron
check-ins, but a requirement to minimise damage when heartbeat
check-ins are misused.

Implement support for NDJSON payloads in the transmitter in order to
send all the check-ins in a single request.

The whole setup is lazily instantiated, so no threads will be spawned
until a check-in event actually needs to be transmitted. Internally,
a queue is used to communicate with the thread, which is suspended
until a message is pushed into the queue.

A second thread is used to keep track of the debounce period, and is
killed after a debounce period does not cause any events to be sent.

When `Appsignal.stop` is called, the scheduler is gracefully stopped,
blocking the call to `Appsignal.stop` until it has transmitted all
enqueued check-in events. During this time, new check-in events
cannot be scheduled.
  • Loading branch information
unflxw committed Aug 16, 2024
1 parent 7c523cc commit 46d4ca7
Show file tree
Hide file tree
Showing 15 changed files with 1,036 additions and 180 deletions.
8 changes: 8 additions & 0 deletions .changesets/send-check-ins-concurrently.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
bump: patch
type: change
---

Send check-ins concurrently. When calling `Appsignal::CheckIn.cron`, instead of blocking the current thread while the check-in events are sent, schedule them to be sent in a separate thread.

When shutting down your application manually, call `Appsignal.stop` to block until all scheduled check-ins have been sent.
1 change: 1 addition & 0 deletions lib/appsignal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def stop(called_by = nil)
end
Appsignal::Extension.stop
Appsignal::Probes.stop
Appsignal::CheckIn.stop
end

# Configure the AppSignal Ruby gem using a DSL.
Expand Down
18 changes: 18 additions & 0 deletions lib/appsignal/check_in.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,26 @@ def cron(identifier)
cron.finish
output
end

# @api private
def transmitter
@transmitter ||= Transmitter.new(
"#{Appsignal.config[:logging_endpoint]}/check_ins/json"
)
end

# @api private
def scheduler
@scheduler ||= Scheduler.new
end

# @api private
def stop
scheduler&.stop
end
end
end
end

require "appsignal/check_in/scheduler"
require "appsignal/check_in/cron"
36 changes: 2 additions & 34 deletions lib/appsignal/check_in/cron.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,6 @@
module Appsignal
module CheckIn
class Cron
class << self
# @api private
def transmitter
@transmitter ||= Appsignal::Transmitter.new(
"#{Appsignal.config[:logging_endpoint]}/check_ins/json"
)
end
end

# @api private
attr_reader :identifier, :digest

Expand All @@ -21,11 +12,11 @@ def initialize(identifier:)
end

def start
transmit_event("start")
CheckIn.scheduler.schedule(event("start"))
end

def finish
transmit_event("finish")
CheckIn.scheduler.schedule(event("finish"))
end

private
Expand All @@ -39,29 +30,6 @@ def event(kind)
:check_in_type => "cron"
}
end

def transmit_event(kind)
unless Appsignal.active?
Appsignal.internal_logger.debug(
"AppSignal not active, not transmitting cron check-in event"
)
return
end

response = self.class.transmitter.transmit(event(kind))

if response.code.to_i >= 200 && response.code.to_i < 300
Appsignal.internal_logger.debug(
"Transmitted cron check-in `#{identifier}` (#{digest}) #{kind} event"
)
else
Appsignal.internal_logger.error(
"Failed to transmit cron check-in #{kind} event: status code was #{response.code}"
)
end
rescue => e
Appsignal.internal_logger.error("Failed to transmit cron check-in #{kind} event: #{e}")
end
end
end
end
192 changes: 192 additions & 0 deletions lib/appsignal/check_in/scheduler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# frozen_string_literal: true

module Appsignal
module CheckIn
class Scheduler
INITIAL_DEBOUNCE_SECONDS = 0.1
BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS = 10

def initialize
# The mutex is used to synchronize access to the events array, the
# waker thread and the main thread, as well as queue writes
# (which depend on the events array) and closes (so they do not
# happen at the same time that an event is added to the scheduler)
@mutex = Mutex.new
# The transmitter thread will be started when an event is first added.
@thread = nil
@queue = Thread::Queue.new
# Scheduled events that have not been sent to the transmitter thread
# yet. A copy of this array is pushed to the queue by the waker thread
# after it has awaited the debounce period.
@events = []
# The waker thread is used to schedule debounces. It will be started
# when an event is first added.
@waker = nil
# For internal testing purposes.
@transmitted = 0
end

def schedule(event)
unless Appsignal.active?
Appsignal.internal_logger.debug(
"Cannot transmit #{describe([event])}: AppSignal is not active"
)
return
end

@mutex.synchronize do
if @queue.closed?
Appsignal.internal_logger.debug(
"Cannot transmit #{describe([event])}: AppSignal is stopped"
)
return
end
add_event(event)
# If we're not already waiting to be awakened from a scheduled
# debounce, schedule a short debounce, which will push the events
# to the queue and schedule a long debounce.
start_waker(INITIAL_DEBOUNCE_SECONDS) if @waker.nil?

Appsignal.internal_logger.debug(
"Scheduling #{describe([event])} to be transmitted"
)

# Make sure to start the thread after an event has been added.
@thread ||= Thread.new(&method(:run))
end
end

def stop
@mutex.synchronize do
# Flush all events before closing the queue.
push_events
rescue ClosedQueueError
# The queue is already closed (by a previous call to `#stop`)
# so it is not possible to push events to it anymore.
ensure
# Ensure calling `#stop` closes the queue and kills
# the waker thread, disallowing any further events from being
# scheduled with `#schedule`.
stop_waker
@queue.close

# Block until the thread has finished.
@thread&.join
end
end

# @api private
# For internal testing purposes.
attr_reader :thread, :waker, :queue, :events, :transmitted

private

def run
loop do
events = @queue.pop
break if events.nil?

transmit(events)
@transmitted += 1
end
end

def transmit(events)
description = describe(events)

begin
response = CheckIn.transmitter.transmit(events, :format => :ndjson)

if (200...300).include?(response.code.to_i)
Appsignal.internal_logger.debug(
"Transmitted #{description}"
)
else
Appsignal.internal_logger.error(
"Failed to transmit #{description}: #{response.code} status code"
)
end
rescue => e
Appsignal.internal_logger.error("Failed to transmit #{description}: #{e.message}")
end
end

def describe(events)
if events.empty?
# This shouldn't happen.
"no check-in events"
elsif events.length > 1
"#{events.length} check-in events"
else
event = events.first
if event[:check_in_type] == "cron"
"cron check-in `#{event[:identifier] || "unknown"}` " \
"#{event[:kind] || "unknown"} event (digest #{event[:digest] || "unknown"})" \
else
"unknown check-in event"
end
end
end

# Must be called from within a `@mutex.synchronize` block.
def add_event(event)
# Remove redundant events, keeping the newly added one, which
# should be the one with the most recent timestamp.
if event[:check_in_type] == "cron"
# Remove any existing cron check-in event with the same identifier,
# digest and kind as the one we're adding.
@events.reject! do |existing_event|
next unless existing_event[:identifier] == event[:identifier] &&
existing_event[:digest] == event[:digest] &&
existing_event[:kind] == event[:kind] &&
existing_event[:check_in_type] == "cron"

Appsignal.internal_logger.debug(
"Replacing previously scheduled #{describe([existing_event])}"
)

true
end
end

@events << event
end

# Must be called from within a `@mutex.synchronize` block.
def start_waker(debounce)
stop_waker

@waker = Thread.new do
sleep(debounce)

@mutex.synchronize do
# Make sure this waker doesn't get killed, so it can push
# events and schedule a new waker.
@waker = nil
push_events
end
end
end

# Must be called from within a `@mutex.synchronize` block.
def stop_waker
@waker&.kill
@waker&.join
@waker = nil
end

# Must be called from within a `@mutex.synchronize` block.
def push_events
return if @events.empty?

# Push a copy of the events to the queue, and clear the events array.
# This ensures that `@events` always contains events that have not
# yet been pushed to the queue.
@queue.push(@events.dup)
@events.clear

start_waker(BETWEEN_TRANSMISSIONS_DEBOUNCE_SECONDS)
end
end
end
end
2 changes: 1 addition & 1 deletion lib/appsignal/cli/diagnose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def transmit_report_to_appsignal
ENV.fetch("APPSIGNAL_DIAGNOSE_ENDPOINT", DIAGNOSE_ENDPOINT),
Appsignal.config
)
response = transmitter.transmit(:diagnose => data)
response = transmitter.transmit({ :diagnose => data })

unless response.code == "200"
puts " Error: Something went wrong while submitting the report " \
Expand Down
37 changes: 30 additions & 7 deletions lib/appsignal/transmitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
module Appsignal
# @api private
class Transmitter
CONTENT_TYPE = "application/json; charset=UTF-8"
JSON_CONTENT_TYPE = "application/json; charset=UTF-8"
NDJSON_CONTENT_TYPE = "application/x-ndjson; charset=UTF-8"

HTTP_ERRORS = [
EOFError,
Expand Down Expand Up @@ -53,17 +54,39 @@ def uri
end
end

def transmit(payload)
config.logger.debug "Transmitting payload to #{uri}"
http_client.request(http_post(payload))
def transmit(payload, format: :json)
Appsignal.internal_logger.debug "Transmitting payload to #{uri}"
http_client.request(http_post(payload, :format => format))
end

private

def http_post(payload)
def http_post(payload, format: :json)
Net::HTTP::Post.new(uri.request_uri).tap do |request|
request["Content-Type"] = CONTENT_TYPE
request.body = Appsignal::Utils::JSON.generate(payload)
request["Content-Type"] = content_type_for(format)
request.body = generate_body_for(format, payload)
end
end

def content_type_for(format)
case format
when :json
JSON_CONTENT_TYPE
when :ndjson
NDJSON_CONTENT_TYPE
else
raise ArgumentError, "Unknown Content-Type header for format: #{format}"
end
end

def generate_body_for(format, payload)
case format
when :json
Appsignal::Utils::JSON.generate(payload)
when :ndjson
Appsignal::Utils::NDJSON.generate(payload)
else
raise ArgumentError, "Unknown body generator for format: #{format}"
end
end

Expand Down
1 change: 1 addition & 0 deletions lib/appsignal/utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ module Utils
require "appsignal/utils/hash_sanitizer"
require "appsignal/utils/integration_logger"
require "appsignal/utils/json"
require "appsignal/utils/ndjson"
require "appsignal/utils/query_params_sanitizer"
15 changes: 15 additions & 0 deletions lib/appsignal/utils/ndjson.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

module Appsignal
module Utils
class NDJSON
class << self
def generate(body)
body.map do |element|
Appsignal::Utils::JSON.generate(element)
end.join("\n")
end
end
end
end
end
Loading

0 comments on commit 46d4ca7

Please sign in to comment.