package io.confluent.kafkarest.integration.v3;

import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.controllers.SimpleConsumeModule;
import io.confluent.kafkarest.entities.v3.PartitionConsumeData;
import io.confluent.kafkarest.entities.v3.PartitionConsumeRecord;
import io.confluent.kafkarest.entities.v3.SimpleConsumeMultiPartitionResponse;
import io.confluent.kafkarest.entities.v3.SimpleConsumeSinglePartitionResponse;
import io.confluent.kafkarest.extension.RestResourceExtension;
import io.confluent.kafkarest.integration.ClusterTestHarness;
import io.confluent.kafkarest.resources.v3.SimpleConsumeResourcesFeature;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Configurable;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/confluent/kafkarest/integration/v3/SimpleConsumeActionIntegrationTest.class */
public final class SimpleConsumeActionIntegrationTest extends ClusterTestHarness {
    private static final String TOPIC_NAME = "topic-1";
    private static final String SINGLE_PARTITION_REQUEST_PATH_TMPL = "/v3/clusters/%s/internal/topics/%s/partitions/%d/records";
    private static final String MULTI_PARTITION_REQUEST_PATH_TMPL = "/v3/clusters/%s/internal/topics/%s/partitions/-/records:consume";
    private static final int NUM_PARTITIONS = 3;

    /* loaded from: input_file:io/confluent/kafkarest/integration/v3/SimpleConsumeActionIntegrationTest$SimpleConsumeExtension.class */
    public static final class SimpleConsumeExtension implements RestResourceExtension {
        public void register(Configurable<?> configurable, KafkaRestConfig kafkaRestConfig) {
            configurable.register(SimpleConsumeResourcesFeature.class);
            configurable.register(SimpleConsumeModule.class);
        }

        public void clean() {
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        createTopic(TOPIC_NAME, NUM_PARTITIONS, (short) 1);
    }

    protected void overrideKafkaRestConfigs(Properties properties) {
        properties.put("kafka.rest.resource.extension.class", SimpleConsumeExtension.class.getName());
    }

    public Properties overrideBrokerProperties(int i, Properties properties) {
        properties.put("auto.include.jmx.reporter", false);
        properties.put("confluent.reporters.telemetry.auto.enable", false);
        return super.overrideBrokerProperties(i, properties);
    }

