package io.confluent.kafkarest.integration.v3;

import com.google.common.collect.ImmutableMap;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.TestUtils;
import io.confluent.kafkarest.controllers.CloudUIInternalModule;
import io.confluent.kafkarest.entities.v3.PartitionConsumeData;
import io.confluent.kafkarest.entities.v3.PartitionConsumeRecord;
import io.confluent.kafkarest.entities.v3.PartitionOffsetData;
import io.confluent.kafkarest.entities.v3.SimpleConsumeMultiPartitionResponse;
import io.confluent.kafkarest.entities.v3.SimpleConsumeOffsetsMultiPartitionResponse;
import io.confluent.kafkarest.entities.v3.SimpleConsumeOffsetsSinglePartitionResponse;
import io.confluent.kafkarest.entities.v3.SimpleConsumeSinglePartitionResponse;
import io.confluent.kafkarest.extension.RestResourceExtension;
import io.confluent.kafkarest.integration.CloudClusterTestHarness;
import io.confluent.kafkarest.resources.v3.CloudUIInternalResourcesFeature;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Configurable;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("IntegrationTest")
/* loaded from: input_file:io/confluent/kafkarest/integration/v3/SimpleConsumeActionIntegrationTest.class */
public final class SimpleConsumeActionIntegrationTest extends CloudClusterTestHarness {
    private static final String TOPIC_NAME = "topic-1";
    private static final String SINGLE_PARTITION_REQUEST_PATH_TMPL = "/v3/clusters/%s/internal/topics/%s/partitions/%d/records";
    private static final String MULTI_PARTITION_REQUEST_PATH_TMPL = "/v3/clusters/%s/internal/topics/%s/partitions/-/records:consume";
    private static final String MULTI_PARTITION_GUARANTEE_REQUEST_PATH_TMPL = "/v3/clusters/%s/internal/topics/%s/partitions/-/records:consume_guarantee_progress";
    private static final String SINGLE_PARTITION_OFFSETS_REQUEST_PATH_TMPL = "/v3/clusters/%s/internal/topics/%s/partitions/%d/records:offsets";
    private static final String MULTI_PARTITION_OFFSETS_REQUEST_PATH_TMPL = "/v3/clusters/%s/internal/topics/%s/partitions/-/records:offsets";
    private static final int NUM_PARTITIONS = 3;
    private static final int NUM_RECORDS = 5;
    private static final Random RNG = new Random();

    /* loaded from: input_file:io/confluent/kafkarest/integration/v3/SimpleConsumeActionIntegrationTest$SimpleConsumeExtension.class */
    public static final class SimpleConsumeExtension implements RestResourceExtension {
        public void register(Configurable<?> configurable, KafkaRestConfig kafkaRestConfig) {
            configurable.register(CloudUIInternalResourcesFeature.class);
            configurable.register(CloudUIInternalModule.class);
        }

        public void clean() {
        }
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws Exception {
        super.setUp(testInfo);
        createTopic(TOPIC_NAME, NUM_PARTITIONS, (short) 1);
    }

    protected void overrideKafkaRestConfigs(Properties properties) {
        properties.put("kafka.rest.resource.extension.class", SimpleConsumeExtension.class.getName());
    }

    public Properties overrideBrokerProperties(int i, Properties properties) {
        properties.put("auto.include.jmx.reporter", false);
        properties.put("confluent.reporters.telemetry.auto.enable", false);
        return super.overrideBrokerProperties(i, properties);
    }

    private long[] produceToOnePartition(int i, int i2, int i3, int i4) {
        Properties producerProperties = this.restConfig.getProducerProperties();
        producerProperties.put("bootstrap.servers", this.brokerList);
        KafkaProducer kafkaProducer = new KafkaProducer(producerProperties, new ByteArraySerializer(), new ByteArraySerializer());
        CompletableFuture[] completableFutureArr = new CompletableFuture[i2];
        long[] jArr = new long[i2];
        for (int i5 = 0; i5 < i2; i5++) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFutureArr[i5] = completableFuture;
            int i6 = i5;
            kafkaProducer.send(new ProducerRecord(TOPIC_NAME, Integer.valueOf(i), ("key" + i5 + TestUtils.generateAlphanumericString(RNG, i3)).getBytes(StandardCharsets.UTF_8), ("value" + i5 + TestUtils.generateAlphanumericString(RNG, i4)).getBytes(StandardCharsets.UTF_8)), (recordMetadata, exc) -> {
                if (exc != null) {
                    Assertions.fail("Failed producing during simple consume integration test setup");
                } else {
                    completableFuture.complete(recordMetadata);
                    jArr[i6] = recordMetadata.timestamp();
                }
            });
        }
        CompletableFuture.allOf(completableFutureArr).join();
        kafkaProducer.close(Duration.ofMillis(100L));
        return jArr;
    }

    private long[] produceToOnePartition(int i) {
        return produceToOnePartition(0, i, 0, 0);
    }

    private long[] produceToOnePartition(int i, int i2) {
        return produceToOnePartition(i, i2, 0, 0);
    }

