/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.spark;

import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.spark.SparkFilterConverter;
import org.apache.flink.table.store.spark.SparkInputPartition;
import org.apache.flink.table.store.spark.SparkTypeUtils;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.Statistics;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
import org.apache.spark.sql.types.StructType;

public class SparkDataSourceReader
implements SupportsPushDownFilters,
SupportsPushDownRequiredColumns,
SupportsReportStatistics {
    private final FileStoreTable table;
    private List<Predicate> predicates = new ArrayList<Predicate>();
    private Filter[] pushedFilters;
    private int[] projectedFields;
    private List<Split> splits;

    public SparkDataSourceReader(FileStoreTable table) {
        this.table = table;
    }

    public Filter[] pushFilters(Filter[] filters) {
        SparkFilterConverter converter = new SparkFilterConverter(this.table.schema().logicalRowType());
        ArrayList<Predicate> predicates = new ArrayList<Predicate>();
        ArrayList<Filter> pushed = new ArrayList<Filter>();
        for (Filter filter : filters) {
            try {
                predicates.add(converter.convert(filter));
                pushed.add(filter);
            }
            catch (UnsupportedOperationException unsupportedOperationException) {
                // empty catch block
            }
        }
        this.predicates = predicates;
        this.pushedFilters = pushed.toArray(new Filter[0]);
        return filters;
    }

    public Filter[] pushedFilters() {
        return this.pushedFilters;
    }

    public void pruneColumns(StructType requiredSchema) {
        String[] pruneFields = requiredSchema.fieldNames();
        List<String> fieldNames = this.table.schema().fieldNames();
        int[] projected = new int[pruneFields.length];
        for (int i = 0; i < projected.length; ++i) {
            projected[i] = fieldNames.indexOf(pruneFields[i]);
        }
        this.projectedFields = projected;
    }

    public Statistics estimateStatistics() {
        long rowCount = 0L;
        for (Split split : this.splits()) {
            for (DataFileMeta file : split.files()) {
                rowCount += file.rowCount();
            }
        }
        final long numRows = rowCount;
        final long sizeInBytes = (long)this.readSchema().defaultSize() * numRows;
        return new Statistics(){

            public OptionalLong sizeInBytes() {
                return OptionalLong.of(sizeInBytes);
            }

            public OptionalLong numRows() {
                return OptionalLong.of(numRows);
            }
        };
    }

    public StructType readSchema() {
        RowType rowType = this.table.schema().logicalRowType();
        return SparkTypeUtils.fromFlinkRowType(this.projectedFields == null ? rowType : TypeUtils.project(rowType, this.projectedFields));
    }

    public List<InputPartition<InternalRow>> planInputPartitions() {
        return this.splits().stream().map(split -> new SparkInputPartition(this.table, this.projectedFields, this.predicates, (Split)split)).collect(Collectors.toList());
    }

    protected List<Split> splits() {
        if (this.splits == null) {
            this.splits = this.table.newScan().withFilter(this.predicates).plan().splits;
        }
        return this.splits;
    }
}

