def client
unless @@client
$stderr.puts "You must specify a client"
exit
end
negotiate
Timeout::timeout(5) { @server_object = @ring_server.read([:name, uniq.intern, nil, nil])[2] }
i = 0
loop do
i += 1
GC.start if i%1000 == 0
begin
@server_object.map_reduce?
rescue NoMethodError
@called = @@client.call(@server_object)
end
begin
unless @called
if @server_object.map_reduce? && @server_object.valid?
map_reduce_client = eval("MapReduce::#{@server_object.base_type_to_s}::Client").new(@server_object)
$server_object = @server_object
Object.instance_eval do
define_method(:logger) do |*args|
$server_object._logger(*args)
end
define_method(:server) do
$server_object
end
end
map_reduce_client.each do |object|
t = Time.now
begin
if @@options[:timeout] != false
Timeout::timeout(@@options[:timeout] || 600) do
@@client.call(object)
end
else
@@client.call(object)
end
rescue Timeout::Error
end
@server_object.add_time_spent_processing_objects(Time.now-t)
end
else
raise MapReduceError, "invalid map reduce server (possibly missing type or input)"
end
end
rescue NoMethodError
end
end
rescue Timeout::Error => m
spawn
@retry_count += 1
if @retry_count <= 5
retry
else
raise Timeout::Error, m
end
rescue DRb::DRbConnError => m
stop
negotiate
@retry_count += 1
if @retry_count <= 5
retry
else
raise DRb::DRbConnError, m
end
end