/*
 * Decompiled with CFR 0.152.
 */
package zipkin2.collector.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Callback;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.kafka.KafkaCollector;

final class KafkaCollectorWorker
implements Runnable {
    static final Logger LOG = LoggerFactory.getLogger(KafkaCollectorWorker.class);
    static final Callback<Void> NOOP = new Callback<Void>(){

        public void onSuccess(Void value) {
        }

        public void onError(Throwable t) {
        }
    };
    final Properties properties;
    final List<String> topics;
    final Collector collector;
    final CollectorMetrics metrics;
    final AtomicReference<List<TopicPartition>> assignedPartitions = new AtomicReference(Collections.emptyList());

    KafkaCollectorWorker(KafkaCollector.Builder builder) {
        this.properties = builder.properties;
        this.topics = Arrays.asList(builder.topic.split(","));
        this.collector = builder.delegate.build();
        this.metrics = builder.metrics;
    }

    @Override
    public void run() {
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(this.properties);
            Throwable throwable = null;
            try {
                try {
                    kafkaConsumer.subscribe(this.topics, new ConsumerRebalanceListener(){

                        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                            KafkaCollectorWorker.this.assignedPartitions.set(Collections.emptyList());
                        }

                        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                            KafkaCollectorWorker.this.assignedPartitions.set(Collections.unmodifiableList(new ArrayList<TopicPartition>(partitions)));
                        }
                    });
                    LOG.info("Kafka consumer starting polling loop.");
                    block14: while (true) {
                        ConsumerRecords consumerRecords = kafkaConsumer.poll(1000L);
                        LOG.debug("Kafka polling returned batch of {} messages.", (Object)consumerRecords.count());
                        Iterator iterator = consumerRecords.iterator();
                        while (true) {
                            if (!iterator.hasNext()) continue block14;
                            ConsumerRecord record = (ConsumerRecord)iterator.next();
                            this.metrics.incrementMessages();
                            byte[] bytes = (byte[])record.value();
                            if (bytes.length < 2) {
                                this.metrics.incrementMessagesDropped();
                                continue;
                            }
                            if (!KafkaCollectorWorker.protobuf3(bytes) && bytes[0] <= 16 && bytes[0] != 12) {
                                this.metrics.incrementBytes(bytes.length);
                                try {
                                    Span span = (Span)SpanBytesDecoder.THRIFT.decodeOne(bytes);
                                    this.collector.accept(Collections.singletonList(span), NOOP);
                                }
                                catch (RuntimeException e) {
                                    this.metrics.incrementMessagesDropped();
                                }
                                continue;
                            }
                            this.collector.acceptSpans(bytes, NOOP);
                        }
                        break;
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
            }
            catch (Throwable throwable3) {
                if (throwable != null) {
                    try {
                        kafkaConsumer.close();
                    }
                    catch (Throwable throwable4) {
                        throwable.addSuppressed(throwable4);
                    }
                } else {
                    kafkaConsumer.close();
                }
                throw throwable3;
            }
        }
        catch (InterruptException kafkaConsumer) {
            LOG.info("Kafka consumer polling loop stopped.");
            LOG.info("Closing Kafka consumer...");
            LOG.info("Kafka consumer closed.");
        }
        catch (Error | RuntimeException e) {
            try {
                LOG.warn("Unexpected error in polling loop spans", e);
                throw e;
            }
            catch (Throwable throwable) {
                LOG.info("Kafka consumer polling loop stopped.");
                LOG.info("Closing Kafka consumer...");
                LOG.info("Kafka consumer closed.");
                throw throwable;
            }
        }
    }

    static boolean protobuf3(byte[] bytes) {
        return bytes[0] == 10 && bytes[1] != 0;
    }
}

