A task group for the 'longlived' work model.
Create a LongLived
task group for the specified task_class
that will run a maximum of max_workers
.
# File lib/symphony/task_group/longlived.rb, line 15
def initialize( task_class, max_workers )
super
@queue = nil
end
If the number of workers is not at the maximum, start some.
# File lib/symphony/task_group/longlived.rb, line 26
def adjust_workers
self.sample_queue_status
return nil if self.throttled?
if self.needs_a_worker?
self.log.info "Too few workers for (%s); spinning one up." % [ self.task_class.name ]
pid = self.start_worker( !self.workers.empty? )
return [ pid ]
end
return nil
rescue Timeout::Error => err
self.log.warn "%p while adjusting workers: %s" % [ err.class, err.message ]
return nil
end
Get a queue for counting the number of messages in the queue for this worker.
# File lib/symphony/task_group/longlived.rb, line 84
def get_message_counting_queue
@queue ||= begin
self.log.debug "Creating the message-counting queue."
channel = Symphony::Queue.amqp_channel
channel.queue( self.task_class.queue_name, passive: true, prefetch: 0 )
end
unless @queue.channel.open?
self.log.info "Message-counting queue's channel was closed: resetting."
Symphony::Queue.reset
@queue = nil
end
return @queue
rescue Bunny::NotFound, Bunny::ChannelAlreadyClosed
self.log.info "Child hasn't created the queue yet; deferring"
Symphony::Queue.reset
return nil
end
Return true
if the task group should scale up by one.
# File lib/symphony/task_group/longlived.rb, line 45
def needs_a_worker?
return true if self.workers.empty?
queue = self.get_message_counting_queue or return false
# Calculate the number of workers across the whole broker
if ( cc = queue.consumer_count ) >= self.max_workers
self.log.debug "%p: Already at max workers (%d)" % [ self.task_class, self.max_workers ]
return false
else
self.log.debug "%p: Not yet at max workers (have %d)" % [ self.task_class, cc ]
end
self.log.debug "Mean jobcount is %0.2f" % [ self.mean_jobcount ]
return self.mean_jobcount > 1 && !self.sample_values_decreasing?
end
Add the current number of workers to the samples.
# File lib/symphony/task_group/longlived.rb, line 63
def sample_queue_status
return if self.workers.empty?
queue = self.get_message_counting_queue or return
count = queue.message_count
self.add_sample( count )
end
Overridden to grab a Bunny::Queue for monitoring when the first worker starts.
# File lib/symphony/task_group/longlived.rb, line 74
def start_worker( exit_on_idle=false )
pid = super
self.log.info "Start a new worker at pid %d" % [ pid ]
return pid
end