The Prefork class provides a pre-forking worker pool for executing tasks in parallel using multiple processes.
A pre-forking worker pool is a technique for executing code in parallel in a UNIX environment. Each worker in the pool forks a child process and then executes user supplied code in that child process. The child process can pull jobs from a queue (beanstalkd for example) or listen on a socket for network requests.
The code to execute in the child processes is passed as a block to the Prefork initialize method. The child processes executes this code in a loop; that is, your code block should not worry about keeping itself alive. This is handled by the library.
If your code raises an exception, it will be captured by the library code and marshalled back to the parent process. This will halt the child process. The Prefork worker pool does not restart dead workers. A method is provided to iterate over workers that have errors, and it is up to the user to handle errors as they please.
Instead of passing a block to the initialize method, you can provide a Ruby module that defines an “execute” method. This method will be executed in the child process’ run loop. When using a module, you also have the option of defining a “before_executing” method and an “after_executing” method. These methods will be called before the child starts the execute loop and after the execute loop finishes. Each method will be called exactly once. Both methods are optional.
Sending a SIGHUP to a child process will cause that child to stop and restart. The child will send a signal to the parent asking to be shutdown. The parent will gracefully halt the child and then start a new child process to replace it. If you define a “hup” method in your worker module, it will be executed when SIGHUP is received by the child. Your “hup” method will be the last method executed in the signal handler.
This has the advantage of calling your before/after_executing methods again and reloading any code or resources your worker code will use. The SIGHUP will call Thread#wakeup on the main child process thread; please write your code to respond accordingly to this wakeup call (a thread waiting on a Queue#pop will not return when wakeup is called on the thread).
A pre-forking echo server: github.com/TwP/servolux/blob/master/examples/echo.rb
Pulling jobs from a beanstalkd work queue: github.com/TwP/servolux/blob/master/examples/beanstalk.rb
In this example, we are creating 42 worker processes that will log the process ID and the current time to a file. Each worker will do this every 2 seconds. The before/after_executing methods are used to open the file before the run loop starts and to close the file after the run loop completes. The execute method uses the stored file descriptor when logging the message.
module RunMe def before_executing @fd = File.open("#{Process.pid}.txt", 'w') end def after_executing @fd.close end def execute @fd.puts "Process #{Process.pid} @ #{Time.now}" sleep 2 end end pool = Servolux::Prefork.new(:module => RunMe) pool.start 42
When a :timeout is supplied to the constructor, a “heartbeat” is setup between the parent and the child worker. Each loop through the child’s execute code must return before :timeout seconds have elapsed. If one iteration through the loop takes longer than :timeout seconds, then the parent process will halt the child worker. An error will be raised in the parent process.
pool = Servolux::Prefork.new(:timeout => 2) { puts "Process #{Process.pid} is running." sleep(rand * 5) } pool.start 42
Eventually all 42 child processes will be killed by their parents. The random number generator will eventually cause the child to sleep longer than two seconds.
What is happening here is that each time the child processes executes the
block of code, the Servolux library code
will send a “heartbeat” message to the parent. The parent is using a
Kernel#select call on the communications pipe to wait for this message. The
timeout is passed to the select call, and this will cause it to return
nil – this is the error condition the heartbeat prevents.
Use the heartbeat with caution – allow margins for timing issues and processor load spikes.
Forked child processes are configured to respond to two signals: SIGHUP and SIGTERM. The SIGHUP signal when sent to a child process is used to restart just that one child. The SIGTERM signal when sent to a child process is used to forcibly kill the child; it will not be restarted. The parent process uses SIGTERM to halt all the children when it is stopping.
SIGHUP Child processes are restarted by sending a SIGHUP signal to the child. This will shutdown the child worker and then start up a new one to replace it. For the child to shutdown gracefully, it needs to return from the “execute” method when it receives the signal. Define a “hup” method that will wake the execute thread from any pending operations – listening on a socket, reading a file, polling a queue, etc. When the execute method returns, the child will exit.
SIGTERM Child processes are stopped by the prefork parent by sending a SIGTERM signal to the child. For the child to shutdown gracefully, it needs to return from the “execute” method when it receives the signal. Define a “term” method that will wake the execute thread from any pending operations – listening on a socket, reading a file, polling a queue, etc. When the execute method returns, the child will exit.
Adds additional workers to the pool. It will not add more workers than the number set in :#max_workers
# File lib/servolux/prefork.rb, line 247 def add_workers( number = 1 ) number.times do break if at_max_workers? worker = Worker.new(self, @config) worker.extend @module worker.start @workers << worker pause end end
Return true or false if we are currently at or above the maximum number of workers allowed.
# File lib/servolux/prefork.rb, line 300 def at_max_workers? return false unless @max_workers return @workers.size >= @max_workers end
Report if the number of workers is below the minimum threshold
# File lib/servolux/prefork.rb, line 289 def below_minimum_workers? return false unless @min_workers return @workers.size < @min_workers end
Returns the number of dead workers in the pool
# File lib/servolux/prefork.rb, line 341 def dead_worker_count worker_counts[:dead] end
Iterates over all the workers and yields each, in turn, to the given block.
# File lib/servolux/prefork.rb, line 236 def each_worker( &block ) @workers.each(&block) self end
Make sure that the worker pool has >= the minimum number of workers and less than the maximum number of workers.
Generally, this means prune the number of workers and then spawn workers up to the min_worker level. If min is not set, then we only prune
# File lib/servolux/prefork.rb, line 277 def ensure_worker_pool_size prune_workers while below_minimum_workers? do add_workers end end
Iterates over all the workers and yields the worker to the given block only if the worker has an error condition.
# File lib/servolux/prefork.rb, line 311 def errors @workers.each { |worker| yield worker unless worker.error.nil? } self end
Returns the number of live workers in the pool
# File lib/servolux/prefork.rb, line 333 def live_worker_count worker_counts[:alive] end
Remove workers that are no longer alive from the worker pool
# File lib/servolux/prefork.rb, line 263 def prune_workers new_workers = @workers.find_all { |w| w.reap.alive? } @workers = new_workers end
This method should be called periodically in order to clear the return status from child processes that have either died or been restarted (via a HUP signal). This will remove zombie children from the process table.
@return [Prefork] self
# File lib/servolux/prefork.rb, line 211 def reap @workers.each { |worker| worker.reap } self end
Send this given signal to all child process. The default signal is ‘TERM’. The method waits for a short period of time after the signal is sent to each child; this is done to alleviate a flood of signals being sent simultaneously and overwhelming the CPU.
@param [String, Integer] signal The signal to send to child processes. @return [Prefork] self
# File lib/servolux/prefork.rb, line 224 def signal( signal = 'TERM' ) @workers.each { |worker| worker.signal(signal); pause } self end
Start up the given number of workers. Each worker will create a child process and run the user supplied code in that child process.
@param [Integer] number The number of workers to prefork @return [Prefork] self
# File lib/servolux/prefork.rb, line 188 def start( number ) @workers.clear add_workers( number ) self end
Stop all workers. The current process will wait for each child process to
exit before this method will return. The worker instances are not destroyed
by this method; this means that the each_worker and the
errors methods will still function correctly after stopping
the workers.
# File lib/servolux/prefork.rb, line 199 def stop @workers.each { |worker| worker.stop; pause } reap self end
Returns a hash containing the counts of alive and dead workers
# File lib/servolux/prefork.rb, line 320 def worker_counts counts = { :alive => 0, :dead => 0 } each_worker do |worker| state = worker.alive? ? :alive : :dead counts[state] += 1 end return counts end
Create a new pre-forking worker pool. You must provide a block of code for the workers to execute in their child processes. This code block can be passed either as a block to this method or as a module via the :module option.
If a :timeout is given, then each worker will setup a “heartbeat”
between the parent process and the child process. If the child does not
respond to the parent within :timeout seconds, then the child process will
be halted. If you do not want to use the heartbeat then leave the :timeout
unset or manually set it to nil.
Additionally, :#min_workers and :#max_workers options are
avilable. If :#min_workers is given, the
method ensure_worker_pool_size will guarantee that at least
:#min_workers are up and
running. If :#max_workers is given, then
add_workers will NOT allow ou to spawn more workers than
:max_workers.
The pre-forking worker pool makes no effort to restart dead workers. It is left to the user to implement this functionality.
# File lib/servolux/prefork.rb, line 170 def initialize( opts = {}, &block ) @timeout = opts.fetch(:timeout, nil) @module = opts.fetch(:module, nil) @max_workers = opts.fetch(:max_workers, nil) @min_workers = opts.fetch(:min_workers, nil) @config = opts.fetch(:config, {}) @module = Module.new { define_method :execute, &block } if block @workers = [] raise ArgumentError, 'No code was given to execute by the workers.' unless @module end