class ActiveSupport::Notifications::Fanout
This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.
This class is thread safe. All methods are reentrant.
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/active_support/notifications/fanout.rb, line 13 def initialize @subscribers = [] @listeners_for = Concurrent::Map.new super end
Public Instance Methods
finish(name, id, payload, listeners = listeners_for(name))
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 45 def finish(name, id, payload, listeners = listeners_for(name)) listeners.each { |s| s.finish(name, id, payload) } end
listeners_for(name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 53 def listeners_for(name) # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics) @listeners_for[name] || synchronize do # use synchronisation when accessing @subscribers @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } end end
listening?(name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 61 def listening?(name) listeners_for(name).any? end
publish(name, *args)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 49 def publish(name, *args) listeners_for(name).each { |s| s.publish(name, *args) } end
start(name, id, payload)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 41 def start(name, id, payload) listeners_for(name).each { |s| s.start(name, id, payload) } end
subscribe(pattern = nil, block = Proc.new)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 19 def subscribe(pattern = nil, block = Proc.new) subscriber = Subscribers.new pattern, block synchronize do @subscribers << subscriber @listeners_for.clear end subscriber end
unsubscribe(subscriber_or_name)
click to toggle source
# File lib/active_support/notifications/fanout.rb, line 28 def unsubscribe(subscriber_or_name) synchronize do case subscriber_or_name when String @subscribers.reject! { |s| s.matches?(subscriber_or_name) } else @subscribers.delete(subscriber_or_name) end @listeners_for.clear end end
wait()
click to toggle source
This is a sync queue, so there is no waiting.
# File lib/active_support/notifications/fanout.rb, line 66 def wait end