/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.compactor.operator;

import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
import org.apache.flink.util.Preconditions;

public class CompactCoordinatorStateHandler
extends AbstractStreamOperator<Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>>
implements OneInputStreamOperator<CommittableMessage<FileSinkCommittable>, Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>>,
BoundedOneInput,
CheckpointListener {
    private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer;

    public CompactCoordinatorStateHandler(SimpleVersionedSerializer<FileSinkCommittable> committableSerializer) {
        this.committableSerializer = Preconditions.checkNotNull(committableSerializer);
    }

    @Override
    public void processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> element) throws Exception {
        this.output.collect(new StreamRecord(Either.Left(element.getValue())));
    }

    @Override
    public void endInput() throws Exception {
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        SimpleVersionedListState<FileSinkCommittable> remainingCommittableState = new SimpleVersionedListState<FileSinkCommittable>(context.getOperatorStateStore().getListState(CompactCoordinator.REMAINING_COMMITTABLE_RAW_STATES_DESC), this.committableSerializer);
        CompactCoordinator.CompactTrigger trigger = new CompactCoordinator.CompactTrigger(FileCompactStrategy.Builder.newBuilder().setSizeThreshold(0L).build());
        Iterable stateRemaining = (Iterable)remainingCommittableState.get();
        if (stateRemaining != null) {
            for (FileSinkCommittable committable : stateRemaining) {
                String bucketId = committable.getBucketId();
                CompactorRequest request = new CompactorRequest(bucketId);
                if (committable.hasPendingFile() && trigger.onElement(committable) != CompactCoordinator.CompactTriggerResult.PASS_THROUGH) {
                    request.addToCompact(committable);
                } else {
                    request.addToPassthrough(committable);
                }
                this.output.collect(new StreamRecord(Either.Right(request)));
            }
        }
        remainingCommittableState.clear();
    }
}

