package io.confluent.connect.hdfs;

import io.confluent.connect.avro.AvroData;
import io.confluent.connect.storage.schema.StorageSchemaCompatibility;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/hdfs/HdfsSinkTask.class */
public class HdfsSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(HdfsSinkTask.class);
    private DataWriter hdfsWriter;
    private AvroData avroData;

    public String version() {
        return Version.getVersion();
    }

    /* JADX WARN: Finally extract failed */
    public void start(Map<String, String> map) {
        Set<TopicPartition> assignment = this.context.assignment();
        try {
            HdfsSinkConnectorConfig hdfsSinkConnectorConfig = new HdfsSinkConnectorConfig(map);
            boolean booleanValue = hdfsSinkConnectorConfig.getBoolean("hive.integration").booleanValue();
            if (booleanValue && StorageSchemaCompatibility.getCompatibility(hdfsSinkConnectorConfig.getString("schema.compatibility")) == StorageSchemaCompatibility.NONE) {
                throw new ConfigException("Hive Integration requires schema compatibility to be BACKWARD, FORWARD or FULL");
            }
            if (hdfsSinkConnectorConfig.getLong("rotate.schedule.interval.ms").longValue() > 0) {
                String string = hdfsSinkConnectorConfig.getString("timezone");
                if (string.equals("")) {
                    throw new ConfigException("timezone", string, "Timezone cannot be empty when using scheduled file rotation.");
                }
                DateTimeZone.forID(string);
            }
            this.avroData = new AvroData(hdfsSinkConnectorConfig.avroDataConfig());
            this.hdfsWriter = new DataWriter(hdfsSinkConnectorConfig, this.context, this.avroData);
            recover(assignment);
            if (booleanValue) {
                syncWithHive();
            }
            log.info("The connector relies on offsets in HDFS filenames, but does commit these offsets to Connect to enable monitoring progress of the HDFS connector. Upon startup, the HDFS Connector restores offsets from filenames in HDFS. In the absence of files in HDFS, the connector will attempt to find offsets for its consumer group in the '__consumer_offsets' topic. If offsets are not found, the consumer will rely on the reset policy specified in the 'consumer.auto.offset.reset' property to start exporting data to HDFS.");
        } catch (ConnectException e) {
            log.info("Couldn't start HdfsSinkConnector:", e);
            log.info("Shutting down HdfsSinkConnector.");
            try {
                if (this.hdfsWriter != null) {
                    try {
                        log.debug("Closing data writer due to task start failure.");
                        this.hdfsWriter.close();
                        log.debug("Stopping data writer due to task start failure.");
                        this.hdfsWriter.stop();
                    } catch (Throwable th) {
                        log.debug("Stopping data writer due to task start failure.");
                        this.hdfsWriter.stop();
                        throw th;
                    }
                }
            } catch (Throwable th2) {
                log.debug("Error closing and stopping data writer: {}", th2.getMessage(), th2);
            }
            throw e;
        } catch (ConfigException e2) {
            throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", e2);
        }
    }

    public void put(Collection<SinkRecord> collection) throws ConnectException {
        if (log.isDebugEnabled()) {
            log.debug("Read {} records from Kafka", Integer.valueOf(collection.size()));
        }
        try {
            this.hdfsWriter.write(collection);
        } catch (ConnectException e) {
            throw new ConnectException(e);
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : this.hdfsWriter.getCommittedOffsets().entrySet()) {
            log.debug("Found last committed offset {} for topic partition {}", entry.getValue(), entry.getKey());
            hashMap.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().longValue()));
        }
        log.debug("Returning committed offsets {}", hashMap);
        return hashMap;
    }

    public void open(Collection<TopicPartition> collection) {
        this.hdfsWriter.open(collection);
    }

    public void close(Collection<TopicPartition> collection) {
        if (this.hdfsWriter != null) {
            this.hdfsWriter.close();
        }
    }

    public void stop() throws ConnectException {
        if (this.hdfsWriter != null) {
            this.hdfsWriter.stop();
        }
    }

    private void recover(Set<TopicPartition> set) {
        Iterator<TopicPartition> it = set.iterator();
        while (it.hasNext()) {
            this.hdfsWriter.recover(it.next());
        }
    }

    private void syncWithHive() throws ConnectException {
        this.hdfsWriter.syncWithHive();
    }

    public AvroData getAvroData() {
        return this.avroData;
    }
}
