package org.springframework.integration.kafka.inbound;

import kafka.serializer.Decoder;
import kafka.serializer.DefaultDecoder;
import kafka.utils.VerifiableProperties;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.kafka.core.KafkaMessageMetadata;
import org.springframework.integration.kafka.listener.AbstractDecodingAcknowledgingMessageListener;
import org.springframework.integration.kafka.listener.AbstractDecodingMessageListener;
import org.springframework.integration.kafka.listener.Acknowledgment;
import org.springframework.integration.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MutableMessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.class */
public class KafkaMessageDrivenChannelAdapter extends MessageProducerSupport implements OrderlyShutdownCapable {
    private final KafkaMessageListenerContainer messageListenerContainer;
    private Decoder<?> keyDecoder = new DefaultDecoder((VerifiableProperties) null);
    private Decoder<?> payloadDecoder = new DefaultDecoder((VerifiableProperties) null);
    private boolean generateMessageId = false;
    private boolean generateTimestamp = false;
    private boolean useMessageBuilderFactory = false;
    private boolean autoCommitOffset = true;

    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter$AcknowledgingChannelForwardingMessageListener.class */
    private class AcknowledgingChannelForwardingMessageListener extends AbstractDecodingAcknowledgingMessageListener {
        public AcknowledgingChannelForwardingMessageListener() {
            super(KafkaMessageDrivenChannelAdapter.this.keyDecoder, KafkaMessageDrivenChannelAdapter.this.payloadDecoder);
        }

        @Override // org.springframework.integration.kafka.listener.AbstractDecodingAcknowledgingMessageListener
        public void doOnMessage(Object obj, Object obj2, KafkaMessageMetadata kafkaMessageMetadata, Acknowledgment acknowledgment) {
            KafkaMessageDrivenChannelAdapter.this.sendMessage(KafkaMessageDrivenChannelAdapter.this.toMessage(obj, obj2, kafkaMessageMetadata, acknowledgment));
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.class */
    private class AutoAcknowledgingChannelForwardingMessageListener extends AbstractDecodingMessageListener {
        public AutoAcknowledgingChannelForwardingMessageListener() {
            super(KafkaMessageDrivenChannelAdapter.this.keyDecoder, KafkaMessageDrivenChannelAdapter.this.payloadDecoder);
        }

        @Override // org.springframework.integration.kafka.listener.AbstractDecodingMessageListener
        public void doOnMessage(Object obj, Object obj2, KafkaMessageMetadata kafkaMessageMetadata) {
            KafkaMessageDrivenChannelAdapter.this.sendMessage(KafkaMessageDrivenChannelAdapter.this.toMessage(obj, obj2, kafkaMessageMetadata, null));
        }
    }

    public KafkaMessageDrivenChannelAdapter(KafkaMessageListenerContainer kafkaMessageListenerContainer) {
        Assert.notNull(kafkaMessageListenerContainer);
        Assert.isNull(kafkaMessageListenerContainer.getMessageListener());
        this.messageListenerContainer = kafkaMessageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
    }

    public void setKeyDecoder(Decoder<?> decoder) {
        this.keyDecoder = decoder;
    }

    public void setPayloadDecoder(Decoder<?> decoder) {
        this.payloadDecoder = decoder;
    }

    public void setAutoCommitOffset(boolean z) {
        this.autoCommitOffset = z;
    }

    public void setGenerateMessageId(boolean z) {
        this.generateMessageId = z;
    }

    public void setGenerateTimestamp(boolean z) {
        this.generateTimestamp = z;
    }

    public void setUseMessageBuilderFactory(boolean z) {
        this.useMessageBuilderFactory = z;
    }

    protected void onInit() {
        this.messageListenerContainer.setMessageListener(this.autoCommitOffset ? new AutoAcknowledgingChannelForwardingMessageListener() : new AcknowledgingChannelForwardingMessageListener());
        if (!this.generateMessageId && !this.generateTimestamp && (getMessageBuilderFactory() instanceof DefaultMessageBuilderFactory)) {
            setMessageBuilderFactory(new MutableMessageBuilderFactory());
        }
        super.onInit();
    }

    protected void doStart() {
        this.messageListenerContainer.start();
    }

    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    public String getComponentType() {
        return "kafka:message-driven-channel-adapter";
    }

    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return getPhase();
    }

    public int afterShutdown() {
        return getPhase();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Message<Object> toMessage(Object obj, Object obj2, KafkaMessageMetadata kafkaMessageMetadata, Acknowledgment acknowledgment) {
        MessageHeaderAccessor messageHeaderAccessor = new MessageHeaderAccessor();
        messageHeaderAccessor.setHeader(KafkaHeaders.MESSAGE_KEY, obj);
        messageHeaderAccessor.setHeader(KafkaHeaders.TOPIC, kafkaMessageMetadata.getPartition().getTopic());
        messageHeaderAccessor.setHeader(KafkaHeaders.PARTITION_ID, Integer.valueOf(kafkaMessageMetadata.getPartition().getId()));
        messageHeaderAccessor.setHeader(KafkaHeaders.OFFSET, Long.valueOf(kafkaMessageMetadata.getOffset()));
        messageHeaderAccessor.setHeader(KafkaHeaders.NEXT_OFFSET, Long.valueOf(kafkaMessageMetadata.getNextOffset()));
        messageHeaderAccessor.setLeaveMutable((this.generateMessageId || this.generateTimestamp) ? false : true);
        if (!this.autoCommitOffset) {
            messageHeaderAccessor.setHeader(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
        }
        return this.useMessageBuilderFactory ? getMessageBuilderFactory().withPayload(obj2).copyHeaders(messageHeaderAccessor.toMessageHeaders()).build() : MessageBuilder.createMessage(obj2, messageHeaderAccessor.getMessageHeaders());
    }
}
