/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.sink;

import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.connector.sink.Committable;
import org.apache.flink.table.store.connector.sink.LogOffsetCommittable;
import org.apache.flink.table.store.connector.sink.PrepareCommitOperator;
import org.apache.flink.table.store.connector.sink.StoreSinkWrite;
import org.apache.flink.table.store.log.LogWriteCallback;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
import org.apache.flink.table.store.table.sink.SinkRecord;

public class StoreWriteOperator
extends PrepareCommitOperator {
    private static final long serialVersionUID = 2L;
    protected final FileStoreTable table;
    @Nullable
    private final LogSinkFunction logSinkFunction;
    private final StoreSinkWrite.Provider storeSinkWriteProvider;
    private transient StoreSinkWrite write;
    private transient SimpleContext sinkContext;
    private long currentWatermark = Long.MIN_VALUE;
    @Nullable
    private transient LogWriteCallback logCallback;

    public StoreWriteOperator(FileStoreTable table, @Nullable LogSinkFunction logSinkFunction, StoreSinkWrite.Provider storeSinkWriteProvider) {
        this.table = table;
        this.logSinkFunction = logSinkFunction;
        this.storeSinkWriteProvider = storeSinkWriteProvider;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Committable>> output) {
        super.setup(containingTask, config, output);
        if (this.logSinkFunction != null) {
            FunctionUtils.setFunctionRuntimeContext((Function)this.logSinkFunction, (RuntimeContext)this.getRuntimeContext());
        }
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.write = this.storeSinkWriteProvider.provide(this.table, context, this.getContainingTask().getEnvironment().getIOManager());
        if (this.logSinkFunction != null) {
            StreamingFunctionUtils.restoreFunctionState((StateInitializationContext)context, (Function)this.logSinkFunction);
        }
    }

    public void open() throws Exception {
        super.open();
        this.sinkContext = new SimpleContext(this.getProcessingTimeService());
        if (this.logSinkFunction != null) {
            FunctionUtils.openFunction((Function)this.logSinkFunction, (Configuration)new Configuration());
            this.logCallback = new LogWriteCallback();
            this.logSinkFunction.setWriteCallback(this.logCallback);
        }
    }

    public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
        if (this.logSinkFunction != null) {
            this.logSinkFunction.writeWatermark(new Watermark(mark.getTimestamp()));
        }
    }

    @Override
    public void processElement(StreamRecord<RowData> element) throws Exception {
        SinkRecord record;
        this.sinkContext.timestamp = element.hasTimestamp() ? Long.valueOf(element.getTimestamp()) : null;
        try {
            record = this.write.write((RowData)element.getValue());
        }
        catch (Exception e) {
            throw new IOException(e);
        }
        if (this.logSinkFunction != null) {
            SinkRecord logRecord = this.write.toLogRecord(record);
            this.logSinkFunction.invoke(logRecord, this.sinkContext);
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.write.snapshotState(context);
        if (this.logSinkFunction != null) {
            StreamingFunctionUtils.snapshotFunctionState((StateSnapshotContext)context, (OperatorStateBackend)this.getOperatorStateBackend(), (Function)this.logSinkFunction);
        }
    }

    public void finish() throws Exception {
        super.finish();
        if (this.logSinkFunction != null) {
            this.logSinkFunction.finish();
        }
    }

    public void close() throws Exception {
        super.close();
        this.write.close();
        if (this.logSinkFunction != null) {
            FunctionUtils.closeFunction((Function)this.logSinkFunction);
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        if (this.logSinkFunction instanceof CheckpointListener) {
            ((CheckpointListener)this.logSinkFunction).notifyCheckpointComplete(checkpointId);
        }
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        super.notifyCheckpointAborted(checkpointId);
        if (this.logSinkFunction instanceof CheckpointListener) {
            ((CheckpointListener)this.logSinkFunction).notifyCheckpointAborted(checkpointId);
        }
    }

    @Override
    protected List<Committable> prepareCommit(boolean doCompaction, long checkpointId) throws IOException {
        List<Committable> committables = this.write.prepareCommit(doCompaction, checkpointId);
        if (this.logCallback != null) {
            try {
                this.logSinkFunction.flush();
            }
            catch (Exception e) {
                throw new IOException(e);
            }
            this.logCallback.offsets().forEach((k, v) -> committables.add(new Committable(checkpointId, Committable.Kind.LOG_OFFSET, new LogOffsetCommittable((int)k, (long)v))));
        }
        return committables;
    }

    private class SimpleContext
    implements SinkFunction.Context {
        @Nullable
        private Long timestamp;
        private final ProcessingTimeService processingTimeService;

        public SimpleContext(ProcessingTimeService processingTimeService) {
            this.processingTimeService = processingTimeService;
        }

        public long currentProcessingTime() {
            return this.processingTimeService.getCurrentProcessingTime();
        }

        public long currentWatermark() {
            return StoreWriteOperator.this.currentWatermark;
        }

        public Long timestamp() {
            return this.timestamp;
        }
    }
}

