package io.confluent.kafkarest.controllers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.confluent.kafkarest.controllers.CloudUIInternalModule;
import io.confluent.kafkarest.entities.v3.PartitionConsumeData;
import io.confluent.kafkarest.entities.v3.PartitionConsumeRecord;
import io.confluent.kafkarest.entities.v3.PartitionOffsetData;
import io.confluent.kafkarest.entities.v3.PartitionsOffsetsData;
import io.confluent.kafkarest.resources.v3.SimpleConsumeAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.BadRequestException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafkarest/controllers/SimpleConsumeManagerImpl.class */
public final class SimpleConsumeManagerImpl implements SimpleConsumeManager {
    private static final int MAX_POLLS = 5;
    private static final int MAX_RESPONSE_BYTES = 20971520;
    private final KafkaConsumerProvider consumerProvider;
    private final Executor simpleConsumeExecutor;
    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
    private static final Logger log = LoggerFactory.getLogger(SimpleConsumeManagerImpl.class);

    @Inject
    public SimpleConsumeManagerImpl(KafkaConsumerProvider kafkaConsumerProvider, @CloudUIInternalModule.SimpleConsumeExecutor ExecutorService executorService) {
        this.consumerProvider = (KafkaConsumerProvider) Objects.requireNonNull(kafkaConsumerProvider);
        this.simpleConsumeExecutor = (Executor) Objects.requireNonNull(executorService);
    }

