Skip to content

Commit

Permalink
Make Notifier::Fanout faster and safer
Browse files Browse the repository at this point in the history
This commit aims to improve ActiveSupport::Notifications::Fanout. There
are three main goals here: backwards compatibility, safety, and
performance.

* Backwards compatibility

This ActiveSupport::Notifications is an old and well used interface.
Over time it has collected a lot of features and flexibility, much of
which I suspect is not used anywhere by anyone, but it is hard to know
specifics and we would at minimum need a deprecation cycle.

For this reason this aims to fully maintain compatibility. This includes
both the ability to use an alternate notification implementation instead
of Fanout, the signatures received by all types of listeners, and the
interface used on the Instrumenter and Fanout itself (including the
sometimes problematic start/finish).

* Safety

There have been issues (both recent and past) with the "timestacks"
becoming invalid, particularly when subscribing and unsubscribing within
events. This is an issue when topics are subscribed/unsubscribed to
while they are in flight.

The previous implementation would record a separate timestamp or event
object for each listener in a thread local stack. This meant that it was
essential that the listeners to start and finish were identical.

This issue is avoided by passing the listeners used to `start` the event
to `finish` (`finish_with_state` in the Instrumenter), to ensure they
are the same set in `start`/`finish`.

This commit further avoids this issue. Instead of pushing individual
times onto a stack, we now push a single object, `Handle`, onto the
stack for an event. This object holds all the subscribers (recorded at
start time) and all their associated data. This means that as long as
start/stop calls are not interleaved.

This commit also exposes `build_handle` as a public interface. This
returns the Handle object which can have start/stop called at any time
and any order safely. The one reservation I have with making this public
is that existing "evented" listeners (those receiving start/stop) may
not be ready for that (ex. if they maintain an internal thread-local
stack).

* Performance

This aims to be faster and make fewer allocations then the existing
implementation.

For time-based and event-object-based listeners, the previous
implementation created a separate object for each listener, pushing
and popping it on a thread-local stack. This is slower both because we
need to access the thread local repeatedly (hash lookups) and because
we're allocating duplicate objects.

The new implementation works by grouping similar types of listeners
together and shares either the `Event` or start/stop times between all
of them. The grouping was done so that we didn't need to allocate Events
or Times for topics which did have a listener of that type.

This implementation is significantly faster for all cases, except for
evented, which is slower.

For topics with 10 subscriptions:

*main*:

               timed     66.739k (± 2.5%) i/s -    338.800k in   5.079883s
     timed_monotonic    138.265k (± 0.6%) i/s -    699.261k in   5.057575s
        event_object     48.650k (± 0.2%) i/s -    244.250k in   5.020614s
             evented    366.559k (± 1.0%) i/s -      1.851M in   5.049727s
        unsubscribed      3.696M (± 0.5%) i/s -     18.497M in   5.005335s

*This branch*:

               timed    259.031k (± 0.6%) i/s -      1.302M in   5.025612s
     timed_monotonic    327.439k (± 1.7%) i/s -      1.665M in   5.086815s
        event_object    228.991k (± 0.3%) i/s -      1.164M in   5.083539s
             evented    296.057k (± 0.3%) i/s -      1.501M in   5.070315s
        unsubscribed      3.670M (± 0.3%) i/s -     18.376M in   5.007095s

Co-authored-by: John Crepezzi <[email protected]>
Co-authored-by: Theo Julienne <[email protected]>
  • Loading branch information
3 people committed Jun 2, 2022
1 parent 920fcce commit dbf2edb
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 83 deletions.
283 changes: 203 additions & 80 deletions activesupport/lib/active_support/notifications/fanout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,29 @@ def initialize(exceptions)
end
end

module FanoutIteration # :nodoc:
def iterate_guarding_exceptions(listeners)
exceptions = nil

listeners.each do |s|
yield s
rescue Exception => e
exceptions ||= []
exceptions << e
end

if exceptions
if exceptions.size == 1
raise exceptions.first
else
raise InstrumentationSubscriberError.new(exceptions), cause: exceptions.first
end
end

listeners
end
end

# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
#
Expand All @@ -28,19 +51,25 @@ def initialize
@string_subscribers = Hash.new { |h, k| h[k] = [] }
@other_subscribers = []
@listeners_for = Concurrent::Map.new
@groups_for = Concurrent::Map.new
super
end

def inspect # :nodoc:
total_patterns = @string_subscribers.size + @other_subscribers.size
"#<#{self.class} (#{total_patterns} patterns)>"
end

def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
subscriber = Subscribers.new(pattern, callable || block, monotonic)
synchronize do
case pattern
when String
@string_subscribers[pattern] << subscriber
@listeners_for.delete(pattern)
clear_cache(pattern)
when NilClass, Regexp
@other_subscribers << subscriber
@listeners_for.clear
clear_cache
else
raise ArgumentError, "pattern must be specified as a String, Regexp or empty"
end
Expand All @@ -53,61 +82,193 @@ def unsubscribe(subscriber_or_name)
case subscriber_or_name
when String
@string_subscribers[subscriber_or_name].clear
@listeners_for.delete(subscriber_or_name)
clear_cache(subscriber_or_name)
@other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
else
pattern = subscriber_or_name.try(:pattern)
if String === pattern
@string_subscribers[pattern].delete(subscriber_or_name)
@listeners_for.delete(pattern)
clear_cache(pattern)
else
@other_subscribers.delete(subscriber_or_name)
@listeners_for.clear
clear_cache
end
end
end
end

def inspect # :nodoc:
total_patterns = @string_subscribers.size + @other_subscribers.size
"#<#{self.class} (#{total_patterns} patterns)>"
def clear_cache(key = nil) # :nodoc:
if key
@listeners_for.delete(key)
@groups_for.delete(key)
else
@listeners_for.clear
@groups_for.clear
end
end

def start(name, id, payload)
iterate_guarding_exceptions(listeners_for(name)) { |s| s.start(name, id, payload) }
class BaseGroup # :nodoc:
include FanoutIteration

def initialize(listeners, name, id, payload)
@listeners = listeners
end

def each(&block)
iterate_guarding_exceptions(@listeners, &block)
end
end

def finish(name, id, payload, listeners = listeners_for(name))
iterate_guarding_exceptions(listeners) { |s| s.finish(name, id, payload) }
class BaseTimeGroup < BaseGroup # :nodoc:
def start(name, id, payload)
@start_time = now
end

def finish(name, id, payload)
stop_time = now
each do |listener|
listener.call(name, @start_time, stop_time, id, payload)
end
end
end

def publish(name, *args)
iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
class MonotonicTimedGroup < BaseTimeGroup # :nodoc:
private
def now
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end

def publish_event(event)
iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
class TimedGroup < BaseTimeGroup # :nodoc:
private
def now
Time.now
end
end

def iterate_guarding_exceptions(listeners)
exceptions = nil
class EventedGroup < BaseGroup # :nodoc:
def start(name, id, payload)
each do |s|
s.start(name, id, payload)
end
end

listeners.each do |s|
yield s
rescue Exception => e
exceptions ||= []
exceptions << e
def finish(name, id, payload)
each do |s|
s.finish(name, id, payload)
end
end
end

if exceptions
if exceptions.size == 1
raise exceptions.first
else
raise InstrumentationSubscriberError.new(exceptions), cause: exceptions.first
class EventObjectGroup < BaseGroup # :nodoc:
def start(name, id, payload)
@event = build_event(name, id, payload)
@event.start!
end

def finish(name, id, payload)
@event.payload = payload
@event.finish!

each do |s|
s.call(@event)
end
end

listeners
private
def build_event(name, id, payload)
ActiveSupport::Notifications::Event.new name, nil, nil, id, payload
end
end

def groups_for(name) # :nodoc:
@groups_for.compute_if_absent(name) do
listeners_for(name).group_by(&:group_class).transform_values do |s|
s.map(&:delegate)
end
end
end

