package org.springframework.cloud.stream.binder.file;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StreamUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/file/MessageController.class */
public class MessageController implements Closeable {
    private static Log logger = LogFactory.getLog(MessageController.class);
    private String prefix;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Map<String, FileAdapter> inputs = new HashMap();
    private final Map<String, FileAdapter> outputs = new HashMap();
    private ExecutorService executor = Executors.newCachedThreadPool();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/file/MessageController$FileAdapter.class */
    public class FileAdapter {
        private File file;
        private final SynchronousQueue<Message<?>> exchange;
        private MessageChannel target;

        public FileAdapter(MessageController messageController, String str) {
            this(str, false);
        }

        public FileAdapter(String str, boolean z) {
            this.exchange = new SynchronousQueue<>();
            this.file = new File(String.valueOf(MessageController.this.prefix) + "/" + str);
            int i = 0;
            while (!this.file.exists()) {
                int i2 = i;
                i++;
                if (i2 >= 100) {
                    break;
                }
                if (MessageController.logger.isDebugEnabled() && i % 10 == 1) {
                    MessageController.logger.debug("Waiting for: " + this.file);
                }
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("Cannot find file: " + this.file, e);
                }
            }
            MessageController.logger.debug("Starting background processing for: " + this.file + ", writable=" + z);
            if (z) {
                MessageController.this.executor.submit(() -> {
                    try {
                        write();
                    } catch (IOException e2) {
                        MessageController.logger.error("Failed to write: " + this.file, e2);
                    }
                });
            } else {
                MessageController.this.executor.submit(() -> {
                    try {
                        listen();
                    } catch (IOException e2) {
                        MessageController.logger.error("Failed to read: " + this.file, e2);
                    }
                });
            }
        }

