The main Arborist process – responsible for coordinating all other activity.
Signals the manager responds to
Array of actions supported by the Tree API
The Timers::Timer that periodically checkpoints the manager's state (if it's configured to do so)
The queue of pending Event API events
The ZeroMQ PUB socket that publishes events for the Event API
The Timers::Timer that periodically publishes a heartbeat event
The maximum amount of time to wait for pending events to be delivered during shutdown, in milliseconds.
The Hash of all loaded Nodes, keyed by their identifier
The CZTop::Reactor that runs the event loop
The root node of the tree.
A unique string used to identify different runs of the Manager
The time at which the manager began running.
The Hash of all Subscriptions, keyed by their subscription ID
The ZeroMQ socket REP socket that handles Tree API requests
Create a new Arborist::Manager.
# File lib/arborist/manager.rb, line 97
def initialize
@run_id = SecureRandom.hex( 16 )
@root = Arborist::Node.create( :root )
@nodes = { '_' => @root }
@subscriptions = {}
@tree_built = false
@start_time = nil
@checkpoint_timer = nil
@linger = self.class.linger
self.log.info "Linger set to %p" % [ @linger ]
@reactor = CZTop::Reactor.new
@tree_socket = nil
@event_socket = nil
@event_queue = []
@heartbeat_timer = nil
@checkpoint_timer = nil
end
Flag for marking when the tree is built successfully the first time
# File lib/arborist/manager.rb, line 163
attr_predicate_accessor :tree_built
Cancel the timer that saves tree snapshots.
# File lib/arborist/manager.rb, line 372
def cancel_checkpoint_timer
self.reactor.remove_timer( self.checkpoint_timer )
end
Cancel the timer that publishes heartbeat events.
# File lib/arborist/manager.rb, line 340
def cancel_heartbeat_timer
self.reactor.remove_timer( self.heartbeat_timer )
end
Register a periodic timer that will save a snapshot of the node tree's state to the state file on a configured interval if one is configured.
# File lib/arborist/manager.rb, line 353
def register_checkpoint_timer
unless self.class.state_file
self.log.info "No state file configured; skipping checkpoint timer setup."
return nil
end
interval = self.class.checkpoint_frequency
unless interval && interval.nonzero?
self.log.info "Checkpoint frequency is %p; skipping checkpoint timer setup." % [ interval ]
return nil
end
self.log.info "Setting up node state checkpoint every %0.3fs" % [ interval ]
@checkpoint_timer = self.reactor.add_periodic_timer( interval ) do
self.save_node_states
end
end
Register a periodic timer that will publish a heartbeat event at a configurable interval.
# File lib/arborist/manager.rb, line 329
def register_heartbeat_timer
interval = self.class.heartbeat_frequency
self.log.info "Setting up to heartbeat every %ds" % [ interval ]
@heartbeat_timer = self.reactor.add_periodic_timer( interval ) do
self.publish_heartbeat_event
end
end
Attempt to restore the state of loaded node from the configured state file. Returns true if it succeeded, or false if a state file wasn't configured, doesn't exist, isn't readable, or couldn't be unmarshalled.
# File lib/arborist/manager.rb, line 305
def restore_node_states
path = self.class.state_file or return false
return false unless path.readable?
self.log.info "Restoring node state from %s" % [ path ]
nodes = Marshal.load( path.open('r:binary') )
nodes.each do |identifier, saved_node|
self.log.debug "Loaded node: %p" % [ identifier ]
if (( current_node = self.nodes[ identifier ] ))
self.log.debug "Restoring state of the %p node." % [ identifier ]
current_node.restore( saved_node )
else
self.log.info "Not restoring state for the %s node: not present in the loaded tree." %
[ identifier ]
end
end
return true
end
Resume the timer that saves tree snapshots.
# File lib/arborist/manager.rb, line 378
def resume_checkpoint_timer
self.reactor.resume_timer( self.checkpoint_timer )
end
Resume the timer that publishes heartbeat events.
# File lib/arborist/manager.rb, line 346
def resume_heartbeat_timer
self.reactor.resume_timer( self.heartbeat_timer )
end
Write out the state of all the manager's nodes to the state_file if one is configured.
# File lib/arborist/manager.rb, line 279
def save_node_states
start_time = Time.now
path = self.class.state_file or return
self.log.info "Saving current node state to %s" % [ path ]
tmpfile = Tempfile.create(
[path.basename.to_s.sub(path.extname, ''), path.extname],
path.dirname.to_s,
encoding: 'binary'
)
Marshal.dump( self.nodes, tmpfile )
tmpfile.close
File.rename( tmpfile.path, path.to_s )
self.log.debug "Saved state file in %0.1f seconds." % [ Time.now - start_time ]
rescue SystemCallError => err
self.log.error "%p while saving node state: %s" % [ err.class, err.message ]
ensure
File.unlink( tmpfile.path ) if tmpfile && File.exist?( tmpfile.path )
end
Handle signals.
# File lib/arborist/manager.rb, line 390
def handle_signal( sig )
self.log.debug "Handling signal %s" % [ sig ]
case sig
when :INT, :TERM
self.on_termination_signal( sig )
when :HUP
self.on_hangup_signal( sig )
when :USR1
self.on_user1_signal( sig )
else
self.log.warn "Unhandled signal %s" % [ sig ]
end
end
Handle a HUP signal. The default is to restart the handler.
# File lib/arborist/manager.rb, line 419
def on_hangup_signal( signo )
self.log.warn "Hangup (%p)" % [ signo ]
self.restart
end
Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to on_interrupt_signal.
# File lib/arborist/manager.rb, line 411
def on_termination_signal( signo )
self.log.warn "Terminated (%p)" % [ signo ]
self.stop
end
Handle a USR1 signal. Writes a message to the log.
# File lib/arborist/manager.rb, line 426
def on_user1_signal( signo )
self.log.info "Checkpoint: User signal."
self.save_node_states
end
Register the Manager's timers.
# File lib/arborist/manager.rb, line 229
def cancel_timers
self.cancel_heartbeat_timer
self.cancel_checkpoint_timer
end
Return a human-readable representation of the Manager suitable for debugging.
# File lib/arborist/manager.rb, line 263
def inspect
return "#<%p:%#x {runid: %s} %d nodes>" % [
self.class,
self.object_id * 2,
self.run_id,
self.nodes.length,
]
end
Register the Manager's timers.
# File lib/arborist/manager.rb, line 222
def register_timers
self.register_checkpoint_timer
self.register_heartbeat_timer
end
Restart the manager
# File lib/arborist/manager.rb, line 250
def restart
raise NotImplementedError
end
Setup sockets and start the event loop.
# File lib/arborist/manager.rb, line 185
def run
self.log.info "Getting ready to start the manager."
self.setup_sockets
self.register_timers
self.with_signal_handler( reactor, *QUEUE_SIGS ) do
self.start_accepting_requests
end
ensure
self.shutdown_sockets
self.save_node_states
end
Returns true if the Manager is running.
# File lib/arborist/manager.rb, line 214
def running?
return self.reactor &&
self.event_socket &&
self.reactor.registered?( self.event_socket )
end
Create the sockets used by the manager and bind them to the appropriate endpoints.
# File lib/arborist/manager.rb, line 200
def setup_sockets
self.setup_tree_socket
self.setup_event_socket
end
Shut down the sockets used by the manager.
# File lib/arborist/manager.rb, line 207
def shutdown_sockets
self.shutdown_tree_socket
self.shutdown_event_socket
end
Start a loop, accepting a request and handling it.
# File lib/arborist/manager.rb, line 236
def start_accepting_requests
self.log.debug "Starting the main loop"
self.start_time = Time.now
self.reactor.register( self.tree_socket, :read, &self.method(:on_tree_socket_event) )
self.reactor.register( self.event_socket, :write, &self.method(:on_event_socket_event) )
self.log.debug "Manager running."
return self.reactor.start_polling( ignore_interrupts: true )
end
Stop the manager.
# File lib/arborist/manager.rb, line 256
def stop
self.log.info "Stopping the manager."
self.reactor.stop_polling
end
Add the specified node
to the Manager.
# File lib/arborist/manager.rb, line 481
def add_node( node )
identifier = node.identifier
raise Arborist::NodeError, "Node %p already present." % [ identifier ] if self.nodes[ identifier ]
self.nodes[ identifier ] = node
if self.tree_built?
self.link_node( node )
self.publish_system_event( 'node_added', node: identifier )
end
end
Yield each node in a depth-first traversal of the manager's tree to the
specified block
, or return an Enumerator if no block is given.
# File lib/arborist/manager.rb, line 914
def all_nodes( &block )
iter = self.enumerator_for( self.root )
return iter.each( &block ) if block
return iter
end
Return the Array of all nodes above the specified node
.
# File lib/arborist/manager.rb, line 970
def ancestors_for( node )
parent_id = node.parent or return []
parent = self.nodes[ parent_id ]
return [ parent ] + self.ancestors_for( parent )
end
Build the tree out of all the loaded nodes.
# File lib/arborist/manager.rb, line 448
def build_tree
self.log.info "Building tree from %d loaded nodes." % [ self.nodes.length ]
# Build primary tree structure
self.nodes.each_value do |node|
next if node.operational?
self.link_node_to_parent( node )
end
self.tree_built = true
# Set up secondary dependencies
self.nodes.each_value do |node|
node.register_secondary_dependencies( self )
end
self.restore_node_states
end
Create a subscription that publishes to the Manager's event publisher
for the node with the specified identifier
and
event_pattern
, using the given criteria
when
considering an event.
# File lib/arborist/manager.rb, line 1083
def create_subscription( identifier, event_pattern, criteria, negative_criteria={} )
sub = Arborist::Subscription.new( event_pattern, criteria, negative_criteria ) do |*args|
self.publish( *args )
end
self.subscribe( identifier, sub )
return sub
end
Return a depth
limited enumerator for the specified
start_node
.
# File lib/arborist/manager.rb, line 945
def depth_limited_enumerator_for( start_node, depth, &filter )
return Enumerator.new do |yielder|
traverse = ->( node, current_depth ) do
self.log.debug "Enumerating nodes from %s at depth: %p" %
[ node.identifier, current_depth ]
if !filter || filter.call( node )
yielder.yield( node )
node.each do |child|
traverse[ child, current_depth - 1 ]
end if current_depth > 0
end
end
traverse.call( start_node, depth )
end
end
Return an Array of all nodes below the specified node
.
# File lib/arborist/manager.rb, line 964
def descendants_for( node )
return self.enumerator_for( node ).to_a
end
Handle the specified raw_request
and return a response.
# File lib/arborist/manager.rb, line 614
def dispatch_request( raw_request )
raise "Manager is shutting down" unless self.running?
header, body = Arborist::TreeAPI.decode( raw_request )
handler = self.lookup_tree_request_action( header )
return handler.call( header, body )
rescue => err
self.log.error "%p: %s" % [ err.class, err.message ]
err.backtrace.each {|frame| self.log.debug " #{frame}" }
errtype = case err
when Arborist::MessageError,
Arborist::ConfigError,
Arborist::NodeError
'client'
else
'server'
end
return Arborist::TreeAPI.error_response( errtype, err.message )
end
Return an enumerator for the specified start_node
.
# File lib/arborist/manager.rb, line 931
def enumerator_for( start_node, &filter )
return Enumerator.new do |yielder|
traverse = ->( node ) do
if !filter || filter.call( node )
yielder.yield( node )
node.each( &traverse )
end
end
traverse.call( start_node )
end
end
Traverse the node tree and return the specified return_values
from any nodes which match the given filter
, skipping downed
nodes and all their children if exclude_down
is set. If
return_values
is set to nil
, then all values from
the node will be returned.
# File lib/arborist/manager.rb, line 542
def find_matching_node_states( filter, return_values, exclude_down=false, negative_filter={} )
nodes_iter = if exclude_down
self.reachable_nodes
else
self.all_nodes
end
states = nodes_iter.
select {|node| node.matches?(filter) }.
reject {|node| !negative_filter.empty? && node.matches?(negative_filter) }.
each_with_object( {} ) do |node, hash|
hash[ node.identifier ] = node.fetch_values( return_values )
end
return states
end
Acknowledge a node
# File lib/arborist/manager.rb, line 870
def handle_ack_request( header, body )
self.log.info "ACK: %p" % [ header ]
identifier = header[ 'identifier' ] or
return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for ACK.' )
node = self.nodes[ identifier ] or
return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )
self.log.debug "Acking the %s node: %p" % [ identifier, body ]
body = symbolify_keys( body )
events = node.acknowledge( **body )
self.propagate_events( node, events )
return Arborist::TreeAPI.successful_response( nil )
end
Return a response to the `deps` action.
# File lib/arborist/manager.rb, line 728
def handle_deps_request( header, body )
self.log.info "DEPS: %p" % [ header ]
from = header['from'] || '_'
deps = self.merge_dependencies_from( from )
deps.delete( from )
return Arborist::TreeAPI.successful_response({ deps: deps.to_a })
rescue Arborist::ClientError => err
return Arborist::TreeAPI.error_response( 'client', err.message )
end
Return a repsonse to the `fetch` action.
# File lib/arborist/manager.rb, line 701
def handle_fetch_request( header, body )
self.log.info "FETCH: %p" % [ header ]
from = header['from'] || '_'
depth = header['depth']
tree = header['tree']
start_node = self.nodes[ from ] or
return Arborist::TreeAPI.error_response( 'client', "No such node %s." % [from] )
self.log.debug " Listing nodes under %p" % [ start_node ]
if tree
iter = [ start_node.to_h(depth: (depth || -1)) ]
elsif depth
self.log.debug " depth limited to %d" % [ depth ]
iter = self.depth_limited_enumerator_for( start_node, depth )
else
self.log.debug " no depth limit"
iter = self.enumerator_for( start_node )
end
data = iter.map( &:to_h )
self.log.debug " got data for %d nodes" % [ data.length ]
return Arborist::TreeAPI.successful_response( data )
end
Add a node
# File lib/arborist/manager.rb, line 811
def handle_graft_request( header, body )
self.log.info "GRAFT: %p" % [ header ]
identifier = header[ 'identifier' ] or
return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for GRAFT.' )
if self.nodes[ identifier ]
return Arborist::TreeAPI.error_response( 'client', "Node %p already exists." % [identifier] )
end
type = header[ 'type' ] or
return Arborist::TreeAPI.error_response( 'client', 'No type specified for GRAFT.' )
parent = header[ 'parent' ] || '_'
parent_node = self.nodes[ parent ] or
return Arborist::TreeAPI.error_response( 'client', 'No parent node found for %s.' % [parent] )
self.log.debug "Grafting a new %s node under %p" % [ type, parent_node ]
# If the parent has a factory method for the node type, use it, otherwise
# use the Pluggability factory
node = if parent_node.respond_to?( type )
parent_node.method( type ).call( identifier, body )
else
body.merge!( parent: parent )
Arborist::Node.create( type, identifier, body )
end
self.add_node( node )
return Arborist::TreeAPI.successful_response( node ? {identifier: node.identifier} : nil )
end
Modify a node's operational attributes
# File lib/arborist/manager.rb, line 845
def handle_modify_request( header, body )
self.log.info "MODIFY: %p" % [ header ]
identifier = header[ 'identifier' ] or
return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for MODIFY.' )
return Arborist::TreeAPI.error_response( 'client', "Unable to MODIFY root node." ) if identifier == '_'
node = self.nodes[ identifier ] or
return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )
self.log.debug "Modifying operational attributes of the %s node: %p" % [ identifier, body ]
if new_parent_identifier = body.delete( 'parent' )
old_parent = self.nodes[ node.parent ]
new_parent = self.nodes[ new_parent_identifier ] or
return Arborist::TreeAPI.error_response( 'client', "No such parent node: %p" % [new_parent_identifier] )
node.reparent( old_parent, new_parent )
end
node.modify( body )
return Arborist::TreeAPI.successful_response( nil )
end
Remove a node and its children.
# File lib/arborist/manager.rb, line 799
def handle_prune_request( header, body )
self.log.info "PRUNE: %p" % [ header ]
identifier = header[ 'identifier' ] or
return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for PRUNE.' )
node = self.remove_node( identifier )
return Arborist::TreeAPI.successful_response( node ? node.to_h : nil )
end
Return a response to the 'search' action.
# File lib/arborist/manager.rb, line 762
def handle_search_request( header, body )
self.log.info "SEARCH: %p" % [ header ]
exclude_down = header['exclude_down']
values = if header.key?( 'return' )
header['return'] || []
else
nil
end
body = [ body ] unless body.is_a?( Array )
positive = body.shift
negative = body.shift || {}
states = self.find_matching_node_states( positive, values, exclude_down, negative )
return Arborist::TreeAPI.successful_response( states )
end
Return a response to the `status` action.
# File lib/arborist/manager.rb, line 655
def handle_status_request( header, body )
self.log.info "STATUS: %p" % [ header ]
return Arborist::TreeAPI.successful_response(
server_version: Arborist::VERSION,
state: self.running? ? 'running' : 'not running',
uptime: self.uptime,
nodecount: self.nodecount
)
end
Return a response to the `subscribe` action.
# File lib/arborist/manager.rb, line 667
def handle_subscribe_request( header, body )
self.log.info "SUBSCRIBE: %p" % [ header ]
event_type = header[ 'event_type' ]
node_identifier = header[ 'identifier' ]
body = [ body ] unless body.is_a?( Array )
positive = body.shift
negative = body.shift || {}
subscription = self.create_subscription( node_identifier, event_type, positive, negative )
self.log.info "Subscription to %s events at or under %s: %p" %
[ event_type || 'all', node_identifier || 'the root node', subscription ]
return Arborist::TreeAPI.successful_response( id: subscription.id )
end
Un-acknowledge a node
# File lib/arborist/manager.rb, line 889
def handle_unack_request( header, body )
self.log.info "UNACK: %p" % [ header ]
identifier = header[ 'identifier' ] or
return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNACK.' )
node = self.nodes[ identifier ] or
return Arborist::TreeAPI.error_response( 'client', "No such node %p" % [identifier] )
self.log.debug "Unacking the %s node: %p" % [ identifier, body ]
events = node.unacknowledge
self.propagate_events( node, events )
return Arborist::TreeAPI.successful_response( nil )
end
Return a response to the `unsubscribe` action.
# File lib/arborist/manager.rb, line 685
def handle_unsubscribe_request( header, body )
self.log.info "UNSUBSCRIBE: %p" % [ header ]
subscription_id = header[ 'subscription_id' ] or
return Arborist::TreeAPI.error_response( 'client', 'No identifier specified for UNSUBSCRIBE.' )
subscription = self.remove_subscription( subscription_id ) or
return Arborist::TreeAPI.successful_response( nil )
self.log.info "Destroyed subscription: %p" % [ subscription ]
return Arborist::TreeAPI.successful_response(
event_type: subscription.event_type,
criteria: subscription.criteria
)
end
Update nodes using the data from the update request's
body
.
# File lib/arborist/manager.rb, line 782
def handle_update_request( header, body )
self.log.info "UPDATE: %p" % [ header ]
unless body.respond_to?( :each )
return Arborist::TreeAPI.error_response( 'client', 'Malformed update: body does not respond to #each' )
end
monitor_key = header['monitor_key']
body.each do |identifier, properties|
self.update_node( identifier, properties, monitor_key )
end
return Arborist::TreeAPI.successful_response( nil )
end
Link the node to other nodes in the tree.
# File lib/arborist/manager.rb, line 495
def link_node( node )
raise "Tree is not built yet" unless self.tree_built?
self.link_node_to_parent( node )
node.register_secondary_dependencies( self )
end
Link the specified node
to its parent. Raises an error if the
specified node
's parent is not yet loaded.
# File lib/arborist/manager.rb, line 469
def link_node_to_parent( node )
self.log.debug "Linking node %p to its parent" % [ node ]
parent_id = node.parent || '_'
parent_node = self.nodes[ parent_id ] or
raise "no parent '%s' node loaded for %p" % [ parent_id, node ]
self.log.debug "adding %p as a child of %p" % [ node, parent_node ]
parent_node.add_child( node )
end
Add nodes yielded from the specified enumerator
into the
manager's tree.
# File lib/arborist/manager.rb, line 439
def load_tree( enumerator )
enumerator.each do |node|
self.add_node( node )
end
self.build_tree
end
Given a request header
, return a call-able object that can
handle the response.
# File lib/arborist/manager.rb, line 640
def lookup_tree_request_action( header )
raise Arborist::MessageError, "unsupported version %d" % [ header['version'] ] unless
header['version'] == 1
action = header['action'] or
raise Arborist::MessageError, "missing required header 'action'"
raise Arborist::MessageError, "No such action '%s'" % [ action ] unless
VALID_TREEAPI_ACTIONS.include?( action )
handler_name = "handle_%s_request" % [ action ]
return self.method( handler_name )
end
Recurse into the children and secondary dependencies of the
from
node and merge the identifiers of the traversed nodes
into the deps_set
.
# File lib/arborist/manager.rb, line 744
def merge_dependencies_from( from, deps_set=Set.new )
return deps_set unless deps_set.add?( from )
start_node = self.nodes[ from ] or
raise Arborist::ClientError "No such node %s." % [ from ]
self.enumerator_for( start_node ).each do |subnode|
deps_set.add( subnode.identifier )
subnode.node_subscribers.each do |subdep|
self.merge_dependencies_from( subdep, deps_set )
end
end
return deps_set
end
Return the number of nodes in the manager's tree.
# File lib/arborist/manager.rb, line 568
def nodecount
return self.nodes.length
end
Return an Array of the identifiers of all nodes in the manager's tree.
# File lib/arborist/manager.rb, line 574
def nodelist
return self.nodes.keys
end
IO event handler for the event socket.
# File lib/arborist/manager.rb, line 1033
def on_event_socket_event( event )
if event.writable?
if (( msg = self.event_queue.shift ))
# self.log.debug "Publishing event %p" % [ msg ]
event.socket << msg
end
else
raise "Unhandled event %p on the event socket" % [ event ]
end
self.unregister_event_socket if self.event_queue.empty?
end
ZMQ::Handler API – Read and handle an incoming request.
# File lib/arborist/manager.rb, line 602
def on_tree_socket_event( event )
if event.readable?
request = event.socket.receive
msg = self.dispatch_request( request )
event.socket << msg
else
raise "Unsupported event %p on tree API socket!" % [ event ]
end
end
Propagate one or more events
to the specified
node
and its ancestors in the tree, publishing them to
matching subscriptions belonging to the nodes along the way.
# File lib/arborist/manager.rb, line 1103
def propagate_events( node, *events )
node.publish_events( *events )
if node.parent
self.log.debug "Propagating %d events from %s -> %s" % [
events.length,
node.identifier,
node.parent
]
parent = self.nodes[ node.parent ] or raise "couldn't find parent %p of node %p!" %
[ node.parent, node.identifier ]
self.propagate_events( parent, *events )
end
end
Publish the specified event
.
# File lib/arborist/manager.rb, line 1010
def publish( identifier, event )
self.event_queue << Arborist::EventAPI.encode( identifier, event.to_h )
self.register_event_socket if self.running?
end
Publish a system event that observers can watch for to detect restarts.
# File lib/arborist/manager.rb, line 1048
def publish_heartbeat_event
return unless self.start_time
self.publish_system_event( 'heartbeat',
run_id: self.run_id,
start_time: self.start_time.iso8601,
version: Arborist::VERSION
)
end
Publish an event with the specified eventname
and
data
.
# File lib/arborist/manager.rb, line 1059
def publish_system_event( eventname, **data )
eventname = eventname.to_s
eventname = 'sys.' + eventname unless eventname.start_with?( 'sys.' )
self.log.debug "Publishing %s event: %p." % [ eventname, data ]
self.publish( eventname, data )
end
Yield each node that is not down to the specified block
, or
return an Enumerator if no block is given.
# File lib/arborist/manager.rb, line 923
def reachable_nodes( &block )
iter = self.enumerator_for( self.root ) {|node| node.reachable? }
return iter.each( &block ) if block
return iter
end
Register the publisher with the reactor if it's not already.
# File lib/arborist/manager.rb, line 1017
def register_event_socket
self.log.debug "Registering event socket for write events."
self.reactor.enable_events( self.event_socket, :write ) unless
self.reactor.event_enabled?( self.event_socket, :write )
end
Remove a node
from the Manager. The
node
can either be the Arborist::Node
to remove, or the identifier of a node.
# File lib/arborist/manager.rb, line 505
def remove_node( node )
node = self.nodes[ node ] unless node.is_a?( Arborist::Node )
return unless node
raise "Can't remove an operational node" if node.operational?
self.log.info "Removing node %p" % [ node ]
self.publish_system_event( 'node_removed', node: node.identifier )
node.children.each do |identifier, child_node|
self.remove_node( child_node )
end
if parent_node = self.nodes[ node.parent || '_' ]
parent_node.remove_child( node )
end
return self.nodes.delete( node.identifier )
end
Remove the subscription with the specified
subscription_identifier
from the node it's attached to and
from the manager, and return it.
# File lib/arborist/manager.rb, line 1095
def remove_subscription( subscription_identifier )
node = self.subscriptions.delete( subscription_identifier ) or return nil
return node.remove_subscription( subscription_identifier )
end
Return the current root node.
# File lib/arborist/manager.rb, line 907
def root_node
return self.nodes[ '_' ]
end
Set up the ZMQ PUB socket for published events.
# File lib/arborist/manager.rb, line 982
def setup_event_socket
@event_socket = CZTop::Socket::PUB.new
self.log.info " binding the event socket (%#0x) to %p" %
[ @event_socket.object_id * 2, Arborist.event_api_url ]
@event_socket.options.linger = ( self.linger * 1000 ).ceil
@event_socket.bind( Arborist.event_api_url )
end
Set up the ZeroMQ REP socket for the Tree API.
# File lib/arborist/manager.rb, line 585
def setup_tree_socket
@tree_socket = CZTop::Socket::REP.new
self.log.info " binding the tree API socket (%#0x) to %p" %
[ @tree_socket.object_id * 2, Arborist.tree_api_url ]
@tree_socket.options.linger = 0
@tree_socket.bind( Arborist.tree_api_url )
end
Stop accepting events to be published
# File lib/arborist/manager.rb, line 992
def shutdown_event_socket
start = Time.now
timeout = start + (self.linger.to_f / 2.0)
self.log.warn "Waiting to empty the event queue..."
until self.event_queue.empty?
sleep 0.1
break if Time.now > timeout
end
self.log.warn " ... waited %0.1f seconds" % [ Time.now - start ]
@event_socket.options.linger = 0
@event_socket.unbind( @event_socket.last_endpoint )
@event_socket = nil
end
Tear down the ZeroMQ REP socket.
# File lib/arborist/manager.rb, line 595
def shutdown_tree_socket
@tree_socket.unbind( @tree_socket.last_endpoint )
@tree_socket = nil
end
Add the specified subscription
to the node corresponding with
the given identifier
.
# File lib/arborist/manager.rb, line 1068
def subscribe( identifier, subscription )
identifier ||= '_'
node = self.nodes[ identifier ] or raise ArgumentError, "no such node %p" % [ identifier ]
self.log.debug "Registering subscription %p" % [ subscription ]
node.add_subscription( subscription )
self.log.debug " adding '%s' to the subscriptions hash." % [ subscription.id ]
self.subscriptions[ subscription.id ] = node
self.log.debug " subscriptions hash: %#0x" % [ self.subscriptions.object_id ]
end
Unregister the event publisher socket from the reactor if it's registered.
# File lib/arborist/manager.rb, line 1025
def unregister_event_socket
self.log.debug "Unregistering event socket for write events."
self.reactor.disable_events( self.event_socket, :write ) if
self.reactor.event_enabled?( self.event_socket, :write )
end
Update the node with the specified identifier
with the given
new_properties
and propagate any events generated by the
update to the node and its ancestors.
# File lib/arborist/manager.rb, line 527
def update_node( identifier, new_properties, monitor_key='_' )
unless (( node = self.nodes[identifier] ))
self.log.warn "Update for non-existent node %p ignored." % [ identifier ]
return []
end
events = node.update( new_properties, monitor_key )
self.propagate_events( node, events )
end
Return the duration the manager has been running in seconds.
# File lib/arborist/manager.rb, line 561
def uptime
return 0 unless self.start_time
return Time.now - self.start_time
end