A task is the subclassable unit of work that Symphony
loads when it starts up.
The default number of seconds to wait for work
The number of seconds between checks to see if the worker is idle. Note that this is not the same as the idle timeout – it's how often to check to see if the task has been idle for too long.
Signal to reset to defaults for the child
Valid work model types
Prepare the process after being forked from the Daemon.
# File lib/symphony/task.rb, line 78
def self::after_fork
self.log.debug "After fork [%d]: Threads: %p" % [ Process.pid, ThreadGroup::Default.list ]
Symphony::Queue.reset
Process.setpgrp
Symphony.config.install if Symphony.config
end
Prepare the process to be forked.
# File lib/symphony/task.rb, line 71
def self::before_fork
self.log.debug "Before fork [%d]: Threads: %p" % [ Process.pid, ThreadGroup::Default.list ]
Symphony::Queue.reset
end
Return a consumer tag for this task's queue consumer.
# File lib/symphony/task.rb, line 121
def self::consumer_tag
return "%s.%s.%d" % [
self.queue_name,
Socket.gethostname.gsub(/\..*$/, ''),
Process.pid,
]
end
Return an queue name derived from the name of the task class.
# File lib/symphony/task.rb, line 114
def self::default_queue_name
name = self.name || "anonymous task %d" % [ self.object_id ]
return name.gsub( /\W+/, '.' ).downcase
end
Inheritance hook – set some defaults on subclasses.
# File lib/symphony/task.rb, line 87
def self::inherited( subclass )
super
subclass.instance_variable_set( :@queue, nil )
subclass.instance_variable_set( :@queue_type, nil )
subclass.instance_variable_set( :@always_rebind, false )
subclass.instance_variable_set( :@routing_keys, Set.new )
subclass.instance_variable_set( :@acknowledge, true )
subclass.instance_variable_set( :@work_model, :longlived )
subclass.instance_variable_set( :@prefetch, 10 )
subclass.instance_variable_set( :@timeout_action, :reject )
subclass.instance_variable_set( :@timeout, nil )
subclass.instance_variable_set( :@persistent, false )
subclass.instance_variable_set( :@idle_timeout, DEFAULT_IDLE_TIMEOUT )
end
Fetch the Symphony::Queue
for this task, creating it if necessary.
# File lib/symphony/task.rb, line 105
def self::queue
unless @queue
@queue = Symphony::Queue.for_task( self )
end
return @queue
end
Create a new Task
object and listen for work. Exits with the code returned by start
when it's done.
# File lib/symphony/task.rb, line 60
def self::run( exit_on_idle=false )
if self.subscribe_to.empty?
raise ScriptError,
"No subscriptions defined. Add one or more patterns using subscribe_to."
end
exit self.new( self.queue, exit_on_idle ).start
end
The Thread that checks for idle timeout
The Time the task was last running
The queue that the task consumes messages from
The signal handler thread
Enable or disable acknowledgements.
# File lib/symphony/task.rb, line 185
def self::acknowledge( new_setting=nil )
unless new_setting.nil?
self.log.info "Turning task acknowlegement %s." % [ new_setting ? "on" : "off" ]
@acknowledge = new_setting
end
return @acknowledge
end
Get/set the “always re-bind” flag that causes the queue the task uses to always re-bind to its exchange when the task starts. The normal behavior is that the queue is only bound if it didn't already exist, which can mean that you'd need to destroy the queue to force it to rebind if you change a task's routing keys.
# File lib/symphony/task.rb, line 174
def self::always_rebind( new_setting=nil )
unless new_setting.nil?
self.log.info "%s forced re-binding." % [ new_setting ? "Enabled" : "Disabled" ]
@always_rebind = new_setting
end
return @always_rebind
end
Get/set the maximum number of seconds the worker should wait for events to arrive before exiting.
# File lib/symphony/task.rb, line 260
def self::idle_timeout( seconds=nil, options={} )
unless seconds.nil?
self.log.info "Setting the idle timeout to %0.2fs." % [ seconds.to_f ]
@idle_timeout = seconds.to_f
end
return @idle_timeout
end
Create a worker that will listen on the specified queue
for a job.
# File lib/symphony/task.rb, line 276
def initialize( queue, exit_on_idle=false )
@queue = queue
@signal_handler = nil
@shutting_down = false
@restarting = false
@idle_checker = nil
@last_worked = Time.now
@exit_on_idle = exit_on_idle
end
Create the queue the task consumes from as a persistent queue, so it will continue to receive events even if the task is no longer consuming them. This only effects queues which are not already declared, so if the bindings for the queue change you'll need to delete the existing queue before starting up to have them take effect.
# File lib/symphony/task.rb, line 250
def self::persistent( new_setting=nil )
if new_setting
@persistent = new_setting
end
return @persistent
end
Set the maximum number of messages to prefetch. Ignored if the work_model
is :oneshot.
# File lib/symphony/task.rb, line 237
def self::prefetch( count=nil )
if count
@prefetch = count
end
return @prefetch
end
Get/set the name of the queue to consume.
# File lib/symphony/task.rb, line 137
def self::queue_name( new_name=nil )
if new_name
@queue_name = new_name
end
@queue_name ||= self.default_queue_name
return @queue_name
end
Specify an x-queue-type for the underlying queue
# File lib/symphony/task.rb, line 148
def self::queue_type( type=nil )
if type
@queue_type = type
end
return @queue_type
end
Set up one or more topic key patterns to use when binding the Task's queue to the exchange.
# File lib/symphony/task.rb, line 159
def self::subscribe_to( *routing_keys )
unless routing_keys.empty?
self.log.info "Setting task routing keys to: %p." % [ routing_keys ]
@routing_keys.replace( routing_keys )
end
return @routing_keys
end
Get/set the maximum number of seconds the job should work on a single message before giving up.
# File lib/symphony/task.rb, line 197
def self::timeout( seconds=nil, options={} )
unless seconds.nil?
self.log.info "Setting the task timeout to %0.2fs." % [ seconds.to_f ]
@timeout = seconds.to_f
self.timeout_action( options[:action] )
end
return @timeout
end
Set the action taken when work times out.
# File lib/symphony/task.rb, line 209
def self::timeout_action( new_value=nil )
if new_value
@timeout_action = new_value.to_sym
end
return @timeout_action
end
Alter the work model between oneshot or longlived.
# File lib/symphony/task.rb, line 219
def self::work_model( new_setting=nil )
if new_setting
new_setting = new_setting.to_sym
unless WORK_MODELS.include?( new_setting )
raise "Unknown work_model %p (must be one of: %s)" %
[ new_setting, WORK_MODELS.join(', ') ]
end
self.log.info "Setting task work model to: %p." % [ new_setting ]
@work_model = new_setting
end
return @work_model
end
Check to see if the last run was more than idle_timeout seconds ago, and cancelling the task's consumer if so.
# File lib/symphony/task.rb, line 416
def check_for_idle_timeout
# If it's unset, it means it's running now
return unless self.last_worked && self.exit_on_idle?
seconds_idle = Time.now - self.last_worked
self.log.debug "%p: idle %0.2fs" % [ self.class, seconds_idle ]
if seconds_idle > self.class.idle_timeout
self.log.debug "Sending stop signal due to idle timeout"
self.stop_gracefully
end
end
Handle signals; called by the signal handler thread with a signal from the queue.
# File lib/symphony/task.rb, line 439
def handle_signal( sig )
self.log.debug "Handling signal %s" % [ sig ]
case sig
when :TERM
self.on_terminate
when :INT
self.on_interrupt
when :HUP
self.on_hangup
else
self.log.warn "Unhandled signal %s" % [ sig ]
end
end
Return a consumer tag for this task's queue consumer.
# File lib/symphony/task.rb, line 475
def make_consumer_tag
return "%s.%s.%d" % [
self.queue_name,
Socket.gethostname.gsub(/\..*$/, ''),
Process.pid,
]
end
Handle a hangup signal by re-reading the config and restarting.
# File lib/symphony/task.rb, line 517
def on_hangup
self.log.info "Hangup signal."
self.restart
end
Handle a termination or interrupt signal.
# File lib/symphony/task.rb, line 524
def on_terminate
self.log.debug "Signalled to shut down."
if self.shutting_down?
self.stop_immediately
else
self.stop_gracefully
end
end
Do any necessary pre-processing on the raw payload
according to values in the given metadata
.
# File lib/symphony/task.rb, line 456
def preprocess_payload( payload, metadata )
self.log.debug "Got a %0.2fK %s payload" %
[ payload.bytesize / 1024.0, metadata[:content_type] ]
work_payload = case metadata[:content_type]
when 'application/x-msgpack'
MessagePack.unpack( payload )
when 'application/json', 'text/javascript'
JSON.parse( payload )
when 'application/x-yaml', 'text/x-yaml'
YAML.load( payload )
else
payload
end
return work_payload
end
Return a string for setting the proc title
# File lib/symphony/task.rb, line 505
def procname
return "%s %s: Symphony: %p (%s) -> %s" % [
RUBY_ENGINE,
RUBY_VERSION,
self.class,
self.class.work_model,
self.class.queue_name
]
end
Restart the task after reloading the config.
# File lib/symphony/task.rb, line 338
def restart
self.restarting = true
self.log.warn "Restarting..."
if Symphony.config.reload
self.log.info " config reloaded"
else
self.log.info " no config changes"
end
self.log.info " resetting queue"
Symphony::Queue.reset
self.queue.shutdown
end
Set up the task and start handling messages.
# File lib/symphony/task.rb, line 315
def start
rval = nil
Process.setproctitle( self.procname )
begin
self.restarting = false
rval = self.with_signal_handler( *SIGNALS ) do
self.start_handling_messages
end
end while self.restarting?
return rval ? 0 : 1
rescue Exception => err
self.log.fatal "%p in %p: %s: %s" % [ err.class, self.class, err.message, err.backtrace.first ]
self.log.debug { ' ' + err.backtrace.join(" \n") }
return :software
end
Start consuming messages from the queue, calling work
for each one.
# File lib/symphony/task.rb, line 372
def start_handling_messages
oneshot = self.class.work_model == :oneshot
rval = nil
self.queue.wait_for_message( oneshot ) do |payload, metadata|
begin
self.last_worked = nil
work_payload = self.preprocess_payload( payload, metadata )
rval = if self.class.timeout
self.work_with_timeout( work_payload, metadata )
else
self.work( work_payload, metadata )
end
ensure
self.last_worked = Time.now
end
rval
end
return rval
end
Start the thread that will deliver signals once they're put on the queue, and check for the last time a job was handled by this task process.
# File lib/symphony/task.rb, line 399
def start_signal_handler
@signal_handler = Thread.new do
Thread.current.abort_on_exception = true
loop do
# self.log.debug "Signal handler: waiting for new signals in the queue."
self.wait_for_signals( IDLE_CHECK_INTERVAL )
self.check_for_idle_timeout
end
end
rescue => err
self.log.fatal "Signal handler thread crashed: %p: %s" % [ err.class, err.message ]
self.stop_immediately
end
Set the task to stop after what it's doing is completed.
# File lib/symphony/task.rb, line 364
def stop_gracefully
self.log.warn "Attempting to shut down gracefully."
self.shutting_down = true
self.queue.shutdown
end
Stop the task immediately, e.g., when sent a second TERM signal.
# File lib/symphony/task.rb, line 355
def stop_immediately
self.log.warn "Already in shutdown -- halting immediately."
self.shutting_down = true
self.ignore_signals( *SIGNALS )
self.queue.halt
end
Stop the signal handler thread.
# File lib/symphony/task.rb, line 432
def stop_signal_handler
@signal_handler.exit if @signal_handler
end
Do work based on the given message payload
and metadata
.
# File lib/symphony/task.rb, line 485
def work( payload, metadata )
raise NotImplementedError,
"%p doesn't implement required method #work" % [ self.class ]
end
Wrap a timeout around the call to work, and handle timeouts according to the configured timeout_action.
# File lib/symphony/task.rb, line 493
def work_with_timeout( payload, metadata )
Timeout.timeout( self.class.timeout ) do
return self.work( payload, metadata )
end
rescue Timeout::Error
self.log.error "Timed out while performing work"
raise if self.class.timeout_action == :reject
return false
end