/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table.stream;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.connector.file.table.FileSystemFactory;
import org.apache.flink.connector.file.table.TableMetaStoreFactory;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.PartitionCommitter;
import org.apache.flink.connector.file.table.stream.StreamingFileWriter;
import org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter;
import org.apache.flink.connector.file.table.stream.compact.CompactCoordinator;
import org.apache.flink.connector.file.table.stream.compact.CompactFileWriter;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
import org.apache.flink.connector.file.table.stream.compact.CompactOperator;
import org.apache.flink.connector.file.table.stream.compact.CompactReader;
import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.util.function.SupplierWithException;

@Internal
public class StreamingSink {
    private StreamingSink() {
    }

    public static <T> DataStream<PartitionCommitInfo> writer(ProviderContext providerContext, DataStream<T> inputStream, long bucketCheckInterval, StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, int parallelism, List<String> partitionKeys, Configuration conf) {
        StreamingFileWriter<T> fileWriter = new StreamingFileWriter<T>(bucketCheckInterval, bucketsBuilder, partitionKeys, conf);
        SingleOutputStreamOperator<PartitionCommitInfo> writerStream = inputStream.transform(StreamingFileWriter.class.getSimpleName(), TypeInformation.of(PartitionCommitInfo.class), fileWriter).setParallelism(parallelism);
        providerContext.generateUid("streaming-writer").ifPresent(writerStream::uid);
        return writerStream;
    }

    public static <T> DataStream<PartitionCommitInfo> compactionWriter(ProviderContext providerContext, DataStream<T> inputStream, long bucketCheckInterval, StreamingFileSink.BucketsBuilder<T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder, FileSystemFactory fsFactory, Path path, CompactReader.Factory<T> readFactory, long targetFileSize, int parallelism) {
        CompactFileWriter<T> writer = new CompactFileWriter<T>(bucketCheckInterval, bucketsBuilder);
        SupplierWithException fsSupplier = (SupplierWithException<FileSystem, E> & Serializable)() -> fsFactory.create(path.toUri());
        CompactCoordinator coordinator = new CompactCoordinator(fsSupplier, targetFileSize);
        SingleOutputStreamOperator<CompactMessages.CoordinatorInput> writerStream = inputStream.transform("streaming-writer", TypeInformation.of(CompactMessages.CoordinatorInput.class), writer).setParallelism(parallelism);
        providerContext.generateUid("streaming-writer").ifPresent(writerStream::uid);
        SingleOutputStreamOperator<CompactMessages.CoordinatorOutput> coordinatorStream = writerStream.transform("compact-coordinator", TypeInformation.of(CompactMessages.CoordinatorOutput.class), coordinator).setParallelism(1).setMaxParallelism(1);
        providerContext.generateUid("compact-coordinator").ifPresent(coordinatorStream::uid);
        CompactWriter.Factory writerFactory = CompactBucketWriter.factory(bucketsBuilder::createBucketWriter);
        CompactOperator<T> compacter = new CompactOperator<T>(fsSupplier, readFactory, writerFactory);
        SingleOutputStreamOperator<PartitionCommitInfo> operatorStream = coordinatorStream.broadcast().transform("compact-operator", TypeInformation.of(PartitionCommitInfo.class), compacter).setParallelism(parallelism);
        providerContext.generateUid("compact-operator").ifPresent(operatorStream::uid);
        return operatorStream;
    }

    public static DataStreamSink<?> sink(ProviderContext providerContext, DataStream<PartitionCommitInfo> writer, Path locationPath, ObjectIdentifier identifier, List<String> partitionKeys, TableMetaStoreFactory msFactory, FileSystemFactory fsFactory, Configuration options) {
        DataStream<PartitionCommitInfo> stream = writer;
        if (partitionKeys.size() > 0 && options.contains(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND)) {
            PartitionCommitter committer = new PartitionCommitter(locationPath, identifier, partitionKeys, msFactory, fsFactory, options);
            SingleOutputStreamOperator<Void> committerStream = writer.transform(PartitionCommitter.class.getSimpleName(), Types.VOID, committer).setParallelism(1).setMaxParallelism(1);
            providerContext.generateUid("partition-committer").ifPresent(committerStream::uid);
            stream = committerStream;
        }
        DataStreamSink<PartitionCommitInfo> discardingSink = stream.addSink(new DiscardingSink()).name("end").setParallelism(1);
        providerContext.generateUid("discarding-sink").ifPresent(discardingSink::uid);
        return discardingSink;
    }
}

