/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network;

import java.io.File;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyShuffleServiceFactory
implements ShuffleServiceFactory<NettyShuffleDescriptor, ResultPartition, SingleInputGate> {
    private static final Logger LOG = LoggerFactory.getLogger(NettyShuffleServiceFactory.class);
    private static final String DIR_NAME_PREFIX = "netty-shuffle";

    public NettyShuffleMaster createShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
        return new NettyShuffleMaster(shuffleMasterContext.getConfiguration());
    }

    public NettyShuffleEnvironment createShuffleEnvironment(ShuffleEnvironmentContext shuffleEnvironmentContext) {
        Preconditions.checkNotNull(shuffleEnvironmentContext);
        NettyShuffleEnvironmentConfiguration networkConfig = NettyShuffleEnvironmentConfiguration.fromConfiguration(shuffleEnvironmentContext.getConfiguration(), shuffleEnvironmentContext.getNetworkMemorySize(), shuffleEnvironmentContext.isLocalCommunicationOnly(), shuffleEnvironmentContext.getHostAddress());
        return NettyShuffleServiceFactory.createNettyShuffleEnvironment(networkConfig, shuffleEnvironmentContext.getTaskExecutorResourceId(), shuffleEnvironmentContext.getEventPublisher(), shuffleEnvironmentContext.getParentMetricGroup(), shuffleEnvironmentContext.getIoExecutor());
    }

    @VisibleForTesting
    static NettyShuffleEnvironment createNettyShuffleEnvironment(NettyShuffleEnvironmentConfiguration config, ResourceID taskExecutorResourceId, TaskEventPublisher taskEventPublisher, MetricGroup metricGroup, Executor ioExecutor) {
        return NettyShuffleServiceFactory.createNettyShuffleEnvironment(config, taskExecutorResourceId, taskEventPublisher, new ResultPartitionManager(), metricGroup, ioExecutor);
    }

    @VisibleForTesting
    static NettyShuffleEnvironment createNettyShuffleEnvironment(NettyShuffleEnvironmentConfiguration config, ResourceID taskExecutorResourceId, TaskEventPublisher taskEventPublisher, ResultPartitionManager resultPartitionManager, MetricGroup metricGroup, Executor ioExecutor) {
        NettyConfig nettyConfig = config.nettyConfig();
        ConnectionManager connectionManager = nettyConfig != null ? new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig, config.getMaxNumberOfConnections(), config.isConnectionReuseEnabled()) : new LocalConnectionManager();
        return NettyShuffleServiceFactory.createNettyShuffleEnvironment(config, taskExecutorResourceId, taskEventPublisher, resultPartitionManager, connectionManager, metricGroup, ioExecutor);
    }

    @VisibleForTesting
    public static NettyShuffleEnvironment createNettyShuffleEnvironment(NettyShuffleEnvironmentConfiguration config, ResourceID taskExecutorResourceId, TaskEventPublisher taskEventPublisher, ResultPartitionManager resultPartitionManager, ConnectionManager connectionManager, MetricGroup metricGroup, Executor ioExecutor) {
        Preconditions.checkNotNull(config);
        Preconditions.checkNotNull(taskExecutorResourceId);
        Preconditions.checkNotNull(taskEventPublisher);
        Preconditions.checkNotNull(resultPartitionManager);
        Preconditions.checkNotNull(metricGroup);
        Preconditions.checkNotNull(connectionManager);
        FileChannelManagerImpl fileChannelManager = new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
        if (LOG.isInfoEnabled()) {
            LOG.info("Created a new {} for storing result partitions of BLOCKING shuffles. Used directories:\n\t{}", (Object)FileChannelManager.class.getSimpleName(), (Object)Arrays.stream(fileChannelManager.getPaths()).map(File::getAbsolutePath).collect(Collectors.joining("\n\t")));
        }
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize(), config.getRequestSegmentsTimeout());
        BatchShuffleReadBufferPool batchShuffleReadBufferPool = new BatchShuffleReadBufferPool(config.batchShuffleReadMemoryBytes(), config.networkBufferSize());
        ExecutorService batchShuffleReadIOExecutor = Executors.newFixedThreadPool(Math.max(1, Math.min(batchShuffleReadBufferPool.getMaxConcurrentRequests(), 4 * Hardware.getNumberCPUCores())), new ExecutorThreadFactory("blocking-shuffle-io"));
        NettyShuffleMetricFactory.registerShuffleMetrics(metricGroup, networkBufferPool);
        ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(resultPartitionManager, fileChannelManager, networkBufferPool, batchShuffleReadBufferPool, batchShuffleReadIOExecutor, config.getBlockingSubpartitionType(), config.networkBuffersPerChannel(), config.floatingNetworkBuffersPerGate(), config.networkBufferSize(), config.isBlockingShuffleCompressionEnabled(), config.getCompressionCodec(), config.getMaxBuffersPerChannel(), config.sortShuffleMinBuffers(), config.sortShuffleMinParallelism(), config.isSSLEnabled());
        SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(taskExecutorResourceId, config, connectionManager, resultPartitionManager, taskEventPublisher, networkBufferPool);
        return new NettyShuffleEnvironment(taskExecutorResourceId, config, networkBufferPool, connectionManager, resultPartitionManager, fileChannelManager, resultPartitionFactory, singleInputGateFactory, ioExecutor, batchShuffleReadBufferPool, batchShuffleReadIOExecutor);
    }
}

