MonitorRunner

class
Superclass
Object
Included Modules
CZTop::Reactor::SignalHandling
Extended With
Loggability

An event-driven runner for Arborist::Monitors.

Constants

QUEUE_SIGS

Signals the runner handles

THREAD_CLEANUP_INTERVAL

Number of seconds between thread cleanup

Attributes

client[R]

The Arborist::Client that will provide the message packing and unpacking

handler[RW]

The ZMQ::Handler subclass that handles all async IO

monitors[R]

The Array of loaded Arborist::Monitors the runner should run.

reactor[RW]

The reactor (a ZMQ::Loop) the runner uses to drive everything

request_queue[R]

The Queue of pending requests, keyed by the callback that should be called with the results.

runner_threads[R]

A hash of monitor object -> thread used to contain and track running monitor threads.

Public Class Methods

anchor
new()

Create a new Arborist::MonitorRunner

# File lib/arborist/monitor_runner.rb, line 33
def initialize
        @monitors        = []
        @handler         = nil
        @reactor         = CZTop::Reactor.new
        @client          = Arborist::Client.new
        @runner_threads  = {}
        @request_queue   = {}
end

Public Instance Methods

anchor
add_interval_timer_for( monitor )

Create a repeating ZMQ::Timer that will run the specified monitor on its interval.

# File lib/arborist/monitor_runner.rb, line 243
def add_interval_timer_for( monitor )
        interval = monitor.interval
        self.log.info "Creating timer for %p" % [ monitor ]

        return self.reactor.add_periodic_timer( interval ) do
                unless self.runner_threads.key?( monitor )
                        self.run_monitor( monitor )
                end
        end
end
anchor
add_splay_timer_for( monitor )

Create a one-shot ZMQ::Timer that will register the interval timer for the specified monitor after a random number of seconds no greater than its splay.

# File lib/arborist/monitor_runner.rb, line 257
def add_splay_timer_for( monitor )
        delay = rand( monitor.splay )
        self.log.debug "Splaying registration of %p for %ds" % [ monitor, delay ]

        self.reactor.add_oneshot_timer( delay ) do
                self.add_interval_timer_for( monitor )
        end
end
anchor
add_thread_cleanup_timer()

Set up a timer to clean up monitor threads.

# File lib/arborist/monitor_runner.rb, line 268
def add_thread_cleanup_timer
        self.log.debug "Starting thread cleanup timer for %p." % [ self.runner_threads ]
        self.reactor.add_periodic_timer( THREAD_CLEANUP_INTERVAL ) do
                self.cleanup_monitor_threads
        end
end
anchor
add_timer_for( monitor )

Register a timer for the specified monitor.

# File lib/arborist/monitor_runner.rb, line 231
def add_timer_for( monitor )
        interval = monitor.interval

        if monitor.splay.nonzero?
                self.add_splay_timer_for( monitor )
        else
                self.add_interval_timer_for( monitor )
        end
end
anchor
cleanup_monitor_threads()

Clean up any monitor runner threads that are dead.

# File lib/arborist/monitor_runner.rb, line 279
def cleanup_monitor_threads
        self.runner_threads.values.reject( &:alive? ).each do |thr|
                monitor = self.runner_threads.key( thr )
                self.runner_threads.delete( monitor )

                begin
                        thr.join
                rescue => err
                        self.log.error "%p while running %s: %s" %
                                [ err.class, thr[:monitor_desc], err.message ]
                end
        end
end
anchor
handle_io_event( event )

Reactor callback – handle the client's socket becoming writable.

# File lib/arborist/monitor_runner.rb, line 110
def handle_io_event( event )
        if event.writable?
                if (( pair = self.request_queue.shift ))
                        callback, request = *pair
                        res = self.client.send_tree_api_request( request )
                        callback.call( res )
                end

                self.unregister if self.request_queue.empty?
        else
                raise "Unexpected %p on the tree API socket" % [ event ]
        end

end
anchor
load_monitors( enumerator )

Load monitors from the specified enumerator.

# File lib/arborist/monitor_runner.rb, line 74
def load_monitors( enumerator )
        self.monitors.concat( enumerator.to_a )
end
anchor
queue_request( request, &callback )

Add the specified event to the queue to be published to the console event socket

# File lib/arborist/monitor_runner.rb, line 203
def queue_request( request, &callback )
        self.request_queue[ callback ] = request
        self.register
end
anchor
register()

Register the handler's pollitem as being ready to write if it isn't already.

# File lib/arborist/monitor_runner.rb, line 216
def register
        # self.log.debug "Registering for writing."
        self.reactor.enable_events( self.client.tree_api, :write ) unless self.registered?
end
anchor
registered?()

Returns true if the runner's client socket is currently registered for writing.

# File lib/arborist/monitor_runner.rb, line 210
def registered?
        return self.reactor.event_enabled?( self.client.tree_api, :write )
end
anchor
restart()