    @Override // io.confluent.kafkarest.controllers.SimpleConsumeManager
    public CompletableFuture<PartitionConsumeData> consumeFromPartition(@Nonnull TopicPartition topicPartition, Boolean bool, Long l, Long l2, Integer num, Integer num2, Integer num3) {
        log.debug("consumeFromPartition(start): {}", Long.valueOf(System.nanoTime()));
        KafkaConsumer<byte[], byte[]> consumerWithConfigs = getConsumerWithConfigs(num, num2);
        return CompletableFuture.supplyAsync(() -> {
            if (Boolean.TRUE.equals(bool)) {
                return doConsume(consumerWithConfigs, topicPartition, ((Long) consumerWithConfigs.beginningOffsets(Collections.singleton(topicPartition)).get(topicPartition)).longValue(), num3);
            }
            if (Boolean.FALSE.equals(bool)) {
                return doConsume(consumerWithConfigs, topicPartition, ((Long) consumerWithConfigs.endOffsets(Collections.singleton(topicPartition)).get(topicPartition)).longValue(), num3);
            }
            if (l == null) {
                return doConsume(consumerWithConfigs, topicPartition, Long.max(((Long) consumerWithConfigs.beginningOffsets(Collections.singleton(topicPartition)).get(topicPartition)).longValue(), l2.longValue()), num3);
            }
            Map singletonMap = Collections.singletonMap(topicPartition, l);
            log.debug("offsetsForTimes(start): {}", Long.valueOf(System.nanoTime()));
            Map offsetsForTimes = consumerWithConfigs.offsetsForTimes(singletonMap);
            log.debug("offsetsForTimes(end): {}", Long.valueOf(System.nanoTime()));
            OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) offsetsForTimes.get(topicPartition);
            return offsetAndTimestamp == null ? doConsume(consumerWithConfigs, topicPartition, ((Long) consumerWithConfigs.endOffsets(Collections.singleton(topicPartition)).get(topicPartition)).longValue(), num3) : doConsume(consumerWithConfigs, topicPartition, offsetAndTimestamp.offset(), num3);
        }, this.simpleConsumeExecutor).handle((partitionConsumeData, th) -> {
            log.debug("consumeFromPartition(end): {}", Long.valueOf(System.nanoTime()));
            return (PartitionConsumeData) handleConsumeResult(partitionConsumeData, th, consumerWithConfigs);
        });
    }

    private KafkaConsumer<byte[], byte[]> getConsumerWithConfigs(Integer num, Integer num2) {
        Properties properties = new Properties();
        if (num != null) {
            properties.put("max.poll.records", num);
        }
        if (num2 != null) {
            properties.put("fetch.max.bytes", num2);
        }
        try {
            return this.consumerProvider.getConsumer(properties);
        } catch (ConfigException e) {
            throw new BadRequestException(e.getMessage());
        }
    }

    private <T> T handleConsumeResult(T t, Throwable th, KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        this.consumerProvider.releaseConsumer(kafkaConsumer);
        if (th != null) {
            throw processError(th);
        }
        return t;
    }

    @VisibleForTesting
    PartitionConsumeData doConsume(KafkaConsumer<byte[], byte[]> kafkaConsumer, TopicPartition topicPartition, long j, Integer num) {
        List emptyList = Collections.emptyList();
        Set singleton = Collections.singleton(topicPartition);
        Map singletonMap = Collections.singletonMap(Integer.valueOf(topicPartition.partition()), Long.valueOf(j));
        log.debug("endOffsets(start): {}", Long.valueOf(System.nanoTime()));
        Map endOffsets = kafkaConsumer.endOffsets(singleton);
        log.debug("endOffsets(end): {}", Long.valueOf(System.nanoTime()));
        if (canConsumeFromOffsets(singletonMap, endOffsets)) {
            kafkaConsumer.assign(singleton);
            kafkaConsumer.seek(topicPartition, j);
            log.debug("poll(start): {}", Long.valueOf(System.nanoTime()));
            emptyList = kafkaConsumer.poll(POLL_TIMEOUT).records(topicPartition);
            log.debug("poll(end): {}", Long.valueOf(System.nanoTime()));
        }
        return getPartitionConsumeData(emptyList, topicPartition, singletonMap, endOffsets, num);
    }

    @VisibleForTesting
    static boolean canConsumeFromOffsets(Map<Integer, Long> map, Map<TopicPartition, Long> map2) {
        if (map == null || map.isEmpty()) {
            return false;
        }
        for (Map.Entry<TopicPartition, Long> entry : map2.entrySet()) {
            if (entry.getValue().longValue() >= map.get(Integer.valueOf(entry.getKey().partition())).longValue()) {
                return true;
            }
        }
        return false;
    }

    private static PartitionConsumeData getPartitionConsumeData(List<ConsumerRecord<byte[], byte[]>> list, @Nonnull TopicPartition topicPartition, Map<Integer, Long> map, @Nonnull Map<TopicPartition, Long> map2, Integer num) {
        int partition = topicPartition.partition();
        long longValue = map2.get(topicPartition).longValue();
        if (map != null && map.containsKey(Integer.valueOf(partition))) {
            longValue = Math.min(longValue, map.get(Integer.valueOf(partition)).longValue());
        }
        int size = list != null ? list.size() : 0;
        if (size > 0) {
            longValue = list.get(size - 1).offset() + 1;
        }
        return PartitionConsumeData.builder().setPartitionId(Integer.valueOf(partition)).setNextOffset(Long.valueOf(longValue)).setRecords(PartitionConsumeRecord.listFromConsumerRecordList(list, num)).build();
    }

    private static RuntimeException processError(Throwable th) {
        if (th instanceof CompletionException) {
            th = th.getCause();
        }
        return ((th instanceof IllegalArgumentException) || (th instanceof IllegalStateException)) ? new BadRequestException(th.getMessage()) : th instanceof TimeoutException ? new io.confluent.kafkarest.exceptions.TimeoutException(th) : th instanceof RuntimeException ? (RuntimeException) th : new RuntimeException(th);
    }

    @Override // io.confluent.kafkarest.controllers.SimpleConsumeManager
    public CompletableFuture<List<PartitionConsumeData>> consumeFromMultiplePartitions(@Nonnull String str, int i, Map<Integer, Long> map, Boolean bool, Long l, Integer num, Integer num2, Integer num3, boolean z) {
        log.debug("consumeFromMultiPartitions(start): {}", Long.valueOf(System.nanoTime()));
        KafkaConsumer<byte[], byte[]> consumerWithConfigs = getConsumerWithConfigs(Integer.valueOf((num == null || num.intValue() <= i) ? SimpleConsumeAction.MAX_POLL_RECORDS_LIMIT : num.intValue()), num2);
        List<TopicPartition> partitions = getPartitions(str, i, map);
        return CompletableFuture.supplyAsync(z ? () -> {
            return doConsumeMultiGuaranteeProgress(consumerWithConfigs, partitions, map, bool, l, num, num3);
        } : () -> {
            return doConsumeMulti(consumerWithConfigs, partitions, map, bool, l, num3);
        }, this.simpleConsumeExecutor).handle((list, th) -> {
            log.debug("consumeFromMultiPartitions(end): {}", Long.valueOf(System.nanoTime()));
            return (List) handleConsumeResult(list, th, consumerWithConfigs);
        });
    }

    private static List<TopicPartition> getPartitions(String str, int i, Map<Integer, Long> map) {
        return (map == null || map.isEmpty()) ? (List) IntStream.range(0, i).mapToObj(i2 -> {
            return new TopicPartition(str, i2);
        }).collect(Collectors.toList()) : (List) map.keySet().stream().map(num -> {
            return new TopicPartition(str, num.intValue());
        }).collect(Collectors.toList());
    }

    @VisibleForTesting
    List<PartitionConsumeData> doConsumeMulti(KafkaConsumer<byte[], byte[]> kafkaConsumer, List<TopicPartition> list, Map<Integer, Long> map, Boolean bool, Long l, Integer num) {
        log.debug("endOffsets(start): {}", Long.valueOf(System.nanoTime()));
        Map endOffsets = kafkaConsumer.endOffsets(list);
        log.debug("endOffsets(end): {}", Long.valueOf(System.nanoTime()));
        Map<Integer, Long> calculateInitialOffsets = calculateInitialOffsets(kafkaConsumer, list, map, endOffsets, bool, l);
        ConsumerRecords consumerRecords = new ConsumerRecords(Collections.emptyMap());
        if (canConsumeFromOffsets(calculateInitialOffsets, endOffsets)) {
            kafkaConsumer.assign(list);
            for (TopicPartition topicPartition : list) {
                kafkaConsumer.seek(topicPartition, calculateInitialOffsets.get(Integer.valueOf(topicPartition.partition())).longValue());
            }
            log.debug("poll(start): {}", Long.valueOf(System.nanoTime()));
            consumerRecords = kafkaConsumer.poll(POLL_TIMEOUT);
            log.debug("poll(end): {}", Long.valueOf(System.nanoTime()));
        }
        ImmutableList.Builder builder = new ImmutableList.Builder();
        for (TopicPartition topicPartition2 : list) {
            builder.add(getPartitionConsumeData(consumerRecords.records(topicPartition2), topicPartition2, calculateInitialOffsets, endOffsets, num));
        }
        return builder.build();
    }

    @VisibleForTesting
    public static int getOverallSerializedMessageSize(List<ConsumerRecord<byte[], byte[]>> list) {
        int i = 0;
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            i += consumerRecord.serializedKeySize() + consumerRecord.serializedValueSize();
        }
        return i;
    }

    @VisibleForTesting
    List<PartitionConsumeData> doConsumeMultiGuaranteeProgress(KafkaConsumer<byte[], byte[]> kafkaConsumer, List<TopicPartition> list, Map<Integer, Long> map, Boolean bool, Long l, Integer num, Integer num2) {
        log.debug("endOffsets(start): {}", Long.valueOf(System.nanoTime()));
        Map endOffsets = kafkaConsumer.endOffsets(list);
        log.debug("endOffsets(end): {}", Long.valueOf(System.nanoTime()));
        Map<Integer, Long> calculateInitialOffsets = calculateInitialOffsets(kafkaConsumer, list, map, endOffsets, bool, l);
        HashMap hashMap = new HashMap();
        if (canConsumeFromOffsets(calculateInitialOffsets, endOffsets)) {
            kafkaConsumer.assign(list);
            for (TopicPartition topicPartition : list) {
                kafkaConsumer.seek(topicPartition, calculateInitialOffsets.get(Integer.valueOf(topicPartition.partition())).longValue());
            }
            int i = 0;
            int i2 = 0;
            while (hashMap.size() < list.size() && i < MAX_POLLS && i2 <= MAX_RESPONSE_BYTES) {
                i++;
                log.debug("poll#{} (start): {}", Integer.valueOf(i), Long.valueOf(System.nanoTime()));
                ConsumerRecords poll = kafkaConsumer.poll(POLL_TIMEOUT);
                log.debug("poll#{} (end): {}", Integer.valueOf(i), Long.valueOf(System.nanoTime()));
                for (TopicPartition topicPartition2 : list) {
                    List records = poll.records(topicPartition2);
                    if (records != null && records.size() > 0) {
                        List subList = records.subList(0, Math.max(1, Math.min(records.size(), num.intValue())));
                        i2 += getOverallSerializedMessageSize(subList);
                        num = Integer.valueOf(num.intValue() - subList.size());
                        ((List) hashMap.computeIfAbsent(topicPartition2, topicPartition3 -> {
                            return new ArrayList();
                        })).addAll(subList);
                    }
                }
                kafkaConsumer.pause(hashMap.keySet());
            }
        }
        ImmutableList.Builder builder = new ImmutableList.Builder();
        for (TopicPartition topicPartition4 : list) {
            builder.add(getPartitionConsumeData((List) hashMap.get(topicPartition4), topicPartition4, calculateInitialOffsets, endOffsets, num2));
        }
        return builder.build();
    }

    private static Map<Integer, Long> calculateInitialOffsets(KafkaConsumer<byte[], byte[]> kafkaConsumer, List<TopicPartition> list, @Nullable Map<Integer, Long> map, Map<TopicPartition, Long> map2, @Nullable Boolean bool, @Nullable Long l) {
        Map<Integer, Long> map3;
        if (Boolean.TRUE.equals(bool)) {
            map3 = (Map) kafkaConsumer.beginningOffsets(list).entrySet().stream().collect(Collectors.toMap(entry -> {
                return Integer.valueOf(((TopicPartition) entry.getKey()).partition());
            }, (v0) -> {
                return v0.getValue();
            }));
        } else if (Boolean.FALSE.equals(bool)) {
            map3 = (Map) map2.entrySet().stream().collect(Collectors.toMap(entry2 -> {
                return Integer.valueOf(((TopicPartition) entry2.getKey()).partition());
            }, (v0) -> {
                return v0.getValue();
            }));
        } else if (l != null) {
            Map map4 = (Map) list.stream().collect(Collectors.toMap(topicPartition -> {
                return topicPartition;
            }, topicPartition2 -> {
                return l;
            }));
            log.debug("offsetsForTimes(start): {}", Long.valueOf(System.nanoTime()));
            Map offsetsForTimes = kafkaConsumer.offsetsForTimes(map4);
            log.debug("offsetsForTimes(end): {}", Long.valueOf(System.nanoTime()));
            map3 = (Map) offsetsForTimes.entrySet().stream().collect(Collectors.toMap(entry3 -> {
                return Integer.valueOf(((TopicPartition) entry3.getKey()).partition());
            }, entry4 -> {
                return entry4.getValue() != null ? Long.valueOf(((OffsetAndTimestamp) entry4.getValue()).offset()) : (Long) map2.get(entry4.getKey());
            }));
        } else {
            map3 = map != null ? (Map) kafkaConsumer.beginningOffsets(list).entrySet().stream().collect(Collectors.toMap(entry5 -> {
                return Integer.valueOf(((TopicPartition) entry5.getKey()).partition());
            }, entry6 -> {
                return Long.valueOf(Long.max(((Long) entry6.getValue()).longValue(), ((Long) map.get(Integer.valueOf(((TopicPartition) entry6.getKey()).partition()))).longValue()));
            })) : (Map) map2.entrySet().stream().collect(Collectors.toMap(entry7 -> {
                return Integer.valueOf(((TopicPartition) entry7.getKey()).partition());
            }, (v0) -> {
                return v0.getValue();
            }));
        }
        return map3;
    }

    @Override // io.confluent.kafkarest.controllers.SimpleConsumeManager
    public CompletableFuture<Long> getOffsetForPartition(@Nonnull TopicPartition topicPartition, Boolean bool, Long l) {
        log.debug("getOffsetForPartition(start): {}", Long.valueOf(System.nanoTime()));
        KafkaConsumer<byte[], byte[]> consumerWithConfigs = getConsumerWithConfigs(null, null);
        return CompletableFuture.supplyAsync(() -> {
            long longValue;
            if (Boolean.TRUE.equals(bool)) {
                longValue = ((Long) consumerWithConfigs.beginningOffsets(Collections.singleton(topicPartition)).get(topicPartition)).longValue();
            } else if (Boolean.FALSE.equals(bool)) {
                longValue = ((Long) consumerWithConfigs.endOffsets(Collections.singleton(topicPartition)).get(topicPartition)).longValue();
            } else {
                Map singletonMap = Collections.singletonMap(topicPartition, Objects.requireNonNull(l));
                log.debug("offsetsForTimes(start): {}", Long.valueOf(System.nanoTime()));
                Map offsetsForTimes = consumerWithConfigs.offsetsForTimes(singletonMap);
                log.debug("offsetsForTimes(end): {}", Long.valueOf(System.nanoTime()));
                OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) offsetsForTimes.get(topicPartition);
                longValue = offsetAndTimestamp == null ? ((Long) consumerWithConfigs.endOffsets(Collections.singleton(topicPartition)).get(topicPartition)).longValue() : offsetAndTimestamp.offset();
            }
            return Long.valueOf(longValue);
        }, this.simpleConsumeExecutor).handle((l2, th) -> {
            log.debug("getOffsetForPartition(end): {}", Long.valueOf(System.nanoTime()));
            return (Long) handleConsumeResult(l2, th, consumerWithConfigs);
        });
    }

    @Override // io.confluent.kafkarest.controllers.SimpleConsumeManager
    public CompletableFuture<PartitionsOffsetsData> getOffsetsForPartitions(@Nonnull String str, int i, Boolean bool, Long l) {
        log.debug("getOffsetsForPartitions(start): {}", Long.valueOf(System.nanoTime()));
        KafkaConsumer<byte[], byte[]> consumerWithConfigs = getConsumerWithConfigs(null, null);
        List<TopicPartition> partitions = getPartitions(str, i, null);
        return CompletableFuture.supplyAsync(() -> {
            log.debug("beginningOffsets(start): {}", Long.valueOf(System.nanoTime()));
            Map beginningOffsets = consumerWithConfigs.beginningOffsets(partitions);
            log.debug("beginningOffsets(end): {}", Long.valueOf(System.nanoTime()));
            log.debug("endOffsets(start): {}", Long.valueOf(System.nanoTime()));
            Map endOffsets = consumerWithConfigs.endOffsets(partitions);
            log.debug("endOffsets(end): {}", Long.valueOf(System.nanoTime()));
            long sum = partitions.stream().mapToLong(topicPartition -> {
                return Long.valueOf(((Long) endOffsets.get(topicPartition)).longValue() - ((Long) beginningOffsets.get(topicPartition)).longValue()).longValue();
            }).sum();
            Map<Integer, Long> calculateInitialOffsets = calculateInitialOffsets(consumerWithConfigs, partitions, null, endOffsets, bool, l);
            PartitionsOffsetsData partitionsOffsetsData = new PartitionsOffsetsData(sum);
            calculateInitialOffsets.forEach((num, l2) -> {
                partitionsOffsetsData.addOffset(PartitionOffsetData.builder().setPartitionId(num).setNextOffset(l2).build());
            });
            return partitionsOffsetsData;
        }, this.simpleConsumeExecutor).handle((partitionsOffsetsData, th) -> {
            log.debug("getOffsetsForPartitions(end): {}", Long.valueOf(System.nanoTime()));
            return (PartitionsOffsetsData) handleConsumeResult(partitionsOffsetsData, th, consumerWithConfigs);
        });
    }
}
