package net.mguenther.kafka.junit.provider;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import net.mguenther.kafka.junit.KeyValue;
import net.mguenther.kafka.junit.RecordProducer;
import net.mguenther.kafka.junit.SendKeyValues;
import net.mguenther.kafka.junit.SendKeyValuesTransactional;
import net.mguenther.kafka.junit.SendValues;
import net.mguenther.kafka.junit.SendValuesTransactional;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;

/* loaded from: input_file:net/mguenther/kafka/junit/provider/DefaultRecordProducer.class */
public class DefaultRecordProducer implements RecordProducer {
    private final String bootstrapServers;

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <V> List<RecordMetadata> send(SendValues<V> sendValues) throws InterruptedException {
        return send(SendKeyValues.to(sendValues.getTopic(), (Collection) sendValues.getValues().stream().map(obj -> {
            return new KeyValue((String) null, obj);
        }).collect(Collectors.toList())).withAll(sendValues.getProducerProps()).build());
    }

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <V> List<RecordMetadata> send(SendValuesTransactional<V> sendValuesTransactional) throws InterruptedException {
        HashMap hashMap = new HashMap();
        for (String str : sendValuesTransactional.getValuesPerTopic().keySet()) {
            hashMap.put(str, (Collection) sendValuesTransactional.getValuesPerTopic().get(str).stream().map(obj -> {
                return new KeyValue((String) null, obj);
            }).collect(Collectors.toList()));
        }
        return send(SendKeyValuesTransactional.inTransaction(hashMap).withAll(sendValuesTransactional.getProducerProps()).withFailTransaction(sendValuesTransactional.shouldFailTransaction()).build());
    }

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <K, V> List<RecordMetadata> send(SendKeyValues<K, V> sendKeyValues) throws InterruptedException {
        KafkaProducer kafkaProducer = new KafkaProducer(effectiveProducerProps(sendKeyValues.getProducerProps()));
        ArrayList arrayList = new ArrayList(sendKeyValues.getRecords().size());
        try {
            for (KeyValue<K, V> keyValue : sendKeyValues.getRecords()) {
                try {
                    arrayList.add((RecordMetadata) kafkaProducer.send(new ProducerRecord(sendKeyValues.getTopic(), (Integer) null, keyValue.getKey(), keyValue.getValue(), keyValue.getHeaders())).get());
                } catch (ExecutionException e) {
                    if (RuntimeException.class.isAssignableFrom(e.getCause().getClass())) {
                        throw ((RuntimeException) e.getCause());
                    }
                    throw new RuntimeException(e.getCause());
                }
            }
            return Collections.unmodifiableList(arrayList);
        } finally {
            kafkaProducer.flush();
            kafkaProducer.close();
        }
    }

    @Override // net.mguenther.kafka.junit.RecordProducer
    public <K, V> List<RecordMetadata> send(SendKeyValuesTransactional<K, V> sendKeyValuesTransactional) throws InterruptedException {
        KafkaProducer kafkaProducer = new KafkaProducer(effectiveProducerProps(sendKeyValuesTransactional.getProducerProps()));
        ArrayList arrayList = new ArrayList();
        try {
            try {
                try {
                    kafkaProducer.initTransactions();
                    kafkaProducer.beginTransaction();
                    for (String str : sendKeyValuesTransactional.getRecordsPerTopic().keySet()) {
                        for (KeyValue<K, V> keyValue : sendKeyValuesTransactional.getRecordsPerTopic().get(str)) {
                            try {
                                arrayList.add((RecordMetadata) kafkaProducer.send(new ProducerRecord(str, (Integer) null, keyValue.getKey(), keyValue.getValue(), keyValue.getHeaders())).get());
                            } catch (ExecutionException e) {
                                if (RuntimeException.class.isAssignableFrom(e.getCause().getClass())) {
                                    throw ((RuntimeException) e.getCause());
                                }
                                throw new RuntimeException(e.getCause());
                            }
                        }
                    }
                    if (sendKeyValuesTransactional.shouldFailTransaction()) {
                        kafkaProducer.abortTransaction();
                    } else {
                        kafkaProducer.commitTransaction();
                    }
                    return Collections.unmodifiableList(arrayList);
                } catch (ProducerFencedException e2) {
                    kafkaProducer.abortTransaction();
                    throw new RuntimeException(String.format("There happens to be another producer that shares the transactional ID '%s'with this producer, but that has a newer epoch assigned to it. This producer has been fenced off, it can no longer write to the log transactionally. Hence, the ongoing transaction is aborted and the producer closed.", sendKeyValuesTransactional.getProducerProps().get("transactional.id")), e2);
                }
            } catch (OutOfOrderSequenceException e3) {
                kafkaProducer.abortTransaction();
                throw new RuntimeException("This producer has received out-of-band sequence numbers. This is a fatal condition and thus, the producer is no longer able to log transactionally and reliably. Hence, the ongoing transaction is aborted and the producer closed.", e3);
            }
        } finally {
            kafkaProducer.flush();
            kafkaProducer.close();
        }
    }

    private Properties effectiveProducerProps(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.put("bootstrap.servers", this.bootstrapServers);
        return properties2;
    }

    public DefaultRecordProducer(String str) {
        this.bootstrapServers = str;
    }
}
