An event-driven runner for Arborist::Monitors.
Signals the runner handles
Number of seconds between thread cleanup
The Arborist::Client that will provide the message packing and unpacking
The ZMQ::Handler subclass that handles all async IO
The Array of loaded Arborist::Monitors the runner should run.
The reactor (a ZMQ::Loop) the runner uses to drive everything
The Queue of pending requests, keyed by the callback that should be called with the results.
A hash of monitor object -> thread used to contain and track running monitor threads.
Create a new Arborist::MonitorRunner
# File lib/arborist/monitor_runner.rb, line 33
def initialize
@monitors = []
@handler = nil
@reactor = CZTop::Reactor.new
@client = Arborist::Client.new
@runner_threads = {}
@request_queue = {}
end
Create a repeating ZMQ::Timer that will run the specified monitor on its interval.
# File lib/arborist/monitor_runner.rb, line 243
def add_interval_timer_for( monitor )
interval = monitor.interval
self.log.info "Creating timer for %p" % [ monitor ]
return self.reactor.add_periodic_timer( interval ) do
unless self.runner_threads.key?( monitor )
self.run_monitor( monitor )
end
end
end
Create a one-shot ZMQ::Timer that will register the interval timer for the
specified monitor
after a random number of seconds no greater
than its splay.
# File lib/arborist/monitor_runner.rb, line 257
def add_splay_timer_for( monitor )
delay = rand( monitor.splay )
self.log.debug "Splaying registration of %p for %ds" % [ monitor, delay ]
self.reactor.add_oneshot_timer( delay ) do
self.add_interval_timer_for( monitor )
end
end
Set up a timer to clean up monitor threads.
# File lib/arborist/monitor_runner.rb, line 268
def add_thread_cleanup_timer
self.log.debug "Starting thread cleanup timer for %p." % [ self.runner_threads ]
self.reactor.add_periodic_timer( THREAD_CLEANUP_INTERVAL ) do
self.cleanup_monitor_threads
end
end
Register a timer for the specified monitor
.
# File lib/arborist/monitor_runner.rb, line 231
def add_timer_for( monitor )
interval = monitor.interval
if monitor.splay.nonzero?
self.add_splay_timer_for( monitor )
else
self.add_interval_timer_for( monitor )
end
end
Clean up any monitor runner threads that are dead.
# File lib/arborist/monitor_runner.rb, line 279
def cleanup_monitor_threads
self.runner_threads.values.reject( &:alive? ).each do |thr|
monitor = self.runner_threads.key( thr )
self.runner_threads.delete( monitor )
begin
thr.join
rescue => err
self.log.error "%p while running %s: %s" %
[ err.class, thr[:monitor_desc], err.message ]
end
end
end
Reactor callback – handle the client's socket becoming writable.
# File lib/arborist/monitor_runner.rb, line 110
def handle_io_event( event )
if event.writable?
if (( pair = self.request_queue.shift ))
callback, request = *pair
res = self.client.send_tree_api_request( request )
callback.call( res )
end
self.unregister if self.request_queue.empty?
else
raise "Unexpected %p on the tree API socket" % [ event ]
end
end
Load monitors from the specified enumerator
.
# File lib/arborist/monitor_runner.rb, line 74
def load_monitors( enumerator )
self.monitors.concat( enumerator.to_a )
end
Add the specified event
to the queue to be published to the
console event socket
# File lib/arborist/monitor_runner.rb, line 203
def queue_request( request, &callback )
self.request_queue[ callback ] = request
self.register
end
Register the handler's pollitem as being ready to write if it isn't already.
# File lib/arborist/monitor_runner.rb, line 216
def register
# self.log.debug "Registering for writing."
self.reactor.enable_events( self.client.tree_api, :write ) unless self.registered?
end
Returns true
if the runner's client socket is currently
registered for writing.
# File lib/arborist/monitor_runner.rb, line 210
def registered?
return self.reactor.event_enabled?( self.client.tree_api, :write )
end
Restart the runner
# File lib/arborist/monitor_runner.rb, line 95
def restart
# :TODO: Kill any running monitor children, cancel monitor timers, and reload
# monitors from the monitor enumerator
raise NotImplementedError
end
Run the specified monitors
# File lib/arborist/monitor_runner.rb, line 80
def run
self.monitors.each do |mon|
self.add_timer_for( mon )
end
self.add_thread_cleanup_timer
self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do
self.reactor.register( self.client.tree_api, :write, &self.method(:handle_io_event) )
self.reactor.start_polling
end
end
Update nodes with the results of a monitor's run.
# File lib/arborist/monitor_runner.rb, line 127
def run_monitor( monitor )
positive = monitor.positive_criteria
negative = monitor.negative_criteria
exclude_down = monitor.exclude_down?
props = monitor.node_properties
self.search( positive, exclude_down, props, negative ) do |nodes|
self.log.info "Running %p monitor for %d node(s)" % [
monitor.description,
nodes.length
]
unless nodes.empty?
self.runner_threads[ monitor ] = Thread.new do
Thread.current[:monitor_desc] = monitor.description
results = self.run_monitor_safely( monitor, nodes )
self.log.debug " updating with results: %p" % [ results ]
self.update( results, monitor.key ) do
self.log.debug "Updated %d via the '%s' monitor" %
[ results.length, monitor.description ]
end
end
self.log.debug "THREAD: Started %p for %p" % [ self.runner_threads[monitor], monitor ]
self.log.debug "THREAD: Runner threads have: %p" % [ self.runner_threads.to_a ]
end
end
end
Exec monitor
against the provided nodes
hash,
treating runtime exceptions as an error condition. Returns an update hash,
keyed by node identifier.
# File lib/arborist/monitor_runner.rb, line 161
def run_monitor_safely( monitor, nodes )
results = begin
monitor.run( nodes )
rescue => err
errmsg = "Exception while running %p monitor: %s: %s" % [
monitor.description,
err.class.name,
err.message
]
self.log.error "%s\n%s" % [ errmsg, err.backtrace.join("\n ") ]
nodes.keys.each_with_object({}) do |id, results|
results[id] = { error: errmsg }
end
end
return results
end
Create a search request using the runner's client, then queue the
request up with the specified block
as the callback.
# File lib/arborist/monitor_runner.rb, line 182
def search( criteria, exclude_down, properties, negative={}, &block )
search = self.client.make_search_request( criteria,
exclude_down: exclude_down,
properties: properties,
exclude: negative
)
self.queue_request( search, &block )
end
Stop the runner.
# File lib/arborist/monitor_runner.rb, line 103
def stop
self.log.info "Stopping the runner."
self.reactor.stop_polling
end
Unregister the handler's pollitem from the reactor when there's nothing ready to write.
# File lib/arborist/monitor_runner.rb, line 224
def unregister
# self.log.debug "Unregistering for writing."
self.reactor.disable_events( self.client.tree_api, :write ) if self.registered?
end
Create an update request using the runner's client, then queue the
request up with the specified block
as the callback.
# File lib/arborist/monitor_runner.rb, line 194
def update( nodemap, monitor_key, &block )
return if nodemap.empty?
update = self.client.make_update_request( nodemap, monitor_key: monitor_key )
self.queue_request( update, &block )
end
Handle signals.
# File lib/arborist/monitor_runner.rb, line 301
def handle_signal( sig )
self.log.debug "Handling signal %s" % [ sig ]
case sig
when :INT, :TERM
self.on_termination_signal( sig )
when :HUP
self.on_hangup_signal( sig )
when :USR1
self.on_user1_signal( sig )
else
self.log.warn "Unhandled signal %s" % [ sig ]
end
end
Handle a hangup by restarting the runner.
# File lib/arborist/monitor_runner.rb, line 330
def on_hangup_signal( signo )
self.log.warn "Hangup (%p)" % [ signo ]
self.restart
end
Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to on_interrupt_signal.
# File lib/arborist/monitor_runner.rb, line 322
def on_termination_signal( signo )
self.log.warn "Terminated (%p)" % [ signo ]
self.stop
end