/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.sink;

import java.io.Serializable;
import java.util.UUID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.sink.Committable;
import org.apache.flink.table.store.connector.sink.CommittableStateManager;
import org.apache.flink.table.store.connector.sink.CommittableTypeInfo;
import org.apache.flink.table.store.connector.sink.Committer;
import org.apache.flink.table.store.connector.sink.CommitterOperator;
import org.apache.flink.table.store.connector.sink.FullChangelogStoreSinkWrite;
import org.apache.flink.table.store.connector.sink.StoreSinkWrite;
import org.apache.flink.table.store.connector.sink.StoreSinkWriteImpl;
import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableFunction;

public abstract class FlinkSink
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final String WRITER_NAME = "Writer";
    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
    protected final FileStoreTable table;
    private final boolean isOverwrite;

    public FlinkSink(FileStoreTable table, boolean isOverwrite) {
        this.table = table;
        this.isOverwrite = isOverwrite;
    }

    protected StoreSinkWrite.Provider createWriteProvider(String initialCommitUser) {
        if (this.table.options().changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION && !this.table.options().writeOnly()) {
            long fullCompactionThresholdMs = this.table.options().changelogProducerFullCompactionTriggerInterval().toMillis();
            return (table, context, ioManager) -> new FullChangelogStoreSinkWrite(table, context, initialCommitUser, ioManager, this.isOverwrite, fullCompactionThresholdMs);
        }
        return (table, context, ioManager) -> new StoreSinkWriteImpl(table, context, initialCommitUser, ioManager, this.isOverwrite);
    }

    public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
        boolean streamingCheckpointEnabled;
        String initialCommitUser = UUID.randomUUID().toString();
        StreamExecutionEnvironment env = input.getExecutionEnvironment();
        ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        boolean bl = streamingCheckpointEnabled = isStreaming && checkpointConfig.isCheckpointingEnabled();
        if (streamingCheckpointEnabled) {
            this.assertCheckpointConfiguration(env);
        }
        CommittableTypeInfo typeInfo = new CommittableTypeInfo();
        SingleOutputStreamOperator written = input.transform(WRITER_NAME, (TypeInformation)typeInfo, this.createWriteOperator(this.createWriteProvider(initialCommitUser), isStreaming)).setParallelism(input.getParallelism());
        SingleOutputStreamOperator committed = written.transform(GLOBAL_COMMITTER_NAME, (TypeInformation)typeInfo, (OneInputStreamOperator)new CommitterOperator(streamingCheckpointEnabled, initialCommitUser, this.createCommitterFactory(streamingCheckpointEnabled), this.createCommittableStateManager())).setParallelism(1).setMaxParallelism(1);
        return committed.addSink((SinkFunction)new DiscardingSink()).name("end").setParallelism(1);
    }

    private void assertCheckpointConfiguration(StreamExecutionEnvironment env) {
        Preconditions.checkArgument((!env.getCheckpointConfig().isUnalignedCheckpointsEnabled() ? 1 : 0) != 0, (Object)("Table Store sink currently does not support unaligned checkpoints. Please set " + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key() + " to false."));
        Preconditions.checkArgument((env.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE ? 1 : 0) != 0, (Object)("Table Store sink currently only supports EXACTLY_ONCE checkpoint mode. Please set " + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key() + " to exactly-once"));
    }

    protected abstract OneInputStreamOperator<RowData, Committable> createWriteOperator(StoreSinkWrite.Provider var1, boolean var2);

    protected abstract SerializableFunction<String, Committer> createCommitterFactory(boolean var1);

    protected abstract CommittableStateManager createCommittableStateManager();
}

