# File lib/nats/client.rb, line 539
  def drain(&blk)
    return if draining? or closing?
    @draining = true

    # Remove interest in all subjects to stop receiving messages.
    @subs.each do |sid, _|
      send_command("UNSUB #{sid} #{CR_LF}")
    end

    # Roundtrip to ensure no more messages are received.
    flush do
      drain_timeout_timer, draining_timer = nil, nil
      drain_timeout_timer = EM.add_timer(options[:drain_timeout]) do
        EM.cancel_timer(draining_timer)

        # Report the timeout via the error callback and just close
        err_cb.call(NATS::ClientError.new("Drain Timeout"))
        @draining = false
        close unless closing?
        blk.call if blk
      end

      # Periodically check for the pending data to be empty.
      draining_timer = EM.add_periodic_timer(0.1) do
        next unless closing? or @buf.nil? or @buf.empty?

        # Subscriptions have been drained already so disallow publishing.
        @drained_subs = true
        next unless pending_data_size == 0
        EM.cancel_timer(draining_timer)
        EM.cancel_timer(drain_timeout_timer)

        # We're done draining and can close now.
        @draining = false
        close unless closing?
        blk.call if blk
      end
    end
  end