package org.springframework.cloud.gcp.pubsub.integration.inbound;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.Subscriber;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.gcp.pubsub.core.PubSubException;
import org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations;
import org.springframework.cloud.gcp.pubsub.integration.AckMode;
import org.springframework.cloud.gcp.pubsub.integration.PubSubHeaderMapper;
import org.springframework.cloud.gcp.pubsub.support.GcpPubSubHeaders;
import org.springframework.cloud.gcp.pubsub.support.converter.ConvertedBasicAcknowledgeablePubsubMessage;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.HeaderMapper;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/cloud/gcp/pubsub/integration/inbound/PubSubInboundChannelAdapter.class */
public class PubSubInboundChannelAdapter extends MessageProducerSupport {
    private static final Log LOGGER = LogFactory.getLog(PubSubInboundChannelAdapter.class);
    private final String subscriptionName;
    private final PubSubSubscriberOperations pubSubSubscriberOperations;
    private Subscriber subscriber;
    private AckMode ackMode = AckMode.AUTO;
    private HeaderMapper<Map<String, String>> headerMapper = new PubSubHeaderMapper();
    private Class payloadType = byte[].class;

    public PubSubInboundChannelAdapter(PubSubSubscriberOperations pubSubSubscriberOperations, String str) {
        Assert.notNull(pubSubSubscriberOperations, "Pub/Sub subscriber template can't be null.");
        Assert.notNull(str, "Pub/Sub subscription name can't be null.");
        this.pubSubSubscriberOperations = pubSubSubscriberOperations;
        this.subscriptionName = str;
    }

    public AckMode getAckMode() {
        return this.ackMode;
    }

    public void setAckMode(AckMode ackMode) {
        Assert.notNull(ackMode, "The acknowledgement mode can't be null.");
        this.ackMode = ackMode;
    }

    public Class getPayloadType() {
        return this.payloadType;
    }

    public void setPayloadType(Class cls) {
        Assert.notNull(cls, "The payload type cannot be null.");
        this.payloadType = cls;
    }

    public void setHeaderMapper(HeaderMapper<Map<String, String>> headerMapper) {
        Assert.notNull(headerMapper, "The header mapper can't be null.");
        this.headerMapper = headerMapper;
    }

    protected void doStart() {
        super.doStart();
        this.subscriber = this.pubSubSubscriberOperations.subscribeAndConvert(this.subscriptionName, this::consumeMessage, this.payloadType);
    }

    protected void doStop() {
        if (this.subscriber != null) {
            this.subscriber.stopAsync();
        }
        super.doStop();
    }

    private void consumeMessage(final ConvertedBasicAcknowledgeablePubsubMessage convertedBasicAcknowledgeablePubsubMessage) {
        Map headers = this.headerMapper.toHeaders(convertedBasicAcknowledgeablePubsubMessage.getPubsubMessage().getAttributesMap());
        if (this.ackMode == AckMode.MANUAL) {
            headers.put(GcpPubSubHeaders.ORIGINAL_MESSAGE, convertedBasicAcknowledgeablePubsubMessage);
            headers.put(GcpPubSubHeaders.ACKNOWLEDGEMENT, new AckReplyConsumer() { // from class: org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter.1
                public void ack() {
                    PubSubInboundChannelAdapter.LOGGER.warn("ACKNOWLEDGEMENT header is deprecated. Please use ORIGINAL_MESSAGE header to ack.");
                    convertedBasicAcknowledgeablePubsubMessage.ack();
                }

                public void nack() {
                    PubSubInboundChannelAdapter.LOGGER.warn("ACKNOWLEDGEMENT header is deprecated. Please use ORIGINAL_MESSAGE header to nack.");
                    convertedBasicAcknowledgeablePubsubMessage.nack();
                }
            });
        }
        try {
            sendMessage(MessageBuilder.withPayload(convertedBasicAcknowledgeablePubsubMessage.getPayload()).copyHeaders(headers).build());
            if (this.ackMode == AckMode.AUTO || this.ackMode == AckMode.AUTO_ACK) {
                convertedBasicAcknowledgeablePubsubMessage.ack();
            }
        } catch (RuntimeException e) {
            if (this.ackMode == AckMode.AUTO) {
                convertedBasicAcknowledgeablePubsubMessage.nack();
            }
            throw new PubSubException("Sending Spring message failed.", e);
        }
    }
}
