package org.springframework.xd.batch.item.hadoop;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.WriteFailedException;
import org.springframework.batch.item.file.transform.LineAggregator;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.hadoop.store.StoreException;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/xd/batch/item/hadoop/HdfsTextItemWriter.class */
public class HdfsTextItemWriter<T> extends AbstractHdfsItemWriter<T> implements InitializingBean {
    private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
    private FileSystem fileSystem;
    private FSDataOutputStream fsDataOutputStream;
    private LineAggregator<T> lineAggregator;
    private String lineSeparator = DEFAULT_LINE_SEPARATOR;
    private volatile String charset = "UTF-8";

    public HdfsTextItemWriter(FileSystem fileSystem) {
        Assert.notNull(fileSystem, "Hadoop FileSystem must not be null.");
        this.fileSystem = fileSystem;
    }

    @Override // org.springframework.xd.batch.item.hadoop.AbstractHdfsItemWriter
    public void write(List<? extends T> list) throws Exception {
        initializeCounterIfNecessary();
        prepareOutputStream();
        copy(getItemsAsBytes(list), this.fsDataOutputStream);
    }

    private void prepareOutputStream() throws IOException {
        boolean z = false;
        while (!z) {
            Path path = new Path(getFileName());
            if (getFileSystem().createNewFile(path)) {
                this.logger.debug("Created new HDFS file " + path.getName());
                z = true;
                resetBytesWritten();
                this.fsDataOutputStream = getFileSystem().append(path);
            } else if (getBytesWritten() >= getRolloverThresholdInBytes()) {
                this.logger.debug("Rolling over new file");
                closeStream();
                incrementCounter();
            } else {
                z = true;
            }
        }
    }

    private void copy(byte[] bArr, FSDataOutputStream fSDataOutputStream) throws IOException {
        fSDataOutputStream.write(bArr);
        incrementBytesWritten(bArr.length);
    }

    @Override // org.springframework.xd.batch.item.hadoop.AbstractHdfsItemWriter
    public FileSystem getFileSystem() {
        return this.fileSystem;
    }

    private byte[] getItemsAsBytes(List<? extends T> list) {
        StringBuilder sb = new StringBuilder();
        Iterator<? extends T> it = list.iterator();
        while (it.hasNext()) {
            sb.append(this.lineAggregator.aggregate(it.next()) + this.lineSeparator);
        }
        try {
            return sb.toString().getBytes(this.charset);
        } catch (UnsupportedEncodingException e) {
            throw new WriteFailedException("Could not write data.", e);
        }
    }

    public void update(ExecutionContext executionContext) {
        super.update(executionContext);
        this.logger.debug("Flushing output stream");
        if (this.fsDataOutputStream != null) {
            try {
                this.fsDataOutputStream.hflush();
            } catch (IOException e) {
                throw new StoreException("Error while flushing stream", e);
            }
        }
    }

    public void close() {
        this.logger.debug("Closing item writer");
        closeStream();
        reset();
    }

    private void closeStream() {
        this.logger.debug("Closing output stream");
        if (this.fsDataOutputStream != null) {
            try {
                this.fsDataOutputStream.close();
            } catch (IOException e) {
                IOUtils.closeStream(this.fsDataOutputStream);
                throw new StoreException("Error while closing stream", e);
            }
        }
    }

    public void setLineAggregator(LineAggregator<T> lineAggregator) {
        this.lineAggregator = lineAggregator;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.lineAggregator, "A LineAggregator must be provided.");
    }
}
