/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.kafka;

import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.kafka.KafkaLogSerializationSchema;
import org.apache.flink.table.store.kafka.KafkaSinkFunction;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.shaded.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.store.table.sink.LogSinkFunction;

public class KafkaLogSinkProvider
implements LogSinkProvider {
    private static final long serialVersionUID = 1L;
    private final String topic;
    private final Properties properties;
    @Nullable
    private final SerializationSchema<RowData> primaryKeySerializer;
    private final SerializationSchema<RowData> valueSerializer;
    private final CoreOptions.LogConsistency consistency;
    private final CoreOptions.LogChangelogMode changelogMode;

    public KafkaLogSinkProvider(String topic, Properties properties, @Nullable SerializationSchema<RowData> primaryKeySerializer, SerializationSchema<RowData> valueSerializer, CoreOptions.LogConsistency consistency, CoreOptions.LogChangelogMode changelogMode) {
        this.topic = topic;
        this.properties = properties;
        this.primaryKeySerializer = primaryKeySerializer;
        this.valueSerializer = valueSerializer;
        this.consistency = consistency;
        this.changelogMode = changelogMode;
    }

    @Override
    public LogSinkFunction createSink() {
        FlinkKafkaProducer.Semantic semantic;
        switch (this.consistency) {
            case TRANSACTIONAL: {
                this.properties.put("transactional.id", "log-store-" + this.topic);
                semantic = FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
                break;
            }
            case EVENTUAL: {
                if (this.primaryKeySerializer == null) {
                    throw new IllegalArgumentException("Can not use EVENTUAL consistency mode for non-pk table.");
                }
                semantic = FlinkKafkaProducer.Semantic.AT_LEAST_ONCE;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported: " + (Object)((Object)this.consistency));
            }
        }
        return new KafkaSinkFunction(this.topic, this.createSerializationSchema(), this.properties, semantic);
    }

    @VisibleForTesting
    KafkaLogSerializationSchema createSerializationSchema() {
        return new KafkaLogSerializationSchema(this.topic, this.primaryKeySerializer, this.valueSerializer, this.changelogMode);
    }
}

