LongLived

class
Superclass
Symphony::TaskGroup
Included Modules
Symphony::Statistics

A task group for the 'longlived' work model.

Public Class Methods

anchor
new( task_class, max_workers )

Create a LongLived task group for the specified task_class that will run a maximum of max_workers.

# File lib/symphony/task_group/longlived.rb, line 15
def initialize( task_class, max_workers )
        super
        @queue = nil
end

Public Instance Methods

anchor
adjust_workers()

If the number of workers is not at the maximum, start some.

# File lib/symphony/task_group/longlived.rb, line 26
def adjust_workers
        self.sample_queue_status

        return nil if self.throttled?

        if self.needs_a_worker?
                self.log.info "Too few workers for (%s); spinning one up." % [ self.task_class.name ]
                pid = self.start_worker( !self.workers.empty? )
                return [ pid ]
        end

        return nil
rescue Timeout::Error => err
        self.log.warn "%p while adjusting workers: %s" % [ err.class, err.message ]
        return nil
end
anchor
get_message_counting_queue()

Get a queue for counting the number of messages in the queue for this worker.

# File lib/symphony/task_group/longlived.rb, line 84
def get_message_counting_queue
        @queue ||= begin
                self.log.debug "Creating the message-counting queue."
                channel = Symphony::Queue.amqp_channel
                channel.queue( self.task_class.queue_name, passive: true, prefetch: 0 )
        end

        unless @queue.channel.open?
                self.log.info "Message-counting queue's channel was closed: resetting."
                Symphony::Queue.reset
                @queue = nil
        end

        return @queue
rescue Bunny::NotFound, Bunny::ChannelAlreadyClosed
        self.log.info "Child hasn't created the queue yet; deferring"
        Symphony::Queue.reset

        return nil
end
anchor
needs_a_worker?()

Return true if the task group should scale up by one.

# File lib/symphony/task_group/longlived.rb, line 45
def needs_a_worker?
        return true if self.workers.empty?
        queue = self.get_message_counting_queue or return false

        # Calculate the number of workers across the whole broker
        if ( cc = queue.consumer_count ) >= self.max_workers
                self.log.debug "%p: Already at max workers (%d)" % [ self.task_class, self.max_workers ]
                return false
        else
                self.log.debug "%p: Not yet at max workers (have %d)" % [ self.task_class, cc ]
        end

        self.log.debug "Mean jobcount is %0.2f" % [ self.mean_jobcount ]
        return self.mean_jobcount > 1 && !self.sample_values_decreasing?
end
anchor
sample_queue_status()

Add the current number of workers to the samples.

# File lib/symphony/task_group/longlived.rb, line 63
def sample_queue_status
        return if self.workers.empty?

        queue = self.get_message_counting_queue or return
        count = queue.message_count
        self.add_sample( count )
end
anchor
start_worker( exit_on_idle=false )

Overridden to grab a Bunny::Queue for monitoring when the first worker starts.

# File lib/symphony/task_group/longlived.rb, line 74
def start_worker( exit_on_idle=false )
        pid = super
        self.log.info "Start a new worker at pid %d" % [ pid ]

        return pid
end