def connect(uri=nil, opts={}, &blk)
case uri
when String
opts[:uri] = process_uri(uri)
when Hash
opts = uri
end
opts[:verbose] = false if opts[:verbose].nil?
opts[:pedantic] = false if opts[:pedantic].nil?
opts[:reconnect] = true if opts[:reconnect].nil?
opts[:ssl] = false if opts[:ssl].nil?
opts[:max_reconnect_attempts] = MAX_RECONNECT_ATTEMPTS if opts[:max_reconnect_attempts].nil?
opts[:reconnect_time_wait] = RECONNECT_TIME_WAIT if opts[:reconnect_time_wait].nil?
opts[:ping_interval] = DEFAULT_PING_INTERVAL if opts[:ping_interval].nil?
opts[:max_outstanding_pings] = DEFAULT_PING_MAX if opts[:max_outstanding_pings].nil?
opts[:drain_timeout] = DEFAULT_DRAIN_TIMEOUT if opts[:drain_timeout].nil?
opts[:uri] ||= ENV['NATS_URI'] || DEFAULT_URI
opts[:verbose] = ENV['NATS_VERBOSE'].downcase == 'true' unless ENV['NATS_VERBOSE'].nil?
opts[:pedantic] = ENV['NATS_PEDANTIC'].downcase == 'true' unless ENV['NATS_PEDANTIC'].nil?
opts[:debug] = ENV['NATS_DEBUG'].downcase == 'true' unless ENV['NATS_DEBUG'].nil?
opts[:reconnect] = ENV['NATS_RECONNECT'].downcase == 'true' unless ENV['NATS_RECONNECT'].nil?
opts[:fast_producer_error] = ENV['NATS_FAST_PRODUCER'].downcase == 'true' unless ENV['NATS_FAST_PRODUCER'].nil?
opts[:ssl] = ENV['NATS_SSL'].downcase == 'true' unless ENV['NATS_SSL'].nil?
opts[:max_reconnect_attempts] = ENV['NATS_MAX_RECONNECT_ATTEMPTS'].to_i unless ENV['NATS_MAX_RECONNECT_ATTEMPTS'].nil?
opts[:reconnect_time_wait] = ENV['NATS_RECONNECT_TIME_WAIT'].to_i unless ENV['NATS_RECONNECT_TIME_WAIT'].nil?
opts[:name] ||= ENV['NATS_CONNECTION_NAME']
opts[:no_echo] ||= ENV['NATS_NO_ECHO'] || false
opts[:ping_interval] = ENV['NATS_PING_INTERVAL'].to_i unless ENV['NATS_PING_INTERVAL'].nil?
opts[:max_outstanding_pings] = ENV['NATS_MAX_OUTSTANDING_PINGS'].to_i unless ENV['NATS_MAX_OUTSTANDING_PINGS'].nil?
opts[:drain_timeout] ||= ENV['NATS_DRAIN_TIMEOUT'].to_i unless ENV['NATS_DRAIN_TIMEOUT'].nil?
uri = opts[:uris] || opts[:servers] || opts[:uri]
if opts[:tls]
case
when opts[:tls][:ca_file]
if !File.readable?(opts[:tls][:ca_file])
raise(Error, "TLS Verification is enabled but ca_file %s is not readable" % opts[:tls][:ca_file])
end
opts[:tls][:verify_peer] ||= true
when (opts[:tls][:verify_peer] && !opts[:tls][:ca_file])
raise(Error, "TLS Verification is enabled but ca_file is not set")
else
opts[:tls][:verify_peer] = false
end
opts[:tls][:ssl_version] ||= %w(tlsv1 tlsv1_1 tlsv1_2)
opts[:tls][:protocols] ||= %w(tlsv1 tlsv1_1 tlsv1_2)
end
unless uri.nil?
uris = uri.kind_of?(Array) ? uri : [uri]
uris.shuffle! unless opts[:dont_randomize_servers]
u = uris.first
@uri = u.is_a?(URI) ? u.dup : URI.parse(u)
end
@err_cb = proc { |e| raise e } unless err_cb
@close_cb = proc { } unless close_cb
@disconnect_cb = proc { } unless disconnect_cb
client = EM.connect(@uri.host, @uri.port, self, opts)
client.on_connect(&blk) if blk
return client
end