package cz.o2.proxima.storage.kafka;

import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.repository.Context;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.AbstractStorage;
import cz.o2.proxima.storage.AttributeWriterBase;
import cz.o2.proxima.storage.DataAccessor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.BulkLogObserver;
import cz.o2.proxima.storage.commitlog.CommitLogReader;
import cz.o2.proxima.storage.commitlog.LogObserver;
import cz.o2.proxima.storage.kafka.partitioner.KeyPartitioner;
import cz.o2.proxima.util.Classpath;
import cz.o2.proxima.view.PartitionedView;
import cz.seznam.euphoria.shadow.com.google.common.base.Strings;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/storage/kafka/KafkaAccessor.class */
public class KafkaAccessor extends AbstractStorage implements DataAccessor {
    public static final String POLL_INTERVAL_CFG = "poll.interval";
    public static final String PARTITIONER_CLASS = "partitioner";
    private final String topic;
    private final Map<String, Object> cfg;
    private Partitioner partitioner;
    private long consumerPollInterval;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaAccessor.class);
    public static final String WRITER_CONFIG_PREFIX = "kafka.";
    private static final int PRODUCE_CONFIG_PREFIX_LENGTH = WRITER_CONFIG_PREFIX.length();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/storage/kafka/KafkaAccessor$BulkConsumer.class */
    public static final class BulkConsumer implements ElementConsumer {
        final BulkLogObserver observer;
        final TopicPartitionCommitter committer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public BulkConsumer(BulkLogObserver bulkLogObserver, TopicPartitionCommitter topicPartitionCommitter) {
            this.observer = bulkLogObserver;
            this.committer = topicPartitionCommitter;
        }

        @Override // cz.o2.proxima.storage.kafka.KafkaAccessor.ElementConsumer
        public void consumeWithConfirm(@Nullable StreamElement streamElement, TopicPartition topicPartition, long j, Consumer<Throwable> consumer) {
            if (streamElement != null) {
                BulkLogObserver bulkLogObserver = this.observer;
                topicPartition.getClass();
                bulkLogObserver.onNext(streamElement, topicPartition::partition, (z, th) -> {
                    if (z) {
                        this.committer.commit(topicPartition, j);
                    } else {
                        consumer.accept(th);
                    }
                });
            }
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1799810326:
                    if (implMethodName.equals("partition")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/kafka/shaded/org/apache/kafka/common/TopicPartition") && serializedLambda.getImplMethodSignature().equals("()I")) {
                        TopicPartition topicPartition = (TopicPartition) serializedLambda.getCapturedArg(0);
                        return topicPartition::partition;
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/storage/kafka/KafkaAccessor$ElementConsumer.class */
    public interface ElementConsumer {
        void consumeWithConfirm(@Nullable StreamElement streamElement, TopicPartition topicPartition, long j, Consumer<Throwable> consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/storage/kafka/KafkaAccessor$OnlineConsumer.class */
    public static final class OnlineConsumer implements ElementConsumer {
        final LogObserver observer;
        final TopicPartitionCommitter committer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public OnlineConsumer(LogObserver logObserver, TopicPartitionCommitter topicPartitionCommitter) {
            this.observer = logObserver;
            this.committer = topicPartitionCommitter;
        }

        @Override // cz.o2.proxima.storage.kafka.KafkaAccessor.ElementConsumer
        public void consumeWithConfirm(@Nullable StreamElement streamElement, TopicPartition topicPartition, long j, Consumer<Throwable> consumer) {
            if (streamElement == null) {
                this.committer.commit(topicPartition, j);
                return;
            }
            LogObserver logObserver = this.observer;
            topicPartition.getClass();
            logObserver.onNext(streamElement, topicPartition::partition, (z, th) -> {
                if (z) {
                    this.committer.commit(topicPartition, j);
                } else {
                    consumer.accept(th);
                }
            });
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1799810326:
                    if (implMethodName.equals("partition")) {
                        z = false;
                        break;
                    }
                    break;
                case -293213923:
                    if (implMethodName.equals("lambda$consumeWithConfirm$34c445a5$1")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/Partition") && serializedLambda.getFunctionalInterfaceMethodName().equals("getId") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()I") && serializedLambda.getImplClass().equals("cz/o2/proxima/kafka/shaded/org/apache/kafka/common/TopicPartition") && serializedLambda.getImplMethodSignature().equals("()I")) {
                        TopicPartition topicPartition = (TopicPartition) serializedLambda.getCapturedArg(0);
                        return topicPartition::partition;
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/storage/commitlog/LogObserver$ConfirmCallback") && serializedLambda.getFunctionalInterfaceMethodName().equals("confirm") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(ZLjava/lang/Throwable;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/storage/kafka/KafkaAccessor$OnlineConsumer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/common/TopicPartition;JLjava/util/function/Consumer;ZLjava/lang/Throwable;)V")) {
                        OnlineConsumer onlineConsumer = (OnlineConsumer) serializedLambda.getCapturedArg(0);
                        TopicPartition topicPartition2 = (TopicPartition) serializedLambda.getCapturedArg(1);
                        long longValue = ((Long) serializedLambda.getCapturedArg(2)).longValue();
                        Consumer consumer = (Consumer) serializedLambda.getCapturedArg(3);
                        return (z2, th) -> {
                            if (z2) {
                                this.committer.commit(topicPartition2, longValue);
                            } else {
                                consumer.accept(th);
                            }
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/storage/kafka/KafkaAccessor$TopicPartitionCommitter.class */
    public interface TopicPartitionCommitter {
        void commit(TopicPartition topicPartition, long j);
    }

    public KafkaAccessor(EntityDescriptor entityDescriptor, URI uri, Map<String, Object> map) {
        super(entityDescriptor, uri);
        this.partitioner = new KeyPartitioner();
        this.consumerPollInterval = 100L;
        if (uri.getPath().length() <= 1) {
            throw new IllegalArgumentException("Specify topic by path in URI");
        }
        if (Strings.isNullOrEmpty(uri.getAuthority())) {
            throw new IllegalArgumentException("Specify brokers by authority in URI");
        }
        this.cfg = map;
        this.topic = Utils.topic(uri);
        configure(map);
    }

    private void configure(Map<String, Object> map) {
        this.consumerPollInterval = ((Long) Optional.ofNullable(map.get(POLL_INTERVAL_CFG)).map(obj -> {
            return Long.valueOf(obj.toString());
        }).orElse(Long.valueOf(this.consumerPollInterval))).longValue();
        this.partitioner = (Partitioner) Optional.ofNullable((String) map.get(PARTITIONER_CLASS)).map(str -> {
            return Classpath.findClass(str, Partitioner.class);
        }).map(cls -> {
            try {
                return (Partitioner) cls.newInstance();
            } catch (IllegalAccessException | InstantiationException e) {
                throw new RuntimeException(e);
            }
        }).orElse(this.partitioner);
        log.info("Using consumerPollInterval {} and partitionerClass {} for URI {}", Long.valueOf(this.consumerPollInterval), this.partitioner.getClass(), getURI());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Properties createProps() {
        Properties properties = new Properties();
        for (Map.Entry<String, Object> entry : this.cfg.entrySet()) {
            if (entry.getKey().startsWith(WRITER_CONFIG_PREFIX)) {
                properties.put(entry.getKey().substring(PRODUCE_CONFIG_PREFIX_LENGTH), entry.getValue().toString());
            }
        }
        return properties;
    }

    public KafkaConsumerFactory createConsumerFactory() {
        return new KafkaConsumerFactory(getURI(), createProps());
    }

    public Optional<AttributeWriterBase> getWriter(Context context) {
        return Optional.of(new KafkaWriter(this));
    }

    public Optional<CommitLogReader> getCommitLogReader(Context context) {
        return Optional.of(new KafkaLogReader(this, context));
    }

    public Optional<PartitionedView> getPartitionedView(Context context) {
        return Optional.of(new KafkaLogReader(this, context));
    }

    public String getTopic() {
        return this.topic;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Partitioner getPartitioner() {
        return this.partitioner;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getConsumerPollInterval() {
        return this.consumerPollInterval;
    }
}
