package io.confluent.mqtt.stream.netty;

import io.confluent.mqtt.MqttConfig;
import io.confluent.mqtt.RegexListTopicMapper;
import io.confluent.mqtt.stream.DeliveryGuarantee;
import io.confluent.mqtt.stream.PublishMqttRecord;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/mqtt/stream/netty/NettyKafkaPublisherTest.class */
public class NettyKafkaPublisherTest {
    private static final String MQTT_TOPIC = "/kitchen/fridge/temperature";
    private static final byte[] PAYLOAD = "42F".getBytes(StandardCharsets.UTF_8);
    private static final String TOPIC_MAPPING = "temperature:.*temperature";
    private MqttConfig config;
    private RegexListTopicMapper mapper;
    KafkaProducer<String, byte[]> producer;
    Map<DeliveryGuarantee, Producer<String, byte[]>> producers;
    private final Map<String, String> props = new HashMap();
    private String listeners = "localhost:8883";
    private String kafka = "kafka:9092";

    @Before
    public void setup() {
        this.props.put("listeners", this.listeners);
        this.props.put("bootstrap.servers", this.kafka);
        this.props.put("topic.regex.list", TOPIC_MAPPING);
        this.config = new MqttConfig(this.props);
        this.mapper = new RegexListTopicMapper(this.config);
        this.producer = (KafkaProducer) Mockito.mock(KafkaProducer.class);
        this.producers = new HashMap();
    }

    @Test
    public void testWriteSuccess() {
        this.producers.put(DeliveryGuarantee.AT_MOST_ONCE, this.producer);
        NettyKafkaPublisher nettyKafkaPublisher = new NettyKafkaPublisher(this.producers);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class, Mockito.RETURNS_DEEP_STUBS);
        PublishMqttRecord newRecord = PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.AT_MOST_ONCE, 1));
        ChannelPromise channelPromise = (ChannelPromise) Mockito.mock(ChannelPromise.class);
        Mockito.when(channelHandlerContext.channel().id().asShortText()).thenReturn("too_long_id");
        ((KafkaProducer) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArguments()[1]).onCompletion(new RecordMetadata(new TopicPartition("mqtt", 7), 64L, 128L, 626400000L, 3L, 8, 16), (Exception) null);
            return null;
        }).when(this.producer)).send((ProducerRecord) Matchers.isA(ProducerRecord.class), (Callback) Matchers.isA(Callback.class));
        nettyKafkaPublisher.write(channelHandlerContext, newRecord, channelPromise);
        ((ChannelPromise) Mockito.verify(channelPromise)).setSuccess();
    }

    @Test
    public void testWriteFail() {
        this.producers.put(DeliveryGuarantee.AT_MOST_ONCE, this.producer);
        NettyKafkaPublisher nettyKafkaPublisher = new NettyKafkaPublisher(this.producers);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class, Mockito.RETURNS_DEEP_STUBS);
        PublishMqttRecord newRecord = PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.AT_MOST_ONCE, 1));
        RuntimeException runtimeException = new RuntimeException("Kafka has been down for a while");
        ChannelPromise channelPromise = (ChannelPromise) Mockito.mock(ChannelPromise.class);
        Mockito.when(channelHandlerContext.channel().id().asShortText()).thenReturn("too_long_id");
        ((KafkaProducer) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArguments()[1]).onCompletion((RecordMetadata) null, runtimeException);
            return null;
        }).when(this.producer)).send((ProducerRecord) Matchers.isA(ProducerRecord.class), (Callback) Matchers.isA(Callback.class));
        nettyKafkaPublisher.write(channelHandlerContext, newRecord, channelPromise);
        ((ChannelPromise) Mockito.verify(channelPromise)).setFailure((Throwable) Matchers.eq(runtimeException));
    }

    @Test
    public void testWriteException() {
        this.producers.put(DeliveryGuarantee.AT_MOST_ONCE, this.producer);
        NettyKafkaPublisher nettyKafkaPublisher = new NettyKafkaPublisher(this.producers);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class, Mockito.RETURNS_DEEP_STUBS);
        PublishMqttRecord newRecord = PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.AT_MOST_ONCE, 1));
        RuntimeException runtimeException = new RuntimeException("Invalid record");
        ChannelPromise channelPromise = (ChannelPromise) Mockito.mock(ChannelPromise.class);
        Exception exc = null;
        Mockito.when(channelHandlerContext.channel().id().asShortText()).thenReturn("too_long_id");
        ((KafkaProducer) Mockito.doAnswer(invocationOnMock -> {
            throw runtimeException;
        }).when(this.producer)).send((ProducerRecord) Matchers.isA(ProducerRecord.class), (Callback) Matchers.isA(Callback.class));
        try {
            nettyKafkaPublisher.write(channelHandlerContext, newRecord, channelPromise);
        } catch (Exception e) {
            exc = e;
        }
        Assert.assertEquals(runtimeException, exc);
    }

    private MqttPublishMessage newPublishMessage(String str, byte[] bArr, MqttQoS mqttQoS, Integer num) {
        return MqttMessageBuilders.publish().topicName(str).qos(mqttQoS).payload(Unpooled.wrappedBuffer(bArr)).messageId(num.intValue()).retained(false).build();
    }
}
