package io.confluent.mqtt.protocol.netty;

import com.sun.security.auth.login.ConfigFile;
import io.confluent.mqtt.MqttConfig;
import io.confluent.mqtt.PipelineFactory;
import io.confluent.mqtt.RegexListTopicMapper;
import io.confluent.mqtt.protocol.security.SecurityProtocol;
import io.confluent.mqtt.stream.PublishMqttRecord;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import javax.security.auth.login.Configuration;
import org.apache.kafka.common.TopicPartition;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:io/confluent/mqtt/protocol/netty/NettyMqttHandlerTest.class */
public class NettyMqttHandlerTest {
    private static final String MQTT_TOPIC = "/kitchen/fridge/temperature";
    private static final byte[] PAYLOAD = "42F".getBytes(StandardCharsets.UTF_8);
    private static final String TOPIC_MAPPING = "temperature:.*temperature";
    private static final String USERNAME = "donnie";
    private static final String PASSWORD = "darko";
    private static final String WRONG_PASSWORD = "bunny";
    private ChannelOutboundHandler kafkaPublisherMock;
    private MqttConfig config;
    private RegexListTopicMapper mapper;
    private final Map<String, String> props = new HashMap();
    private String listeners = "localhost:8883";
    private String kafka = "kafka:9092";

    @Before
    public void setup() {
        this.props.put("listeners", this.listeners);
        this.props.put("bootstrap.servers", this.kafka);
        this.props.put("topic.regex.list", TOPIC_MAPPING);
        this.config = new MqttConfig(this.props);
        this.mapper = new RegexListTopicMapper(this.config);
        this.kafkaPublisherMock = (ChannelOutboundHandler) Mockito.mock(ChannelOutboundHandler.class);
    }

    @After
    public void teardown() {
        NettyMqttHandler.resetInstance();
    }

