package net.jetblack.feedbus.adapters;

import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jetblack.feedbus.adapters.config.ConnectionConfig;
import net.jetblack.feedbus.messages.ForwardedSubscriptionRequest;
import net.jetblack.feedbus.messages.Message;
import net.jetblack.feedbus.messages.MessageType;
import net.jetblack.feedbus.messages.MonitorRequest;
import net.jetblack.feedbus.messages.MulticastData;
import net.jetblack.feedbus.messages.NotificationRequest;
import net.jetblack.feedbus.messages.SubscriptionRequest;
import net.jetblack.feedbus.messages.UnicastData;

/* loaded from: input_file:net/jetblack/feedbus/adapters/Client.class */
public class Client implements Closeable {
    private static final Logger logger = Logger.getLogger(Client.class.getName());
    private final Socket _socket;
    private final DataInputStream _inputStream;
    private final DataOutputStream _outputStream;
    private final ByteSerializable _byteEncoder;
    private final BlockingQueue<Message> _writeQueue;
    private final List<DataErrorListener> _dataErrorListeners = new ArrayList();
    private final List<DataReceivedListener> _dataReceivedListeners = new ArrayList();
    private final List<ForwardedSubscriptionListener> _forwardedSubscriptionListeners = new ArrayList();
    private final List<ConnectionChangedListener> _connectionChangedListener = new ArrayList();
    private final List<HeartbeatListener> _heartbeatListeners = new ArrayList();
    private Thread _readThread;
    private Thread _writeThread;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: net.jetblack.feedbus.adapters.Client$3, reason: invalid class name */
    /* loaded from: input_file:net/jetblack/feedbus/adapters/Client$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$net$jetblack$feedbus$messages$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$net$jetblack$feedbus$messages$MessageType[MessageType.MulticastData.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$net$jetblack$feedbus$messages$MessageType[MessageType.UnicastData.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$net$jetblack$feedbus$messages$MessageType[MessageType.ForwardedSubscriptionRequest.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public static Client createFromProperties() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException, IOException, InterruptedException, ClassNotFoundException {
        ConnectionConfig createFromProperties = ConnectionConfig.createFromProperties();
        return create(createFromProperties.getAddress(), createFromProperties.getPort(), createFromProperties.getByteSerializerType().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]), createFromProperties.getWriteQueueCapacity());
    }

    public static Client create(ConnectionConfig connectionConfig) throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException, IOException, InterruptedException, ClassNotFoundException {
        return create(connectionConfig.getAddress(), connectionConfig.getPort(), connectionConfig.getByteSerializerType().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]), connectionConfig.getWriteQueueCapacity());
    }

    public static Client create(InetAddress inetAddress, int i, ByteSerializable byteSerializable, int i2) throws IOException, InterruptedException {
        Client client = new Client(new Socket(inetAddress, i), byteSerializable, i2);
        client.start();
        client.addSubscription("__admin__", "heartbeat");
        return client;
    }

    public void addDataReceivedListener(DataReceivedListener dataReceivedListener) {
        synchronized (this._dataReceivedListeners) {
            this._dataReceivedListeners.add(dataReceivedListener);
        }
    }

    public void removeDataReceivedListener(DataReceivedListener dataReceivedListener) {
        synchronized (this._dataReceivedListeners) {
            this._dataReceivedListeners.remove(dataReceivedListener);
        }
    }

    public void addDataErrorListener(DataErrorListener dataErrorListener) {
        synchronized (this._dataErrorListeners) {
            this._dataErrorListeners.add(dataErrorListener);
        }
    }

    public void removeDataErrorListener(DataErrorListener dataErrorListener) {
        synchronized (this._dataErrorListeners) {
            this._dataErrorListeners.remove(dataErrorListener);
        }
    }

    public void addForwardedSubscriptionListener(ForwardedSubscriptionListener forwardedSubscriptionListener) {
        synchronized (this._forwardedSubscriptionListeners) {
            this._forwardedSubscriptionListeners.add(forwardedSubscriptionListener);
        }
    }

    public void removeForwardedSubscriptionListener(ForwardedSubscriptionListener forwardedSubscriptionListener) {
        synchronized (this._forwardedSubscriptionListeners) {
            this._forwardedSubscriptionListeners.remove(forwardedSubscriptionListener);
        }
    }

    public void addConnectionChangedListener(ConnectionChangedListener connectionChangedListener) {
        synchronized (this._connectionChangedListener) {
            this._connectionChangedListener.add(connectionChangedListener);
        }
    }

    public void removeConnectionChangedListener(ConnectionChangedListener connectionChangedListener) {
        synchronized (this._connectionChangedListener) {
            this._connectionChangedListener.remove(connectionChangedListener);
        }
    }

    public void addHeartbeatListener(HeartbeatListener heartbeatListener) {
        synchronized (this._heartbeatListeners) {
            this._heartbeatListeners.add(heartbeatListener);
        }
    }

    public void removeHeartbeatListener(HeartbeatListener heartbeatListener) {
        synchronized (this._heartbeatListeners) {
            this._heartbeatListeners.remove(heartbeatListener);
        }
    }

    private Client(Socket socket, ByteSerializable byteSerializable, int i) throws IOException {
        this._socket = socket;
        this._inputStream = new DataInputStream(socket.getInputStream());
        this._outputStream = new DataOutputStream(socket.getOutputStream());
        this._byteEncoder = byteSerializable;
        this._writeQueue = new ArrayBlockingQueue(i);
    }

    private void start() {
        this._readThread = new Thread(new Runnable() { // from class: net.jetblack.feedbus.adapters.Client.1
            @Override // java.lang.Runnable
            public void run() {
                Client.this.read();
            }
        }, "read");
        this._writeThread = new Thread(new Runnable() { // from class: net.jetblack.feedbus.adapters.Client.2
            @Override // java.lang.Runnable
            public void run() {
                Client.this.write();
            }
        }, "write");
        this._readThread.start();
        this._writeThread.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Failed to find 'out' block for switch in B:5:0x001c. Please report as an issue. */
    public void read() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Message read = Message.read(this._inputStream);
                switch (AnonymousClass3.$SwitchMap$net$jetblack$feedbus$messages$MessageType[read.getType().ordinal()]) {
                    case 1:
                        raiseOnDataOrHeartbeat((MulticastData) read);
                    case 2:
                        raiseOnData((UnicastData) read);
                    case 3:
                        raiseOnForwardedSubscriptionRequest((ForwardedSubscriptionRequest) read);
                    default:
                        throw new Exception("invalid message type");
                }
            } catch (EOFException e) {
                logger.severe("The distrubutor has closed the connection.");
                raiseConnectionStateChanged(ConnectionState.Closed, e);
                return;
            } catch (InterruptedException e2) {
                logger.finest("Th read thread has been interrupted.");
                return;
            } catch (Exception e3) {
                logger.log(Level.SEVERE, "Failed to read from distributor.", (Throwable) e3);
                raiseConnectionStateChanged(ConnectionState.Faulted, e3);
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void write() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                this._writeQueue.take().write(this._outputStream);
            } catch (EOFException e) {
                logger.info("EOF write thread");
                raiseConnectionStateChanged(ConnectionState.Closed, e);
                return;
            } catch (InterruptedException e2) {
                logger.info("Interrupted wriite thread");
                return;
            } catch (Exception e3) {
                logger.info("Error write thread");
                raiseConnectionStateChanged(ConnectionState.Faulted, e3);
                return;
            }
        }
    }

    private void raiseConnectionStateChanged(ConnectionState connectionState, Exception exc) {
        raiseConnectionChangedEvent(connectionState, exc);
    }

    private void raiseConnectionChangedEvent(ConnectionState connectionState, Exception exc) {
        ArrayList arrayList = new ArrayList();
        synchronized (this._connectionChangedListener) {
            if (this._connectionChangedListener.isEmpty()) {
                return;
            }
            Iterator<ConnectionChangedListener> it = this._connectionChangedListener.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            ConnectionChangedEvent connectionChangedEvent = new ConnectionChangedEvent(connectionState, exc);
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((ConnectionChangedListener) it2.next()).onConnectionChanged(connectionChangedEvent);
            }
        }
    }

    public void addSubscription(String str, String str2) throws InterruptedException {
        makeSubscriptionRequest(str, str2, true);
    }

    public void removeSubscription(String str, String str2) throws InterruptedException {
        makeSubscriptionRequest(str, str2, false);
    }

    private void makeSubscriptionRequest(String str, String str2, boolean z) throws InterruptedException {
        if (str == null) {
            throw new IllegalArgumentException("feed");
        }
        if (str2 == null) {
            throw new IllegalArgumentException("topic");
        }
        this._writeQueue.put(new SubscriptionRequest(str, str2, z));
    }

    public void addMonitor(String str) throws InterruptedException {
        makeMonitorRequest(str, true);
    }

    public void removeMonitor(String str) throws InterruptedException {
        makeMonitorRequest(str, false);
    }

    private void makeMonitorRequest(String str, boolean z) throws InterruptedException {
        if (str == null) {
            throw new IllegalArgumentException("feed");
        }
        this._writeQueue.put(new MonitorRequest(str, z));
    }

    public void addNotification(String str) throws InterruptedException {
        makeNotificationRequest(str, true);
    }

    public void removeNotification(String str) throws InterruptedException {
        makeNotificationRequest(str, false);
    }

    private void makeNotificationRequest(String str, boolean z) throws InterruptedException {
        if (str == null) {
            throw new IllegalArgumentException("feed");
        }
        this._writeQueue.put(new NotificationRequest(str, z));
    }

    public void send(String str, String str2, String str3, boolean z, Object obj) {
        if (str2 == null) {
            throw new IllegalArgumentException("feed");
        }
        if (str3 == null) {
            throw new IllegalArgumentException("topic");
        }
        try {
            this._writeQueue.put(new UnicastData(str, str2, str3, z, serialize(obj)));
        } catch (Exception e) {
            raiseDataErrorEvent(true, str2, str3, z, obj, e);
        }
    }

    public void publish(String str, String str2, boolean z, Object obj) {
        if (str == null) {
            throw new IllegalArgumentException("feed");
        }
        if (str2 == null) {
            throw new IllegalArgumentException("topic");
        }
        try {
            this._writeQueue.put(new MulticastData(str, str2, z, serialize(obj)));
        } catch (Exception e) {
            raiseDataErrorEvent(true, str, str2, z, obj, e);
        }
    }

    private void raiseDataErrorEvent(boolean z, String str, String str2, boolean z2, Object obj, Exception exc) {
        ArrayList arrayList = new ArrayList();
        synchronized (this._dataErrorListeners) {
            if (this._dataErrorListeners.isEmpty()) {
                return;
            }
            Iterator<DataErrorListener> it = this._dataErrorListeners.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            DataErrorEvent dataErrorEvent = new DataErrorEvent(true, str, str2, z2, obj, exc);
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((DataErrorListener) it2.next()).onDataErrorEvent(dataErrorEvent);
            }
        }
    }

    private void raiseOnForwardedSubscriptionRequest(ForwardedSubscriptionRequest forwardedSubscriptionRequest) {
        raiseForwardedSubscriptionEvent(forwardedSubscriptionRequest.getClientId(), forwardedSubscriptionRequest.getFeed(), forwardedSubscriptionRequest.getTopic(), forwardedSubscriptionRequest.isAdd());
    }

    private void raiseForwardedSubscriptionEvent(String str, String str2, String str3, boolean z) {
        ArrayList arrayList = new ArrayList();
        synchronized (this._forwardedSubscriptionListeners) {
            if (this._forwardedSubscriptionListeners.isEmpty()) {
                return;
            }
            Iterator<ForwardedSubscriptionListener> it = this._forwardedSubscriptionListeners.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            ForwardedSubscriptionEvent forwardedSubscriptionEvent = new ForwardedSubscriptionEvent(str, str2, str3, z);
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((ForwardedSubscriptionListener) it2.next()).onForwardedSubscription(forwardedSubscriptionEvent);
            }
        }
    }

    private void raiseOnDataOrHeartbeat(MulticastData multicastData) {
        if ("__admin__".equals(multicastData.getFeed()) && "heartbeat".contentEquals(multicastData.getTopic())) {
            raiseHeartbeatEvent();
        } else {
            raiseOnData(multicastData.getFeed(), multicastData.getTopic(), multicastData.getData(), multicastData.isImage());
        }
    }

    private void raiseHeartbeatEvent() {
        ArrayList arrayList = new ArrayList();
        synchronized (this._heartbeatListeners) {
            if (this._heartbeatListeners.isEmpty()) {
                return;
            }
            Iterator<HeartbeatListener> it = this._heartbeatListeners.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((HeartbeatListener) it2.next()).onHeartbeat();
            }
        }
    }

    private void raiseOnData(UnicastData unicastData) {
        raiseOnData(unicastData.getFeed(), unicastData.getTopic(), unicastData.getData(), unicastData.isImage());
    }

    protected byte[] serialize(Object obj) throws Exception {
        if (obj == null) {
            return null;
        }
        return this._byteEncoder.serialize(obj);
    }

    protected Object deserialize(byte[] bArr) throws Exception {
        return this._byteEncoder.deserialize(bArr);
    }

    private void raiseOnData(String str, String str2, byte[] bArr, boolean z) {
        try {
            raiseDataReceivedEvent(str, str2, deserialize(bArr), z);
        } catch (Exception e) {
            raiseDataErrorEvent(false, str, str2, z, bArr, e);
        }
    }

    private void raiseDataReceivedEvent(String str, String str2, Object obj, boolean z) {
        ArrayList arrayList = new ArrayList();
        synchronized (this._dataReceivedListeners) {
            if (this._dataErrorListeners.isEmpty()) {
                return;
            }
            Iterator<DataReceivedListener> it = this._dataReceivedListeners.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            DataReceivedEvent dataReceivedEvent = new DataReceivedEvent(str, str2, obj, z);
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((DataReceivedListener) it2.next()).onDataReceived(dataReceivedEvent);
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this._writeThread.interrupt();
            this._writeThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            this._inputStream.close();
            this._readThread.join();
        } catch (IOException e2) {
            e2.printStackTrace();
        } catch (InterruptedException e3) {
            e3.printStackTrace();
        }
        try {
            this._outputStream.close();
        } catch (IOException e4) {
            e4.printStackTrace();
        }
        try {
            this._socket.close();
        } catch (IOException e5) {
            e5.printStackTrace();
        }
    }
}
