class Servolux::Prefork

Synopsis

The Prefork class provides a pre-forking worker pool for executing tasks in parallel using multiple processes.

Details

A pre-forking worker pool is a technique for executing code in parallel in a UNIX environment. Each worker in the pool forks a child process and then executes user supplied code in that child process. The child process can pull jobs from a queue (beanstalkd for example) or listen on a socket for network requests.

The code to execute in the child processes is passed as a block to the Prefork initialize method. The child processes executes this code in a loop; that is, your code block should not worry about keeping itself alive. This is handled by the library.

If your code raises an exception, it will be captured by the library code and marshalled back to the parent process. This will halt the child process. The Prefork worker pool does not restart dead workers. A method is provided to iterate over workers that have errors, and it is up to the user to handle errors as they please.

Instead of passing a block to the initialize method, you can provide a Ruby module that defines an “execute” method. This method will be executed in the child process’ run loop. When using a module, you also have the option of defining a “before_executing” method and an “after_executing” method. These methods will be called before the child starts the execute loop and after the execute loop finishes. Each method will be called exactly once. Both methods are optional.

Sending a SIGHUP to a child process will cause that child to stop and restart. The child will send a signal to the parent asking to be shutdown. The parent will gracefully halt the child and then start a new child process to replace it. If you define a “hup” method in your worker module, it will be executed when SIGHUP is received by the child. Your “hup” method will be the last method executed in the signal handler.

This has the advantage of calling your before/after_executing methods again and reloading any code or resources your worker code will use. The SIGHUP will call Thread#wakeup on the main child process thread; please write your code to respond accordingly to this wakeup call (a thread waiting on a Queue#pop will not return when wakeup is called on the thread).

Examples

A pre-forking echo server: github.com/TwP/servolux/blob/master/examples/echo.rb

Pulling jobs from a beanstalkd work queue: github.com/TwP/servolux/blob/master/examples/beanstalk.rb

Before / After Executing

In this example, we are creating 42 worker processes that will log the process ID and the current time to a file. Each worker will do this every 2 seconds. The before/after_executing methods are used to open the file before the run loop starts and to close the file after the run loop completes. The execute method uses the stored file descriptor when logging the message.

module RunMe
  def before_executing
    @fd = File.open("#{Process.pid}.txt", 'w')
  end

  def after_executing
    @fd.close
  end

  def execute
    @fd.puts "Process #{Process.pid} @ #{Time.now}"
    sleep 2
  end
end

pool = Servolux::Prefork.new(:module => RunMe)
pool.start 42

Heartbeat

When a :timeout is supplied to the constructor, a “heartbeat” is setup between the parent and the child worker. Each loop through the child’s execute code must return before :timeout seconds have elapsed. If one iteration through the loop takes longer than :timeout seconds, then the parent process will halt the child worker. An error will be raised in the parent process.

pool = Servolux::Prefork.new(:timeout => 2) {
  puts "Process #{Process.pid} is running."
  sleep(rand * 5)
}
pool.start 42

Eventually all 42 child processes will be killed by their parents. The random number generator will eventually cause the child to sleep longer than two seconds.

What is happening here is that each time the child processes executes the block of code, the Servolux library code will send a “heartbeat” message to the parent. The parent is using a Kernel#select call on the communications pipe to wait for this message. The timeout is passed to the select call, and this will cause it to return nil – this is the error condition the heartbeat prevents.

Use the heartbeat with caution – allow margins for timing issues and processor load spikes.

Signals

Forked child processes are configured to respond to two signals: SIGHUP and SIGTERM. The SIGHUP signal when sent to a child process is used to restart just that one child. The SIGTERM signal when sent to a child process is used to forcibly kill the child; it will not be restarted. The parent process uses SIGTERM to halt all the children when it is stopping.

SIGHUP Child processes are restarted by sending a SIGHUP signal to the child. This will shutdown the child worker and then start up a new one to replace it. For the child to shutdown gracefully, it needs to return from the “execute” method when it receives the signal. Define a “hup” method that will wake the execute thread from any pending operations – listening on a socket, reading a file, polling a queue, etc. When the execute method returns, the child will exit.

SIGTERM Child processes are stopped by the prefork parent by sending a SIGTERM signal to the child. For the child to shutdown gracefully, it needs to return from the “execute” method when it receives the signal. Define a “term” method that will wake the execute thread from any pending operations – listening on a socket, reading a file, polling a queue, etc. When the execute method returns, the child will exit.

Constants

CommunicationError
UnknownResponse
UnknownSignal

Attributes

config[RW]
max_workers[RW]
min_workers[RW]
timeout[RW]

Public Class Methods

new { block } click to toggle source
new( :module → Module )

Create a new pre-forking worker pool. You must provide a block of code for the workers to execute in their child processes. This code block can be passed either as a block to this method or as a module via the :module option.

If a :timeout is given, then each worker will setup a “heartbeat” between the parent process and the child process. If the child does not respond to the parent within :timeout seconds, then the child process will be halted. If you do not want to use the heartbeat then leave the :timeout unset or manually set it to nil.

