def finalize(reply, connection = nil)
validate_final_message!(reply)
if connection && connection.features.op_msg_enabled?
selector = CLIENT_CONTINUE_MESSAGE.merge(payload: client_empty_message, conversationId: id)
selector[Protocol::Msg::DATABASE_IDENTIFIER] = user.auth_source
cluster_time = connection.mongos? && connection.cluster_time
selector[Operation::CLUSTER_TIME] = cluster_time if cluster_time
Protocol::Msg.new([], {}, selector)
else
Protocol::Query.new(
user.auth_source,
Database::COMMAND,
CLIENT_CONTINUE_MESSAGE.merge(payload: client_empty_message, conversationId: id),
limit: -1
)
end
end