# A Handle is used to record the start and finish time of event
#
# Both `#start` and `#finish` must each be called exactly once
#
# Where possible, it's best to the block form, +ActiveSupport::Notifications.instrument+
# +Handle+ is a low-level API intended for cases where the block form can't be used.
#
# handle = ActiveSupport::Notifications.instrumenter.build_handle("my.event", {})
# begin
# handle.start
# # work to be instrumented
# ensure
# handle.finish
# end
class Handle
def initialize(notifier, name, id, payload) # :nodoc:
@name = name
@id = id
@payload = payload
@groups = notifier.groups_for(name).map do |group_klass, grouped_listeners|
group_klass.new(grouped_listeners, name, id, payload)
end
@state = :initialized
end

def start
ensure_state! :initialized
@state = :started

@groups.each do |group|
group.start(@name, @id, @payload)
end
end

def finish
finish_with_values(@name, @id, @payload)
end

def finish_with_values(name, id, payload) # :nodoc:
ensure_state! :started
@state = :finished

@groups.each do |group|
group.finish(name, id, payload)
end
end

private
def ensure_state!(expected)
if @state != expected
raise ArgumentError, "expected state to be #{expected.inspect} but was #{@state.inspect}"
end
end
end

include FanoutIteration

def build_handle(name, id, payload)
Handle.new(self, name, id, payload)
end

def start(name, id, payload)
handle_stack = (IsolatedExecutionState[:_fanout_handle_stack] ||= [])
handle = build_handle(name, id, payload)
handle_stack << handle
handle.start
end

def finish(name, id, payload, listeners = nil)
handle_stack = IsolatedExecutionState[:_fanout_handle_stack]
handle = handle_stack.pop
handle.finish_with_values(name, id, payload)
end

def publish(name, *args)
iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
end

def publish_event(event)
iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
end

def listeners_for(name)
Expand Down Expand Up @@ -185,7 +346,7 @@ def unsubscribe!(*)
end

class Evented # :nodoc:
attr_reader :pattern
attr_reader :pattern, :delegate

def initialize(pattern, delegate)
@pattern = Matcher.wrap(pattern)
Expand All @@ -194,6 +355,10 @@ def initialize(pattern, delegate)
@can_publish_event = delegate.respond_to?(:publish_event)
end

def group_class
EventedGroup
end

def publish(name, *args)
if @can_publish
@delegate.publish name, *args
Expand All @@ -208,14 +373,6 @@ def publish_event(event)
end
end

def start(name, id, payload)
@delegate.start name, id, payload
end

def finish(name, id, payload)
@delegate.finish name, id, payload
end

def subscribed_to?(name)
pattern === name
end
Expand All @@ -226,63 +383,29 @@ def unsubscribe!(name)
end

class Timed < Evented # :nodoc:
def publish(name, *args)
@delegate.call name, *args
def group_class
TimedGroup
end

def start(name, id, payload)
timestack = IsolatedExecutionState[:_timestack] ||= []
timestack.push Time.now
end

def finish(name, id, payload)
timestack = IsolatedExecutionState[:_timestack]
started = timestack.pop
@delegate.call(name, started, Time.now, id, payload)
end
end

class MonotonicTimed < Evented # :nodoc:
def publish(name, *args)
@delegate.call name, *args
end
end

def start(name, id, payload)
timestack = IsolatedExecutionState[:_timestack_monotonic] ||= []
timestack.push Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def finish(name, id, payload)
timestack = IsolatedExecutionState[:_timestack_monotonic]
started = timestack.pop
@delegate.call(name, started, Process.clock_gettime(Process::CLOCK_MONOTONIC), id, payload)
class MonotonicTimed < Timed # :nodoc:
def group_class
MonotonicTimedGroup
end
end

class EventObject < Evented
def start(name, id, payload)
stack = IsolatedExecutionState[:_event_stack] ||= []
event = build_event name, id, payload
event.start!
stack.push event
end

def finish(name, id, payload)
stack = IsolatedExecutionState[:_event_stack]
event = stack.pop
event.payload = payload
event.finish!
@delegate.call event
def group_class
EventObjectGroup
end

def publish_event(event)
@delegate.call event
end

private
def build_event(name, id, payload)
ActiveSupport::Notifications::Event.new name, nil, nil, id, payload
end
end
end
end
Expand Down
Loading

0 comments on commit dbf2edb

Please sign in to comment.