Restart the runner

# File lib/arborist/monitor_runner.rb, line 95
def restart
        # :TODO: Kill any running monitor children, cancel monitor timers, and reload
        # monitors from the monitor enumerator
        raise NotImplementedError
end
anchor
run()

Run the specified monitors

# File lib/arborist/monitor_runner.rb, line 80
def run
        self.monitors.each do |mon|
                self.add_timer_for( mon )
        end

        self.add_thread_cleanup_timer

        self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do
                self.reactor.register( self.client.tree_api, :write, &self.method(:handle_io_event) )
                self.reactor.start_polling
        end
end
anchor
run_monitor( monitor )

Update nodes with the results of a monitor's run.

# File lib/arborist/monitor_runner.rb, line 127
def run_monitor( monitor )
        positive     = monitor.positive_criteria
        negative     = monitor.negative_criteria
        exclude_down = monitor.exclude_down?
        props        = monitor.node_properties

        self.search( positive, exclude_down, props, negative ) do |nodes|
                self.log.info "Running %p monitor for %d node(s)" % [
                        monitor.description,
                        nodes.length
                ]

                unless nodes.empty?
                        self.runner_threads[ monitor ] = Thread.new do
                                Thread.current[:monitor_desc] = monitor.description
                                results = self.run_monitor_safely( monitor, nodes )

                                self.log.debug "  updating with results: %p" % [ results ]
                                self.update( results, monitor.key ) do
                                        self.log.debug "Updated %d via the '%s' monitor" %
                                                [ results.length, monitor.description ]
                                end
                        end
                        self.log.debug "THREAD: Started %p for %p" % [ self.runner_threads[monitor], monitor ]
                        self.log.debug "THREAD: Runner threads have: %p" % [ self.runner_threads.to_a ]
                end
        end
end
anchor
run_monitor_safely( monitor, nodes )

Exec monitor against the provided nodes hash, treating runtime exceptions as an error condition. Returns an update hash, keyed by node identifier.

# File lib/arborist/monitor_runner.rb, line 161
def run_monitor_safely( monitor, nodes )
        results = begin
                monitor.run( nodes )
        rescue => err
                errmsg = "Exception while running %p monitor: %s: %s" % [
                        monitor.description,
                        err.class.name,
                        err.message
                ]
                self.log.error "%s\n%s" % [ errmsg, err.backtrace.join("\n  ") ]
                nodes.keys.each_with_object({}) do |id, results|
                        results[id] = { error: errmsg }
                end
        end

        return results
end
anchor
search( criteria, exclude_down, properties, negative={}, &block )

Create a search request using the runner's client, then queue the request up with the specified block as the callback.

# File lib/arborist/monitor_runner.rb, line 182
def search( criteria, exclude_down, properties, negative={}, &block )
        search = self.client.make_search_request( criteria,
                exclude_down: exclude_down,
                properties: properties,
                exclude: negative
        )
        self.queue_request( search, &block )
end
anchor
stop()

Stop the runner.

# File lib/arborist/monitor_runner.rb, line 103
def stop
        self.log.info "Stopping the runner."
        self.reactor.stop_polling
end
anchor
unregister()

Unregister the handler's pollitem from the reactor when there's nothing ready to write.

# File lib/arborist/monitor_runner.rb, line 224
def unregister
        # self.log.debug "Unregistering for writing."
        self.reactor.disable_events( self.client.tree_api, :write ) if self.registered?
end
anchor
update( nodemap, monitor_key, &block )

Create an update request using the runner's client, then queue the request up with the specified block as the callback.

# File lib/arborist/monitor_runner.rb, line 194
def update( nodemap, monitor_key, &block )
        return if nodemap.empty?
        update = self.client.make_update_request( nodemap, monitor_key: monitor_key )
        self.queue_request( update, &block )
end

Signal Handling

↑ top

Public Instance Methods

anchor
handle_signal( sig )

Handle signals.

# File lib/arborist/monitor_runner.rb, line 301
def handle_signal( sig )
        self.log.debug "Handling signal %s" % [ sig ]
        case sig
        when :INT, :TERM
                self.on_termination_signal( sig )

        when :HUP
                self.on_hangup_signal( sig )

        when :USR1
                self.on_user1_signal( sig )

        else
                self.log.warn "Unhandled signal %s" % [ sig ]
        end

end
anchor
on_hangup_signal( signo )

Handle a hangup by restarting the runner.

# File lib/arborist/monitor_runner.rb, line 330
def on_hangup_signal( signo )
        self.log.warn "Hangup (%p)" % [ signo ]
        self.restart
end
anchor
on_interrupt_signal( signo )
anchor
on_termination_signal( signo )

Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to on_interrupt_signal.

# File lib/arborist/monitor_runner.rb, line 322
def on_termination_signal( signo )
        self.log.warn "Terminated (%p)" % [ signo ]
        self.stop
end
Also aliased as: on_interrupt_signal