ObserverRunner

class
Superclass
Object
Included Modules
CZTop::Reactor::SignalHandling
Extended With
Loggability

An event-driven runner for Arborist::Observers.

Constants

QUEUE_SIGS

Signals the observer runner responds to

Attributes

client[R]

The Arborist::Client that will be used for creating and tearing down subscriptions

observers[R]

The Array of loaded Arborist::Observers the runner should run.

reactor[RW]

The reactor (a CZTop::Reactor) the runner uses to drive everything

subscriptions[R]

The map of subscription IDs to the Observer which it was created for.

timers[R]

The Array of registered ZMQ::Timers

Public Class Methods

anchor
new()

Create a new Arborist::ObserverRunner

# File lib/arborist/observer_runner.rb, line 30
def initialize
        @observers          = []
        @timers             = []
        @subscriptions      = {}
        @reactor            = CZTop::Reactor.new
        @client             = Arborist::Client.new
        @manager_last_runid = nil
end

Public Instance Methods

anchor
add_observer( observer )

Add the specified observer and subscribe to the events it wishes to receive.

# File lib/arborist/observer_runner.rb, line 169
def add_observer( observer )
        self.log.info "Adding observer: %s" % [ observer.description ]
        observer.subscriptions.each do |sub|
                subid = self.client.subscribe( sub )
                self.subscriptions[ subid ] = observer
                self.client.event_api.subscribe( subid )
                self.log.debug "  subscribed to %p with subscription %s" % [ sub, subid ]
        end
end
anchor
add_timers_for( observer )

Register a timer for the specified observer.

# File lib/arborist/observer_runner.rb, line 140
def add_timers_for( observer )
        observer.timers.each do |interval, callback|
                self.log.info "Creating timer for %s observer to run %p every %ds" %
                        [ observer.description, callback, interval ]
                timer = self.reactor.add_periodic_timer( interval, &callback )
                self.timers << timer
        end
end
anchor
handle_system_event( event_type, event )

Handle a `sys.` event from the Manager being observed.

# File lib/arborist/observer_runner.rb, line 217
def handle_system_event( event_type, event )
        self.log.debug "Got a %s event from the Manager: %p" % [ event_type, event ]

        case event_type
        when 'sys.heartbeat'
                this_runid = event['run_id']
                if @manager_last_runid && this_runid != @manager_last_runid
                        self.log.warn "Manager run ID changed: re-subscribing"
                        self.reset
                        self.register_observers
                end

                @manager_last_runid = this_runid
        when 'sys.node_added', 'sys.node_removed'
                # no-op
        else
                # no-op
        end
end
anchor
load_observers( enumerator )

Load observers from the specified enumerator.

# File lib/arborist/observer_runner.rb, line 61
def load_observers( enumerator )
        self.observers.concat( enumerator.to_a )
end
anchor
on_subscription_event( event )

Handle IO events from the reactor.

# File lib/arborist/observer_runner.rb, line 196
def on_subscription_event( event )
        if event.readable?
                msg = event.socket.receive
                subid, event = Arborist::EventAPI.decode( msg )

                if (( observer = self.subscriptions[subid] ))
                        self.log.debug "Got %p event for %p" % [ subid, observer ]
                        observer.handle_event( subid, event )
                elsif subid.start_with?( 'sys.' )
                        self.log.debug "System event! %p" % [ event ]
                        self.handle_system_event( subid, event )
                else
                        self.log.warn "Ignoring event %p for which we have no observer." % [ subid ]
                end
        else
                raise "Unhandled event %p on the event socket" % [ event ]
        end
end
anchor
register_observer_timers()

Register timers for each Observer.

# File lib/arborist/observer_runner.rb, line 118
def register_observer_timers
        self.observers.each do |observer|
                self.add_timers_for( observer )
        end
end
anchor
register_observers()

Add subscriptions for all of the observers loaded into the runner.

# File lib/arborist/observer_runner.rb, line 110
def register_observers
        self.observers.each do |observer|
                self.add_observer( observer )
        end
