TaskGroup

class
Superclass
Object

A group for managing groups of tasks.

Attributes

last_child_started[R]

The Time that the last child started

max_workers[RW]

The maximum number of workers the group will keep running

task_class[R]

The Class of the task the worker runs

workers[R]

The set of worker PIDs

Public Class Methods

anchor
new( task_class, max_workers )

Set up a new task group.

# File lib/symphony/task_group.rb, line 23
def initialize( task_class, max_workers )
        @task_class         = task_class
        @max_workers        = max_workers
        @workers            = Set.new
        @last_child_started = Time.now
        @throttle           = 0
        @queue              = nil
end

Public Instance Methods

anchor
adjust_throttle( adjustment=1 )

Add adjustment to the throttle value, ensuring that it doesn't go below zero.

# File lib/symphony/task_group.rb, line 134
def adjust_throttle( adjustment=1 )
        self.log.debug "Adjusting worker throttle by %d" % [ adjustment ]
        @throttle += adjustment
        @throttle = 0 if @throttle < 0
        @throttle = Symphony.throttle_max if @throttle > Symphony.throttle_max
end
anchor
adjust_workers()

Scale workers up or down based on the task group's work model. This method needs to return an array of pids that were started, otherwise nil.

# File lib/symphony/task_group.rb, line 96
def adjust_workers
        raise NotImplementedError, "%p needs to provide an implementation of #adjust_workers" % [
                self.class
        ]
end
anchor
on_child_exit( pid, status )

Handle the exit of the child with the specified pid. The status is the Process::Status returned by waitpid.

# File lib/symphony/task_group.rb, line 105
def on_child_exit( pid, status )
        self.workers.delete( pid )
        self.adjust_throttle( status.success? ? -1 : 1 )
end
anchor
restart_workers()

Send a SIGHUP to all the group's workers.

# File lib/symphony/task_group.rb, line 88
def restart_workers
        self.signal_processes( :HUP, *self.workers )
end
anchor
start_worker( exit_on_idle=false )

Start a new Symphony::Task and return its PID.

# File lib/symphony/task_group.rb, line 55
def start_worker( exit_on_idle=false )
        self.log.debug "Starting a %p." % [ task_class ]
        task_class.before_fork

        pid = Process.fork do
                task_class.after_fork
                task_class.run( exit_on_idle )
        end or raise "No PID from forked %p worker?" % [ task_class ]

        Process.setpgid( pid, 0 )

        self.log.info "Adding %p worker %p" % [ task_class, pid ]
        self.workers << pid
        @last_child_started = Time.now

        return pid
end
anchor
stop_all_workers()

Stop all of the task group's workers.

# File lib/symphony/task_group.rb, line 82
def stop_all_workers
        self.signal_processes( :TERM, *self.workers )
end
anchor
stop_worker()

Stop a worker from the task group.

# File lib/symphony/task_group.rb, line 75
def stop_worker
        pid = self.workers.first
        self.signal_processes( :TERM, pid )
end
anchor
throttle_seconds()

Return the number of seconds between child startup times.

# File lib/symphony/task_group.rb, line 126
def throttle_seconds
        return 0 unless @throttle.nonzero?
        return Math.log( @throttle ) * Symphony.throttle_factor
end
anchor
throttled?()

Returns true if the group of tasks is throttled (i.e., should wait to start any more children).

# File lib/symphony/task_group.rb, line 113
def throttled?
        # Return unless the throttle period has lapsed
        unless self.throttle_seconds < (Time.now - self.last_child_started)
                self.log.warn "Not starting children: throttled for %0.2f seconds" %
                        [ self.throttle_seconds ]
                return true
        end

        return false
end

Protected Instance Methods

anchor
signal_processes( signal, *pids )

Send the specified signal to the process associated with pid, handling harmless errors.

# File lib/symphony/task_group.rb, line 148
def signal_processes( signal, *pids )
        self.log.debug "Signalling processes: %p" % [ pids ]

        # Do them one at a time, as Process.kill will abort on the first error if you
        # pass more than one pid.
        pids.each do |pid|
                begin
                        Process.kill( signal, pid )
                rescue Errno::ESRCH => err
                        self.log.error "%p when trying to %s worker %d: %s" %
                                [ err.class, signal, pid, err.message ]
                end
        end
end