A group for managing groups of tasks.
The Time that the last child started
The maximum number of workers the group will keep running
The Class of the task the worker runs
The set of worker PIDs
Set up a new task group.
# File lib/symphony/task_group.rb, line 23 def initialize( task_class, max_workers ) @task_class = task_class @max_workers = max_workers @workers = Set.new @last_child_started = Time.now @throttle = 0 @queue = nil end
adjustment to the throttle value, ensuring that it doesn't go below zero.
# File lib/symphony/task_group.rb, line 134 def adjust_throttle( adjustment=1 ) self.log.debug "Adjusting worker throttle by %d" % [ adjustment ] @throttle += adjustment @throttle = 0 if @throttle < 0 @throttle = Symphony.throttle_max if @throttle > Symphony.throttle_max end
Scale workers up or down based on the task group's work model. This method needs to return an array of pids that were started, otherwise nil.
# File lib/symphony/task_group.rb, line 96 def adjust_workers raise NotImplementedError, "%p needs to provide an implementation of #adjust_workers" % [ self.class ] end
Handle the exit of the child with the specified
status is the Process::Status returned by waitpid.
# File lib/symphony/task_group.rb, line 105 def on_child_exit( pid, status ) self.workers.delete( pid ) self.adjust_throttle( status.success? ? -1 : 1 ) end
Send a SIGHUP to all the group's workers.
# File lib/symphony/task_group.rb, line 88 def restart_workers self.signal_processes( :HUP, *self.workers ) end
Start a new
Symphony::Task and return its PID.
# File lib/symphony/task_group.rb, line 55 def start_worker( exit_on_idle=false ) self.log.debug "Starting a %p." % [ task_class ] task_class.before_fork pid = Process.fork do task_class.after_fork task_class.run( exit_on_idle ) end or raise "No PID from forked %p worker?" % [ task_class ] Process.setpgid( pid, 0 ) self.log.info "Adding %p worker %p" % [ task_class, pid ] self.workers << pid @last_child_started = Time.now return pid end
Stop all of the task group's workers.
# File lib/symphony/task_group.rb, line 82 def stop_all_workers self.signal_processes( :TERM, *self.workers ) end
Stop a worker from the task group.
# File lib/symphony/task_group.rb, line 75 def stop_worker pid = self.workers.first self.signal_processes( :TERM, pid ) end
Return the number of seconds between child startup times.
# File lib/symphony/task_group.rb, line 126 def throttle_seconds return 0 unless @throttle.nonzero? return Math.log( @throttle ) * Symphony.throttle_factor end
true if the group of tasks is throttled (i.e., should wait to start any more children).
# File lib/symphony/task_group.rb, line 113 def throttled? # Return unless the throttle period has lapsed unless self.throttle_seconds < (Time.now - self.last_child_started) self.log.warn "Not starting children: throttled for %0.2f seconds" % [ self.throttle_seconds ] return true end return false end
Send the specified
signal to the process associated with
pid, handling harmless errors.
# File lib/symphony/task_group.rb, line 148 def signal_processes( signal, *pids ) self.log.debug "Signalling processes: %p" % [ pids ] # Do them one at a time, as Process.kill will abort on the first error if you # pass more than one pid. pids.each do |pid| begin Process.kill( signal, pid ) rescue Errno::ESRCH => err self.log.error "%p when trying to %s worker %d: %s" % [ err.class, signal, pid, err.message ] end end end