Task

class
Superclass
Object
Included Modules
Symphony::SignalHandling

A task is the subclassable unit of work that Symphony loads when it starts up.

Constants

DEFAULT_IDLE_TIMEOUT

The default number of seconds to wait for work

IDLE_CHECK_INTERVAL

The number of seconds between checks to see if the worker is idle. Note that this is not the same as the idle timeout – it's how often to check to see if the task has been idle for too long.

SIGNALS

Signal to reset to defaults for the child

WORK_MODELS

Valid work model types

Public Class Methods

anchor
after_fork()

Prepare the process after being forked from the Daemon.

# File lib/symphony/task.rb, line 78
def self::after_fork
        self.log.debug "After fork [%d]: Threads: %p" % [ Process.pid, ThreadGroup::Default.list ]
        Symphony::Queue.reset
        Process.setpgrp
        Symphony.config.install if Symphony.config
end
anchor
before_fork()

Prepare the process to be forked.

# File lib/symphony/task.rb, line 71
def self::before_fork
        self.log.debug "Before fork [%d]: Threads: %p" % [ Process.pid, ThreadGroup::Default.list ]
        Symphony::Queue.reset
end
anchor
consumer_tag()

Return a consumer tag for this task's queue consumer.

# File lib/symphony/task.rb, line 121
def self::consumer_tag
        return "%s.%s.%d" % [
                self.queue_name,
                Socket.gethostname.gsub(/\..*$/, ''),
                Process.pid,
        ]
end
anchor
default_queue_name()

Return an queue name derived from the name of the task class.

# File lib/symphony/task.rb, line 114
def self::default_queue_name
        name = self.name || "anonymous task %d" % [ self.object_id ]
        return name.gsub( /\W+/, '.' ).downcase
end
anchor
inherited( subclass )

Inheritance hook – set some defaults on subclasses.

# File lib/symphony/task.rb, line 87
def self::inherited( subclass )
        super

        subclass.instance_variable_set( :@queue, nil )
        subclass.instance_variable_set( :@queue_type, nil )
        subclass.instance_variable_set( :@always_rebind, false )
        subclass.instance_variable_set( :@routing_keys, Set.new )
        subclass.instance_variable_set( :@acknowledge, true )
        subclass.instance_variable_set( :@work_model, :longlived )
        subclass.instance_variable_set( :@prefetch, 10 )
        subclass.instance_variable_set( :@timeout_action, :reject )
        subclass.instance_variable_set( :@timeout, nil )
        subclass.instance_variable_set( :@persistent, false )
        subclass.instance_variable_set( :@idle_timeout, DEFAULT_IDLE_TIMEOUT )
end
anchor
queue()

Fetch the Symphony::Queue for this task, creating it if necessary.

# File lib/symphony/task.rb, line 105
def self::queue
        unless @queue
                @queue = Symphony::Queue.for_task( self )
        end
        return @queue
end
anchor
run( exit_on_idle=false )

Create a new Task object and listen for work. Exits with the code returned by start when it's done.

# File lib/symphony/task.rb, line 60
def self::run( exit_on_idle=false )
        if self.subscribe_to.empty?
                raise ScriptError,
                        "No subscriptions defined. Add one or more patterns using subscribe_to."
        end

        exit self.new( self.queue, exit_on_idle ).start
end

Declarative Methods

↑ top

Attributes

idle_checker[RW]

The Thread that checks for idle timeout

last_worked[RW]

The Time the task was last running

queue[R]

The queue that the task consumes messages from

signal_handler[R]

The signal handler thread

Public Class Methods

anchor
acknowledge( new_setting=nil )

Enable or disable acknowledgements.

# File lib/symphony/task.rb, line 185
def self::acknowledge( new_setting=nil )
        unless new_setting.nil?
                self.log.info "Turning task acknowlegement %s." % [ new_setting ? "on" : "off" ]
                @acknowledge = new_setting
        end

        return @acknowledge
end
anchor
always_rebind( new_setting=nil )

Get/set the “always re-bind” flag that causes the queue the task uses to always re-bind to its exchange when the task starts. The normal behavior is that the queue is only bound if it didn't already exist, which can mean that you'd need to destroy the queue to force it to rebind if you change a task's routing keys.

# File lib/symphony/task.rb, line 174
def self::always_rebind( new_setting=nil )
        unless new_setting.nil?
                self.log.info "%s forced re-binding." % [ new_setting ? "Enabled" : "Disabled" ]
                @always_rebind = new_setting
        end

        return @always_rebind
end
anchor
idle_timeout( seconds=nil, options={} )

Get/set the maximum number of seconds the worker should wait for events to arrive before exiting.

# File lib/symphony/task.rb, line 260
def self::idle_timeout( seconds=nil, options={} )
        unless seconds.nil?
                self.log.info "Setting the idle timeout to %0.2fs." % [ seconds.to_f ]
                @idle_timeout = seconds.to_f
        end

        return @idle_timeout
end
anchor
new( queue, exit_on_idle=false )

