/*
 * Decompiled with CFR 0.152.
 */
package zipkin2.collector.kafka;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Callback;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.kafka.KafkaCollector;

final class KafkaCollectorWorker
implements Runnable {
    static final Logger LOG = LoggerFactory.getLogger(KafkaCollectorWorker.class);
    static final Callback<Void> NOOP = new Callback<Void>(){

        public void onSuccess(Void value) {
        }

        public void onError(Throwable t) {
        }
    };
    final Properties properties;
    final List<String> topics;
    final Collector collector;
    final CollectorMetrics metrics;
    final AtomicReference<List<TopicPartition>> assignedPartitions = new AtomicReference(Collections.emptyList());
    final AtomicBoolean running = new AtomicBoolean(true);

    KafkaCollectorWorker(KafkaCollector.Builder builder) {
        this.properties = builder.properties;
        this.topics = Arrays.asList(builder.topic.split(","));
        this.collector = builder.delegate.build();
        this.metrics = builder.metrics;
    }

    @Override
    public void run() {
        try (KafkaConsumer kafkaConsumer = new KafkaConsumer(this.properties);){
            kafkaConsumer.subscribe(this.topics, new ConsumerRebalanceListener(){

                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    KafkaCollectorWorker.this.assignedPartitions.set(Collections.emptyList());
                }

                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    KafkaCollectorWorker.this.assignedPartitions.set(Collections.unmodifiableList(new ArrayList<TopicPartition>(partitions)));
                }
            });
            LOG.debug("Kafka consumer starting polling loop.");
            while (this.running.get()) {
                ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.of(1000L, ChronoUnit.MILLIS));
                LOG.debug("Kafka polling returned batch of {} messages.", (Object)consumerRecords.count());
                for (ConsumerRecord record : consumerRecords) {
                    byte[] bytes = (byte[])record.value();
                    this.metrics.incrementMessages();
                    this.metrics.incrementBytes(bytes.length);
                    if (bytes.length == 0) continue;
                    if (bytes.length < 2) {
                        this.metrics.incrementMessagesDropped();
                        continue;
                    }
                    if (!KafkaCollectorWorker.protobuf3(bytes) && bytes[0] <= 16 && bytes[0] != 12) {
                        Span span;
                        try {
                            span = (Span)SpanBytesDecoder.THRIFT.decodeOne(bytes);
                        }
                        catch (RuntimeException e) {
                            this.metrics.incrementMessagesDropped();
                            continue;
                        }
                        this.collector.accept(Collections.singletonList(span), NOOP);
                        continue;
                    }
                    this.collector.acceptSpans(bytes, NOOP);
                }
            }
        }
        catch (Error | RuntimeException e) {
            LOG.warn("Unexpected error in polling loop spans", e);
            throw e;
        }
        finally {
            LOG.debug("Kafka consumer polling loop stopped. Kafka consumer closed.");
        }
    }

    public void stop() {
        this.running.set(false);
    }

    static boolean protobuf3(byte[] bytes) {
        return bytes[0] == 10 && bytes[1] != 0;
    }
}