    @Test
    public void testPing() throws Exception {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(MqttMessage.class);
        newEmbeddedChannel(SecurityProtocol.PLAINTEXT).writeInbound(new Object[]{new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_LEAST_ONCE, false, 0))});
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), forClass.capture(), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        assertFixedHeaderEquals(newPingResp().fixedHeader(), ((MqttMessage) forClass.getValue()).fixedHeader());
    }

    @Test
    public void testPublishQoS0Success() throws Exception {
        PublishMqttRecord newRecord = PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.AT_MOST_ONCE, 1));
        doPublish(newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.AT_MOST_ONCE, 1));
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), Matchers.eq(newRecord), (ChannelPromise) Matchers.isA(ChannelPromise.class));
    }

    @Test
    public void testPublishTooLargeMqttMessageFails() throws Exception {
        byte[] bArr = new byte[8092 - 28];
        new SecureRandom().nextBytes(bArr);
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{MqttEncoder.INSTANCE});
        embeddedChannel.writeOutbound(new Object[]{newPublishMessage(MQTT_TOPIC, bArr, MqttQoS.AT_MOST_ONCE, -1)});
        newExtendedEmbeddedChannel(SecurityProtocol.PLAINTEXT).writeInbound(new Object[]{(ByteBuf) embeddedChannel.readOutbound()});
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock, Mockito.never())).write((ChannelHandlerContext) Matchers.any(), Matchers.any(), (ChannelPromise) Matchers.any());
        byte[] bytes = "some message".getBytes(StandardCharsets.UTF_8);
        PublishMqttRecord newRecord = PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, bytes, MqttQoS.AT_MOST_ONCE, -1));
        embeddedChannel.writeOutbound(new Object[]{newPublishMessage(MQTT_TOPIC, bytes, MqttQoS.AT_MOST_ONCE, 1)});
        newExtendedEmbeddedChannel(SecurityProtocol.PLAINTEXT).writeInbound(new Object[]{(ByteBuf) embeddedChannel.readOutbound()});
        ArgumentCaptor forClass = ArgumentCaptor.forClass(PublishMqttRecord.class);
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), forClass.capture(), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        Assert.assertEquals(newRecord, (PublishMqttRecord) forClass.getValue());
    }

    @Test
    public void testPublishLargeMqttMessageSucceeds() throws Exception {
        byte[] bArr = new byte[8092 - 28];
        new SecureRandom().nextBytes(bArr);
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{MqttEncoder.INSTANCE});
        embeddedChannel.writeOutbound(new Object[]{newPublishMessage(MQTT_TOPIC, bArr, MqttQoS.AT_MOST_ONCE, -1)});
        ByteBuf byteBuf = (ByteBuf) embeddedChannel.readOutbound();
        this.props.put("mqtt.message.max.bytes", String.valueOf(8388608));
        newExtendedEmbeddedChannel(SecurityProtocol.PLAINTEXT).writeInbound(new Object[]{byteBuf});
        PublishMqttRecord newRecord = PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, bArr, MqttQoS.AT_MOST_ONCE, -1));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(PublishMqttRecord.class);
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), forClass.capture(), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        Assert.assertEquals(newRecord, (PublishMqttRecord) forClass.getValue());
    }

    @Test
    public void testPublishQoS0Fail() throws Exception {
        assertPublishFailed(newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.AT_MOST_ONCE, 1), PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.AT_MOST_ONCE, 1)));
    }

    @Test
    public void testPublishQoS1Success() throws Exception {
        PublishMqttRecord newRecord = PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.AT_LEAST_ONCE, 1));
        MqttPubAckMessage newPubAckMessage = newPubAckMessage(1);
        doPublish(newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.AT_LEAST_ONCE, 1));
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), Matchers.eq(newRecord), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Object.class);
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock, Mockito.times(2))).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), forClass.capture(), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        assertFixedHeaderEquals(newPubAckMessage.fixedHeader(), ((MqttMessage) forClass.getAllValues().get(1)).fixedHeader());
        Assert.assertEquals(newPubAckMessage.variableHeader().messageId(), ((MqttPubAckMessage) forClass.getAllValues().get(1)).variableHeader().messageId());
    }

    private void assertFixedHeaderEquals(MqttFixedHeader mqttFixedHeader, MqttFixedHeader mqttFixedHeader2) {
        Assert.assertEquals(mqttFixedHeader.messageType(), mqttFixedHeader2.messageType());
        Assert.assertEquals(Boolean.valueOf(mqttFixedHeader.isDup()), Boolean.valueOf(mqttFixedHeader2.isDup()));
        Assert.assertEquals(mqttFixedHeader.qosLevel(), mqttFixedHeader2.qosLevel());
        Assert.assertEquals(Boolean.valueOf(mqttFixedHeader.isRetain()), Boolean.valueOf(mqttFixedHeader2.isRetain()));
        Assert.assertEquals(mqttFixedHeader.remainingLength(), mqttFixedHeader2.remainingLength());
    }

    private void assertConnectReply(MqttConnectMessage mqttConnectMessage, SecurityProtocol securityProtocol, MqttConnAckMessage mqttConnAckMessage) throws Exception {
        newEmbeddedChannel(securityProtocol).writeInbound(new Object[]{mqttConnectMessage});
        ArgumentCaptor forClass = ArgumentCaptor.forClass(MqttMessage.class);
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), forClass.capture(), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        assertFixedHeaderEquals(mqttConnAckMessage.fixedHeader(), ((MqttMessage) forClass.getValue()).fixedHeader());
        assertVariableHeaderEquals(mqttConnAckMessage.variableHeader(), ((MqttMessage) forClass.getValue()).variableHeader());
    }

    private void assertVariableHeaderEquals(MqttConnAckVariableHeader mqttConnAckVariableHeader, Object obj) {
        Assert.assertTrue("Actual header's type is different from MqttConnAckVariableHeader", obj instanceof MqttConnAckVariableHeader);
        MqttConnAckVariableHeader mqttConnAckVariableHeader2 = (MqttConnAckVariableHeader) obj;
        Assert.assertEquals(mqttConnAckVariableHeader.connectReturnCode(), mqttConnAckVariableHeader2.connectReturnCode());
        Assert.assertEquals(Boolean.valueOf(mqttConnAckVariableHeader.isSessionPresent()), Boolean.valueOf(mqttConnAckVariableHeader2.isSessionPresent()));
    }

    @Test
    public void testPublishQoS1Fail() throws Exception {
        assertPublishFailed(newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.AT_LEAST_ONCE, 1), PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.AT_LEAST_ONCE, 1)));
    }

    @Test
    public void testPublishQoS2Success() throws Exception {
        PublishMqttRecord newRecord = PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.EXACTLY_ONCE, 65535));
        MqttMessage newPubRecMessage = newPubRecMessage(1);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Object.class);
        doPublish(newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.EXACTLY_ONCE, 65535));
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock, Mockito.times(2))).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), forClass.capture(), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        Assert.assertEquals(2L, forClass.getAllValues().size());
        Assert.assertEquals(newRecord, forClass.getAllValues().get(0));
        assertFixedHeaderEquals(newPubRecMessage.fixedHeader(), ((MqttMessage) forClass.getAllValues().get(1)).fixedHeader());
        MqttMessage newPubRelMessage = newPubRelMessage(65535);
        Mockito.reset(new ChannelOutboundHandler[]{this.kafkaPublisherMock});
        ((ChannelOutboundHandler) Mockito.doAnswer(invocationOnMock -> {
            ((ChannelPromise) invocationOnMock.getArguments()[2]).setSuccess();
            return null;
        }).when(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.any(), Matchers.any(), (ChannelPromise) Matchers.any());
        newEmbeddedChannel(SecurityProtocol.PLAINTEXT).writeInbound(new Object[]{newPubRelMessage});
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Object.class);
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock, Mockito.times(1))).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), forClass2.capture(), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        Assert.assertEquals(1L, forClass2.getAllValues().size());
        assertFixedHeaderEquals(newPubCompMessage(65535).fixedHeader(), ((MqttMessage) forClass2.getAllValues().get(0)).fixedHeader());
        Assert.assertEquals(65535, ((MqttMessageIdVariableHeader) r0.variableHeader()).messageId());
    }

    @Test
    public void testPublishQoS2PublishFail() throws Exception {
        assertPublishFailed(newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.EXACTLY_ONCE, 65535), PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.EXACTLY_ONCE, 65535)));
    }

    @Test
    public void testPublishQoS2PubRecFail() throws Exception {
        PublishMqttRecord newRecord = PublishMqttRecord.newRecord((TopicPartition) this.mapper.map(MQTT_TOPIC).get(), newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.EXACTLY_ONCE, 65535));
        ((ChannelOutboundHandler) Mockito.doAnswer(invocationOnMock -> {
            ((ChannelPromise) invocationOnMock.getArguments()[2]).setSuccess();
            return null;
        }).when(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), Matchers.eq(newRecord), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        ((ChannelOutboundHandler) Mockito.doAnswer(new Answer() { // from class: io.confluent.mqtt.protocol.netty.NettyMqttHandlerTest.1
            private int invocationCounter = 0;

            public Object answer(InvocationOnMock invocationOnMock2) {
                ChannelPromise channelPromise = (ChannelPromise) invocationOnMock2.getArguments()[2];
                int i = this.invocationCounter;
                this.invocationCounter = i + 1;
                if (i == 0) {
                    channelPromise.setSuccess();
                    return null;
                }
                channelPromise.setFailure(new RuntimeException("No response from client"));
                return null;
            }
        }).when(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), Matchers.eq(MqttMessage.class), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        newEmbeddedChannel(SecurityProtocol.PLAINTEXT).writeInbound(new Object[]{newPublishMessage(MQTT_TOPIC, PAYLOAD, MqttQoS.EXACTLY_ONCE, 65535)});
        ArgumentCaptor forClass = ArgumentCaptor.forClass(MqttMessage.class);
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock, Mockito.times(2))).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), forClass.capture(), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        forClass.getAllValues();
        Assert.assertEquals(newRecord, forClass.getAllValues().get(0));
        Assert.assertEquals(((MqttMessage) forClass.getAllValues().get(1)).fixedHeader().messageType(), MqttMessageType.PUBREC);
        Assert.assertEquals(((MqttMessageIdVariableHeader) r0.variableHeader()).messageId(), 65535);
    }

    @Test
    public void testConnectPlaintextSuccess() throws Exception {
        loadForbiddenJaasConfiguration();
        assertConnectReply(newConnectMessage(USERNAME, WRONG_PASSWORD), SecurityProtocol.PLAINTEXT, newConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED));
    }

    @Test
    public void testDisconnect() throws Exception {
        newEmbeddedChannel(SecurityProtocol.PLAINTEXT).writeInbound(new Object[]{newDisconnectMessage()});
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock)).close((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), (ChannelPromise) Matchers.any(ChannelPromise.class));
    }

    @Test
    public void testConnectSaslPlaintextSuccess() throws Exception {
        assertConnectReply(newConnectMessage(USERNAME, PASSWORD), SecurityProtocol.SASL_PLAINTEXT, newConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED));
    }

    @Test
    public void testConnectSaslPlaintextFailed() throws Exception {
        assertConnectEncryptedSasl(WRONG_PASSWORD, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, SecurityProtocol.SASL_PLAINTEXT);
    }

    @Test
    public void testConnectSaslSSLSuccess() throws Exception {
        assertConnectEncryptedSasl(PASSWORD, MqttConnectReturnCode.CONNECTION_ACCEPTED, SecurityProtocol.SASL_SSL);
    }

    @Test
    public void testConnectSaslTLSSuccess() throws Exception {
        assertConnectEncryptedSasl(PASSWORD, MqttConnectReturnCode.CONNECTION_ACCEPTED, SecurityProtocol.SASL_TLS);
    }

    @Test
    public void testConnectSaslTLSFailed() throws Exception {
        assertConnectEncryptedSasl(WRONG_PASSWORD, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, SecurityProtocol.SASL_TLS);
    }

    @Test
    public void testConnectSaslSSLFailed() throws Exception {
        assertConnectEncryptedSasl(WRONG_PASSWORD, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, SecurityProtocol.SASL_SSL);
    }

    private void assertConnectEncryptedSasl(String str, MqttConnectReturnCode mqttConnectReturnCode, SecurityProtocol securityProtocol) throws Exception {
        MqttConnectMessage newConnectMessage = newConnectMessage(USERNAME, str);
        MqttConnAckMessage newConnAckMessage = newConnAckMessage(mqttConnectReturnCode);
        loadLocalJaasConfiguration();
        assertConnectReply(newConnectMessage, securityProtocol, newConnAckMessage);
    }

    private void loadLocalJaasConfiguration() throws URISyntaxException {
        Configuration.setConfiguration(new ConfigFile(getClass().getClassLoader().getResource("kafka-mqtt-jaas.conf").toURI()));
    }

    private void loadForbiddenJaasConfiguration() {
        Configuration configuration = (Configuration) Mockito.mock(Configuration.class);
        ((Configuration) Mockito.doThrow(new RuntimeException("Should not be called")).when(configuration)).getAppConfigurationEntry((String) Matchers.eq("ConfluentKafkaMqtt"));
        Configuration.setConfiguration(configuration);
    }

    private void doPublish(Object obj) throws Exception {
        ((ChannelOutboundHandler) Mockito.doAnswer(invocationOnMock -> {
            ((ChannelPromise) invocationOnMock.getArguments()[2]).setSuccess();
            return null;
        }).when(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), Matchers.any(), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        newEmbeddedChannel(SecurityProtocol.PLAINTEXT).writeInbound(new Object[]{obj});
    }

    private void assertPublishFailed(Object obj, PublishMqttRecord publishMqttRecord) throws Exception {
        ((ChannelOutboundHandler) Mockito.doThrow(new RuntimeException("Kafka is not happy")).when(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.any(ChannelHandlerContext.class), Matchers.isA(PublishMqttRecord.class), (ChannelPromise) Matchers.isA(ChannelPromise.class));
        newEmbeddedChannel(SecurityProtocol.PLAINTEXT).writeInbound(new Object[]{obj});
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock)).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), Matchers.eq(publishMqttRecord), (ChannelPromise) Matchers.any(ChannelPromise.class));
        ((ChannelOutboundHandler) Mockito.verify(this.kafkaPublisherMock, Mockito.never())).write((ChannelHandlerContext) Matchers.isA(ChannelHandlerContext.class), Matchers.isA(MqttPubAckMessage.class), (ChannelPromise) Matchers.isA(ChannelPromise.class));
    }

    private EmbeddedChannel newEmbeddedChannel(SecurityProtocol securityProtocol) {
        this.props.put("listeners.security.protocol", securityProtocol.toString());
        this.config = new MqttConfig(this.props);
        this.mapper = new RegexListTopicMapper(this.config);
        return new EmbeddedChannel(new ChannelHandler[]{new NettyMqttHandler(this.config), this.kafkaPublisherMock});
    }

    private EmbeddedChannel newExtendedEmbeddedChannel(SecurityProtocol securityProtocol) {
        this.props.put("listeners.security.protocol", securityProtocol.toString());
        this.config = new MqttConfig(this.props);
        this.mapper = new RegexListTopicMapper(this.config);
        ArrayList arrayList = new ArrayList(Arrays.asList(new PipelineFactory(this.config).newPublishPipelineHandlers((SocketChannel) Mockito.mock(SocketChannel.class))));
        arrayList.add(this.kafkaPublisherMock);
        return new EmbeddedChannel((ChannelHandler[]) arrayList.toArray(new ChannelHandler[0]));
    }

    private MqttConnectMessage newConnectMessage(String str, String str2) {
        return MqttMessageBuilders.connect().hasUser(true).username(str).hasPassword(true).password(str2.getBytes(StandardCharsets.UTF_8)).build();
    }

    private MqttMessage newDisconnectMessage() {
        return MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object) null, (Object) null);
    }

    private MqttConnAckMessage newConnAckMessage(MqttConnectReturnCode mqttConnectReturnCode) {
        return MqttMessageBuilders.connAck().returnCode(mqttConnectReturnCode).sessionPresent(false).build();
    }

    private MqttMessage newPubRelMessage(Integer num) {
        return MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.EXACTLY_ONCE, false, 2), MqttMessageIdVariableHeader.from(num.intValue()), (Object) null);
    }

    private MqttPublishMessage newPublishMessage(String str, byte[] bArr, MqttQoS mqttQoS, Integer num) {
        return MqttMessageBuilders.publish().topicName(str).qos(mqttQoS).payload(Unpooled.wrappedBuffer(bArr)).messageId(mqttQoS == MqttQoS.AT_MOST_ONCE ? -1 : num.intValue()).retained(false).build();
    }

    private MqttPubAckMessage newPubAckMessage(Integer num) {
        return MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2), MqttMessageIdVariableHeader.from(num.intValue()), (Object) null);
    }

    private MqttMessage newPubRecMessage(Integer num) {
        return MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 2), MqttMessageIdVariableHeader.from(num.intValue()), (Object) null);
    }

    private MqttMessage newPingResp() {
        return new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0));
    }

    private MqttMessage newPubCompMessage(int i) {
        return MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 2), MqttMessageIdVariableHeader.from(i), (Object) null);
    }
}
