def initialize(seeds, monitoring, options = Options::Redacted.new)
if options[:monitoring_io] != false && !options[:server_selection_semaphore]
raise ArgumentError, 'Need server selection semaphore'
end
@servers = []
@monitoring = monitoring
@event_listeners = Event::Listeners.new
@options = options.freeze
@app_metadata = Server::AppMetadata.new(@options)
@update_lock = Mutex.new
@sdam_flow_lock = Mutex.new
@cluster_time = nil
@cluster_time_lock = Mutex.new
@topology = Topology.initial(self, monitoring, options)
Session::SessionPool.create(self)
opening_topology = Topology::Unknown.new(options, monitoring, self)
publish_sdam_event(
Monitoring::TOPOLOGY_OPENING,
Monitoring::Event::TopologyOpening.new(opening_topology)
)
subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
@seeds = seeds
servers = seeds.map do |seed|
add(seed, monitor: false)
end
if seeds.size >= 1
@topology = topology.class.new(topology.options, topology.monitoring, self)
publish_sdam_event(
Monitoring::TOPOLOGY_CHANGED,
Monitoring::Event::TopologyChanged.new(opening_topology, @topology)
)
end
servers.each do |server|
server.start_monitoring
end
if options[:monitoring_io] == false
return
end
@cursor_reaper = CursorReaper.new
@socket_reaper = SocketReaper.new(self)
@periodic_executor = PeriodicExecutor.new(@cursor_reaper, @socket_reaper)
@periodic_executor.run!
ObjectSpace.define_finalizer(self, self.class.finalize({}, @periodic_executor, @session_pool))
@connecting = false
@connected = true
if options[:scan] != false
server_selection_timeout = options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT
if server_selection_timeout < 3
server_selection_timeout = 3
end
start_time = Time.now
deadline = start_time + server_selection_timeout
loop do
servers = servers_list.dup
if servers.all? { |server| server.description.last_update_time >= start_time }
break
end
if (time_remaining = deadline - Time.now) <= 0
break
end
options[:server_selection_semaphore].wait(time_remaining)
end
end
end