Manager

class
Superclass
Object
Included Modules
CZTop::Reactor::SignalHandling
Arborist::HashUtilities
Extended With
Configurability
Loggability
Arborist::MethodUtilities

The main Arborist process – responsible for coordinating all other activity.

Constants

QUEUE_SIGS

Signals the manager responds to

VALID_TREEAPI_ACTIONS

Array of actions supported by the Tree API

Attributes

checkpoint_timer[R]

The Timers::Timer that periodically checkpoints the manager's state (if it's configured to do so)

event_queue[R]

The queue of pending Event API events

event_socket[RW]

The ZeroMQ PUB socket that publishes events for the Event API

heartbeat_timer[R]

The Timers::Timer that periodically publishes a heartbeat event

linger[R]

The maximum amount of time to wait for pending events to be delivered during shutdown, in milliseconds.

nodes[RW]

The Hash of all loaded Nodes, keyed by their identifier

reactor[R]

The CZTop::Reactor that runs the event loop

root[RW]

The root node of the tree.

run_id[R]

A unique string used to identify different runs of the Manager

start_time[RW]

The time at which the manager began running.

subscriptions[RW]

The Hash of all Subscriptions, keyed by their subscription ID

tree_socket[RW]

The ZeroMQ socket REP socket that handles Tree API requests

Public Class Methods

anchor
new()

Create a new Arborist::Manager.

# File lib/arborist/manager.rb, line 97
def initialize
        @run_id = SecureRandom.hex( 16 )
        @root = Arborist::Node.create( :root )
        @nodes = { '_' => @root }

        @subscriptions = {}
        @tree_built = false

        @start_time   = nil

        @checkpoint_timer = nil
        @linger = self.class.linger
        self.log.info "Linger set to %p" % [ @linger ]

        @reactor = CZTop::Reactor.new
        @tree_socket = nil
        @event_socket = nil
        @event_queue = []

        @heartbeat_timer = nil
        @checkpoint_timer = nil
end

Public Instance Methods

anchor
tree_built()

Flag for marking when the tree is built successfully the first time

# File lib/arborist/manager.rb, line 163
attr_predicate_accessor :tree_built

Node state saving/reloading

↑ top

Public Instance Methods

anchor
cancel_checkpoint_timer()

Cancel the timer that saves tree snapshots.

# File lib/arborist/manager.rb, line 372
def cancel_checkpoint_timer
        self.reactor.remove_timer( self.checkpoint_timer )
end
anchor
cancel_heartbeat_timer()

Cancel the timer that publishes heartbeat events.

# File lib/arborist/manager.rb, line 340
def cancel_heartbeat_timer
        self.reactor.remove_timer( self.heartbeat_timer )
end
anchor
register_checkpoint_timer()

Register a periodic timer that will save a snapshot of the node tree's state to the state file on a configured interval if one is configured.

# File lib/arborist/manager.rb, line 353
def register_checkpoint_timer
        unless self.class.state_file
                self.log.info "No state file configured; skipping checkpoint timer setup."
                return nil
        end
        interval = self.class.checkpoint_frequency
        unless interval && interval.nonzero?
                self.log.info "Checkpoint frequency is %p; skipping checkpoint timer setup." % [ interval ]
                return nil
        end

        self.log.info "Setting up node state checkpoint every %0.3fs" % [ interval ]
        @checkpoint_timer = self.reactor.add_periodic_timer( interval ) do
                self.save_node_states
        end
end
anchor
register_heartbeat_timer()

Register a periodic timer that will publish a heartbeat event at a configurable interval.

# File lib/arborist/manager.rb, line 329
def register_heartbeat_timer
        interval = self.class.heartbeat_frequency

        self.log.info "Setting up to heartbeat every %ds" % [ interval ]
        @heartbeat_timer = self.reactor.add_periodic_timer( interval ) do
                self.publish_heartbeat_event
        end
end
anchor
restore_node_states()

Attempt to restore the state of loaded node from the configured state file. Returns true if it succeeded, or false if a state file wasn't configured, doesn't exist, isn't readable, or couldn't be unmarshalled.

