package org.springframework.integration.ip.tcp.connection;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.integration.Message;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import org.springframework.integration.ip.util.SocketUtils;

/* loaded from: input_file:org/springframework/integration/ip/tcp/connection/TcpNioConnection.class */
public class TcpNioConnection extends AbstractTcpConnection {
    private final SocketChannel socketChannel;
    private OutputStream channelOutputStream;
    private PipedOutputStream pipedOutputStream;
    private PipedInputStream pipedInputStream;
    private boolean usingDirectBuffers;
    private Executor taskExecutor;
    private ByteBuffer rawBuffer;
    private int maxMessageSize;
    private long lastRead;
    private AtomicInteger executionControl;
    private boolean writingToPipe;

    /* loaded from: input_file:org/springframework/integration/ip/tcp/connection/TcpNioConnection$ChannelOutputStream.class */
    class ChannelOutputStream extends OutputStream {
        private Selector selector;
        private int soTimeout;

        ChannelOutputStream() {
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            doWrite(ByteBuffer.wrap(new byte[]{(byte) i}));
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            TcpNioConnection.this.doClose();
        }

        @Override // java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            doWrite(ByteBuffer.wrap(bArr, i, i2));
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr) throws IOException {
            doWrite(ByteBuffer.wrap(bArr));
        }