    private long[] produceToAllPartitions() {
        Properties producerProperties = this.restConfig.getProducerProperties();
        producerProperties.put("bootstrap.servers", this.brokerList);
        KafkaProducer kafkaProducer = new KafkaProducer(producerProperties, new ByteArraySerializer(), new ByteArraySerializer());
        CompletableFuture[] completableFutureArr = new CompletableFuture[NUM_PARTITIONS];
        long[] jArr = new long[NUM_PARTITIONS];
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFutureArr[i] = completableFuture;
            int i2 = i;
            kafkaProducer.send(new ProducerRecord(TOPIC_NAME, Integer.valueOf(i), ("key" + i).getBytes(StandardCharsets.UTF_8), ("value" + i).getBytes(StandardCharsets.UTF_8)), (recordMetadata, exc) -> {
                if (exc != null) {
                    Assertions.fail("Failed producing during simple consume integration test setup");
                } else {
                    completableFuture.complete(recordMetadata);
                    jArr[i2] = recordMetadata.timestamp();
                }
            });
        }
        CompletableFuture.allOf(completableFutureArr).join();
        return jArr;
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithOffset_Ok() {
        long[] produceToAllPartitions = produceToAllPartitions();
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("offset", "0")).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
            PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
            Assertions.assertTrue((partitionData == null || partitionData.getRecords() == null || partitionData.getRecords().size() != 1) ? false : true);
            PartitionConsumeRecord partitionConsumeRecord = (PartitionConsumeRecord) partitionData.getRecords().get(0);
            Assertions.assertEquals(0L, partitionConsumeRecord.getOffset());
            Assertions.assertEquals(i, partitionConsumeRecord.getPartitionId());
            Assertions.assertEquals(produceToAllPartitions[i], partitionConsumeRecord.getTimestamp());
            Assertions.assertEquals("key" + i, partitionConsumeRecord.getKey());
            Assertions.assertEquals("value" + i, partitionConsumeRecord.getValue());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithNegativeOffset_ReturnsHttp400() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("offset", "-1")).get().getStatus());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithTimestamp_Ok() {
        long[] produceToAllPartitions = produceToAllPartitions();
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("timestamp", Long.toString(produceToAllPartitions[i]))).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
            PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
            Assertions.assertTrue((partitionData == null || partitionData.getRecords() == null || partitionData.getRecords().size() != 1) ? false : true);
            PartitionConsumeRecord partitionConsumeRecord = (PartitionConsumeRecord) partitionData.getRecords().get(0);
            Assertions.assertEquals(0L, partitionConsumeRecord.getOffset());
            Assertions.assertEquals(i, partitionConsumeRecord.getPartitionId());
            Assertions.assertEquals(produceToAllPartitions[i], partitionConsumeRecord.getTimestamp());
            Assertions.assertEquals("key" + i, partitionConsumeRecord.getKey());
            Assertions.assertEquals("value" + i, partitionConsumeRecord.getValue());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithNegativeTimestamp_ReturnsHttp400() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("timestamp", "-1")).get().getStatus());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithInvalidTimestamp_ReturnsHttp400() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("timestamp", Long.toString(Long.MAX_VALUE))).get().getStatus());
        }
    }

    @Test
    public void testSimpleConsume_fromMultiplePartitionFromOffsets_Ok() {
        JSONObject jSONObject = new JSONObject();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            JSONObject jSONObject2 = new JSONObject();
            jSONObject2.put("partition_id", i);
            jSONObject2.put("offset", 0);
            arrayList.add(jSONObject2);
        }
        jSONObject.put("offsets", (Collection) arrayList);
        testSimpleConsumeFromMultiplePartitionsOk(jSONObject);
    }

    @Test
    public void testSimpleConsume_fromMultiplePartitionFromBeginning_Ok() {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("from_beginning", "true");
        testSimpleConsumeFromMultiplePartitionsOk(jSONObject);
    }

    private void testSimpleConsumeFromMultiplePartitionsOk(JSONObject jSONObject) {
        long[] produceToAllPartitions = produceToAllPartitions();
        String clusterId = getClusterId();
        Response post = request(String.format(MULTI_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME)).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
        SimpleConsumeMultiPartitionResponse simpleConsumeMultiPartitionResponse = (SimpleConsumeMultiPartitionResponse) post.readEntity(SimpleConsumeMultiPartitionResponse.class);
        Assertions.assertNotNull(simpleConsumeMultiPartitionResponse);
        Assertions.assertEquals(clusterId, simpleConsumeMultiPartitionResponse.getClusterId());
        Assertions.assertEquals(TOPIC_NAME, simpleConsumeMultiPartitionResponse.getTopicName());
        List partitionDataList = simpleConsumeMultiPartitionResponse.getPartitionDataList();
        Assertions.assertTrue(partitionDataList != null && partitionDataList.size() == NUM_PARTITIONS);
        boolean z = false;
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            PartitionConsumeData partitionConsumeData = (PartitionConsumeData) partitionDataList.get(i);
            Assertions.assertNotNull(partitionConsumeData);
            Assertions.assertEquals(i, partitionConsumeData.getPartitionId());
            if (partitionConsumeData.getRecords() != null) {
                PartitionConsumeRecord partitionConsumeRecord = (PartitionConsumeRecord) partitionConsumeData.getRecords().get(0);
                Assertions.assertEquals(0L, partitionConsumeRecord.getOffset());
                Assertions.assertEquals(i, partitionConsumeRecord.getPartitionId());
                Assertions.assertEquals(produceToAllPartitions[i], partitionConsumeRecord.getTimestamp());
                Assertions.assertEquals("key" + i, partitionConsumeRecord.getKey());
                Assertions.assertEquals("value" + i, partitionConsumeRecord.getValue());
                z = true;
            }
        }
        Assertions.assertTrue(z);
    }

    @Test
    public void testSimpleConsume_fromMultiplePartitionInvalidOffset_ReturnsHttp400() {
        produceToAllPartitions();
        String format = String.format(MULTI_PARTITION_REQUEST_PATH_TMPL, getClusterId(), TOPIC_NAME);
        JSONObject jSONObject = new JSONObject();
        ArrayList arrayList = new ArrayList();
        int i = 0;
        while (i < NUM_PARTITIONS) {
            JSONObject jSONObject2 = new JSONObject();
            jSONObject2.put("partition_id", i);
            jSONObject2.put("offset", i == 2 ? -1 : 0);
            arrayList.add(jSONObject2);
            i++;
        }
        jSONObject.put("offsets", (Collection) arrayList);
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(format).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE)).getStatus());
    }
}