# File lib/arborist/manager.rb, line 305
def restore_node_states
        path = self.class.state_file or return false
        return false unless path.readable?

        self.log.info "Restoring node state from %s" % [ path ]
        nodes = Marshal.load( path.open('r:binary') )

        nodes.each do |identifier, saved_node|
                self.log.debug "Loaded node: %p" % [ identifier ]
                if (( current_node = self.nodes[ identifier ] ))
                        self.log.debug "Restoring state of the %p node." % [ identifier ]
                        current_node.restore( saved_node )
                else
                        self.log.info "Not restoring state for the %s node: not present in the loaded tree." %
                                [ identifier ]
                end
        end

        return true
end
anchor
resume_checkpoint_timer()

Resume the timer that saves tree snapshots.

# File lib/arborist/manager.rb, line 378
def resume_checkpoint_timer
        self.reactor.resume_timer( self.checkpoint_timer )
end
anchor
resume_heartbeat_timer()

Resume the timer that publishes heartbeat events.

# File lib/arborist/manager.rb, line 346
def resume_heartbeat_timer
        self.reactor.resume_timer( self.heartbeat_timer )
end
anchor
save_node_states()

Write out the state of all the manager's nodes to the state_file if one is configured.

# File lib/arborist/manager.rb, line 279
def save_node_states
        start_time = Time.now
        path = self.class.state_file or return
        self.log.info "Saving current node state to %s" % [ path ]
        tmpfile = Tempfile.create(
                [path.basename.to_s.sub(path.extname, ''), path.extname],
                path.dirname.to_s,
                encoding: 'binary'
        )
        Marshal.dump( self.nodes, tmpfile )
        tmpfile.close

        File.rename( tmpfile.path, path.to_s )
        self.log.debug "Saved state file in %0.1f seconds." % [ Time.now - start_time ]

rescue SystemCallError => err
        self.log.error "%p while saving node state: %s" % [ err.class, err.message ]

ensure
        File.unlink( tmpfile.path ) if tmpfile && File.exist?( tmpfile.path )
end

Signal Handling

↑ top

Public Instance Methods

anchor
handle_signal( sig )

Handle signals.

# File lib/arborist/manager.rb, line 390
def handle_signal( sig )
        self.log.debug "Handling signal %s" % [ sig ]
        case sig
        when :INT, :TERM
                self.on_termination_signal( sig )

        when :HUP
                self.on_hangup_signal( sig )

        when :USR1
                self.on_user1_signal( sig )

        else
                self.log.warn "Unhandled signal %s" % [ sig ]
        end

end
anchor
on_hangup_signal( signo )

Handle a HUP signal. The default is to restart the handler.

# File lib/arborist/manager.rb, line 419
def on_hangup_signal( signo )
        self.log.warn "Hangup (%p)" % [ signo ]
        self.restart
end
anchor
on_interrupt_signal( signo )
anchor
on_termination_signal( signo )

Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to on_interrupt_signal.

# File lib/arborist/manager.rb, line 411
def on_termination_signal( signo )
        self.log.warn "Terminated (%p)" % [ signo ]
        self.stop
end
Also aliased as: on_interrupt_signal
anchor
on_user1_signal( signo )

Handle a USR1 signal. Writes a message to the log.

# File lib/arborist/manager.rb, line 426
def on_user1_signal( signo )
        self.log.info "Checkpoint: User signal."
        self.save_node_states
end

Startup/Shutdown

↑ top

Public Instance Methods

anchor
cancel_timers()

Register the Manager's timers.

# File lib/arborist/manager.rb, line 229
def cancel_timers
        self.cancel_heartbeat_timer
        self.cancel_checkpoint_timer
end
anchor
inspect()

Return a human-readable representation of the Manager suitable for debugging.

# File lib/arborist/manager.rb, line 263
def inspect
        return "#<%p:%#x {runid: %s} %d nodes>" % [
                self.class,
                self.object_id * 2,
                self.run_id,
                self.nodes.length,
        ]
end
anchor
register_timers()

Register the Manager's timers.

# File lib/arborist/manager.rb, line 222
def register_timers
        self.register_checkpoint_timer
        self.register_heartbeat_timer
end
anchor
restart()

Restart the manager

