/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchShuffleReadBufferPool {
    private static final Logger LOG = LoggerFactory.getLogger(BatchShuffleReadBufferPool.class);
    private static final int NUM_BYTES_PER_REQUEST = 0x800000;
    private static final Duration WAITING_TIME = Duration.ofSeconds(2L);
    private final long totalBytes;
    private final int numTotalBuffers;
    private final int bufferSize;
    private final int numBuffersPerRequest;
    private final Set<Object> bufferRequesters = ConcurrentHashMap.newKeySet();
    @GuardedBy(value="buffers")
    private final Queue<MemorySegment> buffers = new ArrayDeque<MemorySegment>();
    @GuardedBy(value="buffers")
    private long lastBufferOperationTimestamp = System.nanoTime();
    @GuardedBy(value="buffers")
    private boolean destroyed;
    @GuardedBy(value="buffers")
    private boolean initialized;

    public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
        Preconditions.checkArgument(totalBytes > 0L, "Total memory size must be positive.");
        Preconditions.checkArgument(bufferSize > 0, "Size of buffer must be positive.");
        Preconditions.checkArgument(totalBytes >= (long)bufferSize, String.format("Illegal configuration, config value for '%s' must be no smaller than '%s', please increase '%s' to at least %d bytes.", TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(), TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(), bufferSize));
        this.totalBytes = totalBytes;
        this.bufferSize = bufferSize;
        this.numTotalBuffers = (int)Math.min(totalBytes / (long)bufferSize, Integer.MAX_VALUE);
        this.numBuffersPerRequest = Math.min(this.numTotalBuffers, Math.max(1, 0x800000 / bufferSize));
    }

    @VisibleForTesting
    long getTotalBytes() {
        return this.totalBytes;
    }

    @VisibleForTesting
    public int getNumTotalBuffers() {
        return this.numTotalBuffers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public int getAvailableBuffers() {
        Queue<MemorySegment> queue = this.buffers;
        synchronized (queue) {
            return this.buffers.size();
        }
    }

    public int getNumBuffersPerRequest() {
        return this.numBuffersPerRequest;
    }

    public int getMaxConcurrentRequests() {
        return this.numBuffersPerRequest > 0 ? this.numTotalBuffers / this.numBuffersPerRequest : 0;
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initialize() {
        Queue<MemorySegment> queue = this.buffers;
        synchronized (queue) {
            Preconditions.checkState(!this.destroyed, "Buffer pool is already destroyed.");
            if (this.initialized) {
                return;
            }
            this.initialized = true;
            try {
                for (int i = 0; i < this.numTotalBuffers; ++i) {
                    this.buffers.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(this.bufferSize));
                }
            }
            catch (OutOfMemoryError outOfMemoryError) {
                int allocated = this.buffers.size();
                this.buffers.forEach(MemorySegment::free);
                this.buffers.clear();
                throw new OutOfMemoryError(String.format("Can't allocate enough direct buffer for batch shuffle read buffer pool (bytes allocated: %d, bytes still needed: %d). To avoid the exception, you need to do one of the following adjustments: 1) If you have ever decreased %s, you need to undo the decrement; 2) If you ever increased %s, you should also increase %s; 3) If neither the above cases, it usually means some other parts of your application have consumed too many direct memory and the value of %s should be increased.", allocated * this.bufferSize, (this.numTotalBuffers - allocated) * this.bufferSize, TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key(), TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(), TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key(), TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key()));
            }
        }
        LOG.info("Batch shuffle IO buffer pool initialized: numBuffers={}, bufferSize={}.", (Object)this.numTotalBuffers, (Object)this.bufferSize);
    }

    public void registerRequester(Object requester) {
        this.bufferRequesters.add(requester);
    }

    public void unregisterRequester(Object requester) {
        this.bufferRequesters.remove(requester);
    }

    public int getAverageBuffersPerRequester() {
        return Math.max(1, this.numTotalBuffers / Math.max(1, this.bufferRequesters.size()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<MemorySegment> requestBuffers() throws Exception {
        ArrayList<MemorySegment> allocated = new ArrayList<MemorySegment>(this.numBuffersPerRequest);
        Queue<MemorySegment> queue = this.buffers;
        synchronized (queue) {
            Preconditions.checkState(!this.destroyed, "Buffer pool is already destroyed.");
            if (!this.initialized) {
                this.initialize();
            }
            Deadline deadline = Deadline.fromNow(WAITING_TIME);
            while (this.buffers.size() < this.numBuffersPerRequest) {
                Preconditions.checkState(!this.destroyed, "Buffer pool is already destroyed.");
                this.buffers.wait(WAITING_TIME.toMillis());
                if (deadline.hasTimeLeft()) continue;
                return allocated;
            }
            while (allocated.size() < this.numBuffersPerRequest) {
                allocated.add(this.buffers.poll());
            }
            this.lastBufferOperationTimestamp = System.nanoTime();
        }
        return allocated;
    }

    public void recycle(MemorySegment segment) {
        Preconditions.checkArgument(segment != null, "Buffer must be not null.");
        this.recycle(Collections.singletonList(segment));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recycle(Collection<MemorySegment> segments) {
        Preconditions.checkArgument(segments != null, "Buffer list must be not null.");
        if (segments.isEmpty()) {
            return;
        }
        Queue<MemorySegment> queue = this.buffers;
        synchronized (queue) {
            Preconditions.checkState(this.initialized, "Recycling a buffer before initialization.");
            if (this.destroyed) {
                segments.forEach(MemorySegment::free);
                return;
            }
            this.buffers.addAll(segments);
            this.lastBufferOperationTimestamp = System.nanoTime();
            if (this.buffers.size() >= this.numBuffersPerRequest) {
                this.buffers.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long getLastBufferOperationTimestamp() {
        Queue<MemorySegment> queue = this.buffers;
        synchronized (queue) {
            return this.lastBufferOperationTimestamp;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() {
        Queue<MemorySegment> queue = this.buffers;
        synchronized (queue) {
            this.destroyed = true;
            this.buffers.clear();
            this.buffers.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isDestroyed() {
        Queue<MemorySegment> queue = this.buffers;
        synchronized (queue) {
            return this.destroyed;
        }
    }
}

