package org.springframework.cloud.stream.binder.reactorkafka;

import java.lang.reflect.Type;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.Lifecycle;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

/* loaded from: input_file:org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinder.class */
public class ReactorKafkaBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner> implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
    private static final Log log = LogFactory.getLog(ReactorKafkaBinder.class);
    private final KafkaBinderConfigurationProperties configurationProperties;
    private KafkaExtendedBindingProperties extendedBindingProperties;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinder$ReactorMessageHandler.class */
    public static class ReactorMessageHandler extends AbstractMessageHandler implements Lifecycle {
        private final RecordMessageConverter converter;
        private final String topic;
        private final SenderOptions<Object, Object> senderOptions;
        private volatile KafkaSender<Object, Object> sender;
        private volatile boolean running;

        ReactorMessageHandler(SenderOptions<Object, Object> senderOptions, RecordMessageConverter recordMessageConverter, String str) {
            this.senderOptions = senderOptions;
            this.converter = recordMessageConverter;
            this.topic = str;
        }

        protected void handleMessageInternal(Message<?> message) {
            Object obj = message.getHeaders().get("sendResult");
            Sinks.One one = Sinks.one();
            if (obj instanceof AtomicReference) {
                ((AtomicReference) obj).set(one.asMono());
            }
            if (this.sender == null) {
                one.emitError(new IllegalStateException("Handler is not running"), (Sinks.EmitFailureHandler) null);
                return;
            }
            this.sender.send(Flux.just(SenderRecord.create(this.converter.fromMessage(message, this.topic), UUID.randomUUID()))).subscribe(senderResult -> {
                one.emitValue(senderResult.recordMetadata(), (Sinks.EmitFailureHandler) null);
            });
        }

        public synchronized void start() {
            if (this.running) {
                return;
            }
            this.sender = KafkaSender.create(this.senderOptions);
            this.running = true;
        }

        public synchronized void stop() {
            if (this.running) {
                KafkaSender<Object, Object> kafkaSender = this.sender;
                this.sender = null;
                kafkaSender.close();
                this.running = false;
            }
        }

        public boolean isRunning() {
            return this.running;
        }
    }

    public ReactorKafkaBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaTopicProvisioner kafkaTopicProvisioner) {
        super(new String[0], kafkaTopicProvisioner, (ListenerContainerCustomizer) null, (MessageSourceCustomizer) null);
        this.extendedBindingProperties = new KafkaExtendedBindingProperties();
        this.configurationProperties = kafkaBinderConfigurationProperties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler createProducerMessageHandler(ProducerDestination producerDestination, ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties, MessageChannel messageChannel) throws Exception {
        return new ReactorMessageHandler(SenderOptions.create(createProducerConfigs(extendedProducerProperties)), new MessagingMessageConverter(), producerDestination.getName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) throws Exception {
        boolean z = !StringUtils.hasText(str);
        Map<String, Object> createConsumerConfigs = createConsumerConfigs(z, z ? "anonymous." + UUID.randomUUID().toString() : str, extendedConsumerProperties);
        final RecordMessageConverter messageConverter = getMessageConverter(extendedConsumerProperties);
        final ReceiverOptions subscription = ReceiverOptions.create(createConsumerConfigs).addAssignListener(collection -> {
            System.out.println("Assigned: " + collection);
        }).subscription(Collections.singletonList(consumerDestination.getName()));
        return new MessageProducerSupport() { // from class: org.springframework.cloud.stream.binder.reactorkafka.ReactorKafkaBinder.1
            private final KafkaReceiver<Object, Object> receiver;
            private volatile Subscription subscription;

            {
                this.receiver = KafkaReceiver.create(subscription);
            }

            protected void doStart() {
                Flux doOnSubscribe = this.receiver.receive().doOnSubscribe(subscription2 -> {
                    this.subscription = subscription2;
                });
                RecordMessageConverter recordMessageConverter = messageConverter;
                subscribeToPublisher(doOnSubscribe.map(receiverRecord -> {
                    return recordMessageConverter.toMessage(receiverRecord, (Acknowledgment) null, (Consumer) null, (Type) null);
                }));
            }

            protected synchronized void doStop() {
                if (this.subscription != null) {
                    this.subscription.cancel();
                    this.subscription = null;
                }
            }
        };
    }

    private RecordMessageConverter getMessageConverter(ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        MessagingMessageConverter messagingMessageConverter;
        if (((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getConverterBeanName() == null) {
            MessagingMessageConverter messagingMessageConverter2 = new MessagingMessageConverter();
            KafkaConsumerProperties.StandardHeaders standardHeaders = ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getStandardHeaders();
            messagingMessageConverter2.setGenerateMessageId(KafkaConsumerProperties.StandardHeaders.id.equals(standardHeaders) || KafkaConsumerProperties.StandardHeaders.both.equals(standardHeaders));
            messagingMessageConverter2.setGenerateTimestamp(KafkaConsumerProperties.StandardHeaders.timestamp.equals(standardHeaders) || KafkaConsumerProperties.StandardHeaders.both.equals(standardHeaders));
            messagingMessageConverter2.setHeaderMapper(new DefaultKafkaHeaderMapper());
            messagingMessageConverter = messagingMessageConverter2;
        } else {
            try {
                messagingMessageConverter = (RecordMessageConverter) getApplicationContext().getBean(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getConverterBeanName(), RecordMessageConverter.class);
            } catch (NoSuchBeanDefinitionException e) {
                throw new IllegalStateException("Converter bean not present in application context", e);
            }
        }
        return messagingMessageConverter;
    }

    private Map<String, Object> createConsumerConfigs(boolean z, String str, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        HashMap hashMap = new HashMap();
        hashMap.put("key.deserializer", ByteArrayDeserializer.class);
        hashMap.put("value.deserializer", ByteArrayDeserializer.class);
        hashMap.put("enable.auto.commit", false);
        hashMap.put("auto.commit.interval.ms", 100);
        hashMap.put("auto.offset.reset", z ? "latest" : "earliest");
        hashMap.put("group.id", str);
        Map mergedConsumerConfiguration = this.configurationProperties.mergedConsumerConfiguration();
        if (!ObjectUtils.isEmpty(mergedConsumerConfiguration)) {
            hashMap.putAll(mergedConsumerConfiguration);
        }
        if (ObjectUtils.isEmpty(hashMap.get("bootstrap.servers"))) {
            hashMap.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        }
        Map configuration = ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getConfiguration();
        if (!ObjectUtils.isEmpty(configuration)) {
            Assert.state(!configuration.containsKey("bootstrap.servers"), "bootstrap.servers cannot be overridden at the binding level; use multiple binders instead");
            hashMap.putAll(configuration);
        }
        if (!ObjectUtils.isEmpty(((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getStartOffset())) {
            hashMap.put("auto.offset.reset", ((KafkaConsumerProperties) extendedConsumerProperties.getExtension()).getStartOffset().name());
        }
        return hashMap;
    }

    private Map<String, Object> createProducerConfigs(ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties) {
        HashMap hashMap = new HashMap();
        hashMap.put("key.serializer", ByteArraySerializer.class);
        hashMap.put("value.serializer", ByteArraySerializer.class);
        hashMap.put("acks", String.valueOf(this.configurationProperties.getRequiredAcks()));
        Map mergedProducerConfiguration = this.configurationProperties.mergedProducerConfiguration();
        if (!ObjectUtils.isEmpty(mergedProducerConfiguration)) {
            hashMap.putAll(mergedProducerConfiguration);
        }
        if (ObjectUtils.isEmpty(hashMap.get("bootstrap.servers"))) {
            hashMap.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        }
        KafkaProducerProperties kafkaProducerProperties = (KafkaProducerProperties) extendedProducerProperties.getExtension();
        if (ObjectUtils.isEmpty(hashMap.get("batch.size"))) {
            hashMap.put("batch.size", String.valueOf(kafkaProducerProperties.getBufferSize()));
        }
        if (ObjectUtils.isEmpty(hashMap.get("linger.ms"))) {
            hashMap.put("linger.ms", String.valueOf(kafkaProducerProperties.getBatchTimeout()));
        }
        if (ObjectUtils.isEmpty(hashMap.get("compression.type"))) {
            hashMap.put("compression.type", kafkaProducerProperties.getCompressionType().toString());
        }
        Map configuration = ((KafkaProducerProperties) extendedProducerProperties.getExtension()).getConfiguration();
        Assert.state(!configuration.containsKey("bootstrap.servers"), "bootstrap.servers cannot be overridden at the binding level; use multiple binders instead");
        if (!ObjectUtils.isEmpty(configuration)) {
            hashMap.putAll(configuration);
        }
        if (!ObjectUtils.isEmpty(kafkaProducerProperties.getConfiguration())) {
            hashMap.putAll(kafkaProducerProperties.getConfiguration());
        }
        return hashMap;
    }

    /* renamed from: getExtendedConsumerProperties, reason: merged with bridge method [inline-methods] */
    public KafkaConsumerProperties m2getExtendedConsumerProperties(String str) {
        return (KafkaConsumerProperties) this.extendedBindingProperties.getExtendedConsumerProperties(str);
    }

    /* renamed from: getExtendedProducerProperties, reason: merged with bridge method [inline-methods] */
    public KafkaProducerProperties m1getExtendedProducerProperties(String str) {
        return (KafkaProducerProperties) this.extendedBindingProperties.getExtendedProducerProperties(str);
    }

    public String getDefaultsPrefix() {
        return this.extendedBindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
    }

    public void setExtendedBindingProperties(KafkaExtendedBindingProperties kafkaExtendedBindingProperties) {
        this.extendedBindingProperties = kafkaExtendedBindingProperties;
    }
}