# File lib/arborist/manager.rb, line 250
def restart
        raise NotImplementedError
end
anchor
run()

Setup sockets and start the event loop.

# File lib/arborist/manager.rb, line 185
def run
        self.log.info "Getting ready to start the manager."
        self.setup_sockets
        self.register_timers
        self.with_signal_handler( reactor, *QUEUE_SIGS ) do
                self.start_accepting_requests
        end
ensure
        self.shutdown_sockets
        self.save_node_states
end
anchor
running?()

Returns true if the Manager is running.

# File lib/arborist/manager.rb, line 214
def running?
        return self.reactor &&
                self.event_socket &&
                self.reactor.registered?( self.event_socket )
end
anchor
setup_sockets()

Create the sockets used by the manager and bind them to the appropriate endpoints.

# File lib/arborist/manager.rb, line 200
def setup_sockets
        self.setup_tree_socket
        self.setup_event_socket
end
anchor
shutdown_sockets()

Shut down the sockets used by the manager.

# File lib/arborist/manager.rb, line 207
def shutdown_sockets
        self.shutdown_tree_socket
        self.shutdown_event_socket
end
anchor
start_accepting_requests()

Start a loop, accepting a request and handling it.

# File lib/arborist/manager.rb, line 236
def start_accepting_requests
        self.log.debug "Starting the main loop"

        self.start_time = Time.now

        self.reactor.register( self.tree_socket, :read, &self.method(:on_tree_socket_event) )
        self.reactor.register( self.event_socket, :write, &self.method(:on_event_socket_event) )

        self.log.debug "Manager running."
        return self.reactor.start_polling( ignore_interrupts: true )
end
anchor
stop()

Stop the manager.

# File lib/arborist/manager.rb, line 256
def stop
        self.log.info "Stopping the manager."
        self.reactor.stop_polling
end

Tree API

↑ top

Public Instance Methods

anchor
add_node( node )

Add the specified node to the Manager.

# File lib/arborist/manager.rb, line 481
def add_node( node )
        identifier = node.identifier

        raise Arborist::NodeError, "Node %p already present." % [ identifier ] if self.nodes[ identifier ]
        self.nodes[ identifier ] = node

        if self.tree_built?
                self.link_node( node )
                self.publish_system_event( 'node_added', node: identifier )
        end
end
anchor
all_nodes( &block )

Yield each node in a depth-first traversal of the manager's tree to the specified block, or return an Enumerator if no block is given.

# File lib/arborist/manager.rb, line 914
def all_nodes( &block )
        iter = self.enumerator_for( self.root )
        return iter.each( &block ) if block
        return iter
end
anchor
ancestors_for( node )

Return the Array of all nodes above the specified node.

# File lib/arborist/manager.rb, line 970
def ancestors_for( node )
        parent_id = node.parent or return []
        parent = self.nodes[ parent_id ]
        return [ parent ] + self.ancestors_for( parent )
end
anchor
build_tree()

Build the tree out of all the loaded nodes.

# File lib/arborist/manager.rb, line 448
def build_tree
        self.log.info "Building tree from %d loaded nodes." % [ self.nodes.length ]

        # Build primary tree structure
        self.nodes.each_value do |node|
                next if node.operational?
                self.link_node_to_parent( node )
        end
        self.tree_built = true

        # Set up secondary dependencies
        self.nodes.each_value do |node|
                node.register_secondary_dependencies( self )
        end

        self.restore_node_states
end
anchor
create_subscription( identifier, event_pattern, criteria, negative_criteria={} )

Create a subscription that publishes to the Manager's event publisher for the node with the specified identifier and event_pattern, using the given criteria when considering an event.

# File lib/arborist/manager.rb, line 1083
def create_subscription( identifier, event_pattern, criteria, negative_criteria={} )
        sub = Arborist::Subscription.new( event_pattern, criteria, negative_criteria ) do |*args|
                self.publish( *args )
        end
        self.subscribe( identifier, sub )

        return sub
end
anchor
depth_limited_enumerator_for( start_node, depth, &filter )

Return a depth limited enumerator for the specified start_node.

