package io.confluent.kafkarest.integration.v3;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.controllers.CloudUIInternalModule;
import io.confluent.kafkarest.entities.v3.ListConsumerGroupOffsetsResultsData;
import io.confluent.kafkarest.entities.v3.TopicPartitionOffsetAndMetaData;
import io.confluent.kafkarest.entities.v3.UpdateConsumerGroupOffsetsRequest;
import io.confluent.kafkarest.entities.v3.UpdateConsumerGroupOffsetsResponse;
import io.confluent.kafkarest.extension.RestResourceExtension;
import io.confluent.kafkarest.integration.CloudClusterTestHarness;
import io.confluent.kafkarest.resources.v3.CloudUIInternalResourcesFeature;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Configurable;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.eclipse.jetty.http.HttpStatus;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

@Tag("IntegrationTest")
/* loaded from: input_file:io/confluent/kafkarest/integration/v3/ConsumerGroupOffsetsResourceIntegrationTest.class */
public class ConsumerGroupOffsetsResourceIntegrationTest extends CloudClusterTestHarness {
    private static final String TOPIC_NAME = "test_topic";
    private static final int NUM_PARTITIONS = 3;
    private static final String CONSUMER_GROUP = "test_cg";
    private static final String OFFSETS_API_PATH_TMP = "/v3/clusters/%s/internal/consumer-groups/%s/offsets";
    private Map<TopicPartition, Long> defaultResetOffsets = new HashMap();

    /* loaded from: input_file:io/confluent/kafkarest/integration/v3/ConsumerGroupOffsetsResourceIntegrationTest$ConsumerGroupInternalExtension.class */
    public static final class ConsumerGroupInternalExtension implements RestResourceExtension {
        public void register(Configurable<?> configurable, KafkaRestConfig kafkaRestConfig) {
            configurable.register(CloudUIInternalResourcesFeature.class);
            configurable.register(CloudUIInternalModule.class);
        }

        public void clean() {
        }
    }

    /* loaded from: input_file:io/confluent/kafkarest/integration/v3/ConsumerGroupOffsetsResourceIntegrationTest$KafkaConsumerThread.class */
    private class KafkaConsumerThread implements Runnable {
        private final Consumer<String, String> consumer;
        private AtomicBoolean running = new AtomicBoolean(true);
        private CountDownLatch postFirstPollLatch;

        public KafkaConsumerThread(CountDownLatch countDownLatch) {
            Properties consumerProperties = ConsumerGroupOffsetsResourceIntegrationTest.this.restConfig.getConsumerProperties();
            consumerProperties.put("bootstrap.servers", ConsumerGroupOffsetsResourceIntegrationTest.this.brokerList);
            consumerProperties.put("group.id", ConsumerGroupOffsetsResourceIntegrationTest.CONSUMER_GROUP);
            consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            this.consumer = new KafkaConsumer(consumerProperties);
            this.postFirstPollLatch = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.consumer.subscribe(Collections.singletonList(ConsumerGroupOffsetsResourceIntegrationTest.TOPIC_NAME));
            while (this.running.get()) {
                this.consumer.poll(Duration.ofMillis(100L));
                this.postFirstPollLatch.countDown();
            }
            this.consumer.close();
        }

