package co.cask.cdap.file.dropzone.polling;

import co.cask.cdap.client.StreamClient;
import co.cask.cdap.client.StreamWriter;
import co.cask.cdap.file.dropzone.config.ObserverConfiguration;
import co.cask.cdap.filetailer.Pipe;
import co.cask.cdap.filetailer.PipeListener;
import co.cask.cdap.filetailer.config.PipeConfiguration;
import co.cask.cdap.filetailer.metrics.FileTailerMetricsProcessor;
import co.cask.cdap.filetailer.queue.FileTailerQueue;
import co.cask.cdap.filetailer.sink.FileTailerSink;
import co.cask.cdap.filetailer.sink.SinkStrategy;
import co.cask.cdap.filetailer.state.FileTailerStateProcessorImpl;
import co.cask.cdap.filetailer.tailer.LogTailer;
import java.io.File;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/file/dropzone/polling/PollingListenerImpl.class */
public class PollingListenerImpl implements PollingListener {
    private static final Logger LOG = LoggerFactory.getLogger(PollingListenerImpl.class);
    private PollingService monitor;
    private final ObserverConfiguration observerConf;
    private FileTailerMetricsProcessor metricsProcessor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/file/dropzone/polling/PollingListenerImpl$PipeListenerImpl.class */
    public class PipeListenerImpl implements PipeListener {
        private boolean isRead = false;
        private File directory;
        private String filePath;
        private String stateFilePath;
        private Pipe pipe;

        public PipeListenerImpl(File file, String str, String str2) {
            this.directory = file;
            this.filePath = str;
            this.stateFilePath = str2;
        }

        public void onRead() {
            PollingListenerImpl.LOG.info("File {} already read", this.filePath);
            this.isRead = true;
        }

        public boolean isRead() {
            return this.isRead;
        }

        public void onIngest() {
            PollingListenerImpl.LOG.info("File {} already processed", this.filePath);
            this.pipe.stopAsync();
            PollingListenerImpl.this.removeStateFile(this.stateFilePath);
            PollingListenerImpl.this.monitor.removeFile(this.directory, new File(this.filePath));
        }

        public void setPipe(Pipe pipe) {
            this.pipe = pipe;
        }
    }

    public PollingListenerImpl(PollingService pollingService, ObserverConfiguration observerConfiguration) {
        this.monitor = pollingService;
        this.observerConf = observerConfiguration;
        this.metricsProcessor = new FileTailerMetricsProcessor(observerConfiguration.getDaemonDir(), observerConfiguration.getPipeConf().getStatisticsFile(), observerConfiguration.getPipeConf().getStatisticsSleepInterval(), observerConfiguration.getPipeConf().getPipeName(), observerConfiguration.getPipeConf().getSourceConfiguration().getWorkDir().getName());
        this.metricsProcessor.startAsync().awaitRunning();
    }

    @Override // co.cask.cdap.file.dropzone.polling.PollingListener
    public void onFileCreate(File file) throws IOException {
        LOG.info("File Added: {}", file.getAbsolutePath());
        LOG.debug("Start configure pipe for file: {}", file.getAbsolutePath());
        Pipe pipe = setupPipe(file);
        LOG.debug("Pipe for file {} successfully configured", file.getAbsolutePath());
        LOG.info("Start processing file: {}", file.getAbsolutePath());
        pipe.startAsync();
    }

    @Override // co.cask.cdap.file.dropzone.polling.PollingListener
    public void onException(Exception exc) {
        LOG.error("Error", exc);
    }

    private Pipe setupPipe(File file) throws IOException {
        PipeConfiguration pipeConfiguration = this.observerConf.getPipeConfiguration(file.getName());
        FileTailerQueue fileTailerQueue = new FileTailerQueue(pipeConfiguration.getQueueSize());
        StreamWriter streamWriterForPipe = getStreamWriterForPipe(pipeConfiguration);
        FileTailerStateProcessorImpl fileTailerStateProcessorImpl = new FileTailerStateProcessorImpl(this.observerConf.getDaemonDir(), pipeConfiguration.getStateFile());
        PipeListenerImpl pipeListenerImpl = new PipeListenerImpl(pipeConfiguration.getSourceConfiguration().getWorkDir(), file.getAbsolutePath(), this.observerConf.getDaemonDir() + "/" + pipeConfiguration.getStateFile());
        Pipe pipe = new Pipe(new LogTailer(pipeConfiguration, fileTailerQueue, fileTailerStateProcessorImpl, this.metricsProcessor, pipeListenerImpl), new FileTailerSink(fileTailerQueue, streamWriterForPipe, SinkStrategy.LOADBALANCE, fileTailerStateProcessorImpl, this.metricsProcessor, pipeListenerImpl, pipeConfiguration.getSinkConfiguration().getPackSize()));
        pipeListenerImpl.setPipe(pipe);
        return pipe;
    }

    private StreamWriter getStreamWriterForPipe(PipeConfiguration pipeConfiguration) throws IOException {
        StreamClient streamClient = pipeConfiguration.getSinkConfiguration().getStreamClient();
        String streamName = pipeConfiguration.getSinkConfiguration().getStreamName();
        try {
            streamClient.create(streamName);
            return streamClient.createWriter(streamName);
        } catch (IOException e) {
            throw new IOException(String.format("Cannot create/get client stream by name: %s: %s", streamName, e));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeStateFile(String str) {
        File file = new File(str);
        if (!file.delete()) {
            throw new IllegalArgumentException(String.format("Cannot remove specified file %s.", file.getAbsolutePath()));
        }
        LOG.info("State file successfully deleted {}.", file);
    }
}