# File lib/arborist/manager.rb, line 945
def depth_limited_enumerator_for( start_node, depth, &filter )
        return Enumerator.new do |yielder|
                traverse = ->( node, current_depth ) do
                        self.log.debug "Enumerating nodes from %s at depth: %p" %
                                [ node.identifier, current_depth ]

                        if !filter || filter.call( node )
                                yielder.yield( node )
                                node.each do |child|
                                        traverse[ child, current_depth - 1 ]
                                end if current_depth > 0
                        end
                end
                traverse.call( start_node, depth )
        end
end
anchor
descendants_for( node )

Return an Array of all nodes below the specified node.

# File lib/arborist/manager.rb, line 964
def descendants_for( node )
        return self.enumerator_for( node ).to_a
end
anchor
dispatch_request( raw_request )

Handle the specified raw_request and return a response.

# File lib/arborist/manager.rb, line 614
def dispatch_request( raw_request )
        raise "Manager is shutting down" unless self.running?

        header, body = Arborist::TreeAPI.decode( raw_request )
        handler = self.lookup_tree_request_action( header )

        return handler.call( header, body )

rescue => err
        self.log.error "%p: %s" % [ err.class, err.message ]
        err.backtrace.each {|frame| self.log.debug "  #{frame}" }

        errtype = case err
                when Arborist::MessageError,
                     Arborist::ConfigError,
                     Arborist::NodeError
                        'client'
                else
                        'server'
                end

        return Arborist::TreeAPI.error_response( errtype, err.message )
end
anchor
enumerator_for( start_node, &filter )

Return an enumerator for the specified start_node.

# File lib/arborist/manager.rb, line 931
def enumerator_for( start_node, &filter )
        return Enumerator.new do |yielder|
                traverse = ->( node ) do
                        if !filter || filter.call( node )
                                yielder.yield( node )
                                node.each( &traverse )
                        end
                end
                traverse.call( start_node )
        end
end
anchor
find_matching_node_states( filter, return_values, exclude_down=false, negative_filter={} )

Traverse the node tree and return the specified return_values from any nodes which match the given filter, skipping downed nodes and all their children if exclude_down is set. If return_values is set to nil, then all values from the node will be returned.

# File lib/arborist/manager.rb, line 542
def find_matching_node_states( filter, return_values, exclude_down=false, negative_filter={} )
        nodes_iter = if exclude_down
                        self.reachable_nodes
                else
                        self.all_nodes
                end

        states = nodes_iter.
                select {|node| node.matches?(filter) }.
                reject {|node| !negative_filter.empty? && node.matches?(negative_filter) }.
                each_with_object( {} ) do |node, hash|
                        hash[ node.identifier ] = node.fetch_values( return_values )
                end

        return states
end
anchor
handle_ack_request( header, body )

Acknowledge a node

# File lib/arborist/manager.rb, line 870
def handle_ack_request( header, body )
        self.log.info "ACK: %p" % [ header ]

        identifier = header[ 'identifier' ] or
                return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for ACK.' )
        node = self.nodes[ identifier ] or
                return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )

        self.log.debug "Acking the %s node: %p" % [ identifier, body ]

        body = symbolify_keys( body )
        events = node.acknowledge( **body )
        self.propagate_events( node, events )

        return Arborist::TreeAPI.successful_response( nil )
end
anchor
handle_deps_request( header, body )

Return a response to the `deps` action.

# File lib/arborist/manager.rb, line 728
def handle_deps_request( header, body )
        self.log.info "DEPS: %p" % [ header ]
        from = header['from'] || '_'

        deps = self.merge_dependencies_from( from )
        deps.delete( from )

        return Arborist::TreeAPI.successful_response({ deps: deps.to_a })

rescue Arborist::ClientError => err
        return Arborist::TreeAPI.error_response( 'client', err.message )
end
anchor
handle_fetch_request( header, body )

Return a repsonse to the `fetch` action.

