/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.sink;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.sink.BucketStreamPartitioner;
import org.apache.flink.table.store.connector.sink.StoreSink;
import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;

public class FlinkSinkBuilder {
    private final ObjectIdentifier tableIdentifier;
    private final FileStoreTable table;
    private final Configuration conf;
    private DataStream<RowData> input;
    @Nullable
    private CatalogLock.Factory lockFactory;
    @Nullable
    private Map<String, String> overwritePartition;
    @Nullable
    private LogSinkFunction logSinkFunction;
    @Nullable
    private Integer parallelism;

    public FlinkSinkBuilder(ObjectIdentifier tableIdentifier, FileStoreTable table) {
        this.tableIdentifier = tableIdentifier;
        this.table = table;
        this.conf = Configuration.fromMap(table.schema().options());
    }

    public FlinkSinkBuilder withInput(DataStream<RowData> input) {
        this.input = input;
        return this;
    }

    public FlinkSinkBuilder withLockFactory(CatalogLock.Factory lockFactory) {
        this.lockFactory = lockFactory;
        return this;
    }

    public FlinkSinkBuilder withOverwritePartition(Map<String, String> overwritePartition) {
        this.overwritePartition = overwritePartition;
        return this;
    }

    public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction logSinkFunction) {
        this.logSinkFunction = logSinkFunction;
        return this;
    }

    public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) {
        this.parallelism = parallelism;
        return this;
    }

    @Nullable
    private Map<String, String> getCompactPartSpec() {
        String json = (String)this.conf.get(FlinkConnectorOptions.COMPACTION_PARTITION_SPEC);
        if (json == null) {
            return null;
        }
        return JsonSerdeUtil.fromJson(json, Map.class);
    }

    public DataStreamSink<?> build() {
        int numBucket = (Integer)this.conf.get(CoreOptions.BUCKET);
        BucketStreamPartitioner partitioner = new BucketStreamPartitioner(numBucket, this.table.schema());
        PartitionTransformation partitioned = new PartitionTransformation(this.input.getTransformation(), (StreamPartitioner)partitioner);
        if (this.parallelism != null) {
            partitioned.setParallelism(this.parallelism.intValue());
        }
        StreamExecutionEnvironment env = this.input.getExecutionEnvironment();
        StoreSink sink = new StoreSink(this.tableIdentifier, this.table, (Boolean)this.conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED), this.getCompactPartSpec(), this.lockFactory, this.overwritePartition, this.logSinkFunction);
        return sink.sinkTo((DataStream<RowData>)new DataStream(env, (Transformation)partitioned));
    }
}

