def drain(&blk)
return if draining? or closing?
@draining = true
@subs.each do |sid, _|
send_command("UNSUB #{sid} #{CR_LF}")
end
flush do
drain_timeout_timer, draining_timer = nil, nil
drain_timeout_timer = EM.add_timer(options[:drain_timeout]) do
EM.cancel_timer(draining_timer)
err_cb.call(NATS::ClientError.new("Drain Timeout"))
@draining = false
close unless closing?
blk.call if blk
end
draining_timer = EM.add_periodic_timer(0.1) do
next unless closing? or @buf.nil? or @buf.empty?
@drained_subs = true
next unless pending_data_size == 0
EM.cancel_timer(draining_timer)
EM.cancel_timer(drain_timeout_timer)
@draining = false
close unless closing?
blk.call if blk
end
end
end