# File lib/arborist/manager.rb, line 701
def handle_fetch_request( header, body )
        self.log.info "FETCH: %p" % [ header ]
        from  = header['from'] || '_'
        depth = header['depth']
        tree  = header['tree']

        start_node = self.nodes[ from ] or
                return Arborist::TreeAPI.error_response( 'client', "No such node %s." % [from] )
        self.log.debug "  Listing nodes under %p" % [ start_node ]

        if tree
                iter = [ start_node.to_h(depth: (depth || -1)) ]
        elsif depth
                self.log.debug "    depth limited to %d" % [ depth ]
                iter = self.depth_limited_enumerator_for( start_node, depth )
        else
                self.log.debug "    no depth limit"
                iter = self.enumerator_for( start_node )
        end
        data = iter.map( &:to_h )
        self.log.debug "  got data for %d nodes" % [ data.length ]

        return Arborist::TreeAPI.successful_response( data )
end
anchor
handle_graft_request( header, body )

Add a node

# File lib/arborist/manager.rb, line 811
def handle_graft_request( header, body )
        self.log.info "GRAFT: %p" % [ header ]

        identifier = header[ 'identifier' ] or
                return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for GRAFT.' )

        if self.nodes[ identifier ]
                return Arborist::TreeAPI.error_response( 'client', "Node %p already exists." % [identifier] )
        end

        type = header[ 'type' ] or
                return Arborist::TreeAPI.error_response( 'client', 'No type specified for GRAFT.' )
        parent = header[ 'parent' ] || '_'
        parent_node = self.nodes[ parent ] or
                return Arborist::TreeAPI.error_response( 'client', 'No parent node found for %s.' % [parent] )

        self.log.debug "Grafting a new %s node under %p" % [ type, parent_node ]

        # If the parent has a factory method for the node type, use it, otherwise
        # use the Pluggability factory
        node = if parent_node.respond_to?( type )
                        parent_node.method( type ).call( identifier, body )
                else
                        body.merge!( parent: parent )
                        Arborist::Node.create( type, identifier, body )
                end

        self.add_node( node )

        return Arborist::TreeAPI.successful_response( node ? {identifier: node.identifier} : nil )
end
anchor
handle_modify_request( header, body )

Modify a node's operational attributes

# File lib/arborist/manager.rb, line 845
def handle_modify_request( header, body )
        self.log.info "MODIFY: %p" % [ header ]

        identifier = header[ 'identifier' ] or
                return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for MODIFY.' )
        return Arborist::TreeAPI.error_response( 'client', "Unable to MODIFY root node." ) if identifier == '_'
        node = self.nodes[ identifier ] or
                return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )

        self.log.debug "Modifying operational attributes of the %s node: %p" % [ identifier, body ]

        if new_parent_identifier = body.delete( 'parent' )
                old_parent = self.nodes[ node.parent ]
                new_parent = self.nodes[ new_parent_identifier ] or
                        return Arborist::TreeAPI.error_response( 'client', "No such parent node: %p" % [new_parent_identifier] )
                node.reparent( old_parent, new_parent )
        end

        node.modify( body )

        return Arborist::TreeAPI.successful_response( nil )
end
anchor
handle_prune_request( header, body )

Remove a node and its children.

# File lib/arborist/manager.rb, line 799
def handle_prune_request( header, body )
        self.log.info "PRUNE: %p" % [ header ]

        identifier = header[ 'identifier' ] or
                return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for PRUNE.' )
        node = self.remove_node( identifier )

        return Arborist::TreeAPI.successful_response( node ? node.to_h : nil )
end
anchor
handle_search_request( header, body )

Return a response to the 'search' action.

# File lib/arborist/manager.rb, line 762
def handle_search_request( header, body )
        self.log.info "SEARCH: %p" % [ header ]

        exclude_down = header['exclude_down']
        values = if header.key?( 'return' )
                        header['return'] || []
                else
                        nil
                end

        body = [ body ] unless body.is_a?( Array )
        positive = body.shift
        negative = body.shift || {}
        states = self.find_matching_node_states( positive, values, exclude_down, negative )

        return Arborist::TreeAPI.successful_response( states )
end
anchor
handle_status_request( header, body )

Return a response to the `status` action.

# File lib/arborist/manager.rb, line 655
def handle_status_request( header, body )
        self.log.info "STATUS: %p" % [ header ]
        return Arborist::TreeAPI.successful_response(
                server_version: Arborist::VERSION,
                state: self.running? ? 'running' : 'not running',
                uptime: self.uptime,
                nodecount: self.nodecount
        )
