/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.over.frame;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.context.ExecutionContext;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.operators.over.frame.OverWindowFrame;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.ResettableExternalBuffer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

public class UnboundedOverWindowFrame
implements OverWindowFrame {
    private GeneratedAggsHandleFunction aggsHandleFunction;
    private final RowType valueType;
    private AggsHandleFunction processor;
    private RowData accValue;
    private RowDataSerializer valueSer;

    public UnboundedOverWindowFrame(GeneratedAggsHandleFunction aggsHandleFunction, RowType valueType) {
        this.aggsHandleFunction = aggsHandleFunction;
        this.valueType = valueType;
    }

    @Override
    public void open(ExecutionContext ctx) throws Exception {
        ClassLoader cl = ctx.getRuntimeContext().getUserCodeClassLoader();
        this.processor = (AggsHandleFunction)this.aggsHandleFunction.newInstance(cl);
        this.processor.open(new PerKeyStateDataViewStore(ctx.getRuntimeContext()));
        this.aggsHandleFunction = null;
        this.valueSer = new RowDataSerializer(this.valueType.getChildren().toArray(new LogicalType[0]));
    }

    @Override
    public void prepare(ResettableExternalBuffer rows) throws Exception {
        this.processor.setWindowSize(rows.size());
        this.processor.setAccumulators(this.processor.createAccumulators());
        ResettableExternalBuffer.BufferIterator iterator = rows.newIterator();
        while (iterator.advanceNext()) {
            this.processor.accumulate(iterator.getRow());
        }
        this.accValue = this.valueSer.copy(this.processor.getValue());
        iterator.close();
    }

    @Override
    public RowData process(int index, RowData current) throws Exception {
        return this.accValue;
    }
}