    private long[] produceToAllPartitions() {
        Properties producerProperties = this.restConfig.getProducerProperties();
        producerProperties.put("bootstrap.servers", this.brokerList);
        KafkaProducer kafkaProducer = new KafkaProducer(producerProperties, new ByteArraySerializer(), new ByteArraySerializer());
        CompletableFuture[] completableFutureArr = new CompletableFuture[NUM_PARTITIONS];
        long[] jArr = new long[NUM_PARTITIONS];
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFutureArr[i] = completableFuture;
            int i2 = i;
            kafkaProducer.send(new ProducerRecord(TOPIC_NAME, Integer.valueOf(i), ("key" + i).getBytes(StandardCharsets.UTF_8), ("value" + i).getBytes(StandardCharsets.UTF_8)), (recordMetadata, exc) -> {
                if (exc != null) {
                    Assertions.fail("Failed producing during simple consume integration test setup");
                } else {
                    completableFuture.complete(recordMetadata);
                    jArr[i2] = recordMetadata.timestamp();
                }
            });
        }
        CompletableFuture.allOf(completableFutureArr).join();
        kafkaProducer.close(Duration.ofMillis(100L));
        return jArr;
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithoutPositionCriteria_Ok() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i))).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
            PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
            Assertions.assertTrue((partitionData == null || partitionData.getRecords() == null || !partitionData.getRecords().isEmpty()) ? false : true);
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithFromBeginningTrue_Ok() {
        long[] produceToAllPartitions = produceToAllPartitions();
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("from_beginning", "true")).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
            PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
            Assertions.assertTrue((partitionData == null || partitionData.getRecords() == null || partitionData.getRecords().size() != 1) ? false : true);
            PartitionConsumeRecord partitionConsumeRecord = (PartitionConsumeRecord) partitionData.getRecords().get(0);
            Assertions.assertEquals(0L, partitionConsumeRecord.getOffset());
            Assertions.assertEquals(i, partitionConsumeRecord.getPartitionId());
            Assertions.assertEquals(produceToAllPartitions[i], partitionConsumeRecord.getTimestamp());
            Assertions.assertEquals("key" + i, partitionConsumeRecord.getKey());
            Assertions.assertEquals("value" + i, partitionConsumeRecord.getValue());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithFromBeginningFalse_Ok() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("from_beginning", "false")).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
            PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
            Assertions.assertTrue((partitionData == null || partitionData.getRecords() == null || !partitionData.getRecords().isEmpty()) ? false : true);
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithInvalidFromBeginning_Ok() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("from_beginning", "invalid_value")).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
            PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
            Assertions.assertTrue((partitionData == null || partitionData.getRecords() == null || !partitionData.getRecords().isEmpty()) ? false : true);
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithOffset_TopicEmpty_Ok() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("offset", "0")).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
            PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
            Assertions.assertTrue((partitionData == null || partitionData.getRecords() == null || !partitionData.getRecords().isEmpty()) ? false : true);
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithOffset_Ok() {
        long[] produceToAllPartitions = produceToAllPartitions();
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("offset", "0")).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
            PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
            Assertions.assertTrue((partitionData == null || partitionData.getRecords() == null || partitionData.getRecords().size() != 1) ? false : true);
            PartitionConsumeRecord partitionConsumeRecord = (PartitionConsumeRecord) partitionData.getRecords().get(0);
            Assertions.assertEquals(0L, partitionConsumeRecord.getOffset());
            Assertions.assertEquals(i, partitionConsumeRecord.getPartitionId());
            Assertions.assertEquals(produceToAllPartitions[i], partitionConsumeRecord.getTimestamp());
            Assertions.assertEquals("key" + i, partitionConsumeRecord.getKey());
            Assertions.assertEquals("value" + i, partitionConsumeRecord.getValue());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithNegativeOffset_ReturnsHttp400() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("offset", "-1")).get().getStatus());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithEmptyOffset_ReturnsHttp400() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("offset", "")).get().getStatus());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithNonnumericOffset_ReturnsHttp400() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("offset", "fruitbat")).get().getStatus());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithTimestamp_Ok() {
        long[] produceToAllPartitions = produceToAllPartitions();
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("timestamp", Long.toString(produceToAllPartitions[i]))).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
            PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
            Assertions.assertTrue((partitionData == null || partitionData.getRecords() == null || partitionData.getRecords().size() != 1) ? false : true);
            PartitionConsumeRecord partitionConsumeRecord = (PartitionConsumeRecord) partitionData.getRecords().get(0);
            Assertions.assertEquals(0L, partitionConsumeRecord.getOffset());
            Assertions.assertEquals(i, partitionConsumeRecord.getPartitionId());
            Assertions.assertEquals(produceToAllPartitions[i], partitionConsumeRecord.getTimestamp());
            Assertions.assertEquals("key" + i, partitionConsumeRecord.getKey());
            Assertions.assertEquals("value" + i, partitionConsumeRecord.getValue());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithNegativeTimestamp_ReturnsHttp400() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("timestamp", "-1")).get().getStatus());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithEmptyTimestamp_ReturnsHttp400() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("timestamp", "")).get().getStatus());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithNonnumericTimestamp_ReturnsHttp400() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("timestamp", "fruitbat")).get().getStatus());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionMoreThanOnePositionParams_ReturnsHttp400() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), ImmutableMap.of("timestamp", Long.toString(Instant.now().toEpochMilli()), "offset", "0")).get().getStatus());
        }
    }

    @Test
    public void testSimpleConsume_fromSinglePartitionWithInvalidTimestamp_ReturnsHttp200() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), ImmutableMap.of("timestamp", Long.toString(Long.MAX_VALUE), "max_poll_records", "10")).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
            PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
            Assertions.assertNotNull(partitionData);
            Assertions.assertNotNull(partitionData.getRecords());
            Assertions.assertTrue(partitionData.getRecords().isEmpty());
        }
    }

    @ValueSource(ints = {-1, 0, 10, 20})
    @ParameterizedTest
    public void testSimpleConsume_fromSinglePartition_WithMaxPollRecords(int i) {
        produceToOnePartition(15);
        String clusterId = getClusterId();
        for (int i2 = 0; i2 < NUM_PARTITIONS; i2++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i2)), ImmutableMap.of("from_beginning", "true", "max_poll_records", String.valueOf(i))).get();
            if (i <= 0) {
                Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus());
            } else {
                Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
                if (i2 == 0) {
                    SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
                    Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
                    Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
                    Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
                    PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
                    Assertions.assertNotNull(partitionData);
                    Assertions.assertNotNull(partitionData.getRecords());
                    Assertions.assertEquals(Math.min(i, 15), partitionData.getRecords().size());
                } else {
                    SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse2 = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
                    Assertions.assertNotNull(simpleConsumeSinglePartitionResponse2);
                    Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse2.getClusterId());
                    Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse2.getTopicName());
                    PartitionConsumeData partitionData2 = simpleConsumeSinglePartitionResponse2.getPartitionData();
                    Assertions.assertNotNull(partitionData2);
                    Assertions.assertNotNull(partitionData2.getRecords());
                    Assertions.assertTrue(partitionData2.getRecords().isEmpty());
                }
            }
        }
    }

    @ValueSource(ints = {-1, 0, 10, 1048576})
    @ParameterizedTest
    public void testSimpleConsume_fromSinglePartition_WithFetchMaxBytes(int i) {
        produceToOnePartition(10);
        String clusterId = getClusterId();
        for (int i2 = 0; i2 < NUM_PARTITIONS; i2++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i2)), ImmutableMap.of("from_beginning", "true", "fetch_max_bytes", String.valueOf(i), "max_poll_records", "100")).get();
            if (i < 0) {
                Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus());
            } else {
                Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
                if (i2 == 0) {
                    SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
                    Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
                    Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
                    Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
                    PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
                    Assertions.assertNotNull(partitionData);
                    Assertions.assertNotNull(partitionData.getRecords());
                    Assertions.assertEquals(10, partitionData.getRecords().size());
                } else {
                    SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse2 = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
                    Assertions.assertNotNull(simpleConsumeSinglePartitionResponse2);
                    Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse2.getClusterId());
                    Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse2.getTopicName());
                    PartitionConsumeData partitionData2 = simpleConsumeSinglePartitionResponse2.getPartitionData();
                    Assertions.assertNotNull(partitionData2);
                    Assertions.assertNotNull(partitionData2.getRecords());
                    Assertions.assertTrue(partitionData2.getRecords().isEmpty());
                }
            }
        }
    }

    @ValueSource(ints = {-1, 0, 20, 100})
    @ParameterizedTest
    public void testSimpleConsume_fromSinglePartition_WithMessageMaxBytes(int i) {
        produceToOnePartition(0, 1, 0, 20);
        String clusterId = getClusterId();
        for (int i2 = 0; i2 < NUM_PARTITIONS; i2++) {
            Response response = request(String.format(SINGLE_PARTITION_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i2)), ImmutableMap.of("from_beginning", "true", "message_max_bytes", String.valueOf(i))).get();
            if (i < 0) {
                Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus());
            } else {
                Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
                if (i2 == 0) {
                    SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
                    Assertions.assertNotNull(simpleConsumeSinglePartitionResponse);
                    Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse.getClusterId());
                    Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse.getTopicName());
                    PartitionConsumeData partitionData = simpleConsumeSinglePartitionResponse.getPartitionData();
                    Assertions.assertNotNull(partitionData);
                    Assertions.assertNotNull(partitionData.getRecords());
                    Assertions.assertEquals(1, partitionData.getRecords().size());
                    PartitionConsumeRecord partitionConsumeRecord = (PartitionConsumeRecord) partitionData.getRecords().get(0);
                    PartitionConsumeRecord.ExceededFields exceededFields = partitionConsumeRecord.getExceededFields();
                    if (i > 20) {
                        Assertions.assertNull(exceededFields);
                    } else {
                        Assertions.assertNotNull(exceededFields);
                        if (i == 0) {
                            Assertions.assertTrue(exceededFields.isKey());
                            Assertions.assertNull(partitionConsumeRecord.getKey());
                            Assertions.assertTrue(exceededFields.isValue());
                            Assertions.assertNull(partitionConsumeRecord.getValue());
                        }
                        if (i == 20) {
                            Assertions.assertFalse(exceededFields.isKey());
                            Assertions.assertNotNull(partitionConsumeRecord.getKey());
                            Assertions.assertTrue(exceededFields.isValue());
                            Assertions.assertNull(partitionConsumeRecord.getValue());
                        }
                    }
                } else {
                    SimpleConsumeSinglePartitionResponse simpleConsumeSinglePartitionResponse2 = (SimpleConsumeSinglePartitionResponse) response.readEntity(SimpleConsumeSinglePartitionResponse.class);
                    Assertions.assertNotNull(simpleConsumeSinglePartitionResponse2);
                    Assertions.assertEquals(clusterId, simpleConsumeSinglePartitionResponse2.getClusterId());
                    Assertions.assertEquals(TOPIC_NAME, simpleConsumeSinglePartitionResponse2.getTopicName());
                    PartitionConsumeData partitionData2 = simpleConsumeSinglePartitionResponse2.getPartitionData();
                    Assertions.assertNotNull(partitionData2);
                    Assertions.assertNotNull(partitionData2.getRecords());
                    Assertions.assertTrue(partitionData2.getRecords().isEmpty());
                }
            }
        }
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartition_EmptyPostBody_Ok(boolean z) {
        testSimpleConsumeFromMultiplePartitionsOk(new JSONObject(), false, z);
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartition_NoPositionParams_Ok(boolean z) {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("max_poll_records", 100);
        testSimpleConsumeFromMultiplePartitionsOk(jSONObject, false, z);
        JSONObject jSONObject2 = new JSONObject();
        jSONObject2.put("fetch_max_bytes", 100000);
        testSimpleConsumeFromMultiplePartitionsOk(jSONObject2, false, z);
        JSONObject jSONObject3 = new JSONObject();
        jSONObject3.put("message_max_bytes", 1048576);
        testSimpleConsumeFromMultiplePartitionsOk(jSONObject3, false, z);
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartitionFromOffsets_Ok(boolean z) {
        JSONObject jSONObject = new JSONObject();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            JSONObject jSONObject2 = new JSONObject();
            jSONObject2.put("partition_id", i);
            jSONObject2.put("offset", 0);
            arrayList.add(jSONObject2);
        }
        jSONObject.put("offsets", (Collection) arrayList);
        testSimpleConsumeFromMultiplePartitionsOk(jSONObject, true, z);
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartitionFromBeginning_Ok(boolean z) {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("from_beginning", "true");
        testSimpleConsumeFromMultiplePartitionsOk(jSONObject, true, z);
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartitionFromTimestamp_Ok(boolean z) {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("timestamp", 0L);
        testSimpleConsumeFromMultiplePartitionsOk(jSONObject, true, z);
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartition_MoreThanOnePositionsCriteria_returnHttp400(boolean z) {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("from_beginning", "true");
        jSONObject.put("timestamp", 0L);
        Response post = request(getPath(z, getClusterId())).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), post.getStatus());
                if (post != null) {
                    if (0 == 0) {
                        post.close();
                        return;
                    }
                    try {
                        post.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (post != null) {
                if (th != null) {
                    try {
                        post.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    post.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartitionInvalidOffset_ReturnsHttp400(boolean z) {
        produceToAllPartitions();
        String path = getPath(z, getClusterId());
        JSONObject jSONObject = new JSONObject();
        ArrayList arrayList = new ArrayList();
        int i = 0;
        while (i < NUM_PARTITIONS) {
            JSONObject jSONObject2 = new JSONObject();
            jSONObject2.put("partition_id", i);
            jSONObject2.put("offset", i == 2 ? -1 : 0);
            arrayList.add(jSONObject2);
            i++;
        }
        jSONObject.put("offsets", (Collection) arrayList);
        Response post = request(path).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
        Throwable th = null;
        try {
            Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), post.getStatus());
            if (post != null) {
                if (0 == 0) {
                    post.close();
                    return;
                }
                try {
                    post.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (post != null) {
                if (0 != 0) {
                    try {
                        post.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    post.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartitionByTimestampTooEarly_Ok(boolean z) {
        long[] produceToOnePartition = produceToOnePartition(NUM_RECORDS);
        String clusterId = getClusterId();
        String path = getPath(z, clusterId);
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("timestamp", 1L);
        Response post = request(path).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
                SimpleConsumeMultiPartitionResponse simpleConsumeMultiPartitionResponse = (SimpleConsumeMultiPartitionResponse) post.readEntity(SimpleConsumeMultiPartitionResponse.class);
                if (post != null) {
                    if (0 != 0) {
                        try {
                            post.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        post.close();
                    }
                }
                PartitionConsumeData partitionConsumeData = validatePartitionData(simpleConsumeMultiPartitionResponse, clusterId).get(0);
                Assertions.assertEquals(NUM_RECORDS, partitionConsumeData.getRecords().size());
                boolean z2 = false;
                for (int i = 0; i < NUM_RECORDS; i++) {
                    PartitionConsumeRecord partitionConsumeRecord = (PartitionConsumeRecord) partitionConsumeData.getRecords().get(i);
                    Assertions.assertEquals(0L, partitionConsumeRecord.getPartitionId());
                    Assertions.assertEquals(i, partitionConsumeRecord.getOffset());
                    Assertions.assertEquals(produceToOnePartition[i], partitionConsumeRecord.getTimestamp());
                    Assertions.assertEquals("key" + i, partitionConsumeRecord.getKey());
                    Assertions.assertEquals("value" + i, partitionConsumeRecord.getValue());
                    z2 = true;
                }
                Assertions.assertTrue(z2);
            } finally {
            }
        } catch (Throwable th3) {
            if (post != null) {
                if (th != null) {
                    try {
                        post.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    post.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartitionByTimestampSkipEarlier_Ok(boolean z) throws InterruptedException {
        produceToOnePartition(NUM_RECORDS);
        Thread.sleep(500L);
        long[] produceToOnePartition = produceToOnePartition(NUM_RECORDS);
        String clusterId = getClusterId();
        String path = getPath(z, clusterId);
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("timestamp", Arrays.stream(produceToOnePartition).min().getAsLong());
        Response post = request(path).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
                SimpleConsumeMultiPartitionResponse simpleConsumeMultiPartitionResponse = (SimpleConsumeMultiPartitionResponse) post.readEntity(SimpleConsumeMultiPartitionResponse.class);
                if (post != null) {
                    if (0 != 0) {
                        try {
                            post.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        post.close();
                    }
                }
                PartitionConsumeData partitionConsumeData = validatePartitionData(simpleConsumeMultiPartitionResponse, clusterId).get(0);
                Assertions.assertEquals(NUM_RECORDS, partitionConsumeData.getRecords().size());
                boolean z2 = false;
                for (int i = 0; i < NUM_RECORDS; i++) {
                    PartitionConsumeRecord partitionConsumeRecord = (PartitionConsumeRecord) partitionConsumeData.getRecords().get(i);
                    Assertions.assertEquals(0L, partitionConsumeRecord.getPartitionId());
                    Assertions.assertEquals(i + NUM_RECORDS, partitionConsumeRecord.getOffset());
                    Assertions.assertEquals(produceToOnePartition[i], partitionConsumeRecord.getTimestamp());
                    Assertions.assertEquals("key" + i, partitionConsumeRecord.getKey());
                    Assertions.assertEquals("value" + i, partitionConsumeRecord.getValue());
                    z2 = true;
                }
                Assertions.assertTrue(z2);
            } finally {
            }
        } catch (Throwable th3) {
            if (post != null) {
                if (th != null) {
                    try {
                        post.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    post.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartitionByTimestampTooLate_Ok(boolean z) {
        long[] produceToOnePartition = produceToOnePartition(NUM_RECORDS);
        String clusterId = getClusterId();
        String path = getPath(z, clusterId);
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("timestamp", Arrays.stream(produceToOnePartition).max().getAsLong() + 1);
        Response post = request(path).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
                SimpleConsumeMultiPartitionResponse simpleConsumeMultiPartitionResponse = (SimpleConsumeMultiPartitionResponse) post.readEntity(SimpleConsumeMultiPartitionResponse.class);
                if (post != null) {
                    if (0 != 0) {
                        try {
                            post.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        post.close();
                    }
                }
                Assertions.assertEquals(0, validatePartitionData(simpleConsumeMultiPartitionResponse, clusterId).get(0).getRecords().size());
            } finally {
            }
        } catch (Throwable th3) {
            if (post != null) {
                if (th != null) {
                    try {
                        post.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    post.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(ints = {-1, 1, 4})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartition_WithMaxPollRecords_Ok(int i) {
        produceToOnePartition(0, NUM_RECORDS);
        produceToOnePartition(1, NUM_RECORDS);
        String clusterId = getClusterId();
        testSimpleConsume_fromMultiplePartition_WithMaxPollRecords_Ok(i, clusterId, false);
        testSimpleConsume_fromMultiplePartition_WithMaxPollRecords_Ok(i, clusterId, true);
    }

    private void testSimpleConsume_fromMultiplePartition_WithMaxPollRecords_Ok(int i, String str, boolean z) {
        String path = getPath(z, str);
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("from_beginning", true);
        jSONObject.put("max_poll_records", String.valueOf(i));
        Response post = request(path).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
                SimpleConsumeMultiPartitionResponse simpleConsumeMultiPartitionResponse = (SimpleConsumeMultiPartitionResponse) post.readEntity(SimpleConsumeMultiPartitionResponse.class);
                if (post != null) {
                    if (0 != 0) {
                        try {
                            post.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        post.close();
                    }
                }
                List<PartitionConsumeData> validatePartitionData = validatePartitionData(simpleConsumeMultiPartitionResponse, str);
                validatePartitionData.sort(Comparator.comparing((v0) -> {
                    return v0.getPartitionId();
                }));
                int sum = validatePartitionData.stream().mapToInt(partitionConsumeData -> {
                    return partitionConsumeData.getRecords().size();
                }).sum();
                if (i <= 1) {
                    Assertions.assertEquals(10, sum);
                } else {
                    if (!z) {
                        Assertions.assertEquals(i, sum);
                        return;
                    }
                    MatcherAssert.assertThat(Integer.valueOf(validatePartitionData.get(0).getRecords().size()), Matchers.greaterThan(0));
                    MatcherAssert.assertThat(Integer.valueOf(validatePartitionData.get(1).getRecords().size()), Matchers.greaterThan(0));
                    Assertions.assertTrue(validatePartitionData.get(2).getRecords().isEmpty());
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (post != null) {
                if (th != null) {
                    try {
                        post.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    post.close();
                }
            }
            throw th4;
        }
    }

    @ValueSource(ints = {-1, 10, 1048576})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartition_WithFetchMaxBytes(int i) {
        produceToOnePartition(0, NUM_RECORDS);
        produceToOnePartition(2, NUM_RECORDS);
        String clusterId = getClusterId();
        testSimpleConsume_fromMultiplePartition_WithFetchMaxBytes(i, clusterId, false);
        testSimpleConsume_fromMultiplePartition_WithFetchMaxBytes(i, clusterId, true);
    }

    private void testSimpleConsume_fromMultiplePartition_WithFetchMaxBytes(int i, String str, boolean z) {
        String path = getPath(z, str);
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("from_beginning", true);
        jSONObject.put("fetch_max_bytes", String.valueOf(i));
        SimpleConsumeMultiPartitionResponse simpleConsumeMultiPartitionResponse = null;
        Response post = request(path).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
        Throwable th = null;
        try {
            if (i < 0) {
                Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), post.getStatus());
            } else {
                Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
                simpleConsumeMultiPartitionResponse = (SimpleConsumeMultiPartitionResponse) post.readEntity(SimpleConsumeMultiPartitionResponse.class);
            }
            if (post != null) {
                if (0 != 0) {
                    try {
                        post.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    post.close();
                }
            }
            if (i >= 0) {
                int sum = validatePartitionData(simpleConsumeMultiPartitionResponse, str).stream().mapToInt(partitionConsumeData -> {
                    return partitionConsumeData.getRecords().size();
                }).sum();
                if (i > 10) {
                    Assertions.assertEquals(10, sum);
                } else if (z) {
                    MatcherAssert.assertThat(Integer.valueOf(sum), Matchers.greaterThan(Integer.valueOf(NUM_RECORDS)));
                } else {
                    MatcherAssert.assertThat(Integer.valueOf(sum), Matchers.lessThanOrEqualTo(Integer.valueOf(NUM_RECORDS)));
                }
            }
        } catch (Throwable th3) {
            if (post != null) {
                if (0 != 0) {
                    try {
                        post.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    post.close();
                }
            }
            throw th3;
        }
    }

    @ValueSource(ints = {-1, 20, 1048576})
    @ParameterizedTest
    public void testSimpleConsume_fromMultiplePartition_WithMessageMaxBytes(int i) {
        produceToOnePartition(0, NUM_RECORDS, 0, 20);
        produceToOnePartition(2, NUM_RECORDS, 0, 20);
        String clusterId = getClusterId();
        testSimpleConsume_fromMultiplePartition_WithMessageMaxBytes(i, clusterId, false);
        testSimpleConsume_fromMultiplePartition_WithMessageMaxBytes(i, clusterId, true);
    }

    private void testSimpleConsume_fromMultiplePartition_WithMessageMaxBytes(int i, String str, boolean z) {
        String path = getPath(z, str);
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("from_beginning", true);
        jSONObject.put("message_max_bytes", String.valueOf(i));
        SimpleConsumeMultiPartitionResponse simpleConsumeMultiPartitionResponse = null;
        Response post = request(path).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
        Throwable th = null;
        try {
            if (i < 0) {
                Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), post.getStatus());
            } else {
                Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
                simpleConsumeMultiPartitionResponse = (SimpleConsumeMultiPartitionResponse) post.readEntity(SimpleConsumeMultiPartitionResponse.class);
            }
            if (post != null) {
                if (0 != 0) {
                    try {
                        post.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    post.close();
                }
            }
            if (i >= 0) {
                List list = (List) validatePartitionData(simpleConsumeMultiPartitionResponse, str).stream().flatMap(partitionConsumeData -> {
                    return partitionConsumeData.getRecords().stream();
                }).collect(Collectors.toList());
                Assertions.assertEquals(10, list.size());
                list.forEach(partitionConsumeRecord -> {
                    if (i > 20) {
                        Assertions.assertNotNull(partitionConsumeRecord.getKey());
                        Assertions.assertNotNull(partitionConsumeRecord.getValue());
                        Assertions.assertNull(partitionConsumeRecord.getExceededFields());
                    } else {
                        PartitionConsumeRecord.ExceededFields exceededFields = partitionConsumeRecord.getExceededFields();
                        Assertions.assertNotNull(exceededFields);
                        Assertions.assertNotNull(partitionConsumeRecord.getKey());
                        Assertions.assertFalse(exceededFields.isKey());
                        Assertions.assertNull(partitionConsumeRecord.getValue());
                        Assertions.assertTrue(exceededFields.isValue());
                    }
                });
            }
        } catch (Throwable th3) {
            if (post != null) {
                if (0 != 0) {
                    try {
                        post.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    post.close();
                }
            }
            throw th3;
        }
    }

    private static String getPath(boolean z, String str) {
        return z ? String.format(MULTI_PARTITION_GUARANTEE_REQUEST_PATH_TMPL, str, TOPIC_NAME) : String.format(MULTI_PARTITION_REQUEST_PATH_TMPL, str, TOPIC_NAME);
    }

    private static List<PartitionConsumeData> validatePartitionData(SimpleConsumeMultiPartitionResponse simpleConsumeMultiPartitionResponse, String str) {
        Assertions.assertNotNull(simpleConsumeMultiPartitionResponse);
        Assertions.assertEquals(str, simpleConsumeMultiPartitionResponse.getClusterId());
        Assertions.assertEquals(TOPIC_NAME, simpleConsumeMultiPartitionResponse.getTopicName());
        List<PartitionConsumeData> partitionDataList = simpleConsumeMultiPartitionResponse.getPartitionDataList();
        Assertions.assertNotNull(partitionDataList);
        Assertions.assertEquals(NUM_PARTITIONS, partitionDataList.size());
        return partitionDataList;
    }

    private void testSimpleConsumeFromMultiplePartitionsOk(JSONObject jSONObject, boolean z, boolean z2) {
        long[] produceToAllPartitions = produceToAllPartitions();
        String clusterId = getClusterId();
        Response post = request(getPath(z2, clusterId)).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
        Throwable th = null;
        try {
            try {
                Assertions.assertEquals(Response.Status.OK.getStatusCode(), post.getStatus());
                SimpleConsumeMultiPartitionResponse simpleConsumeMultiPartitionResponse = (SimpleConsumeMultiPartitionResponse) post.readEntity(SimpleConsumeMultiPartitionResponse.class);
                if (post != null) {
                    if (0 != 0) {
                        try {
                            post.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        post.close();
                    }
                }
                Assertions.assertNotNull(simpleConsumeMultiPartitionResponse);
                Assertions.assertEquals(clusterId, simpleConsumeMultiPartitionResponse.getClusterId());
                Assertions.assertEquals(TOPIC_NAME, simpleConsumeMultiPartitionResponse.getTopicName());
                List partitionDataList = simpleConsumeMultiPartitionResponse.getPartitionDataList();
                Assertions.assertTrue(partitionDataList != null && partitionDataList.size() == NUM_PARTITIONS);
                boolean z3 = false;
                for (int i = 0; i < NUM_PARTITIONS; i++) {
                    PartitionConsumeData partitionConsumeData = (PartitionConsumeData) partitionDataList.get(i);
                    Assertions.assertNotNull(partitionConsumeData);
                    Assertions.assertEquals(i, partitionConsumeData.getPartitionId());
                    List records = partitionConsumeData.getRecords();
                    if (records != null && !records.isEmpty()) {
                        PartitionConsumeRecord partitionConsumeRecord = (PartitionConsumeRecord) partitionConsumeData.getRecords().get(0);
                        Assertions.assertEquals(0L, partitionConsumeRecord.getOffset());
                        Assertions.assertEquals(i, partitionConsumeRecord.getPartitionId());
                        Assertions.assertEquals(produceToAllPartitions[i], partitionConsumeRecord.getTimestamp());
                        Assertions.assertEquals("key" + i, partitionConsumeRecord.getKey());
                        Assertions.assertEquals("value" + i, partitionConsumeRecord.getValue());
                        z3 = true;
                    }
                }
                if (z) {
                    Assertions.assertTrue(z3);
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (post != null) {
                if (th != null) {
                    try {
                        post.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    post.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSimpleConsumeOffsets_fromSinglePartitionWithoutPositionCriteria_Ok() {
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i))).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeOffsetsSinglePartitionResponse simpleConsumeOffsetsSinglePartitionResponse = (SimpleConsumeOffsetsSinglePartitionResponse) response.readEntity(SimpleConsumeOffsetsSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeOffsetsSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeOffsetsSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsSinglePartitionResponse.getTopicName());
            Assertions.assertEquals(i, simpleConsumeOffsetsSinglePartitionResponse.getPartitionId());
            Assertions.assertEquals(0L, simpleConsumeOffsetsSinglePartitionResponse.getNextOffset());
        }
    }

    @Test
    public void testSimpleConsumeOffsets_fromSinglePartitionWithFromBeginningTrue_Ok() {
        produceToAllPartitions();
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("from_beginning", "true")).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeOffsetsSinglePartitionResponse simpleConsumeOffsetsSinglePartitionResponse = (SimpleConsumeOffsetsSinglePartitionResponse) response.readEntity(SimpleConsumeOffsetsSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeOffsetsSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeOffsetsSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsSinglePartitionResponse.getTopicName());
            Assertions.assertEquals(i, simpleConsumeOffsetsSinglePartitionResponse.getPartitionId());
            Assertions.assertEquals(0L, simpleConsumeOffsetsSinglePartitionResponse.getNextOffset());
        }
    }

    @Test
    public void testSimpleConsumeOffsets_fromSinglePartitionWithFromBeginningFalse_Ok() {
        produceToAllPartitions();
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("from_beginning", "false")).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeOffsetsSinglePartitionResponse simpleConsumeOffsetsSinglePartitionResponse = (SimpleConsumeOffsetsSinglePartitionResponse) response.readEntity(SimpleConsumeOffsetsSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeOffsetsSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeOffsetsSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsSinglePartitionResponse.getTopicName());
            Assertions.assertEquals(i, simpleConsumeOffsetsSinglePartitionResponse.getPartitionId());
            Assertions.assertEquals(1L, simpleConsumeOffsetsSinglePartitionResponse.getNextOffset());
        }
    }

    @Test
    public void testSimpleConsumeOffsets_fromSinglePartitionWithTimestamp_Ok() {
        long[] produceToAllPartitions = produceToAllPartitions();
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("timestamp", Long.toString(produceToAllPartitions[i]))).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeOffsetsSinglePartitionResponse simpleConsumeOffsetsSinglePartitionResponse = (SimpleConsumeOffsetsSinglePartitionResponse) response.readEntity(SimpleConsumeOffsetsSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeOffsetsSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeOffsetsSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsSinglePartitionResponse.getTopicName());
            Assertions.assertEquals(i, simpleConsumeOffsetsSinglePartitionResponse.getPartitionId());
            Assertions.assertEquals(0L, simpleConsumeOffsetsSinglePartitionResponse.getNextOffset());
        }
    }

    @Test
    public void testSimpleConsumeOffsets_fromSinglePartitionWithTimestampExceeded_Ok() {
        long[] produceToAllPartitions = produceToAllPartitions();
        String clusterId = getClusterId();
        for (int i = 0; i < NUM_PARTITIONS; i++) {
            Response response = request(String.format(SINGLE_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, Integer.valueOf(i)), Collections.singletonMap("timestamp", Long.toString(produceToAllPartitions[i]) + 1)).get();
            Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
            SimpleConsumeOffsetsSinglePartitionResponse simpleConsumeOffsetsSinglePartitionResponse = (SimpleConsumeOffsetsSinglePartitionResponse) response.readEntity(SimpleConsumeOffsetsSinglePartitionResponse.class);
            Assertions.assertNotNull(simpleConsumeOffsetsSinglePartitionResponse);
            Assertions.assertEquals(clusterId, simpleConsumeOffsetsSinglePartitionResponse.getClusterId());
            Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsSinglePartitionResponse.getTopicName());
            Assertions.assertEquals(i, simpleConsumeOffsetsSinglePartitionResponse.getPartitionId());
            Assertions.assertEquals(1L, simpleConsumeOffsetsSinglePartitionResponse.getNextOffset());
        }
    }

    @Test
    public void testSimpleConsumeOffsets_fromSinglePartitionWithNegativeTimestamp_ReturnsHttp400() {
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_OFFSETS_REQUEST_PATH_TMPL, getClusterId(), TOPIC_NAME, 0), Collections.singletonMap("timestamp", "-1")).get().getStatus());
    }

    @Test
    public void testSimpleConsumeOffsets_fromSinglePartitionWithEmptyTimestamp_ReturnsHttp400() {
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_OFFSETS_REQUEST_PATH_TMPL, getClusterId(), TOPIC_NAME, 0), Collections.singletonMap("timestamp", "")).get().getStatus());
    }

    @Test
    public void testSimpleConsumeOffsets_fromSinglePartitionWithNonnumericTimestamp_ReturnsHttp400() {
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(SINGLE_PARTITION_OFFSETS_REQUEST_PATH_TMPL, getClusterId(), TOPIC_NAME, 0), Collections.singletonMap("timestamp", "fruitbat")).get().getStatus());
    }

    @Test
    public void testSimpleConsumeOffsets_fromSinglePartitionWithInvalidTimestamp_ReturnsHttp200() {
        String clusterId = getClusterId();
        Response response = request(String.format(SINGLE_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME, 0), Collections.singletonMap("timestamp", Long.toString(Long.MAX_VALUE))).get();
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        SimpleConsumeOffsetsSinglePartitionResponse simpleConsumeOffsetsSinglePartitionResponse = (SimpleConsumeOffsetsSinglePartitionResponse) response.readEntity(SimpleConsumeOffsetsSinglePartitionResponse.class);
        Assertions.assertNotNull(simpleConsumeOffsetsSinglePartitionResponse);
        Assertions.assertEquals(clusterId, simpleConsumeOffsetsSinglePartitionResponse.getClusterId());
        Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsSinglePartitionResponse.getTopicName());
        Assertions.assertEquals(0, simpleConsumeOffsetsSinglePartitionResponse.getPartitionId());
        Assertions.assertEquals(0L, simpleConsumeOffsetsSinglePartitionResponse.getNextOffset());
    }

    @Test
    public void testSimpleConsumeOffsets_fromMultiplePartitionsWithoutPositionCriteria_Ok() {
        String clusterId = getClusterId();
        Response response = request(String.format(MULTI_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME)).get();
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        SimpleConsumeOffsetsMultiPartitionResponse simpleConsumeOffsetsMultiPartitionResponse = (SimpleConsumeOffsetsMultiPartitionResponse) response.readEntity(SimpleConsumeOffsetsMultiPartitionResponse.class);
        Assertions.assertNotNull(simpleConsumeOffsetsMultiPartitionResponse);
        Assertions.assertEquals(clusterId, simpleConsumeOffsetsMultiPartitionResponse.getClusterId());
        Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsMultiPartitionResponse.getTopicName());
        for (int i = 0; i < simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().size(); i++) {
            PartitionOffsetData partitionOffsetData = (PartitionOffsetData) simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().get(i);
            Assertions.assertEquals(i, partitionOffsetData.getPartitionId());
            Assertions.assertEquals(0L, partitionOffsetData.getNextOffset());
        }
        Assertions.assertEquals(0L, (Long) simpleConsumeOffsetsMultiPartitionResponse.getTotalRecords().get());
    }

    @Test
    public void testSimpleConsumeOffsets_fromMultiplePartitionsWithFromBeginningTrue_Ok() {
        produceToAllPartitions();
        String clusterId = getClusterId();
        Response response = request(String.format(MULTI_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME), Collections.singletonMap("from_beginning", "true")).get();
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        SimpleConsumeOffsetsMultiPartitionResponse simpleConsumeOffsetsMultiPartitionResponse = (SimpleConsumeOffsetsMultiPartitionResponse) response.readEntity(SimpleConsumeOffsetsMultiPartitionResponse.class);
        Assertions.assertNotNull(simpleConsumeOffsetsMultiPartitionResponse);
        Assertions.assertEquals(clusterId, simpleConsumeOffsetsMultiPartitionResponse.getClusterId());
        Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsMultiPartitionResponse.getTopicName());
        for (int i = 0; i < simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().size(); i++) {
            PartitionOffsetData partitionOffsetData = (PartitionOffsetData) simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().get(i);
            Assertions.assertEquals(i, partitionOffsetData.getPartitionId());
            Assertions.assertEquals(0L, partitionOffsetData.getNextOffset());
        }
        Assertions.assertEquals(3L, (Long) simpleConsumeOffsetsMultiPartitionResponse.getTotalRecords().get());
    }

    @Test
    public void testSimpleConsumeOffsets_fromMultiplePartitionsWithFromBeginningFalse_Ok() {
        produceToAllPartitions();
        String clusterId = getClusterId();
        Response response = request(String.format(MULTI_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME), Collections.singletonMap("from_beginning", "false")).get();
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        SimpleConsumeOffsetsMultiPartitionResponse simpleConsumeOffsetsMultiPartitionResponse = (SimpleConsumeOffsetsMultiPartitionResponse) response.readEntity(SimpleConsumeOffsetsMultiPartitionResponse.class);
        Assertions.assertNotNull(simpleConsumeOffsetsMultiPartitionResponse);
        Assertions.assertEquals(clusterId, simpleConsumeOffsetsMultiPartitionResponse.getClusterId());
        Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsMultiPartitionResponse.getTopicName());
        for (int i = 0; i < simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().size(); i++) {
            PartitionOffsetData partitionOffsetData = (PartitionOffsetData) simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().get(i);
            Assertions.assertEquals(i, partitionOffsetData.getPartitionId());
            Assertions.assertEquals(1L, partitionOffsetData.getNextOffset());
        }
        Assertions.assertEquals(3L, (Long) simpleConsumeOffsetsMultiPartitionResponse.getTotalRecords().get());
    }

    @Test
    public void testSimpleConsumeOffsets_fromMultiplePartitionsWithTimestamp_Ok() {
        long[] produceToAllPartitions = produceToAllPartitions();
        String clusterId = getClusterId();
        Response response = request(String.format(MULTI_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME), Collections.singletonMap("timestamp", Long.toString(produceToAllPartitions[0]))).get();
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        SimpleConsumeOffsetsMultiPartitionResponse simpleConsumeOffsetsMultiPartitionResponse = (SimpleConsumeOffsetsMultiPartitionResponse) response.readEntity(SimpleConsumeOffsetsMultiPartitionResponse.class);
        Assertions.assertNotNull(simpleConsumeOffsetsMultiPartitionResponse);
        Assertions.assertEquals(clusterId, simpleConsumeOffsetsMultiPartitionResponse.getClusterId());
        Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsMultiPartitionResponse.getTopicName());
        for (int i = 0; i < simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().size(); i++) {
            PartitionOffsetData partitionOffsetData = (PartitionOffsetData) simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().get(i);
            Assertions.assertEquals(i, partitionOffsetData.getPartitionId());
            Assertions.assertEquals(0L, partitionOffsetData.getNextOffset());
        }
        Assertions.assertEquals(3L, (Long) simpleConsumeOffsetsMultiPartitionResponse.getTotalRecords().get());
    }

    @Test
    public void testSimpleConsumeOffsets_fromMultiplePartitionsWithTimestampExceeded_Ok() {
        long[] produceToAllPartitions = produceToAllPartitions();
        String clusterId = getClusterId();
        Response response = request(String.format(MULTI_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME), Collections.singletonMap("timestamp", Long.toString(produceToAllPartitions[2] + 1))).get();
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        SimpleConsumeOffsetsMultiPartitionResponse simpleConsumeOffsetsMultiPartitionResponse = (SimpleConsumeOffsetsMultiPartitionResponse) response.readEntity(SimpleConsumeOffsetsMultiPartitionResponse.class);
        Assertions.assertNotNull(simpleConsumeOffsetsMultiPartitionResponse);
        Assertions.assertEquals(clusterId, simpleConsumeOffsetsMultiPartitionResponse.getClusterId());
        Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsMultiPartitionResponse.getTopicName());
        for (int i = 0; i < simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().size(); i++) {
            PartitionOffsetData partitionOffsetData = (PartitionOffsetData) simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().get(i);
            Assertions.assertEquals(i, partitionOffsetData.getPartitionId());
            Assertions.assertEquals(1L, partitionOffsetData.getNextOffset());
        }
        Assertions.assertEquals(3L, (Long) simpleConsumeOffsetsMultiPartitionResponse.getTotalRecords().get());
    }

    @Test
    public void testSimpleConsumeOffsets_fromMultiplePartitionsWithNegativeTimestamp_ReturnsHttp400() {
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(MULTI_PARTITION_OFFSETS_REQUEST_PATH_TMPL, getClusterId(), TOPIC_NAME), Collections.singletonMap("timestamp", "-1")).get().getStatus());
    }

    @Test
    public void testSimpleConsumeOffsets_fromMultiplePartitionsWithEmptyTimestamp_ReturnsHttp400() {
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(MULTI_PARTITION_OFFSETS_REQUEST_PATH_TMPL, getClusterId(), TOPIC_NAME), Collections.singletonMap("timestamp", "")).get().getStatus());
    }

    @Test
    public void testSimpleConsumeOffsets_fromMultiplePartitionsWithNonnumericTimestamp_ReturnsHttp400() {
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), request(String.format(MULTI_PARTITION_OFFSETS_REQUEST_PATH_TMPL, getClusterId(), TOPIC_NAME), Collections.singletonMap("timestamp", "fruitbat")).get().getStatus());
    }

    @Test
    public void testSimpleConsumeOffsets_fromMultiplePartitionsWithInvalidTimestamp_ReturnsHttp200() {
        String clusterId = getClusterId();
        Response response = request(String.format(MULTI_PARTITION_OFFSETS_REQUEST_PATH_TMPL, clusterId, TOPIC_NAME), Collections.singletonMap("timestamp", Long.toString(Long.MAX_VALUE))).get();
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        SimpleConsumeOffsetsMultiPartitionResponse simpleConsumeOffsetsMultiPartitionResponse = (SimpleConsumeOffsetsMultiPartitionResponse) response.readEntity(SimpleConsumeOffsetsMultiPartitionResponse.class);
        Assertions.assertNotNull(simpleConsumeOffsetsMultiPartitionResponse);
        Assertions.assertEquals(clusterId, simpleConsumeOffsetsMultiPartitionResponse.getClusterId());
        Assertions.assertEquals(TOPIC_NAME, simpleConsumeOffsetsMultiPartitionResponse.getTopicName());
        for (int i = 0; i < simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().size(); i++) {
            PartitionOffsetData partitionOffsetData = (PartitionOffsetData) simpleConsumeOffsetsMultiPartitionResponse.getPartitionOffsetList().get(i);
            Assertions.assertEquals(i, partitionOffsetData.getPartitionId());
            Assertions.assertEquals(0L, partitionOffsetData.getNextOffset());
        }
        Assertions.assertEquals(0L, (Long) simpleConsumeOffsetsMultiPartitionResponse.getTotalRecords().get());
    }
}
