A group for managing groups of tasks.
The Time that the last child started
The maximum number of workers the group will keep running
The Class of the task the worker runs
The set of worker PIDs
Set up a new task group.
# File lib/symphony/task_group.rb, line 23
def initialize( task_class, max_workers )
@task_class = task_class
@max_workers = max_workers
@workers = Set.new
@last_child_started = Time.now
@throttle = 0
@queue = nil
end
Add adjustment
to the throttle value, ensuring that it doesn't go below zero.
# File lib/symphony/task_group.rb, line 134
def adjust_throttle( adjustment=1 )
self.log.debug "Adjusting worker throttle by %d" % [ adjustment ]
@throttle += adjustment
@throttle = 0 if @throttle < 0
@throttle = Symphony.throttle_max if @throttle > Symphony.throttle_max
end
Scale workers up or down based on the task group's work model. This method needs to return an array of pids that were started, otherwise nil.
# File lib/symphony/task_group.rb, line 96
def adjust_workers
raise NotImplementedError, "%p needs to provide an implementation of #adjust_workers" % [
self.class
]
end
Handle the exit of the child with the specified pid
. The status
is the Process::Status returned by waitpid.
# File lib/symphony/task_group.rb, line 105
def on_child_exit( pid, status )
self.workers.delete( pid )
self.adjust_throttle( status.success? ? -1 : 1 )
end
Send a SIGHUP to all the group's workers.
# File lib/symphony/task_group.rb, line 88
def restart_workers
self.signal_processes( :HUP, *self.workers )
end
Start a new Symphony::Task
and return its PID.
# File lib/symphony/task_group.rb, line 55
def start_worker( exit_on_idle=false )
self.log.debug "Starting a %p." % [ task_class ]
task_class.before_fork
pid = Process.fork do
task_class.after_fork
task_class.run( exit_on_idle )
end or raise "No PID from forked %p worker?" % [ task_class ]
Process.setpgid( pid, 0 )
self.log.info "Adding %p worker %p" % [ task_class, pid ]
self.workers << pid
@last_child_started = Time.now
return pid
end
Stop all of the task group's workers.
# File lib/symphony/task_group.rb, line 82
def stop_all_workers
self.signal_processes( :TERM, *self.workers )
end
Stop a worker from the task group.
# File lib/symphony/task_group.rb, line 75
def stop_worker
pid = self.workers.first
self.signal_processes( :TERM, pid )
end
Return the number of seconds between child startup times.
# File lib/symphony/task_group.rb, line 126
def throttle_seconds
return 0 unless @throttle.nonzero?
return Math.log( @throttle ) * Symphony.throttle_factor
end
Returns true
if the group of tasks is throttled (i.e., should wait to start any more children).
# File lib/symphony/task_group.rb, line 113
def throttled?
# Return unless the throttle period has lapsed
unless self.throttle_seconds < (Time.now - self.last_child_started)
self.log.warn "Not starting children: throttled for %0.2f seconds" %
[ self.throttle_seconds ]
return true
end
return false
end
Send the specified signal
to the process associated with pid
, handling harmless errors.
# File lib/symphony/task_group.rb, line 148
def signal_processes( signal, *pids )
self.log.debug "Signalling processes: %p" % [ pids ]
# Do them one at a time, as Process.kill will abort on the first error if you
# pass more than one pid.
pids.each do |pid|
begin
Process.kill( signal, pid )
rescue Errno::ESRCH => err
self.log.error "%p when trying to %s worker %d: %s" %
[ err.class, signal, pid, err.message ]
end
end
end