/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingTaskManagerSlot;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerUtils;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerSlotId;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TaskExecutorManager
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorManager.class);
    private final ResourceProfile defaultSlotResourceProfile;
    private final WorkerResourceSpec defaultWorkerResourceSpec;
    private final int numSlotsPerWorker;
    private final int maxSlotNum;
    private final boolean waitResultConsumedBeforeRelease;
    private final int redundantTaskManagerNum;
    private final Time taskManagerTimeout;
    private final ResourceActions resourceActions;
    private final Map<InstanceID, TaskManagerRegistration> taskManagerRegistrations = new HashMap<InstanceID, TaskManagerRegistration>();
    private final Map<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots = new HashMap<TaskManagerSlotId, PendingTaskManagerSlot>();
    private final Executor mainThreadExecutor;
    private final ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;

    TaskExecutorManager(WorkerResourceSpec defaultWorkerResourceSpec, int numSlotsPerWorker, int maxNumSlots, boolean waitResultConsumedBeforeRelease, int redundantTaskManagerNum, Time taskManagerTimeout, ScheduledExecutor scheduledExecutor, Executor mainThreadExecutor, ResourceActions resourceActions) {
        this.defaultWorkerResourceSpec = defaultWorkerResourceSpec;
        this.numSlotsPerWorker = numSlotsPerWorker;
        this.maxSlotNum = maxNumSlots;
        this.waitResultConsumedBeforeRelease = waitResultConsumedBeforeRelease;
        this.redundantTaskManagerNum = redundantTaskManagerNum;
        this.taskManagerTimeout = taskManagerTimeout;
        this.defaultSlotResourceProfile = SlotManagerUtils.generateDefaultSlotResourceProfile(defaultWorkerResourceSpec, numSlotsPerWorker);
        this.resourceActions = Preconditions.checkNotNull(resourceActions);
        this.mainThreadExecutor = mainThreadExecutor;
        this.taskManagerTimeoutsAndRedundancyCheck = scheduledExecutor.scheduleWithFixedDelay(() -> mainThreadExecutor.execute(this::checkTaskManagerTimeoutsAndRedundancy), 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() {
        this.taskManagerTimeoutsAndRedundancyCheck.cancel(false);
    }

    public boolean isTaskManagerRegistered(InstanceID instanceId) {
        return this.taskManagerRegistrations.containsKey(instanceId);
    }

    public boolean registerTaskManager(TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport, ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile) {
        if (this.isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
            LOG.info("The total number of slots exceeds the max limitation {}, releasing the excess task executor.", (Object)this.maxSlotNum);
            this.resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new FlinkException("The total number of slots exceeds the max limitation."));
            return false;
        }
        TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection, StreamSupport.stream(initialSlotReport.spliterator(), false).map(SlotStatus::getSlotID).collect(Collectors.toList()), totalResourceProfile, defaultSlotResourceProfile);
        this.taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
        for (SlotStatus slotStatus : initialSlotReport) {
            if (slotStatus.getJobID() != null) continue;
            this.findAndRemoveExactlyMatchingPendingTaskManagerSlot(slotStatus.getResourceProfile());
        }
        return true;
    }

    private boolean isMaxSlotNumExceededAfterRegistration(SlotReport initialSlotReport) {
        if (!this.isMaxSlotNumExceededAfterAdding(initialSlotReport.getNumSlotStatus())) {
            return false;
        }
        return this.isMaxSlotNumExceededAfterAdding(this.getNumNonPendingReportedNewSlots(initialSlotReport));
    }

    private int getNumNonPendingReportedNewSlots(SlotReport slotReport) {
        HashSet<TaskManagerSlotId> matchingPendingSlots = new HashSet<TaskManagerSlotId>();
        block0: for (SlotStatus slotStatus : slotReport) {
            if (slotStatus.getAllocationID() != null) continue;
            for (PendingTaskManagerSlot pendingTaskManagerSlot : this.pendingSlots.values()) {
                if (matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId()) || !this.isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, slotStatus.getResourceProfile())) continue;
                matchingPendingSlots.add(pendingTaskManagerSlot.getTaskManagerSlotId());
                continue block0;
            }
        }
        return slotReport.getNumSlotStatus() - matchingPendingSlots.size();
    }

    private void findAndRemoveExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
        for (PendingTaskManagerSlot pendingTaskManagerSlot : this.pendingSlots.values()) {
            if (!this.isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, resourceProfile)) continue;
            this.pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
            return;
        }
    }

    private boolean isPendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot pendingTaskManagerSlot, ResourceProfile resourceProfile) {
        return pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
    }

    public void unregisterTaskExecutor(InstanceID instanceId) {
        this.taskManagerRegistrations.remove(instanceId);
    }

    public Collection<InstanceID> getTaskExecutors() {
        return new ArrayList<InstanceID>(this.taskManagerRegistrations.keySet());
    }

    public Optional<ResourceRequirement> allocateWorker(ResourceProfile requestedSlotResourceProfile) {
        int numRegisteredSlots = this.getNumberRegisteredSlots();
        int numPendingSlots = this.getNumberPendingTaskManagerSlots();
        if (this.isMaxSlotNumExceededAfterAdding(this.numSlotsPerWorker)) {
            LOG.warn("Could not allocate {} more slots. The number of registered and pending slots is {}, while the maximum is {}.", new Object[]{this.numSlotsPerWorker, numPendingSlots + numRegisteredSlots, this.maxSlotNum});
            return Optional.empty();
        }
        if (!this.defaultSlotResourceProfile.isMatching(requestedSlotResourceProfile)) {
            return Optional.empty();
        }
        if (!this.resourceActions.allocateResource(this.defaultWorkerResourceSpec)) {
            return Optional.empty();
        }
        for (int i = 0; i < this.numSlotsPerWorker; ++i) {
            PendingTaskManagerSlot pendingTaskManagerSlot = new PendingTaskManagerSlot(this.defaultSlotResourceProfile);
            this.pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot);
        }
        return Optional.of(ResourceRequirement.create(this.defaultSlotResourceProfile, this.numSlotsPerWorker));
    }

    private boolean isMaxSlotNumExceededAfterAdding(int numNewSlot) {
        return this.getNumberRegisteredSlots() + this.getNumberPendingTaskManagerSlots() + numNewSlot > this.maxSlotNum;
    }

    public Map<WorkerResourceSpec, Integer> getRequiredWorkers() {
        int pendingWorkerNum = MathUtils.divideRoundUp(this.getNumberPendingTaskManagerSlots(), this.numSlotsPerWorker);
        return pendingWorkerNum > 0 ? Collections.singletonMap(this.defaultWorkerResourceSpec, pendingWorkerNum) : Collections.emptyMap();
    }

    @VisibleForTesting
    int getNumberPendingTaskManagerSlots() {
        return this.pendingSlots.size();
    }

    private void checkTaskManagerTimeoutsAndRedundancy() {
        if (!this.taskManagerRegistrations.isEmpty()) {
            long currentTime = System.currentTimeMillis();
            ArrayList<TaskManagerRegistration> timedOutTaskManagers = new ArrayList<TaskManagerRegistration>(this.taskManagerRegistrations.size());
            for (TaskManagerRegistration taskManagerRegistration : this.taskManagerRegistrations.values()) {
                if (currentTime - taskManagerRegistration.getIdleSince() < this.taskManagerTimeout.toMilliseconds()) continue;
                timedOutTaskManagers.add(taskManagerRegistration);
            }
            int slotsDiff = this.redundantTaskManagerNum * this.numSlotsPerWorker - this.getNumberFreeSlots();
            if (slotsDiff > 0) {
                int requiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, this.numSlotsPerWorker);
                this.allocateRedundantTaskManagers(requiredTaskManagers);
            } else {
                int maxReleaseNum = -slotsDiff / this.numSlotsPerWorker;
                this.releaseIdleTaskExecutors(timedOutTaskManagers, Math.min(maxReleaseNum, timedOutTaskManagers.size()));
            }
        }
    }

    private void allocateRedundantTaskManagers(int number) {
        LOG.debug("Allocating {} task executors for redundancy.", (Object)number);
        int allocatedNumber = this.allocateWorkers(number);
        if (number != allocatedNumber) {
            LOG.warn("Expect to allocate {} taskManagers. Actually allocate {} taskManagers.", (Object)number, (Object)allocatedNumber);
        }
    }

    private int allocateWorkers(int workerNum) {
        int allocatedWorkerNum = 0;
        for (int i = 0; i < workerNum && this.allocateWorker(this.defaultSlotResourceProfile).isPresent(); ++i) {
            ++allocatedWorkerNum;
        }
        return allocatedWorkerNum;
    }

    private void releaseIdleTaskExecutors(ArrayList<TaskManagerRegistration> timedOutTaskManagers, int releaseNum) {
        for (int index = 0; index < releaseNum; ++index) {
            if (this.waitResultConsumedBeforeRelease) {
                this.releaseIdleTaskExecutorIfPossible(timedOutTaskManagers.get(index));
                continue;
            }
            this.releaseIdleTaskExecutor(timedOutTaskManagers.get(index).getInstanceId());
        }
    }

    private void releaseIdleTaskExecutorIfPossible(TaskManagerRegistration taskManagerRegistration) {
        long idleSince = taskManagerRegistration.getIdleSince();
        taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased().thenAcceptAsync(canBeReleased -> {
            boolean stillIdle;
            InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId();
            boolean bl = stillIdle = idleSince == taskManagerRegistration.getIdleSince();
            if (stillIdle && canBeReleased.booleanValue()) {
                this.releaseIdleTaskExecutor(timedOutTaskManagerId);
            }
        }, this.mainThreadExecutor);
    }

    private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
        FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout.");
        LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", (Object)timedOutTaskManagerId);
        this.resourceActions.releaseResource(timedOutTaskManagerId, cause);
    }

    public ResourceProfile getTotalRegisteredResources() {
        return this.taskManagerRegistrations.values().stream().map(TaskManagerRegistration::getTotalResource).reduce(ResourceProfile.ZERO, ResourceProfile::merge);
    }

    public ResourceProfile getTotalRegisteredResourcesOf(InstanceID instanceID) {
        return Optional.ofNullable(this.taskManagerRegistrations.get(instanceID)).map(TaskManagerRegistration::getTotalResource).orElse(ResourceProfile.ZERO);
    }

    public ResourceProfile getTotalFreeResources() {
        return this.taskManagerRegistrations.values().stream().map(taskManagerRegistration -> taskManagerRegistration.getDefaultSlotResourceProfile().multiply(taskManagerRegistration.getNumberFreeSlots())).reduce(ResourceProfile.ZERO, ResourceProfile::merge);
    }

    public ResourceProfile getTotalFreeResourcesOf(InstanceID instanceID) {
        return Optional.ofNullable(this.taskManagerRegistrations.get(instanceID)).map(taskManagerRegistration -> taskManagerRegistration.getDefaultSlotResourceProfile().multiply(taskManagerRegistration.getNumberFreeSlots())).orElse(ResourceProfile.ZERO);
    }

    public Iterable<SlotID> getSlotsOf(InstanceID instanceId) {
        return this.taskManagerRegistrations.get(instanceId).getSlots();
    }

    public int getNumberRegisteredSlots() {
        return this.taskManagerRegistrations.values().stream().map(TaskManagerRegistration::getNumberRegisteredSlots).reduce(0, Integer::sum);
    }

    public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get(instanceId);
        if (taskManagerRegistration != null) {
            return taskManagerRegistration.getNumberRegisteredSlots();
        }
        return 0;
    }

    public int getNumberFreeSlots() {
        return this.taskManagerRegistrations.values().stream().map(TaskManagerRegistration::getNumberFreeSlots).reduce(0, Integer::sum);
    }

    public int getNumberFreeSlotsOf(InstanceID instanceId) {
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get(instanceId);
        if (taskManagerRegistration != null) {
            return taskManagerRegistration.getNumberFreeSlots();
        }
        return 0;
    }

    public Collection<PendingTaskManagerSlot> getPendingTaskManagerSlots() {
        return this.pendingSlots.values();
    }

    public void occupySlot(InstanceID instanceId) {
        this.taskManagerRegistrations.get(instanceId).occupySlot();
    }

    public void freeSlot(InstanceID instanceId) {
        this.taskManagerRegistrations.get(instanceId).freeSlot();
    }

    public void markUsed(InstanceID instanceID) {
        this.taskManagerRegistrations.get(instanceID).markUsed();
    }
}