end
anchor
handle_subscribe_request( header, body )

Return a response to the `subscribe` action.

# File lib/arborist/manager.rb, line 667
def handle_subscribe_request( header, body )
        self.log.info "SUBSCRIBE: %p" % [ header ]
        event_type      = header[ 'event_type' ]
        node_identifier = header[ 'identifier' ]

        body = [ body ] unless body.is_a?( Array )
        positive = body.shift
        negative = body.shift || {}

        subscription = self.create_subscription( node_identifier, event_type, positive, negative )
        self.log.info "Subscription to %s events at or under %s: %p" %
                [ event_type || 'all', node_identifier || 'the root node', subscription ]

        return Arborist::TreeAPI.successful_response( id: subscription.id )
end
anchor
handle_unack_request( header, body )

Un-acknowledge a node

# File lib/arborist/manager.rb, line 889
def handle_unack_request( header, body )
        self.log.info "UNACK: %p" % [ header ]

        identifier = header[ 'identifier' ] or
                return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNACK.' )
        node = self.nodes[ identifier ] or
                return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )

        self.log.debug "Unacking the %s node: %p" % [ identifier, body ]

        events = node.unacknowledge
        self.propagate_events( node, events )

        return Arborist::TreeAPI.successful_response( nil )
end
anchor
handle_unsubscribe_request( header, body )

Return a response to the `unsubscribe` action.

# File lib/arborist/manager.rb, line 685
def handle_unsubscribe_request( header, body )
        self.log.info "UNSUBSCRIBE: %p" % [ header ]
        subscription_id = header[ 'subscription_id' ] or
                return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNSUBSCRIBE.' )
        subscription = self.remove_subscription( subscription_id ) or
                return Arborist::TreeAPI.successful_response( nil )

        self.log.info "Destroyed subscription: %p" % [ subscription ]
        return Arborist::TreeAPI.successful_response(
                event_type: subscription.event_type,
                criteria: subscription.criteria
        )
end
anchor
handle_update_request( header, body )

Update nodes using the data from the update request's body.

# File lib/arborist/manager.rb, line 782
def handle_update_request( header, body )
        self.log.info "UPDATE: %p" % [ header ]

        unless body.respond_to?( :each )
                return Arborist::TreeAPI.error_response( 'client', 'Malformed update: body does not respond to #each' )
        end

        monitor_key = header['monitor_key']
        body.each do |identifier, properties|
                self.update_node( identifier, properties, monitor_key )
        end

        return Arborist::TreeAPI.successful_response( nil )
end
anchor anchor anchor
load_tree( enumerator )

Add nodes yielded from the specified enumerator into the manager's tree.

# File lib/arborist/manager.rb, line 439
def load_tree( enumerator )
        enumerator.each do |node|
                self.add_node( node )
        end
        self.build_tree
end
anchor
lookup_tree_request_action( header )

Given a request header, return a call-able object that can handle the response.

# File lib/arborist/manager.rb, line 640
def lookup_tree_request_action( header )
        raise Arborist::MessageError, "unsupported version %d" % [ header['version'] ] unless
                header['version'] == 1

        action = header['action'] or
                raise Arborist::MessageError, "missing required header 'action'"
        raise Arborist::MessageError, "No such action '%s'" % [ action ] unless
                VALID_TREEAPI_ACTIONS.include?( action )

        handler_name = "handle_%s_request" % [ action ]
        return self.method( handler_name )
end
anchor
merge_dependencies_from( from, deps_set=Set.new )

Recurse into the children and secondary dependencies of the from node and merge the identifiers of the traversed nodes into the deps_set.

# File lib/arborist/manager.rb, line 744
def merge_dependencies_from( from, deps_set=Set.new )
        return deps_set unless deps_set.add?( from )

        start_node = self.nodes[ from ] or
                raise Arborist::ClientError "No such node %s." % [ from ]

        self.enumerator_for( start_node ).each do |subnode|
                deps_set.add( subnode.identifier )
                subnode.node_subscribers.each do |subdep|
                        self.merge_dependencies_from( subdep, deps_set )
                end
        end

        return deps_set
