/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.SubtaskCommittableManager;
import org.apache.flink.util.Preconditions;

class CheckpointCommittableManagerImpl<CommT>
implements CheckpointCommittableManager<CommT> {
    private final Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers;
    @Nullable
    private final Long checkpointId;
    private final int subtaskId;
    private final int numberOfSubtasks;

    CheckpointCommittableManagerImpl(int subtaskId, int numberOfSubtasks, @Nullable Long checkpointId) {
        this.subtaskId = subtaskId;
        this.numberOfSubtasks = numberOfSubtasks;
        this.checkpointId = checkpointId;
        this.subtasksCommittableManagers = new HashMap<Integer, SubtaskCommittableManager<CommT>>();
    }

    CheckpointCommittableManagerImpl(Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers, @Nullable Long checkpointId) {
        this.subtasksCommittableManagers = Preconditions.checkNotNull(subtasksCommittableManagers);
        this.subtaskId = 0;
        this.numberOfSubtasks = 1;
        this.checkpointId = checkpointId;
    }

    @Override
    public long getCheckpointId() {
        Preconditions.checkNotNull(this.checkpointId);
        return this.checkpointId;
    }

    Collection<SubtaskCommittableManager<CommT>> getSubtaskCommittableManagers() {
        return this.subtasksCommittableManagers.values();
    }

    void upsertSummary(CommittableSummary<CommT> summary) {
        SubtaskCommittableManager existing = this.subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), new SubtaskCommittableManager(summary.getNumberOfCommittables(), this.subtaskId, summary.getCheckpointId().isPresent() ? Long.valueOf(summary.getCheckpointId().getAsLong()) : null));
        if (existing != null) {
            throw new UnsupportedOperationException("Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920");
        }
    }

    void addCommittable(CommittableWithLineage<CommT> committable) {
        this.getSubtaskCommittableManager(committable.getSubtaskId()).add(committable);
    }

    SubtaskCommittableManager<CommT> getSubtaskCommittableManager(int subtaskId) {
        SubtaskCommittableManager<CommT> committables = this.subtasksCommittableManagers.get(subtaskId);
        return Preconditions.checkNotNull(committables, "Unknown subtask for %s", subtaskId);
    }

    @Override
    public CommittableSummary<CommT> getSummary() {
        return new CommittableSummary(this.subtaskId, this.numberOfSubtasks, this.checkpointId, this.subtasksCommittableManagers.values().stream().mapToInt(SubtaskCommittableManager::getNumCommittables).sum(), this.subtasksCommittableManagers.values().stream().mapToInt(SubtaskCommittableManager::getNumPending).sum(), this.subtasksCommittableManagers.values().stream().mapToInt(SubtaskCommittableManager::getNumFailed).sum());
    }

    boolean isFinished() {
        return this.subtasksCommittableManagers.values().stream().allMatch(SubtaskCommittableManager::isFinished);
    }

    @Override
    public Collection<CommittableWithLineage<CommT>> commit(boolean fullyReceived, Committer<CommT> committer) throws IOException, InterruptedException {
        Collection<CommitRequestImpl<CommT>> requests = this.getPendingRequests(fullyReceived);
        requests.forEach(CommitRequestImpl::setSelected);
        committer.commit(new ArrayList(requests));
        requests.forEach(CommitRequestImpl::setCommittedIfNoError);
        return this.drainFinished();
    }

    Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean fullyReceived) {
        return this.subtasksCommittableManagers.values().stream().filter(subtask -> !fullyReceived || subtask.hasReceivedAll()).flatMap(SubtaskCommittableManager::getPendingRequests).collect(Collectors.toList());
    }

    Collection<CommittableWithLineage<CommT>> drainFinished() {
        return this.subtasksCommittableManagers.values().stream().flatMap(subtask -> subtask.drainCommitted().stream()).collect(Collectors.toList());
    }

    CheckpointCommittableManagerImpl<CommT> merge(CheckpointCommittableManagerImpl<CommT> other) {
        Preconditions.checkArgument(Objects.equals(other.checkpointId, this.checkpointId));
        for (Map.Entry<Integer, SubtaskCommittableManager<CommT>> subtaskEntry : other.subtasksCommittableManagers.entrySet()) {
            this.subtasksCommittableManagers.merge(subtaskEntry.getKey(), subtaskEntry.getValue(), SubtaskCommittableManager::merge);
        }
        return this;
    }

    CheckpointCommittableManagerImpl<CommT> copy() {
        return new CheckpointCommittableManagerImpl<CommT>(this.subtasksCommittableManagers.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((SubtaskCommittableManager)e.getValue()).copy())), this.checkpointId);
    }
}