Additionally, :#min_workers and :#max_workers options are avilable. If :#min_workers is given, the method ensure_worker_pool_size will guarantee that at least :#min_workers are up and running. If :#max_workers is given, then add_workers will NOT allow ou to spawn more workers than :max_workers.

The pre-forking worker pool makes no effort to restart dead workers. It is left to the user to implement this functionality.

# File lib/servolux/prefork.rb, line 170
def initialize( opts = {}, &block )
  @timeout = opts.fetch(:timeout, nil)
  @module = opts.fetch(:module, nil)
  @max_workers = opts.fetch(:max_workers, nil)
  @min_workers = opts.fetch(:min_workers, nil)
  @config = opts.fetch(:config, {})
  @module = Module.new { define_method :execute, &block } if block
  @workers = []

  raise ArgumentError, 'No code was given to execute by the workers.' unless @module
end

Public Instance Methods

add_workers( number = 1 ) click to toggle source

Adds additional workers to the pool. It will not add more workers than the number set in :#max_workers

# File lib/servolux/prefork.rb, line 247
def add_workers( number = 1 )
  number.times do
    break if at_max_workers?
    worker = Worker.new(self, @config)
    worker.extend @module
    worker.start
    @workers << worker
    pause
  end
end
at_max_workers? click to toggle source

Return true or false if we are currently at or above the maximum number of workers allowed.

# File lib/servolux/prefork.rb, line 300
def at_max_workers?
  return false unless @max_workers
  return @workers.size >= @max_workers
end
below_minimum_workers? click to toggle source

Report if the number of workers is below the minimum threshold

# File lib/servolux/prefork.rb, line 289
def below_minimum_workers?
  return false unless @min_workers
  return @workers.size < @min_workers
end
dead_worker_count → Integer click to toggle source

Returns the number of dead workers in the pool

# File lib/servolux/prefork.rb, line 341
def dead_worker_count
  worker_counts[:dead]
end
each_worker { |worker| block } click to toggle source

Iterates over all the workers and yields each, in turn, to the given block.

# File lib/servolux/prefork.rb, line 236
def each_worker( &block )
  @workers.each(&block)
  self
end
ensure_worker_pool_size() click to toggle source

Make sure that the worker pool has >= the minimum number of workers and less than the maximum number of workers.

Generally, this means prune the number of workers and then spawn workers up to the min_worker level. If min is not set, then we only prune

# File lib/servolux/prefork.rb, line 277
def ensure_worker_pool_size
  prune_workers
  while below_minimum_workers? do
    add_workers
  end
end
errors { |worker| block } click to toggle source

Iterates over all the workers and yields the worker to the given block only if the worker has an error condition.

# File lib/servolux/prefork.rb, line 311
def errors
  @workers.each { |worker| yield worker unless worker.error.nil? }
  self
end
kill( signal = 'TERM' ) click to toggle source
Alias for: signal
live_worker_count → Integer click to toggle source

Returns the number of live workers in the pool

# File lib/servolux/prefork.rb, line 333
def live_worker_count
  worker_counts[:alive]
end
prune_workers() click to toggle source

Remove workers that are no longer alive from the worker pool

# File lib/servolux/prefork.rb, line 263
def prune_workers
  new_workers = @workers.find_all { |w| w.reap.alive? }
  @workers = new_workers
end
reap() click to toggle source

This method should be called periodically in order to clear the return status from child processes that have either died or been restarted (via a HUP signal). This will remove zombie children from the process table.

@return [Prefork] self

# File lib/servolux/prefork.rb, line 211
def reap
  @workers.each { |worker| worker.reap }
  self
end
signal( signal = 'TERM' ) click to toggle source

Send this given signal to all child process. The default signal is ‘TERM’. The method waits for a short period of time after the signal is sent to each child; this is done to alleviate a flood of signals being sent simultaneously and overwhelming the CPU.

@param [String, Integer] signal The signal to send to child processes. @return [Prefork] self

# File lib/servolux/prefork.rb, line 224
def signal( signal = 'TERM' )
  @workers.each { |worker| worker.signal(signal); pause }
  self
end
Also aliased as: kill
start( number ) click to toggle source

Start up the given number of workers. Each worker will create a child process and run the user supplied code in that child process.

@param [Integer] number The number of workers to prefork @return [Prefork] self

# File lib/servolux/prefork.rb, line 188
def start( number )
  @workers.clear
  add_workers( number )
 self
end
stop() click to toggle source

Stop all workers. The current process will wait for each child process to exit before this method will return. The worker instances are not destroyed by this method; this means that the each_worker and the errors methods will still function correctly after stopping the workers.

# File lib/servolux/prefork.rb, line 199
def stop
  @workers.each { |worker| worker.stop; pause }
  reap
  self
end
worker_counts -> { :alive => 2, :dead → 1 } click to toggle source

Returns a hash containing the counts of alive and dead workers

# File lib/servolux/prefork.rb, line 320
def worker_counts
  counts = { :alive => 0, :dead => 0 }
  each_worker do |worker|
    state = worker.alive? ? :alive : :dead
    counts[state] += 1
  end
  return counts
end