end
anchor
nodecount()

Return the number of nodes in the manager's tree.

# File lib/arborist/manager.rb, line 568
def nodecount
        return self.nodes.length
end
anchor
nodelist()

Return an Array of the identifiers of all nodes in the manager's tree.

# File lib/arborist/manager.rb, line 574
def nodelist
        return self.nodes.keys
end
anchor
on_event_socket_event( event )

IO event handler for the event socket.

# File lib/arborist/manager.rb, line 1033
def on_event_socket_event( event )
        if event.writable?
                if (( msg = self.event_queue.shift ))
                        # self.log.debug "Publishing event %p" % [ msg ]
                        event.socket << msg
                end
        else
                raise "Unhandled event %p on the event socket" % [ event ]
        end

        self.unregister_event_socket if self.event_queue.empty?
end
anchor
on_tree_socket_event( event )

ZMQ::Handler API – Read and handle an incoming request.

# File lib/arborist/manager.rb, line 602
def on_tree_socket_event( event )
        if event.readable?
                request = event.socket.receive
                msg = self.dispatch_request( request )
                event.socket << msg
        else
                raise "Unsupported event %p on tree API socket!" % [ event ]
        end
end
anchor
propagate_events( node, *events )

Propagate one or more events to the specified node and its ancestors in the tree, publishing them to matching subscriptions belonging to the nodes along the way.

# File lib/arborist/manager.rb, line 1103
def propagate_events( node, *events )
        node.publish_events( *events )

        if node.parent
                self.log.debug "Propagating %d events from %s -> %s" % [
                        events.length,
                        node.identifier,
                        node.parent
                ]
                parent = self.nodes[ node.parent ] or raise "couldn't find parent %p of node %p!" %
                        [ node.parent, node.identifier ]
                self.propagate_events( parent, *events )
        end
end
anchor
publish( identifier, event )

Publish the specified event.

# File lib/arborist/manager.rb, line 1010
def publish( identifier, event )
        self.event_queue << Arborist::EventAPI.encode( identifier, event.to_h )
        self.register_event_socket if self.running?
end
anchor
publish_heartbeat_event()

Publish a system event that observers can watch for to detect restarts.

# File lib/arborist/manager.rb, line 1048
def publish_heartbeat_event
        return unless self.start_time
        self.publish_system_event( 'heartbeat',
                run_id: self.run_id,
                start_time: self.start_time.iso8601,
                version: Arborist::VERSION
        )
end
anchor
publish_system_event( eventname, **data )

Publish an event with the specified eventname and data.

# File lib/arborist/manager.rb, line 1059
def publish_system_event( eventname, **data )
        eventname = eventname.to_s
        eventname = 'sys.' + eventname unless eventname.start_with?( 'sys.' )
        self.log.debug "Publishing %s event: %p." % [ eventname, data ]
        self.publish( eventname, data )
end
anchor
reachable_nodes( &block )

Yield each node that is not down to the specified block, or return an Enumerator if no block is given.

# File lib/arborist/manager.rb, line 923
def reachable_nodes( &block )
        iter = self.enumerator_for( self.root ) {|node| node.reachable? }
        return iter.each( &block ) if block
        return iter
end
anchor
register_event_socket()

Register the publisher with the reactor if it's not already.

# File lib/arborist/manager.rb, line 1017
def register_event_socket
        self.log.debug "Registering event socket for write events."
        self.reactor.enable_events( self.event_socket, :write ) unless
                self.reactor.event_enabled?( self.event_socket, :write )
end
anchor
remove_node( node )

Remove a node from the Manager. The node can either be the Arborist::Node to remove, or the identifier of a node.

# File lib/arborist/manager.rb, line 505
def remove_node( node )
        node = self.nodes[ node ] unless node.is_a?( Arborist::Node )
        return unless node

        raise "Can't remove an operational node" if node.operational?

        self.log.info "Removing node %p" % [ node ]
        self.publish_system_event( 'node_removed', node: node.identifier )
        node.children.each do |identifier, child_node|
                self.remove_node( child_node )
        end

        if parent_node = self.nodes[ node.parent || '_' ]
                parent_node.remove_child( node )
        end

        return self.nodes.delete( node.identifier )
