package io.github.quickmsg.core.mqtt;

import io.github.quickmsg.common.channel.MqttChannel;
import io.github.quickmsg.common.enums.ChannelStatus;
import io.github.quickmsg.common.transport.Transport;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttMessage;
import java.time.Duration;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

/* loaded from: input_file:io/github/quickmsg/core/mqtt/MqttReceiveContext.class */
public class MqttReceiveContext extends AbstractReceiveContext<MqttConfiguration> {
    private static final Logger log = LoggerFactory.getLogger(MqttReceiveContext.class);
    private Disposable deferCloseDisposable;

    public MqttReceiveContext(MqttConfiguration mqttConfiguration, Transport<MqttConfiguration> transport) {
        super(mqttConfiguration, transport);
    }

    public void apply(MqttChannel mqttChannel) {
        deferCloseChannel(mqttChannel.getConnection());
        mqttChannel.onClose(onDisposable(mqttChannel)).getConnection().inbound().receiveObject().cast(MqttMessage.class).map(retainMessage()).subscribe(mqttMessage -> {
            accept(mqttChannel, mqttMessage);
        });
    }

    private Function<MqttMessage, MqttMessage> retainMessage() {
        return mqttMessage -> {
            if (mqttMessage.payload() instanceof ByteBuf) {
                ((ByteBuf) mqttMessage.payload()).retain();
            }
            return mqttMessage;
        };
    }

    public void accept(MqttChannel mqttChannel, MqttMessage mqttMessage) {
        log.info("accept channel] {} message {}", mqttChannel.getConnection(), mqttMessage);
        getProtocolAdaptor().chooseProtocol(mqttChannel, mqttMessage, this);
    }

    private MqttReceiveContext deferCloseChannel(Connection connection) {
        this.deferCloseDisposable = Mono.fromRunnable(() -> {
            if (connection.isDisposed()) {
                return;
            }
            connection.dispose();
        }).delaySubscription(Duration.ofSeconds(10L)).subscribe();
        return this;
    }

    private Disposable onDisposable(MqttChannel mqttChannel) {
        return () -> {
            if (mqttChannel.isSessionPersistent()) {
                mqttChannel.setStatus(ChannelStatus.OFFLINE);
                mqttChannel.close().subscribe();
            } else {
                getTopicRegistry().clear(mqttChannel);
                getChannelRegistry().close(mqttChannel);
            }
        };
    }

    public Disposable getDeferCloseDisposable() {
        return this.deferCloseDisposable;
    }
}
