package co.cask.cdap.template.etl.realtime.kafka;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.template.etl.api.Emitter;
import co.cask.cdap.template.etl.api.realtime.RealtimeContext;
import co.cask.cdap.template.etl.api.realtime.SourceState;
import co.cask.cdap.template.etl.realtime.source.KafkaSource;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Service;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.twill.kafka.client.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/template/etl/realtime/kafka/KafkaSimpleApiConsumer.class */
public abstract class KafkaSimpleApiConsumer<KEY, PAYLOAD, OFFSET> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSimpleApiConsumer.class);
    protected static final int SO_TIMEOUT = 5000;
    protected final KafkaSource kafkaSource;
    private final Function<KafkaConsumerInfo<OFFSET>, OFFSET> consumerToOffset = new Function<KafkaConsumerInfo<OFFSET>, OFFSET>() { // from class: co.cask.cdap.template.etl.realtime.kafka.KafkaSimpleApiConsumer.1
        public OFFSET apply(KafkaConsumerInfo<OFFSET> kafkaConsumerInfo) {
            return kafkaConsumerInfo.getReadOffset();
        }
    };
    private Function<ByteBuffer, KEY> keyDecoder;
    private Function<ByteBuffer, PAYLOAD> payloadDecoder;
    private KafkaConfig kafkaConfig;
    private Map<TopicPartition, KafkaConsumerInfo<OFFSET>> consumerInfos;
    private Map<String, byte[]> offsetStore;
    private RealtimeContext sourceContext;

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaSimpleApiConsumer(KafkaSource kafkaSource) {
        this.kafkaSource = kafkaSource;
    }

    public void initialize(RealtimeContext realtimeContext) throws Exception {
        this.sourceContext = realtimeContext;
        Type type = TypeToken.of(getClass()).getSupertype(KafkaSimpleApiConsumer.class).getType();
        if (type instanceof ParameterizedType) {
            Type[] actualTypeArguments = ((ParameterizedType) type).getActualTypeArguments();
            this.keyDecoder = createKeyDecoder(actualTypeArguments[0]);
            this.payloadDecoder = createPayloadDecoder(actualTypeArguments[1]);
        }
        DefaultKafkaConfigurer defaultKafkaConfigurer = new DefaultKafkaConfigurer();
        configureKafka(defaultKafkaConfigurer);
        if (defaultKafkaConfigurer.getZookeeper() == null && defaultKafkaConfigurer.getBrokers() == null) {
            throw new IllegalStateException("Kafka not configured. Must provide either zookeeper or broker list.");
        }
        this.kafkaConfig = new KafkaConfig(defaultKafkaConfigurer.getZookeeper(), defaultKafkaConfigurer.getBrokers());
        this.consumerInfos = createConsumerInfos(defaultKafkaConfigurer.getTopicPartitions());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RealtimeContext getContext() {
        return this.sourceContext;
    }

    public String getName() {
        return getClass().getSimpleName();
    }

    public Map<String, byte[]> getSavedState() {
        return Maps.newHashMap(this.offsetStore);
    }

    public void saveState(SourceState sourceState) {
        this.offsetStore = sourceState.getState();
    }

    public void pollMessages(Emitter<StructuredRecord> emitter) {
        boolean z = false;
        for (KafkaConsumerInfo<OFFSET> kafkaConsumerInfo : this.consumerInfos.values()) {
            Iterator<KafkaMessage<OFFSET>> readMessages = readMessages(kafkaConsumerInfo);
            while (readMessages.hasNext()) {
                KafkaMessage<OFFSET> next = readMessages.next();
                processMessage((KafkaMessage) next, emitter);
                kafkaConsumerInfo.setReadOffset(next.getNextOffset());
            }
            if (kafkaConsumerInfo.hasPendingChanges()) {
                z = true;
            }
        }
        if (z) {
            saveReadOffsets(Maps.transformValues(this.consumerInfos, this.consumerToOffset));
        }
    }

    protected abstract void configureKafka(KafkaConfigurer kafkaConfigurer);

    protected abstract Iterator<KafkaMessage<OFFSET>> readMessages(KafkaConsumerInfo<OFFSET> kafkaConsumerInfo);

    protected abstract OFFSET getBeginOffset(TopicPartition topicPartition);

    protected abstract void saveReadOffsets(Map<TopicPartition, OFFSET> map);

    public void destroy() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final KafkaConfig getKafkaConfig() {
        return this.kafkaConfig;
    }

    protected void processMessage(KafkaMessage<OFFSET> kafkaMessage, Emitter<StructuredRecord> emitter) {
        int partition = kafkaMessage.getTopicPartition().getPartition();
        Preconditions.checkArgument(partition % getContext().getInstanceCount() == getContext().getInstanceId(), "Received unexpected partition " + partition);
        if (kafkaMessage.getKey() == null) {
            processMessage((KafkaSimpleApiConsumer<KEY, PAYLOAD, OFFSET>) decodePayload(kafkaMessage.getPayload()), emitter);
        } else {
            processMessage(decodeKey(kafkaMessage.getKey()), decodePayload(kafkaMessage.getPayload()), emitter);
        }
    }

    protected void processMessage(KEY key, PAYLOAD payload, Emitter<StructuredRecord> emitter) {
        processMessage((KafkaSimpleApiConsumer<KEY, PAYLOAD, OFFSET>) payload, emitter);
    }

    protected abstract void processMessage(PAYLOAD payload, Emitter<StructuredRecord> emitter);

    protected abstract long getDefaultOffset(TopicPartition topicPartition);

    @Nullable
    protected KEY decodeKey(ByteBuffer byteBuffer) {
        if (this.keyDecoder != null) {
            return (KEY) this.keyDecoder.apply(byteBuffer);
        }
        return null;
    }

    @Nullable
    protected PAYLOAD decodePayload(ByteBuffer byteBuffer) {
        if (this.payloadDecoder != null) {
            return (PAYLOAD) this.payloadDecoder.apply(byteBuffer);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void stopService(Service service) {
        try {
            service.stopAndWait();
        } catch (Throwable th) {
            LOG.error("Failed when stopping service {}", service, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getStoreKey(TopicPartition topicPartition) {
        return topicPartition.getTopic() + ":" + topicPartition.getPartition();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, byte[]> getOffsetStore() {
        return this.offsetStore;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StructuredRecord byteBufferToStructuredRecord(@Nullable String str, ByteBuffer byteBuffer) {
        return this.kafkaSource.byteBufferToStructuredRecord(str, byteBuffer);
    }

    private Function<ByteBuffer, KEY> createKeyDecoder(Type type) {
        return (Function<ByteBuffer, KEY>) createDecoder(type, "No decoder for decoding message key");
    }

    private Function<ByteBuffer, PAYLOAD> createPayloadDecoder(Type type) {
        return (Function<ByteBuffer, PAYLOAD>) createDecoder(type, "No decoder for decoding message payload");
    }

    private <T> Function<ByteBuffer, T> createDecoder(Type type, String str) {
        return String.class.equals(type) ? (Function<ByteBuffer, T>) createStringDecoder() : ByteBuffer.class.equals(type) ? (Function<ByteBuffer, T>) createByteBufferDecoder() : (byte[].class.equals(type) || ((type instanceof GenericArrayType) && Byte.TYPE.equals(((GenericArrayType) type).getGenericComponentType()))) ? (Function<ByteBuffer, T>) createBytesDecoder() : createFailureDecoder(str);
    }

    private Function<ByteBuffer, String> createStringDecoder() {
        return new Function<ByteBuffer, String>() { // from class: co.cask.cdap.template.etl.realtime.kafka.KafkaSimpleApiConsumer.2
            public String apply(ByteBuffer byteBuffer) {
                byteBuffer.mark();
                String bytes = Bytes.toString(byteBuffer, Charsets.UTF_8);
                byteBuffer.reset();
                return bytes;
            }
        };
    }

    private Function<ByteBuffer, ByteBuffer> createByteBufferDecoder() {
        return new Function<ByteBuffer, ByteBuffer>() { // from class: co.cask.cdap.template.etl.realtime.kafka.KafkaSimpleApiConsumer.3
            public ByteBuffer apply(ByteBuffer byteBuffer) {
                return byteBuffer;
            }
        };
    }

    private Function<ByteBuffer, byte[]> createBytesDecoder() {
        return new Function<ByteBuffer, byte[]>() { // from class: co.cask.cdap.template.etl.realtime.kafka.KafkaSimpleApiConsumer.4
            public byte[] apply(ByteBuffer byteBuffer) {
                byte[] bArr = new byte[byteBuffer.remaining()];
                byteBuffer.mark();
                byteBuffer.get(bArr);
                byteBuffer.reset();
                return bArr;
            }
        };
    }

    private <T> Function<ByteBuffer, T> createFailureDecoder(final String str) {
        return new Function<ByteBuffer, T>() { // from class: co.cask.cdap.template.etl.realtime.kafka.KafkaSimpleApiConsumer.5
            public T apply(ByteBuffer byteBuffer) {
                throw new IllegalStateException(str);
            }
        };
    }

    private Map<TopicPartition, KafkaConsumerInfo<OFFSET>> createConsumerInfos(Map<TopicPartition, Integer> map) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Map.Entry<TopicPartition, Integer> entry : map.entrySet()) {
            builder.put(entry.getKey(), new KafkaConsumerInfo(entry.getKey(), entry.getValue().intValue(), getBeginOffset(entry.getKey())));
        }
        return builder.build();
    }
}