end
anchor
remove_subscription( subscription_identifier )

Remove the subscription with the specified subscription_identifier from the node it's attached to and from the manager, and return it.

# File lib/arborist/manager.rb, line 1095
def remove_subscription( subscription_identifier )
        node = self.subscriptions.delete( subscription_identifier ) or return nil
        return node.remove_subscription( subscription_identifier )
end
anchor
root_node()

Return the current root node.

# File lib/arborist/manager.rb, line 907
def root_node
        return self.nodes[ '_' ]
end
anchor
setup_event_socket()

Set up the ZMQ PUB socket for published events.

# File lib/arborist/manager.rb, line 982
def setup_event_socket
        @event_socket = CZTop::Socket::PUB.new
        self.log.info "  binding the event socket (%#0x) to %p" %
                [ @event_socket.object_id * 2, Arborist.event_api_url ]
        @event_socket.options.linger = ( self.linger * 1000 ).ceil
        @event_socket.bind( Arborist.event_api_url )
end
anchor
setup_tree_socket()

Set up the ZeroMQ REP socket for the Tree API.

# File lib/arborist/manager.rb, line 585
def setup_tree_socket
        @tree_socket = CZTop::Socket::REP.new
        self.log.info "  binding the tree API socket (%#0x) to %p" %
                [ @tree_socket.object_id * 2, Arborist.tree_api_url ]
        @tree_socket.options.linger = 0
        @tree_socket.bind( Arborist.tree_api_url )
end
anchor
shutdown_event_socket()

Stop accepting events to be published

# File lib/arborist/manager.rb, line 992
def shutdown_event_socket
        start   = Time.now
        timeout = start + (self.linger.to_f / 2.0)

        self.log.warn "Waiting to empty the event queue..."
        until self.event_queue.empty?
                sleep 0.1
                break if Time.now > timeout
        end
        self.log.warn "  ... waited %0.1f seconds" % [ Time.now - start ]

        @event_socket.options.linger = 0
        @event_socket.unbind( @event_socket.last_endpoint )
        @event_socket = nil
end
anchor
shutdown_tree_socket()

Tear down the ZeroMQ REP socket.

# File lib/arborist/manager.rb, line 595
def shutdown_tree_socket
        @tree_socket.unbind( @tree_socket.last_endpoint )
        @tree_socket = nil
end
anchor
subscribe( identifier, subscription )

Add the specified subscription to the node corresponding with the given identifier.

# File lib/arborist/manager.rb, line 1068
def subscribe( identifier, subscription )
        identifier ||= '_'
        node = self.nodes[ identifier ] or raise ArgumentError, "no such node %p" % [ identifier ]

        self.log.debug "Registering subscription %p" % [ subscription ]
        node.add_subscription( subscription )
        self.log.debug " adding '%s' to the subscriptions hash." % [ subscription.id ]
        self.subscriptions[ subscription.id ] = node
        self.log.debug "  subscriptions hash: %#0x" % [ self.subscriptions.object_id ]
end
anchor
unregister_event_socket()

Unregister the event publisher socket from the reactor if it's registered.

# File lib/arborist/manager.rb, line 1025
def unregister_event_socket
        self.log.debug "Unregistering event socket for write events."
        self.reactor.disable_events( self.event_socket, :write ) if
                self.reactor.event_enabled?( self.event_socket, :write )
end
anchor
update_node( identifier, new_properties, monitor_key='_' )

Update the node with the specified identifier with the given new_properties and propagate any events generated by the update to the node and its ancestors.

# File lib/arborist/manager.rb, line 527
def update_node( identifier, new_properties, monitor_key='_' )
        unless (( node = self.nodes[identifier] ))
                self.log.warn "Update for non-existent node %p ignored." % [ identifier ]
                return []
        end

        events = node.update( new_properties, monitor_key )
        self.propagate_events( node, events )
end
anchor
uptime()

Return the duration the manager has been running in seconds.

# File lib/arborist/manager.rb, line 561
def uptime
        return 0 unless self.start_time
        return Time.now - self.start_time
end