        private synchronized void doWrite(ByteBuffer byteBuffer) throws IOException {
            TcpNioConnection.this.socketChannel.write(byteBuffer);
            int remaining = byteBuffer.remaining();
            if (remaining == 0) {
                return;
            }
            if (this.selector == null) {
                this.selector = Selector.open();
                this.soTimeout = TcpNioConnection.this.socketChannel.socket().getSoTimeout();
            }
            TcpNioConnection.this.socketChannel.register(this.selector, 4);
            while (remaining > 0) {
                if (this.selector.select(this.soTimeout) == 0) {
                    throw new SocketTimeoutException("Timeout on write");
                }
                this.selector.selectedKeys().clear();
                TcpNioConnection.this.socketChannel.write(byteBuffer);
                remaining = byteBuffer.remaining();
            }
        }
    }

    public TcpNioConnection(SocketChannel socketChannel, boolean z) throws Exception {
        super(socketChannel.socket(), z);
        this.maxMessageSize = 61440;
        this.executionControl = new AtomicInteger();
        this.socketChannel = socketChannel;
        this.pipedInputStream = new PipedInputStream();
        this.pipedOutputStream = new PipedOutputStream(this.pipedInputStream);
        this.channelOutputStream = new ChannelOutputStream();
        getConnectionId();
        if (this.connectionId == null) {
            throw new Exception("Null id");
        }
    }

    @Override // org.springframework.integration.ip.tcp.connection.AbstractTcpConnection, org.springframework.integration.ip.tcp.connection.TcpConnection
    public void close() {
        doClose();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doClose() {
        if (this.pipedOutputStream != null) {
            try {
                this.pipedOutputStream.close();
            } catch (IOException e) {
            }
        }
        try {
            this.socketChannel.close();
        } catch (Exception e2) {
        }
        super.close();
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public boolean isOpen() {
        return this.socketChannel.isOpen();
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public void send(Message<?> message) throws Exception {
        synchronized (this.mapper) {
            this.serializer.serialize(this.mapper.fromMessage(message), this.channelOutputStream);
            afterSend(message);
        }
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public String getHostAddress() {
        return this.socketChannel.socket().getInetAddress().getHostAddress();
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public String getHostName() {
        return this.socketChannel.socket().getInetAddress().getHostName();
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public Object getPayload() throws Exception {
        return this.deserializer.deserialize(this.pipedInputStream);
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public int getPort() {
        return this.socketChannel.socket().getPort();
    }

    protected ByteBuffer allocate(int i) {
        return this.usingDirectBuffers ? ByteBuffer.allocateDirect(i) : ByteBuffer.allocate(i);
    }

    @Override // java.lang.Runnable
    public void run() {
        this.logger.trace("Nio message assembler running...");
        try {
            if (this.listener == null && !this.singleUse) {
                this.logger.debug("TcpListener exiting - no listener and not single use");
                this.logger.trace("Nio message assembler exiting...");
                try {
                    if (dataAvailable()) {
                        checkForAssembler();
                    }
                    return;
                } catch (IOException e) {
                    this.logger.error("Exception when checking for assembler", e);
                    return;
                }
            }
            try {
                if (dataAvailable()) {
                    Message<?> convert = convert();
                    if (dataAvailable()) {
                        this.executionControl.incrementAndGet();
                        this.taskExecutor.execute(this);
                    }
                    this.executionControl.decrementAndGet();
                    if (convert != null) {
                        sendToChannel(convert);
                    }
                } else {
                    this.executionControl.decrementAndGet();
                }
                this.logger.trace("Nio message assembler exiting...");
                try {
                    if (dataAvailable()) {
                        checkForAssembler();
                    }
                } catch (IOException e2) {
                    this.logger.error("Exception when checking for assembler", e2);
                }
            } catch (IOException e3) {
                this.logger.error("Unexpected exception, exiting...", e3);
                this.logger.trace("Nio message assembler exiting...");
                try {
                    if (dataAvailable()) {
                        checkForAssembler();
                    }
                } catch (IOException e4) {
                    this.logger.error("Exception when checking for assembler", e4);
                }
            }
        } catch (Throwable th) {
            this.logger.trace("Nio message assembler exiting...");
            try {
                if (dataAvailable()) {
                    checkForAssembler();
                }
            } catch (IOException e5) {
                this.logger.error("Exception when checking for assembler", e5);
            }
            throw th;
        }
    }

    private boolean dataAvailable() throws IOException {
        return this.socketChannel.isOpen() && (this.pipedInputStream.available() > 0 || this.writingToPipe);
    }

    private synchronized Message<?> convert() throws IOException {
        if (!dataAvailable()) {
            return null;
        }
        try {
            return this.mapper.toMessage((TcpConnection) this);
        } catch (Exception e) {
            closeConnection();
            if ((e instanceof SocketTimeoutException) && this.singleUse) {
                if (!this.logger.isDebugEnabled()) {
                    return null;
                }
                this.logger.debug("Closing single use socket after timeout " + this.connectionId);
                return null;
            }
            if (e instanceof SoftEndOfStreamException) {
                return null;
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.error("Read exception " + getConnectionId(), e);
                return null;
            }
            this.logger.error("Read exception " + getConnectionId() + " " + e.getClass().getSimpleName() + ":" + e.getCause() + ":" + e.getMessage());
            return null;
        }
    }

    private void sendToChannel(Message<?> message) {
        boolean z = false;
        if (message != null) {
            try {
                z = this.listener.onMessage(message);
            } catch (Exception e) {
                if (!(e instanceof NoListenerException)) {
                    this.logger.error("Exception sending meeeage: " + message, e);
                } else if (this.singleUse) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Closing single use channel after inbound message " + this.connectionId);
                    }
                    closeConnection();
                }
            }
        }
        if (this.singleUse) {
            if ((this.server || z) && !(this.server && this.sender == null)) {
                return;
            }
            this.logger.debug("Closing single use cbannel after inbound message " + this.connectionId);
            closeConnection();
        }
    }

    private void doRead() throws Exception {
        if (this.rawBuffer == null) {
            this.rawBuffer = allocate(this.maxMessageSize);
        }
        this.writingToPipe = true;
        if (this.taskExecutor == null) {
            this.taskExecutor = Executors.newSingleThreadExecutor();
        }
        checkForAssembler();
        this.rawBuffer.clear();
        if (this.socketChannel.read(this.rawBuffer) < 0) {
            closeConnection();
            throw new IOException("Channel closed");
        }
        this.rawBuffer.flip();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Read " + this.rawBuffer.limit() + " into raw buffer");
        }
        this.pipedOutputStream.write(this.rawBuffer.array(), 0, this.rawBuffer.limit());
        this.pipedOutputStream.flush();
        this.writingToPipe = false;
    }

    private void checkForAssembler() {
        synchronized (this.executionControl) {
            if (this.executionControl.incrementAndGet() <= 1) {
                this.executionControl.set(1);
                this.taskExecutor.execute(this);
            } else {
                this.executionControl.decrementAndGet();
            }
        }
    }

    public void readPacket() {
        try {
            doRead();
        } catch (Exception e) {
            this.logger.error("Exception on Read " + getConnectionId() + " " + e.getMessage());
            closeConnection();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void timeout() {
        closeConnection();
    }

    public void setTaskExecutor(Executor executor) {
        this.taskExecutor = executor;
    }

    public void setUsingDirectBuffers(boolean z) {
        this.usingDirectBuffers = z;
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpConnection
    public String getConnectionId() {
        if (this.connectionId == null) {
            this.connectionId = SocketUtils.getSocketId(this.socketChannel.socket());
        }
        return this.connectionId;
    }

    public long getLastRead() {
        return this.lastRead;
    }

    public void setLastRead(long j) {
        this.lastRead = j;
    }
}
