package org.springframework.data.hadoop.batch.item;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.core.serializer.Serializer;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:lib/spring-data-hadoop-batch-2.0.0.RC2.jar:org/springframework/data/hadoop/batch/item/HdfsItemWriter.class */
public class HdfsItemWriter<T> implements ItemStreamWriter<T> {
    private static final String BUFFER_KEY_PREFIX = HdfsItemWriter.class.getName() + ".BUFFER_KEY";
    private final String bufferKey;
    private String fileName;
    private FileSystem fileSystem;
    private FSDataOutputStream fsDataOutputStream;
    private Serializer<T> itemSerializer;

    public HdfsItemWriter(FileSystem fileSystem, Serializer<T> serializer, String str) {
        Assert.notNull(fileSystem, "Hadoop FileSystem is required.");
        Assert.notNull(serializer, "A Serializer implementation is required");
        Assert.isTrue(StringUtils.hasText(str), "A non-empty fileName is required.");
        this.fileSystem = fileSystem;
        this.bufferKey = BUFFER_KEY_PREFIX + "." + hashCode();
        this.itemSerializer = serializer;
        this.fileName = str;
    }

    private List<? extends T> getCurrentBuffer() {
        if (!TransactionSynchronizationManager.hasResource(this.bufferKey)) {
            TransactionSynchronizationManager.bindResource(this.bufferKey, new ArrayList());
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { // from class: org.springframework.data.hadoop.batch.item.HdfsItemWriter.1
                @Override // org.springframework.transaction.support.TransactionSynchronizationAdapter, org.springframework.transaction.support.TransactionSynchronization
                public void beforeCommit(boolean z) {
                    List<? extends T> list = (List) TransactionSynchronizationManager.getResource(HdfsItemWriter.this.bufferKey);
                    if (CollectionUtils.isEmpty(list) || z) {
                        return;
                    }
                    HdfsItemWriter.this.doWrite(list);
                }

                @Override // org.springframework.transaction.support.TransactionSynchronizationAdapter, org.springframework.transaction.support.TransactionSynchronization
                public void afterCompletion(int i) {
                    if (TransactionSynchronizationManager.hasResource(HdfsItemWriter.this.bufferKey)) {
                        TransactionSynchronizationManager.unbindResource(HdfsItemWriter.this.bufferKey);
                    }
                }
            });
        }
        return (List) TransactionSynchronizationManager.getResource(this.bufferKey);
    }

    protected void doWrite(List<? extends T> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        try {
            this.fsDataOutputStream.write(getPayloadAsBytes(list));
        } catch (IOException e) {
            throw new RuntimeException("Error writing to HDFS", e);
        }
    }

    @Override // org.springframework.batch.item.ItemStream
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        try {
            Path path = new Path(this.fileName);
            this.fileSystem.createNewFile(path);
            this.fsDataOutputStream = this.fileSystem.create(path);
        } catch (IOException e) {
            throw new RuntimeException("Unable to open file to write to", e);
        }
    }

    @Override // org.springframework.batch.item.ItemStream
    public void update(ExecutionContext executionContext) throws ItemStreamException {
    }

    @Override // org.springframework.batch.item.ItemWriter
    public void write(List<? extends T> list) throws Exception {
        if (transactionActive()) {
            getCurrentBuffer().addAll(list);
        } else {
            doWrite(list);
        }
    }

    @Override // org.springframework.batch.item.ItemStream
    public void close() {
        if (this.fsDataOutputStream != null) {
            IOUtils.closeStream(this.fsDataOutputStream);
        }
    }

    private byte[] getPayloadAsBytes(List<? extends T> list) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Iterator<? extends T> it = list.iterator();
        while (it.hasNext()) {
            this.itemSerializer.serialize(it.next(), byteArrayOutputStream);
        }
        return byteArrayOutputStream.toByteArray();
    }

    private boolean transactionActive() {
        return TransactionSynchronizationManager.isActualTransactionActive();
    }
}
