package co.cask.cdap.kafka.flow;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.flow.flowlet.FlowletContext;
import com.google.common.base.Charsets;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.api.FetchRequest;
import kafka.common.OffsetOutOfRangeException;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.twill.common.Threads;
import org.apache.twill.kafka.client.TopicPartition;
import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/kafka/flow/Kafka07ConsumerFlowlet.class */
public abstract class Kafka07ConsumerFlowlet<PAYLOAD> extends KafkaConsumerFlowlet<ByteBuffer, PAYLOAD, Map<String, Long>> {
    private static final Logger LOG = LoggerFactory.getLogger(Kafka07ConsumerFlowlet.class);
    private ZKClientService zkClient;
    private KafkaBrokerCache kafkaBrokerCache;
    private Cache<KafkaBroker, SimpleConsumer> kafkaConsumers;
    private ExecutorService fetchExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/kafka/flow/Kafka07ConsumerFlowlet$FetchResult.class */
    public static final class FetchResult implements Iterable<MessageAndOffset> {
        private final KafkaBroker broker;
        private final long beginOffset;
        private final ByteBufferMessageSet messageSet;
        private final Throwable failureCause;

        private FetchResult(KafkaBroker kafkaBroker, long j, ByteBufferMessageSet byteBufferMessageSet) {
            this(kafkaBroker, j, byteBufferMessageSet, (Throwable) null);
        }

        private FetchResult(KafkaBroker kafkaBroker, long j, Throwable th) {
            this(kafkaBroker, j, (ByteBufferMessageSet) null, th);
        }

        private FetchResult(KafkaBroker kafkaBroker, long j, ByteBufferMessageSet byteBufferMessageSet, Throwable th) {
            this.broker = kafkaBroker;
            this.beginOffset = j;
            this.messageSet = byteBufferMessageSet;
            this.failureCause = th;
        }

        @Override // java.lang.Iterable
        public Iterator<MessageAndOffset> iterator() {
            if (this.messageSet == null) {
                throw new IllegalStateException("There was error in the fetch.");
            }
            return this.messageSet.iterator();
        }

        KafkaBroker getBroker() {
            return this.broker;
        }

        Throwable getFailureCause() {
            return this.failureCause;
        }

        boolean isSuccess() {
            return this.failureCause == null;
        }

        long getBeginOffset() {
            return this.beginOffset;
        }
    }

    @Override // co.cask.cdap.kafka.flow.KafkaConsumerFlowlet
    public void initialize(FlowletContext flowletContext) throws Exception {
        super.initialize(flowletContext);
        String zookeeper = getKafkaConfig().getZookeeper();
        if (zookeeper == null) {
            throw new IllegalStateException("Must provide ZooKeeper quorum string to consume from Kafka 0.7 cluster");
        }
        this.zkClient = ZKClientServices.delegate(ZKClients.reWatchOnExpire(ZKClients.retryOnFailure(ZKClientService.Builder.of(zookeeper).build(), RetryStrategies.fixDelay(2L, TimeUnit.SECONDS))));
        this.zkClient.startAndWait();
        this.kafkaBrokerCache = new KafkaBrokerCache(this.zkClient);
        this.kafkaBrokerCache.startAndWait();
        this.kafkaConsumers = CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess(60L, TimeUnit.SECONDS).removalListener(createConsumerCacheRemovalListener()).build();
        this.fetchExecutor = Executors.newCachedThreadPool(Threads.createDaemonThreadFactory("kafka-consumer-%d"));
    }

    public void destroy() {
        super.destroy();
        this.fetchExecutor.shutdownNow();
        if (this.kafkaBrokerCache != null) {
            stopService(this.kafkaBrokerCache);
        }
        if (this.zkClient != null) {
            stopService(this.zkClient);
        }
    }

