package io.confluent.kafkarest.controllers;

import io.confluent.kafkarest.entities.v3.UpdateConsumerGroupOffsetsRequest;
import io.confluent.kafkarest.entities.v3.UpdateConsumerGroupOffsetsResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.easymock.IArgumentMatcher;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:io/confluent/kafkarest/controllers/ConsumerGroupOffsetsManagerImplTest.class */
public class ConsumerGroupOffsetsManagerImplTest extends EasyMockSupport {
    Admin adminClient;
    ConsumerGroupOffsetsManager manager;
    long tp3NewOffset;
    UpdateConsumerGroupOffsetsRequest updateOffsetsRequest;
    AlterConsumerGroupOffsetsResult alterOffsetsResult;
    String consumerGroupId = "test_cg";
    TopicPartition tp1 = new TopicPartition("test", 1);
    TopicPartition tp2 = new TopicPartition("test", 2);
    TopicPartition tp3 = new TopicPartition("test", 3);
    Map<TopicPartition, Long> startOffsets = new HashMap();
    Map<TopicPartition, Long> endOffsets = new HashMap();

    @BeforeEach
    public void setup() {
        this.adminClient = (Admin) mock(Admin.class);
        this.manager = new ConsumerGroupOffsetsManagerImpl(this.adminClient);
        this.startOffsets.put(this.tp1, 100L);
        this.endOffsets.put(this.tp1, 199L);
        this.startOffsets.put(this.tp2, 200L);
        this.endOffsets.put(this.tp2, 299L);
        this.startOffsets.put(this.tp3, 300L);
        this.endOffsets.put(this.tp3, 399L);
        this.tp3NewOffset = 350L;
        setupUpdateOffsetRequest(this.tp3NewOffset);
    }

