/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.dsl.kafka;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;

public class Kafka09ProducerMessageHandler<K, V>
extends AbstractMessageHandler {
    private static final long DEFAULT_SEND_TIMEOUT = 10000L;
    private final KafkaTemplate<K, V> kafkaTemplate;
    private EvaluationContext evaluationContext;
    private volatile Expression topicExpression;
    private volatile Expression messageKeyExpression;
    private volatile Expression partitionIdExpression;
    private boolean sync;
    private long sendTimeout = 10000L;

    public Kafka09ProducerMessageHandler(KafkaTemplate<K, V> kafkaTemplate) {
        Assert.notNull(kafkaTemplate, (String)"kafkaTemplate cannot be null");
        this.kafkaTemplate = kafkaTemplate;
    }

    public void setTopicExpression(Expression topicExpression) {
        this.topicExpression = topicExpression;
    }

    public void setMessageKeyExpression(Expression messageKeyExpression) {
        this.messageKeyExpression = messageKeyExpression;
    }

    public void setPartitionIdExpression(Expression partitionIdExpression) {
        this.partitionIdExpression = partitionIdExpression;
    }

    public KafkaTemplate<?, ?> getKafkaTemplate() {
        return this.kafkaTemplate;
    }

    public void setSync(boolean sync) {
        this.sync = sync;
    }

    public void setSendTimeout(long sendTimeout) {
        this.sendTimeout = sendTimeout;
    }

    protected void onInit() throws Exception {
        super.onInit();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext((BeanFactory)this.getBeanFactory());
    }

    protected void handleMessageInternal(Message<?> message) throws Exception {
        String topic = this.topicExpression != null ? (String)this.topicExpression.getValue(this.evaluationContext, message, String.class) : (String)message.getHeaders().get((Object)"kafka_topic", String.class);
        Assert.state((boolean)StringUtils.hasText((String)topic), (String)"The 'topic' can not be empty or null");
        Integer partitionId = this.partitionIdExpression != null ? (Integer)this.partitionIdExpression.getValue(this.evaluationContext, message, Integer.class) : (Integer)message.getHeaders().get((Object)"kafka_partitionId", Integer.class);
        Object messageKey = this.messageKeyExpression != null ? this.messageKeyExpression.getValue(this.evaluationContext, message) : message.getHeaders().get((Object)"kafka_messageKey");
        Object payload = message.getPayload();
        if (payload instanceof KafkaNull) {
            payload = null;
        }
        ListenableFuture future = partitionId == null ? (messageKey == null ? this.kafkaTemplate.send(topic, payload) : this.kafkaTemplate.send(topic, messageKey, payload)) : (messageKey == null ? this.kafkaTemplate.send(topic, partitionId.intValue(), payload) : this.kafkaTemplate.send(topic, partitionId.intValue(), messageKey, payload));
        if (this.sync) {
            if (this.sendTimeout < 0L) {
                future.get();
            } else {
                try {
                    future.get(this.sendTimeout, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException te) {
                    throw new MessageTimeoutException(message, "Timeout waiting for response from KafkaProducer", (Throwable)te);
                }
            }
        }
    }

    public String getComponentType() {
        return "kafka:outbound-channel-adapter";
    }
}