    @Override // co.cask.cdap.kafka.flow.KafkaConsumerFlowlet
    protected Iterator<KafkaMessage<Map<String, Long>>> readMessages(KafkaConsumerInfo<Map<String, Long>> kafkaConsumerInfo) {
        TopicPartition topicPartition = kafkaConsumerInfo.getTopicPartition();
        List<KafkaBroker> brokers = this.kafkaBrokerCache.getBrokers(topicPartition.getTopic(), topicPartition.getPartition());
        if (brokers.isEmpty()) {
            return Iterators.emptyIterator();
        }
        if (brokers.size() > 1) {
            return multiFetch(kafkaConsumerInfo, brokers, this.fetchExecutor);
        }
        HashMap newHashMap = Maps.newHashMap(kafkaConsumerInfo.getReadOffset());
        KafkaBroker kafkaBroker = brokers.get(0);
        SimpleConsumer consumer = getConsumer(kafkaBroker, kafkaConsumerInfo.getFetchSize());
        return handleFetch(kafkaConsumerInfo, newHashMap, fetchMessages(kafkaBroker, consumer, topicPartition, getBrokerOffset(kafkaBroker, kafkaConsumerInfo.getTopicPartition(), newHashMap, consumer), kafkaConsumerInfo.getFetchSize()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // co.cask.cdap.kafka.flow.KafkaConsumerFlowlet
    public final ByteBuffer decodeKey(ByteBuffer byteBuffer) {
        return null;
    }

    /* renamed from: processMessage, reason: avoid collision after fix types in other method */
    protected final void processMessage2(ByteBuffer byteBuffer, PAYLOAD payload) throws Exception {
        processMessage((Kafka07ConsumerFlowlet<PAYLOAD>) payload);
    }

    @Override // co.cask.cdap.kafka.flow.KafkaConsumerFlowlet
    protected void saveReadOffsets(Map<TopicPartition, Map<String, Long>> map) {
        KeyValueTable offsetStore = getOffsetStore();
        if (offsetStore == null) {
            return;
        }
        for (Map.Entry<TopicPartition, Map<String, Long>> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            for (Map.Entry<String, Long> entry2 : entry.getValue().entrySet()) {
                offsetStore.write(getStoreKey(key) + ":" + entry2.getKey(), Bytes.toBytes(entry2.getValue().longValue()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // co.cask.cdap.kafka.flow.KafkaConsumerFlowlet
    public Map<String, Long> getBeginOffset(TopicPartition topicPartition) {
        KeyValueTable offsetStore = getOffsetStore();
        if (offsetStore == null) {
            return ImmutableMap.of();
        }
        ImmutableMap.Builder builder = ImmutableMap.builder();
        byte[] bytes = Bytes.toBytes(getStoreKey(topicPartition) + ":");
        CloseableIterator scan = offsetStore.scan(bytes, Bytes.stopKeyForPrefix(bytes));
        while (scan.hasNext()) {
            KeyValue keyValue = (KeyValue) scan.next();
            byte[] bArr = (byte[]) keyValue.getKey();
            builder.put(new String(bArr, bytes.length, bArr.length - bytes.length, Charsets.UTF_8), Long.valueOf(Bytes.toLong((byte[]) keyValue.getValue())));
        }
        return builder.build();
    }

    protected long getDefaultOffset(KafkaBroker kafkaBroker, TopicPartition topicPartition) {
        return -2L;
    }

    private SimpleConsumer getConsumer(KafkaBroker kafkaBroker, int i) {
        SimpleConsumer simpleConsumer = (SimpleConsumer) this.kafkaConsumers.getIfPresent(kafkaBroker);
        if (simpleConsumer != null) {
            return simpleConsumer;
        }
        SimpleConsumer simpleConsumer2 = new SimpleConsumer(kafkaBroker.getHost(), kafkaBroker.getPort(), 5000, i);
        this.kafkaConsumers.put(kafkaBroker, simpleConsumer2);
        return simpleConsumer2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public FetchResult fetchMessages(KafkaBroker kafkaBroker, SimpleConsumer simpleConsumer, TopicPartition topicPartition, long j, int i) {
        try {
            return new FetchResult(kafkaBroker, j, simpleConsumer.fetch(new FetchRequest(topicPartition.getTopic(), topicPartition.getPartition(), j, i)));
        } catch (Throwable th) {
            return new FetchResult(kafkaBroker, j, th);
        }
    }

    private Iterator<KafkaMessage<Map<String, Long>>> multiFetch(final KafkaConsumerInfo<Map<String, Long>> kafkaConsumerInfo, List<KafkaBroker> list, Executor executor) {
        final TopicPartition topicPartition = kafkaConsumerInfo.getTopicPartition();
        HashMap newHashMap = Maps.newHashMap(kafkaConsumerInfo.getReadOffset());
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(executor);
        for (final KafkaBroker kafkaBroker : list) {
            final SimpleConsumer consumer = getConsumer(kafkaBroker, kafkaConsumerInfo.getFetchSize());
            final long brokerOffset = getBrokerOffset(kafkaBroker, kafkaConsumerInfo.getTopicPartition(), newHashMap, consumer);
            executorCompletionService.submit(new Callable<FetchResult>() { // from class: co.cask.cdap.kafka.flow.Kafka07ConsumerFlowlet.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public FetchResult call() throws Exception {
                    return Kafka07ConsumerFlowlet.this.fetchMessages(kafkaBroker, consumer, topicPartition, brokerOffset, kafkaConsumerInfo.getFetchSize());
                }
            });
        }
        try {
            ArrayList newArrayList = Lists.newArrayList();
            for (int i = 0; i < list.size(); i++) {
                newArrayList.add(handleFetch(kafkaConsumerInfo, newHashMap, (FetchResult) executorCompletionService.take().get()));
            }
            return Iterators.concat(newArrayList.iterator());
        } catch (Exception e) {
            return Iterators.emptyIterator();
        }
    }

    private RemovalListener<KafkaBroker, SimpleConsumer> createConsumerCacheRemovalListener() {
        return new RemovalListener<KafkaBroker, SimpleConsumer>() { // from class: co.cask.cdap.kafka.flow.Kafka07ConsumerFlowlet.2
            public void onRemoval(RemovalNotification<KafkaBroker, SimpleConsumer> removalNotification) {
                SimpleConsumer simpleConsumer = (SimpleConsumer) removalNotification.getValue();
                if (simpleConsumer == null) {
                    return;
                }
                try {
                    simpleConsumer.close();
                } catch (Throwable th) {
                    Kafka07ConsumerFlowlet.LOG.error("Exception when closing Kafka consumer.", th);
                }
            }
        };
    }

    private long getBrokerOffset(KafkaBroker kafkaBroker, TopicPartition topicPartition, Map<String, Long> map, SimpleConsumer simpleConsumer) {
        Long l = map.get(kafkaBroker.getId());
        if (l == null) {
            l = Long.valueOf(getDefaultOffset(kafkaBroker, topicPartition));
            map.put(kafkaBroker.getId(), l);
        }
        if (l.longValue() < 0) {
            long[] offsetsBefore = simpleConsumer.getOffsetsBefore(topicPartition.getTopic(), topicPartition.getPartition(), l.longValue(), 1);
            l = Long.valueOf(offsetsBefore.length > 0 ? offsetsBefore[0] : 0L);
            map.put(kafkaBroker.getId(), l);
        }
        return l.longValue();
    }

    private Iterator<KafkaMessage<Map<String, Long>>> createMessageIterator(final TopicPartition topicPartition, final Map<String, Long> map, final FetchResult fetchResult) {
        final Iterator<MessageAndOffset> it = fetchResult.iterator();
        return new AbstractIterator<KafkaMessage<Map<String, Long>>>() { // from class: co.cask.cdap.kafka.flow.Kafka07ConsumerFlowlet.3
            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
            public KafkaMessage<Map<String, Long>> m1computeNext() {
                while (it.hasNext()) {
                    MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                    if (messageAndOffset.copy$default$2() >= fetchResult.getBeginOffset()) {
                        map.put(fetchResult.getBroker().getId(), Long.valueOf(messageAndOffset.copy$default$2()));
                        return new KafkaMessage<>(topicPartition, map, null, messageAndOffset.copy$default$1().payload());
                    }
                }
                return (KafkaMessage) endOfData();
            }
        };
    }

    private Iterator<KafkaMessage<Map<String, Long>>> handleFetch(KafkaConsumerInfo<Map<String, Long>> kafkaConsumerInfo, Map<String, Long> map, FetchResult fetchResult) {
        TopicPartition topicPartition = kafkaConsumerInfo.getTopicPartition();
        if (fetchResult.isSuccess()) {
            return createMessageIterator(topicPartition, map, fetchResult);
        }
        if (fetchResult.getFailureCause() instanceof OffsetOutOfRangeException) {
            String topic = topicPartition.getTopic();
            int partition = topicPartition.getPartition();
            SimpleConsumer consumer = getConsumer(fetchResult.getBroker(), kafkaConsumerInfo.getFetchSize());
            long j = consumer.getOffsetsBefore(topic, partition, -2L, 1)[0];
            if (j < fetchResult.getBeginOffset()) {
                j = consumer.getOffsetsBefore(topic, partition, -1L, 1)[0];
            }
            map.put(fetchResult.getBroker().getId(), Long.valueOf(j));
            kafkaConsumerInfo.setReadOffset(map);
        } else {
            this.kafkaConsumers.invalidate(fetchResult.getBroker());
        }
        return Iterators.emptyIterator();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // co.cask.cdap.kafka.flow.KafkaConsumerFlowlet
    protected /* bridge */ /* synthetic */ void processMessage(ByteBuffer byteBuffer, Object obj) throws Exception {
        processMessage2(byteBuffer, (ByteBuffer) obj);
    }
}