    @Test
    public void testUpdateConsumerGroupOffsets_WithResetRequestOfAllOffsetTypes() throws ExecutionException, InterruptedException {
        setupAdminClientForListOffsets(true, Collections.EMPTY_SET);
        setupAdminClientForListOffsets(false, Collections.EMPTY_SET);
        setupAdminClientForAlterOffsets(Collections.EMPTY_SET, Collections.EMPTY_SET);
        replayAll();
        HashSet hashSet = new HashSet();
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp1.topic(), Integer.valueOf(this.tp1.partition()), Integer.valueOf(Response.Status.OK.getStatusCode()), ""));
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp2.topic(), Integer.valueOf(this.tp2.partition()), Integer.valueOf(Response.Status.OK.getStatusCode()), ""));
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp3.topic(), Integer.valueOf(this.tp3.partition()), Integer.valueOf(Response.Status.OK.getStatusCode()), ""));
        Assertions.assertEquals(hashSet, new HashSet(((UpdateConsumerGroupOffsetsResponse) this.manager.updateConsumerGroupOffsets(this.consumerGroupId, this.updateOffsetsRequest).get()).getUpdateResponses()));
        verifyAll();
    }

    @Test
    public void testUpdateConsumerGroupOffsets_WithInvalidOffsetBeforePartitionLogStart() throws ExecutionException, InterruptedException {
        setupAdminClientForListOffsets(true, Collections.EMPTY_SET);
        setupAdminClientForListOffsets(false, Collections.EMPTY_SET);
        setupAdminClientForAlterOffsets(new HashSet(Arrays.asList(this.tp3)), Collections.EMPTY_SET);
        replayAll();
        HashSet hashSet = new HashSet();
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp1.topic(), Integer.valueOf(this.tp1.partition()), Integer.valueOf(Response.Status.OK.getStatusCode()), ""));
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp2.topic(), Integer.valueOf(this.tp2.partition()), Integer.valueOf(Response.Status.OK.getStatusCode()), ""));
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp3.topic(), Integer.valueOf(this.tp3.partition()), Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()), "For test-3, new offset 299 is less than start-offset 300."));
        setupUpdateOffsetRequest(this.startOffsets.get(this.tp3).longValue() - 1);
        Assertions.assertEquals(hashSet, new HashSet(((UpdateConsumerGroupOffsetsResponse) this.manager.updateConsumerGroupOffsets(this.consumerGroupId, this.updateOffsetsRequest).get()).getUpdateResponses()));
        verifyAll();
    }

    @Test
    public void testUpdateConsumerGroupOffsets_WithInvalidOffsetAfterPartitionLogEnd() throws ExecutionException, InterruptedException {
        setupAdminClientForListOffsets(true, Collections.EMPTY_SET);
        setupAdminClientForListOffsets(false, Collections.EMPTY_SET);
        setupAdminClientForAlterOffsets(new HashSet(Arrays.asList(this.tp3)), Collections.EMPTY_SET);
        replayAll();
        HashSet hashSet = new HashSet();
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp1.topic(), Integer.valueOf(this.tp1.partition()), Integer.valueOf(Response.Status.OK.getStatusCode()), ""));
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp2.topic(), Integer.valueOf(this.tp2.partition()), Integer.valueOf(Response.Status.OK.getStatusCode()), ""));
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp3.topic(), Integer.valueOf(this.tp3.partition()), Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()), "For test-3, new offset 400 is greater than end-offset 399."));
        setupUpdateOffsetRequest(this.endOffsets.get(this.tp3).longValue() + 1);
        Assertions.assertEquals(hashSet, new HashSet(((UpdateConsumerGroupOffsetsResponse) this.manager.updateConsumerGroupOffsets(this.consumerGroupId, this.updateOffsetsRequest).get()).getUpdateResponses()));
        verifyAll();
    }

    @EnumSource(value = UpdateConsumerGroupOffsetsRequest.OffsetType.class, names = {"EARLIEST", "OFFSET"})
    @ParameterizedTest
    public void testUpdateConsumerGroupOffsets_ForOffsetType_WithListingStartOffsetFails(UpdateConsumerGroupOffsetsRequest.OffsetType offsetType) throws ExecutionException, InterruptedException {
        Set<TopicPartition> hashSet = new HashSet<>();
        if (offsetType.equals(UpdateConsumerGroupOffsetsRequest.OffsetType.EARLIEST)) {
            hashSet.add(this.tp1);
            setupAdminClientForListOffsets(true, new HashSet(Arrays.asList(this.tp1)));
        } else {
            hashSet.add(this.tp3);
            setupAdminClientForListOffsets(true, new HashSet(Arrays.asList(this.tp3)));
        }
        setupAdminClientForListOffsets(false, Collections.emptySet());
        setupAdminClientForAlterOffsets(hashSet, Collections.EMPTY_SET);
        replayAll();
        HashSet hashSet2 = new HashSet();
        for (TopicPartition topicPartition : new HashSet(Arrays.asList(this.tp1, this.tp2, this.tp3))) {
            if (hashSet.contains(topicPartition)) {
                hashSet2.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Integer.valueOf(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()), String.format("Unexpected server error while listing offsets for %s", topicPartition)));
            } else {
                hashSet2.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Integer.valueOf(Response.Status.OK.getStatusCode()), ""));
            }
        }
        Assertions.assertEquals(hashSet2, new HashSet(((UpdateConsumerGroupOffsetsResponse) this.manager.updateConsumerGroupOffsets(this.consumerGroupId, this.updateOffsetsRequest).get()).getUpdateResponses()));
        verifyAll();
    }

    @EnumSource(value = UpdateConsumerGroupOffsetsRequest.OffsetType.class, names = {"LATEST", "OFFSET"})
    @ParameterizedTest
    public void testUpdateConsumerGroupOffsets_ForOffsetType_WithListingEndOffsetFails(UpdateConsumerGroupOffsetsRequest.OffsetType offsetType) throws ExecutionException, InterruptedException {
        setupAdminClientForListOffsets(true, Collections.emptySet());
        Set<TopicPartition> hashSet = new HashSet<>();
        if (offsetType.equals(UpdateConsumerGroupOffsetsRequest.OffsetType.LATEST)) {
            hashSet.add(this.tp2);
            setupAdminClientForListOffsets(false, new HashSet(Arrays.asList(this.tp2)));
        } else {
            hashSet.add(this.tp3);
            setupAdminClientForListOffsets(false, new HashSet(Arrays.asList(this.tp3)));
        }
        setupAdminClientForAlterOffsets(hashSet, Collections.EMPTY_SET);
        replayAll();
        HashSet hashSet2 = new HashSet();
        for (TopicPartition topicPartition : new HashSet(Arrays.asList(this.tp1, this.tp2, this.tp3))) {
            if (hashSet.contains(topicPartition)) {
                hashSet2.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Integer.valueOf(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()), String.format("Unexpected server error while listing offsets for %s", topicPartition)));
            } else {
                hashSet2.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Integer.valueOf(Response.Status.OK.getStatusCode()), ""));
            }
        }
        Assertions.assertEquals(hashSet2, new HashSet(((UpdateConsumerGroupOffsetsResponse) this.manager.updateConsumerGroupOffsets(this.consumerGroupId, this.updateOffsetsRequest).get()).getUpdateResponses()));
        verifyAll();
    }

    @Test
    public void testUpdateConsumerGroupOffsets_WithAlterConsumerGroupOffsetsFails() throws ExecutionException, InterruptedException {
        setupAdminClientForListOffsets(true, Collections.emptySet());
        setupAdminClientForListOffsets(false, Collections.emptySet());
        setupAdminClientForAlterOffsets(Collections.emptySet(), new HashSet(Arrays.asList(this.tp1, this.tp3)));
        replayAll();
        HashSet hashSet = new HashSet();
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp1.topic(), Integer.valueOf(this.tp1.partition()), Integer.valueOf(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()), String.format("Unexpected server error while altering offsets for %s", this.tp1)));
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp2.topic(), Integer.valueOf(this.tp2.partition()), Integer.valueOf(Response.Status.OK.getStatusCode()), ""));
        hashSet.add(UpdateConsumerGroupOffsetsResponse.TopicPartitionResponse.create(this.tp3.topic(), Integer.valueOf(this.tp3.partition()), Integer.valueOf(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()), String.format("Unexpected server error while altering offsets for %s", this.tp3)));
        Assertions.assertEquals(hashSet, new HashSet(((UpdateConsumerGroupOffsetsResponse) this.manager.updateConsumerGroupOffsets(this.consumerGroupId, this.updateOffsetsRequest).get()).getUpdateResponses()));
        verifyAll();
    }

    private void setupUpdateOffsetRequest(long j) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(UpdateConsumerGroupOffsetsRequest.TopicPartitionOffsetData.create(this.tp1.topic(), Integer.valueOf(this.tp1.partition()), UpdateConsumerGroupOffsetsRequest.OffsetData.create((Long) null, UpdateConsumerGroupOffsetsRequest.OffsetType.EARLIEST)));
        arrayList.add(UpdateConsumerGroupOffsetsRequest.TopicPartitionOffsetData.create(this.tp2.topic(), Integer.valueOf(this.tp2.partition()), UpdateConsumerGroupOffsetsRequest.OffsetData.create((Long) null, UpdateConsumerGroupOffsetsRequest.OffsetType.LATEST)));
        arrayList.add(UpdateConsumerGroupOffsetsRequest.TopicPartitionOffsetData.create(this.tp3.topic(), Integer.valueOf(this.tp3.partition()), UpdateConsumerGroupOffsetsRequest.OffsetData.create(Long.valueOf(j), UpdateConsumerGroupOffsetsRequest.OffsetType.OFFSET)));
        this.updateOffsetsRequest = UpdateConsumerGroupOffsetsRequest.builder().setTopicPartitionOffsetRequests(arrayList).build();
    }

    private void setupAdminClientForListOffsets(boolean z, Set<TopicPartition> set) {
        HashMap hashMap = new HashMap();
        if (z) {
            hashMap.put(this.tp1, OffsetSpec.earliest());
            hashMap.put(this.tp3, OffsetSpec.earliest());
        } else {
            hashMap.put(this.tp2, OffsetSpec.latest());
            hashMap.put(this.tp3, OffsetSpec.latest());
        }
        HashMap hashMap2 = new HashMap();
        if (z) {
            KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
            kafkaFutureImpl.complete(new ListOffsetsResult.ListOffsetsResultInfo(this.startOffsets.get(this.tp1).longValue(), -1L, Optional.of(-1)));
            hashMap2.put(this.tp1, kafkaFutureImpl);
            KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
            kafkaFutureImpl2.complete(new ListOffsetsResult.ListOffsetsResultInfo(this.startOffsets.get(this.tp3).longValue(), -1L, Optional.of(-1)));
            hashMap2.put(this.tp3, kafkaFutureImpl2);
        } else {
            KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
            kafkaFutureImpl3.complete(new ListOffsetsResult.ListOffsetsResultInfo(this.endOffsets.get(this.tp2).longValue(), -1L, Optional.of(-1)));
            hashMap2.put(this.tp2, kafkaFutureImpl3);
            KafkaFutureImpl kafkaFutureImpl4 = new KafkaFutureImpl();
            kafkaFutureImpl4.complete(new ListOffsetsResult.ListOffsetsResultInfo(this.endOffsets.get(this.tp3).longValue(), -1L, Optional.of(-1)));
            hashMap2.put(this.tp3, kafkaFutureImpl4);
        }
        for (TopicPartition topicPartition : set) {
            if (hashMap2.containsKey(topicPartition)) {
                hashMap2.remove(topicPartition);
                KafkaFutureImpl kafkaFutureImpl5 = new KafkaFutureImpl();
                kafkaFutureImpl5.completeExceptionally(new KafkaException("test_exception"));
                hashMap2.put(topicPartition, kafkaFutureImpl5);
            }
        }
        EasyMock.expect(this.adminClient.listOffsets(partitionOffsetSpecMatcher(hashMap))).andReturn(new ListOffsetsResult(hashMap2));
    }

    private void setupAdminClientForAlterOffsets(Set<TopicPartition> set, Set<TopicPartition> set2) {
        HashMap hashMap = new HashMap();
        if (!set.contains(this.tp1)) {
            hashMap.put(this.tp1, new OffsetAndMetadata(this.startOffsets.get(this.tp1).longValue()));
        }
        if (!set.contains(this.tp2)) {
            hashMap.put(this.tp2, new OffsetAndMetadata(this.endOffsets.get(this.tp2).longValue()));
        }
        if (!set.contains(this.tp3)) {
            hashMap.put(this.tp3, new OffsetAndMetadata(this.tp3NewOffset));
        }
        this.alterOffsetsResult = (AlterConsumerGroupOffsetsResult) mock(AlterConsumerGroupOffsetsResult.class);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete((Object) null);
        for (TopicPartition topicPartition : new HashSet(Arrays.asList(this.tp1, this.tp2, this.tp3))) {
            if (!set.contains(topicPartition) && !set2.contains(topicPartition)) {
                EasyMock.expect(this.alterOffsetsResult.partitionResult(topicPartition)).andReturn(kafkaFutureImpl);
            }
        }
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.completeExceptionally(new KafkaException("test exception"));
        Iterator<TopicPartition> it = set2.iterator();
        while (it.hasNext()) {
            EasyMock.expect(this.alterOffsetsResult.partitionResult(it.next())).andReturn(kafkaFutureImpl2);
        }
        EasyMock.expect(this.adminClient.alterConsumerGroupOffsets(this.consumerGroupId, hashMap)).andReturn(this.alterOffsetsResult);
    }

    public static Map<TopicPartition, OffsetSpec> partitionOffsetSpecMatcher(final Map<TopicPartition, OffsetSpec> map) {
        EasyMock.reportMatcher(new IArgumentMatcher() { // from class: io.confluent.kafkarest.controllers.ConsumerGroupOffsetsManagerImplTest.1
            public boolean matches(Object obj) {
                Map map2 = (Map) obj;
                if (map2.size() != map.size()) {
                    return false;
                }
                for (Map.Entry entry : map.entrySet()) {
                    if (!map2.containsKey(entry.getKey()) || ((OffsetSpec) entry.getValue()).getClass() != ((OffsetSpec) map2.get(entry.getKey())).getClass()) {
                        return false;
                    }
                }
                return true;
            }

            public void appendTo(StringBuffer stringBuffer) {
                stringBuffer.append(map);
            }
        });
        return null;
    }
}