Create a worker that will listen on the specified queue for a job.

# File lib/symphony/task.rb, line 276
def initialize( queue, exit_on_idle=false )
        @queue          = queue
        @signal_handler = nil
        @shutting_down  = false
        @restarting     = false
        @idle_checker   = nil
        @last_worked    = Time.now
        @exit_on_idle   = exit_on_idle
end
anchor
persistent( new_setting=nil )

Create the queue the task consumes from as a persistent queue, so it will continue to receive events even if the task is no longer consuming them. This only effects queues which are not already declared, so if the bindings for the queue change you'll need to delete the existing queue before starting up to have them take effect.

# File lib/symphony/task.rb, line 250
def self::persistent( new_setting=nil )
        if new_setting
                @persistent = new_setting
        end
        return @persistent
end
anchor
prefetch( count=nil )

Set the maximum number of messages to prefetch. Ignored if the work_model is :oneshot.

# File lib/symphony/task.rb, line 237
def self::prefetch( count=nil )
        if count
                @prefetch = count
        end
        return @prefetch
end
anchor
queue_name( new_name=nil )

Get/set the name of the queue to consume.

# File lib/symphony/task.rb, line 137
def self::queue_name( new_name=nil )
        if new_name
                @queue_name = new_name
        end

        @queue_name ||= self.default_queue_name
        return @queue_name
end
anchor
queue_type( type=nil )

Specify an x-queue-type for the underlying queue

# File lib/symphony/task.rb, line 148
def self::queue_type( type=nil )
        if type
                @queue_type = type
        end

        return @queue_type
end
anchor
routing_keys( *routing_keys )
Alias for: subscribe_to
anchor
subscribe_to( *routing_keys )

Set up one or more topic key patterns to use when binding the Task's queue to the exchange.

# File lib/symphony/task.rb, line 159
def self::subscribe_to( *routing_keys )
        unless routing_keys.empty?
                self.log.info "Setting task routing keys to: %p." % [ routing_keys ]
                @routing_keys.replace( routing_keys )
        end

        return @routing_keys
end
Also aliased as: routing_keys
anchor
timeout( seconds=nil, options={} )

Get/set the maximum number of seconds the job should work on a single message before giving up.

# File lib/symphony/task.rb, line 197
def self::timeout( seconds=nil, options={} )
        unless seconds.nil?
                self.log.info "Setting the task timeout to %0.2fs." % [ seconds.to_f ]
                @timeout = seconds.to_f
                self.timeout_action( options[:action] )
        end

        return @timeout
end
anchor
timeout_action( new_value=nil )

Set the action taken when work times out.

# File lib/symphony/task.rb, line 209
def self::timeout_action( new_value=nil )
        if new_value
                @timeout_action = new_value.to_sym
        end

        return @timeout_action
end
anchor
work_model( new_setting=nil )

Alter the work model between oneshot or longlived.

# File lib/symphony/task.rb, line 219
def self::work_model( new_setting=nil )
        if new_setting
                new_setting = new_setting.to_sym
                unless WORK_MODELS.include?( new_setting )
                        raise "Unknown work_model %p (must be one of: %s)" %
                                [ new_setting, WORK_MODELS.join(', ') ]
                end

                self.log.info "Setting task work model to: %p." % [ new_setting ]
                @work_model = new_setting
        end

        return @work_model
end

Public Instance Methods

anchor
check_for_idle_timeout()

Check to see if the last run was more than idle_timeout seconds ago, and cancelling the task's consumer if so.

# File lib/symphony/task.rb, line 416
def check_for_idle_timeout

        # If it's unset, it means it's running now
        return unless self.last_worked && self.exit_on_idle?

        seconds_idle = Time.now - self.last_worked
        self.log.debug "%p: idle %0.2fs" % [ self.class, seconds_idle ]

        if seconds_idle > self.class.idle_timeout
                self.log.debug "Sending stop signal due to idle timeout"
                self.stop_gracefully
        end
end
anchor
handle_signal( sig )

Handle signals; called by the signal handler thread with a signal from the queue.

# File lib/symphony/task.rb, line 439
def handle_signal( sig )
        self.log.debug "Handling signal %s" % [ sig ]
        case sig
        when :TERM
                self.on_terminate
        when :INT
                self.on_interrupt
        when :HUP
                self.on_hangup
        else
                self.log.warn "Unhandled signal %s" % [ sig ]
        end
end
anchor
make_consumer_tag()

Return a consumer tag for this task's queue consumer.

# File lib/symphony/task.rb, line 475
def make_consumer_tag
        return "%s.%s.%d" % [
                self.queue_name,
                Socket.gethostname.gsub(/\..*$/, ''),
                Process.pid,
        ]
end
anchor
on_hangup()

Handle a hangup signal by re-reading the config and restarting.

# File lib/symphony/task.rb, line 517
def on_hangup
        self.log.info "Hangup signal."
        self.restart
end
anchor
on_interrupt()
Alias for: on_terminate
anchor
on_terminate()

