package info.bitrich.xchangestream.service.netty;

import info.bitrich.xchangestream.service.exception.NotConnectedException;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/service/netty/NettyStreamingService.class */
public abstract class NettyStreamingService<T> {
    private static final Logger LOG = LoggerFactory.getLogger(NettyStreamingService.class);
    private final int maxFramePayloadLength;
    private final URI uri;
    private boolean isManualDisconnect;
    private Channel webSocketChannel;
    protected Map<String, NettyStreamingService<T>.Subscription> channels;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:info/bitrich/xchangestream/service/netty/NettyStreamingService$NettyWebSocketClientHandler.class */
    public class NettyWebSocketClientHandler extends WebSocketClientHandler {
        NettyWebSocketClientHandler(WebSocketClientHandshaker webSocketClientHandshaker, WebSocketClientHandler.WebSocketMessageHandler webSocketMessageHandler) {
            super(webSocketClientHandshaker, webSocketMessageHandler);
        }

        @Override // info.bitrich.xchangestream.service.netty.WebSocketClientHandler
        public void channelInactive(ChannelHandlerContext channelHandlerContext) {
            if (NettyStreamingService.this.isManualDisconnect) {
                NettyStreamingService.this.isManualDisconnect = false;
                return;
            }
            super.channelInactive(channelHandlerContext);
            NettyStreamingService.LOG.info("Reopening websocket because it was closed by the host");
            NettyStreamingService.this.connect().blockingAwait();
            NettyStreamingService.LOG.info("Resubscribing channels");
            NettyStreamingService.this.resubscribeChannels();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:info/bitrich/xchangestream/service/netty/NettyStreamingService$Subscription.class */
    public class Subscription {
        ObservableEmitter<T> emitter;
        Object[] args;

        private Subscription() {
        }
    }

    public NettyStreamingService(String str) {
        this(str, 65536);
    }

    public NettyStreamingService(String str, int i) {
        this.isManualDisconnect = false;
        this.channels = new ConcurrentHashMap();
        try {
            this.maxFramePayloadLength = i;
            this.uri = new URI(str);
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Error parsing URI " + str, e);
        }
    }

    public Completable connect() {
        return Completable.create(completableEmitter -> {
            try {
                LOG.info("Connecting to {}://{}:{}{}", new Object[]{this.uri.getScheme(), this.uri.getHost(), Integer.valueOf(this.uri.getPort()), this.uri.getPath()});
                String scheme = this.uri.getScheme() == null ? "ws" : this.uri.getScheme();
                final String host = this.uri.getHost();
                if (host == null) {
                    throw new IllegalArgumentException("Host cannot be null.");
                }
                int port = this.uri.getPort() == -1 ? "ws".equalsIgnoreCase(scheme) ? 80 : "wss".equalsIgnoreCase(scheme) ? 443 : -1 : this.uri.getPort();
                if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
                    throw new IllegalArgumentException("Only WS(S) is supported.");
                }
                SslContext build = "wss".equalsIgnoreCase(scheme) ? SslContextBuilder.forClient().build() : null;
                NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
                final WebSocketClientHandler webSocketClientHandler = getWebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(this.uri, WebSocketVersion.V13, (String) null, true, new DefaultHttpHeaders(), this.maxFramePayloadLength), this::messageHandler);
                Bootstrap bootstrap = new Bootstrap();
                final SslContext sslContext = build;
                final int i = port;
                bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { // from class: info.bitrich.xchangestream.service.netty.NettyStreamingService.1
                    /* JADX INFO: Access modifiers changed from: protected */
                    public void initChannel(SocketChannel socketChannel) {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        if (sslContext != null) {
                            pipeline.addLast(new ChannelHandler[]{sslContext.newHandler(socketChannel.alloc(), host, i)});
                        }
                        WebSocketClientExtensionHandler webSocketClientExtensionHandler = NettyStreamingService.this.getWebSocketClientExtensionHandler();
                        ArrayList arrayList = new ArrayList(4);
                        arrayList.add(new HttpClientCodec());
                        arrayList.add(new HttpObjectAggregator(8192));
                        arrayList.add(webSocketClientHandler);
                        if (webSocketClientExtensionHandler != null) {
                            arrayList.add(webSocketClientExtensionHandler);
                        }
                        pipeline.addLast((ChannelHandler[]) arrayList.toArray(new ChannelHandler[arrayList.size()]));
                    }
                });
                bootstrap.connect(this.uri.getHost(), port).addListener(channelFuture -> {
                    this.webSocketChannel = channelFuture.channel();
                    webSocketClientHandler.handshakeFuture().addListener(future -> {
                        completableEmitter.onComplete();
                    });
                });
            } catch (Exception e) {
                completableEmitter.onError(e);
            }
        });
    }

    public Completable disconnect() {
        this.isManualDisconnect = true;
        return Completable.create(completableEmitter -> {
            if (this.webSocketChannel.isOpen()) {
                this.webSocketChannel.writeAndFlush(new CloseWebSocketFrame()).addListener(future -> {
                    this.channels = new ConcurrentHashMap();
                    completableEmitter.onComplete();
                });
            }
        });
    }

    protected abstract String getChannelNameFromMessage(T t) throws IOException;

    public abstract String getSubscribeMessage(String str, Object... objArr) throws IOException;

    public abstract String getUnsubscribeMessage(String str) throws IOException;

    public String getSubscriptionUniqueId(String str, Object... objArr) {
        return str;
    }

    public abstract void messageHandler(String str);

    public void sendMessage(String str) {
        LOG.debug("Sending message: {}", str);
        if (this.webSocketChannel == null || !this.webSocketChannel.isOpen()) {
            LOG.warn("WebSocket is not open! Call connect first.");
            return;
        }
        if (!this.webSocketChannel.isWritable()) {
            LOG.warn("Cannot send data to WebSocket as it is not writable.");
        } else if (str != null) {
            this.webSocketChannel.writeAndFlush(new TextWebSocketFrame(str));
        }
    }

    public Observable<T> subscribeChannel(String str, Object... objArr) {
        String subscriptionUniqueId = getSubscriptionUniqueId(str, objArr);
        LOG.info("Subscribing to channel {}", subscriptionUniqueId);
        return Observable.create(observableEmitter -> {
            if (this.webSocketChannel == null || !this.webSocketChannel.isOpen()) {
                observableEmitter.onError(new NotConnectedException());
            }
            if (this.channels.containsKey(subscriptionUniqueId)) {
                return;
            }
            NettyStreamingService<T>.Subscription subscription = new Subscription();
            subscription.args = objArr;
            subscription.emitter = observableEmitter;
            this.channels.put(subscriptionUniqueId, subscription);
            try {
                sendMessage(getSubscribeMessage(str, objArr));
            } catch (IOException e) {
                observableEmitter.onError(e);
            }
        }).doOnDispose(() -> {
            if (this.channels.containsKey(subscriptionUniqueId)) {
                return;
            }
            sendMessage(getUnsubscribeMessage(subscriptionUniqueId));
            this.channels.remove(subscriptionUniqueId);
        }).share();
    }

    public void resubscribeChannels() {
        for (String str : this.channels.keySet()) {
            try {
                sendMessage(getSubscribeMessage(str, this.channels.get(str).args));
            } catch (IOException e) {
                LOG.error("Failed to reconnect channel: {}", str);
            }
        }
    }

    protected String getChannel(T t) {
        try {
            return getChannelNameFromMessage(t);
        } catch (IOException e) {
            LOG.error("Cannot parse channel from message: {}", t);
            return "";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleMessage(T t) {
        handleChannelMessage(getChannel(t), t);
    }

    protected void handleError(T t, Throwable th) {
        handleChannelError(getChannel(t), th);
    }

    protected void handleChannelMessage(String str, T t) {
        ObservableEmitter<T> observableEmitter = this.channels.get(str).emitter;
        if (observableEmitter == null) {
            LOG.debug("No subscriber for channel {}.", str);
        } else {
            observableEmitter.onNext(t);
        }
    }

    protected void handleChannelError(String str, Throwable th) {
        ObservableEmitter<T> observableEmitter = this.channels.get(str).emitter;
        if (observableEmitter == null) {
            LOG.debug("No subscriber for channel {}.", str);
        } else {
            observableEmitter.onError(th);
        }
    }

    protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
        return WebSocketClientCompressionHandler.INSTANCE;
    }

    protected WebSocketClientHandler getWebSocketClientHandler(WebSocketClientHandshaker webSocketClientHandshaker, WebSocketClientHandler.WebSocketMessageHandler webSocketMessageHandler) {
        return new NettyWebSocketClientHandler(webSocketClientHandshaker, webSocketMessageHandler);
    }

    public boolean isSocketOpen() {
        return this.webSocketChannel.isOpen();
    }
}