        public void stop() {
            this.running.set(false);
        }
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        super.setUp(testInfo);
        createTopic(TOPIC_NAME, NUM_PARTITIONS, (short) 1);
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            this.defaultResetOffsets.put(new TopicPartition(TOPIC_NAME, i), Long.valueOf((i + 1) * 10));
        }
    }

    protected void overrideKafkaRestConfigs(Properties properties) {
        properties.put("kafka.rest.resource.extension.class", ConsumerGroupInternalExtension.class.getName());
    }

    public Properties overrideBrokerProperties(int i, Properties properties) {
        properties.put("auto.include.jmx.reporter", false);
        properties.put("confluent.reporters.telemetry.auto.enable", false);
        return super.overrideBrokerProperties(i, properties);
    }

    @Test
    public void test_getConsumerGroupOffsets_WithValidGroupId_ThenReturnsOffsets() {
        setupConsumerGroupWithOffset();
        Response response = request(String.format(OFFSETS_API_PATH_TMP, getClusterId(), CONSUMER_GROUP)).get();
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        ListConsumerGroupOffsetsResultsData listConsumerGroupOffsetsResultsData = (ListConsumerGroupOffsetsResultsData) response.readEntity(ListConsumerGroupOffsetsResultsData.class);
        Assertions.assertEquals(CONSUMER_GROUP, listConsumerGroupOffsetsResultsData.getConsumerGroupId());
        List offsets = listConsumerGroupOffsetsResultsData.getOffsets();
        Assertions.assertEquals(NUM_PARTITIONS, offsets.size());
        HashMap hashMap = new HashMap(this.defaultResetOffsets);
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            TopicPartitionOffsetAndMetaData topicPartitionOffsetAndMetaData = (TopicPartitionOffsetAndMetaData) offsets.get(i);
            TopicPartition topicPartition = new TopicPartition(topicPartitionOffsetAndMetaData.getTopicName(), topicPartitionOffsetAndMetaData.getPartitionId().intValue());
            Long l = (Long) hashMap.get(topicPartition);
            Assertions.assertNotEquals((Long) null, l);
            if (l != null) {
                Assertions.assertEquals(l, topicPartitionOffsetAndMetaData.getOffset());
                hashMap.remove(topicPartition);
            }
        }
        Assertions.assertTrue(hashMap.isEmpty(), "Offsets for all partitions have not been returned.");
    }

    @Test
    public void test_getConsumerGroupOffsets_WithNonExistingGroupId_ThenReturns404() {
        Response response = request(String.format(OFFSETS_API_PATH_TMP, getClusterId(), "invalid_consumer_group")).get();
        Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus());
        String str = (String) response.readEntity(String.class);
        Assertions.assertTrue(str.contains("Consumer group id not found."));
        Assertions.assertTrue(str.contains(Integer.toString(40405)));
    }

    @Test
    public void test_updateConsumerGroupOffset_WithValidGroupId_ThenResetHappensCorrectly() throws ExecutionException, InterruptedException, JsonProcessingException {
        setupConsumerGroupWithOffset();
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> logStartOffsets = getLogStartOffsets();
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> logEndOffsets = getLogEndOffsets();
        ArrayList arrayList = new ArrayList();
        arrayList.add(UpdateConsumerGroupOffsetsRequest.TopicPartitionOffsetData.create(TOPIC_NAME, 0, UpdateConsumerGroupOffsetsRequest.OffsetData.create((Long) null, UpdateConsumerGroupOffsetsRequest.OffsetType.EARLIEST)));
        arrayList.add(UpdateConsumerGroupOffsetsRequest.TopicPartitionOffsetData.create(TOPIC_NAME, 1, UpdateConsumerGroupOffsetsRequest.OffsetData.create((Long) null, UpdateConsumerGroupOffsetsRequest.OffsetType.LATEST)));
        TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 2);
        long longValue = this.defaultResetOffsets.get(topicPartition).longValue();
        long offset = logEndOffsets.get(topicPartition).offset();
        Assertions.assertTrue(longValue < offset);
        long j = (longValue + offset) / 2;
        Assertions.assertTrue(longValue < j && j < offset);
        arrayList.add(UpdateConsumerGroupOffsetsRequest.TopicPartitionOffsetData.create(TOPIC_NAME, 2, UpdateConsumerGroupOffsetsRequest.OffsetData.create(Long.valueOf(j), UpdateConsumerGroupOffsetsRequest.OffsetType.OFFSET)));
        Response method = request(String.format(OFFSETS_API_PATH_TMP, getClusterId(), CONSUMER_GROUP), true).accept(new String[]{"application/json"}).method("PATCH", Entity.json(new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(UpdateConsumerGroupOffsetsRequest.builder().setTopicPartitionOffsetRequests(arrayList).build())));
        Assertions.assertEquals(HttpStatus.Code.MULTI_STATUS.getCode(), method.getStatus());
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            arrayList2.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(TOPIC_NAME, Integer.valueOf(i), Integer.valueOf(Response.Status.OK.getStatusCode()), ""));
        }
        Assertions.assertEquals(UpdateConsumerGroupOffsetsResponse.create(arrayList2), (UpdateConsumerGroupOffsetsResponse) method.readEntity(UpdateConsumerGroupOffsetsResponse.class));
        Map<TopicPartition, OffsetAndMetadata> listOffsetsForConsumerGroup = listOffsetsForConsumerGroup();
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < NUM_PARTITIONS; i2++) {
            TopicPartition topicPartition2 = new TopicPartition(TOPIC_NAME, i2);
            if (i2 == 0) {
                hashMap.put(topicPartition2, new OffsetAndMetadata(logStartOffsets.get(topicPartition2).offset()));
            } else if (i2 == 1) {
                hashMap.put(topicPartition2, new OffsetAndMetadata(logEndOffsets.get(topicPartition2).offset()));
            } else {
                hashMap.put(topicPartition2, new OffsetAndMetadata(j));
            }
        }
        Assertions.assertEquals(hashMap, listOffsetsForConsumerGroup);
    }

    @Test
    public void test_updateConsumerGroupOffset_WithNonExistingGroupId_ThenReturns404() throws JsonProcessingException {
        Response method = request(String.format(OFFSETS_API_PATH_TMP, getClusterId(), "invalid_consumer_group"), true).accept(new String[]{"application/json"}).method("PATCH", Entity.json(new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(UpdateConsumerGroupOffsetsRequest.builder().setTopicPartitionOffsetRequests(new ArrayList()).build())));
        Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), method.getStatus());
        String str = (String) method.readEntity(String.class);
        Assertions.assertTrue(str.contains("Consumer group id not found."));
        Assertions.assertTrue(str.contains(Integer.toString(40405)));
    }

    @Test
    public void test_updateConsumerGroupOffset_WithZeroPartitionsToReset_ThenReturns200() throws JsonProcessingException {
        setupConsumerGroupWithOffset();
        Response method = request(String.format(OFFSETS_API_PATH_TMP, getClusterId(), CONSUMER_GROUP), true).accept(new String[]{"application/json"}).method("PATCH", Entity.json(new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(UpdateConsumerGroupOffsetsRequest.builder().setTopicPartitionOffsetRequests(new ArrayList()).build())));
        Assertions.assertEquals(HttpStatus.Code.MULTI_STATUS.getCode(), method.getStatus());
        Assertions.assertTrue(((UpdateConsumerGroupOffsetsResponse) method.readEntity(UpdateConsumerGroupOffsetsResponse.class)).getUpdateResponses().isEmpty());
    }

    @Test
    public void test_updateConsumerGroupOffset_WithActiveConsumerGroup_ThatReturns400() throws JsonProcessingException, InterruptedException, ExecutionException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(countDownLatch);
        Thread thread = new Thread(kafkaConsumerThread);
        thread.start();
        countDownLatch.await();
        Thread.sleep(1000L);
        Assertions.assertEquals(ConsumerGroupState.STABLE, describeConsumerGroup().state());
        Response method = request(String.format(OFFSETS_API_PATH_TMP, getClusterId(), CONSUMER_GROUP), true).accept(new String[]{"application/json"}).method("PATCH", Entity.json(new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(UpdateConsumerGroupOffsetsRequest.builder().setTopicPartitionOffsetRequests(new ArrayList()).build())));
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), method.getStatus());
        Assertions.assertTrue(((String) method.readEntity(String.class)).contains(String.format("%s has state %s, alter offsets only allowed in state 'empty' or 'dead'.", CONSUMER_GROUP, "STABLE")));
        kafkaConsumerThread.stop();
        thread.join();
    }

    private void produceToTopicPartition(int i, int i2) {
        Properties producerProperties = this.restConfig.getProducerProperties();
        producerProperties.put("bootstrap.servers", this.brokerList);
        KafkaProducer kafkaProducer = new KafkaProducer(producerProperties, new ByteArraySerializer(), new ByteArraySerializer());
        CompletableFuture[] completableFutureArr = new CompletableFuture[i];
        long[] jArr = new long[i];
        for (int i3 = 0; i3 < i; i3++) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFutureArr[i3] = completableFuture;
            int i4 = i3;
            kafkaProducer.send(new ProducerRecord(TOPIC_NAME, Integer.valueOf(i2), ("key" + i3).getBytes(StandardCharsets.UTF_8), ("value" + i3).getBytes(StandardCharsets.UTF_8)), (recordMetadata, exc) -> {
                if (exc != null) {
                    Assertions.fail("Failed producing during simple consume integration test setup");
                } else {
                    completableFuture.complete(recordMetadata);
                    jArr[i4] = recordMetadata.timestamp();
                }
            });
        }
        CompletableFuture.allOf(completableFutureArr).join();
        kafkaProducer.close(Duration.ofMillis(100L));
    }

    private void resetOffsetsForConsumerGroup(Map<TopicPartition, OffsetAndMetadata> map) throws ExecutionException, InterruptedException {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        AdminClient create = AdminClient.create(adminProperties);
        create.alterConsumerGroupOffsets(CONSUMER_GROUP, map).all().get();
        create.close();
    }

    private ConsumerGroupDescription describeConsumerGroup() throws ExecutionException, InterruptedException {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        AdminClient create = AdminClient.create(adminProperties);
        Map map = (Map) create.describeConsumerGroups(Collections.singletonList(CONSUMER_GROUP)).all().get();
        create.close();
        return (ConsumerGroupDescription) map.get(CONSUMER_GROUP);
    }

    private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> getLogStartOffsets() throws ExecutionException, InterruptedException {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        AdminClient create = AdminClient.create(adminProperties);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            hashMap.put(new TopicPartition(TOPIC_NAME, i), OffsetSpec.earliest());
        }
        return (Map) create.listOffsets(hashMap).all().get();
    }

    private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> getLogEndOffsets() throws ExecutionException, InterruptedException {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        AdminClient create = AdminClient.create(adminProperties);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            hashMap.put(new TopicPartition(TOPIC_NAME, i), OffsetSpec.latest());
        }
        return (Map) create.listOffsets(hashMap).all().get();
    }

    private Map<TopicPartition, OffsetAndMetadata> listOffsetsForConsumerGroup() throws ExecutionException, InterruptedException {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        return (Map) AdminClient.create(adminProperties).listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get();
    }

    private void setupConsumerGroupWithOffset() {
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            produceToTopicPartition(50, i);
        }
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < NUM_PARTITIONS; i2++) {
            TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, i2);
            hashMap.put(topicPartition, new OffsetAndMetadata(this.defaultResetOffsets.get(topicPartition).longValue()));
        }
        try {
            resetOffsetsForConsumerGroup(hashMap);
        } catch (InterruptedException e) {
            Assertions.fail(e);
        } catch (ExecutionException e2) {
            Assertions.fail(e2);
        }
    }
}