        private void write() throws IOException {
            FileOutputStream fileOutputStream = null;
            while (MessageController.this.running.get()) {
                while (MessageController.this.running.get()) {
                    try {
                        Message<?> message = null;
                        try {
                            message = this.exchange.take();
                        } catch (InterruptedException e) {
                            MessageController.this.running.set(false);
                            Thread.currentThread().interrupt();
                        }
                        if (fileOutputStream == null) {
                            fileOutputStream = new FileOutputStream(this.file, true);
                        }
                        MessageController.logger.debug("Serializing to " + this.file + ": " + message);
                        if (message != null) {
                            StringBuilder sb = new StringBuilder();
                            if (!message.getHeaders().isEmpty()) {
                                StringBuilder sb2 = new StringBuilder();
                                for (Map.Entry entry : message.getHeaders().entrySet()) {
                                    if (!"id".equals(entry.getKey()) && !"timestamp".equals(entry.getKey()) && (entry.getValue() instanceof String)) {
                                        if (sb2.length() == 0) {
                                            sb2.append("#headers\n");
                                        }
                                        sb2.append((String) entry.getKey()).append("=").append(entry.getValue()).append("\n");
                                    }
                                }
                                if (sb2.length() > 0) {
                                    sb.append((CharSequence) sb2);
                                }
                            }
                            String obj = message.getPayload().toString();
                            boolean z = false;
                            if (obj.contains("\n") || sb.length() > 0) {
                                sb.append("#payload\n");
                                z = true;
                            }
                            sb.append(obj).append("\n");
                            if (z) {
                                sb.append("#end\n");
                            }
                            MessageController.logger.debug("Sending to " + this.file + ": " + ((Object) sb));
                            StreamUtils.copy(sb.toString(), StandardCharsets.UTF_8, fileOutputStream);
                            fileOutputStream.flush();
                        }
                    } catch (Exception e2) {
                    }
                }
                if (fileOutputStream != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Exception e3) {
                        MessageController.logger.error("Failed to close: " + this.file, e3);
                    }
                }
            }
        }

        private void listen() throws IOException {
            while (MessageController.this.running.get()) {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(this.file)));
                MessageController.logger.debug("Receiving from " + this.file);
                while (MessageController.this.running.get()) {
                    String readLine = bufferedReader.readLine();
                    MessageHeaders messageHeaders = null;
                    if (readLine != null && readLine.equals("#headers")) {
                        LinkedHashMap linkedHashMap = new LinkedHashMap();
                        while (MessageController.this.running.get() && readLine != null) {
                            readLine = bufferedReader.readLine();
                            MessageController.logger.debug("Header line from " + this.file + ": " + readLine);
                            if (readLine == null || readLine.startsWith("#")) {
                                break;
                            }
                            int indexOf = readLine.indexOf("=");
                            linkedHashMap.put(indexOf >= 0 ? readLine.substring(0, indexOf) : readLine, indexOf >= 0 ? readLine.substring(indexOf + 1) : null);
                        }
                        messageHeaders = linkedHashMap.isEmpty() ? null : new MessageHeaders(linkedHashMap);
                    }
                    StringBuilder sb = new StringBuilder();
                    boolean z = false;
                    while (MessageController.this.running.get() && readLine != null) {
                        MessageController.logger.debug("Line from " + this.file + ": " + readLine);
                        if (!readLine.equals("#payload")) {
                            if (!readLine.equals("#end")) {
                                sb.append(readLine);
                                if (!z) {
                                    break;
                                }
                                readLine = bufferedReader.readLine();
                                if (readLine.equals("#end")) {
                                    break;
                                } else {
                                    sb.append(System.getProperty("line.separator"));
                                }
                            } else {
                                break;
                            }
                        } else {
                            z = true;
                            readLine = bufferedReader.readLine();
                        }
                    }
                    if (sb.length() > 0 || messageHeaders != null) {
                        MessageBuilder withPayload = MessageBuilder.withPayload(sb.toString());
                        if (messageHeaders != null) {
                            withPayload.copyHeadersIfAbsent(messageHeaders);
                        }
                        Message<?> build = withPayload.build();
                        MessageController.logger.debug("Assembled from " + this.file + ": " + build);
                        if (this.target != null) {
                            this.target.send(build);
                        } else {
                            try {
                                this.exchange.put(build);
                            } catch (InterruptedException e) {
                                MessageController.this.running.set(false);
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                    if (readLine == null) {
                        try {
                            Thread.sleep(20L);
                        } catch (InterruptedException e2) {
                            MessageController.this.running.set(false);
                            Thread.currentThread().interrupt();
                        }
                    }
                }
                if (bufferedReader != null) {
                    try {
                        bufferedReader.close();
                    } catch (Exception e3) {
                        MessageController.logger.error("Failed to close: " + this.file, e3);
                    }
                }
            }
        }
    }

    public MessageController(String str) {
        this.prefix = str;
        new File(str).mkdirs();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.running.set(false);
        this.executor.shutdownNow();
    }

    public void bind(String str, String str2, MessageChannel messageChannel) {
        this.running.set(true);
        this.inputs.computeIfAbsent(str, str3 -> {
            return new FileAdapter(this, str3);
        }).target = messageChannel;
    }

    public Message<?> receive(String str, long j, TimeUnit timeUnit) {
        this.running.set(true);
        try {
            return (Message) this.inputs.computeIfAbsent(str, str2 -> {
                return new FileAdapter(this, str2);
            }).exchange.poll(j, timeUnit);
        } catch (InterruptedException e) {
            this.running.set(false);
            Thread.currentThread().interrupt();
            return null;
        }
    }

    public void subscribe(String str, SubscribableChannel subscribableChannel) {
        subscribableChannel.subscribe(message -> {
            send(str, message);
        });
    }

    public void send(String str, Message<?> message) {
        this.running.set(true);
        try {
            this.outputs.computeIfAbsent(str, str2 -> {
                return new FileAdapter(str2, true);
            }).exchange.put(message);
        } catch (InterruptedException e) {
            this.running.set(false);
            Thread.currentThread().interrupt();
        }
    }
}