end
anchor
remove_observer( observer )

Remove the specified observer after unsubscribing from its events.

# File lib/arborist/observer_runner.rb, line 181
def remove_observer( observer )
        self.log.info "Removing observer: %s" % [ observer.description ]

        self.subscriptions.keys.each do |subid|
                next unless self.subscriptions[ subid ] == observer

                self.client.unsubscribe( subid )
                self.subscriptions.delete( subid )
                self.client.event_api.unsubscribe( subid )
                self.log.debug "  unsubscribed from %p" % [ subid ]
        end
end
anchor
remove_timers()

Remove any registered timers.

# File lib/arborist/observer_runner.rb, line 151
def remove_timers
        self.timers.each do |timer|
                self.reactor.remove_timer( timer )
        end
end
anchor
reset()

Unsubscribe from and clear all current subscriptions.

# File lib/arborist/observer_runner.rb, line 159
def reset
        self.log.warn "Resetting observer subscriptions."
        self.subscriptions.keys.each do |subid|
                self.client.event_api.unsubscribe( subid )
        end
        self.subscriptions.clear
end
anchor
restart()

Restart the observer, resetting all of its observers' subscriptions.

# File lib/arborist/observer_runner.rb, line 91
def restart
        self.log.info "Restarting!"
        self.reactor.timers.pause
        self.unregister_observers

        self.register_observers
        self.reactor.timers.resume
end
anchor
run()

Run the specified observers

# File lib/arborist/observer_runner.rb, line 67
def run
        self.log.info "Starting!"
        self.register_observers
        self.register_observer_timers
        self.subscribe_to_system_events

        self.reactor.register( self.client.event_api, :read, &self.method(:on_subscription_event) )

        self.with_signal_handler( self.reactor, *QUEUE_SIGS ) do
                self.reactor.start_polling( ignore_interrupts: true )
        end
end
anchor
running?()

Returns true if the ObserverRunner is running.

# File lib/arborist/observer_runner.rb, line 102
def running?
        return self.reactor &&
                self.client &&
                self.reactor.registered?( self.client.event_api )
end
anchor
stop()

Stop the observer

# File lib/arborist/observer_runner.rb, line 82
def stop
        self.log.info "Stopping!"
        self.remove_timers
        self.unregister_observers
        self.reactor.stop_polling
end
anchor
subscribe_to_system_events()

Subscribe the runner to system events published by the Manager.

# File lib/arborist/observer_runner.rb, line 134
def subscribe_to_system_events
        self.client.event_api.subscribe( 'sys.' )
end
anchor
unregister_observers()

Remove the subscriptions belonging to the loaded observers.

# File lib/arborist/observer_runner.rb, line 126
def unregister_observers
        self.observers.each do |observer|
                self.remove_observer( observer )
        end
end

Signal Handling

↑ top

Public Instance Methods

anchor
handle_signal( sig )

Handle signals.

# File lib/arborist/observer_runner.rb, line 245
def handle_signal( sig )
        self.log.debug "Handling signal %s" % [ sig ]
        case sig
        when :INT, :TERM
                self.on_termination_signal( sig )

        when :HUP
                self.on_hangup_signal( sig )

        else
                self.log.warn "Unhandled signal %s" % [ sig ]
        end

end
anchor
on_hangup_signal( signo )

Handle a HUP signal. The default is to restart the handler.

# File lib/arborist/observer_runner.rb, line 271
def on_hangup_signal( signo )
        self.log.warn "Hangup (%p)" % [ signo ]
        self.restart
end
anchor
on_interrupt_signal( signo )
anchor
on_termination_signal( signo )

Handle a TERM signal. Shuts the handler down after handling any current request/s. Also aliased to on_interrupt_signal.

# File lib/arborist/observer_runner.rb, line 263
def on_termination_signal( signo )
        self.log.warn "Terminated (%p)" % [ signo ]
        self.stop
end
Also aliased as: on_interrupt_signal