Handle a termination or interrupt signal.

# File lib/symphony/task.rb, line 524
def on_terminate
        self.log.debug "Signalled to shut down."

        if self.shutting_down?
                self.stop_immediately
        else
                self.stop_gracefully
        end
end
Also aliased as: on_interrupt
anchor
preprocess_payload( payload, metadata )

Do any necessary pre-processing on the raw payload according to values in the given metadata.

# File lib/symphony/task.rb, line 456
def preprocess_payload( payload, metadata )
        self.log.debug "Got a %0.2fK %s payload" %
                [ payload.bytesize / 1024.0, metadata[:content_type] ]
        work_payload = case metadata[:content_type]
                when 'application/x-msgpack'
                        MessagePack.unpack( payload )
                when 'application/json', 'text/javascript'
                        JSON.parse( payload )
                when 'application/x-yaml', 'text/x-yaml'
                        YAML.load( payload )
                else
                        payload
                end

        return work_payload
end
anchor
procname()

Return a string for setting the proc title

# File lib/symphony/task.rb, line 505
def procname
        return "%s %s: Symphony: %p (%s) -> %s" % [
                RUBY_ENGINE,
                RUBY_VERSION,
                self.class,
                self.class.work_model,
                self.class.queue_name
        ]
end
anchor
restart()

Restart the task after reloading the config.

# File lib/symphony/task.rb, line 338
def restart
        self.restarting = true
        self.log.warn "Restarting..."

        if Symphony.config.reload
                self.log.info "  config reloaded"
        else
                self.log.info "  no config changes"
        end

        self.log.info "  resetting queue"
        Symphony::Queue.reset
        self.queue.shutdown
end
anchor
start()

Set up the task and start handling messages.

# File lib/symphony/task.rb, line 315
def start
        rval = nil

        Process.setproctitle( self.procname )

        begin
                self.restarting = false
                rval = self.with_signal_handler( *SIGNALS ) do
                        self.start_handling_messages
                end
        end while self.restarting?

        return rval ? 0 : 1

rescue Exception => err
        self.log.fatal "%p in %p: %s: %s" % [ err.class, self.class, err.message, err.backtrace.first ]
        self.log.debug { '  ' + err.backtrace.join("  \n") }

        return :software
end
anchor
start_handling_messages()

Start consuming messages from the queue, calling work for each one.

# File lib/symphony/task.rb, line 372
def start_handling_messages
        oneshot = self.class.work_model == :oneshot

        rval = nil
        self.queue.wait_for_message( oneshot ) do |payload, metadata|
                begin
                        self.last_worked = nil
                        work_payload = self.preprocess_payload( payload, metadata )

                        rval = if self.class.timeout
                                self.work_with_timeout( work_payload, metadata )
                        else
                                self.work( work_payload, metadata )
                        end
                ensure
                        self.last_worked = Time.now
                end

                rval
        end

        return rval
end
anchor
start_signal_handler()

Start the thread that will deliver signals once they're put on the queue, and check for the last time a job was handled by this task process.

# File lib/symphony/task.rb, line 399
def start_signal_handler
        @signal_handler = Thread.new do
                Thread.current.abort_on_exception = true
                loop do
                        # self.log.debug "Signal handler: waiting for new signals in the queue."
                        self.wait_for_signals( IDLE_CHECK_INTERVAL )
                        self.check_for_idle_timeout
                end
        end
rescue => err
        self.log.fatal "Signal handler thread crashed: %p: %s" % [ err.class, err.message ]
        self.stop_immediately
end
anchor
stop_gracefully()

Set the task to stop after what it's doing is completed.

# File lib/symphony/task.rb, line 364
def stop_gracefully
        self.log.warn "Attempting to shut down gracefully."
        self.shutting_down = true
        self.queue.shutdown
end
anchor
stop_immediately()

Stop the task immediately, e.g., when sent a second TERM signal.

# File lib/symphony/task.rb, line 355
def stop_immediately
        self.log.warn "Already in shutdown -- halting immediately."
        self.shutting_down = true
        self.ignore_signals( *SIGNALS )
        self.queue.halt
end
anchor
stop_signal_handler()

Stop the signal handler thread.

# File lib/symphony/task.rb, line 432
def stop_signal_handler
        @signal_handler.exit if @signal_handler
end
anchor
work( payload, metadata )

Do work based on the given message payload and metadata.

# File lib/symphony/task.rb, line 485
def work( payload, metadata )
        raise NotImplementedError,
                "%p doesn't implement required method #work" % [ self.class ]
end
anchor
work_with_timeout( payload, metadata )

Wrap a timeout around the call to work, and handle timeouts according to the configured timeout_action.

# File lib/symphony/task.rb, line 493
def work_with_timeout( payload, metadata )
        Timeout.timeout( self.class.timeout ) do
                return self.work( payload, metadata )
        end
rescue Timeout::Error
        self.log.error "Timed out while performing work"
        raise if self.class.timeout_action == :reject
        return false
end