package io.confluent.kafkarest.controllers;

import io.confluent.kafkarest.common.KafkaFutures;
import io.confluent.kafkarest.entities.Balancer;
import io.confluent.kafkarest.entities.BalancerAnyUnevenLoadStatus;
import io.confluent.kafkarest.entities.Broker;
import io.confluent.kafkarest.entities.Cluster;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.NotFoundException;
import org.apache.kafka.clients.admin.BalancerOperationError;
import org.apache.kafka.clients.admin.BalancerStatus;
import org.apache.kafka.clients.admin.BalancerStatusDescription;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeBalancerStatusResult;
import org.apache.kafka.clients.admin.DescribeEvenClusterLoadStatusResult;
import org.apache.kafka.clients.admin.EvenClusterLoadStatus;
import org.apache.kafka.clients.admin.EvenClusterLoadStatusDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.easymock.EasyMock;
import org.easymock.EasyMockExtension;
import org.easymock.Mock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({EasyMockExtension.class})
/* loaded from: input_file:io/confluent/kafkarest/controllers/BalancerManagerImplTest.class */
public final class BalancerManagerImplTest {

    @Mock
    private ConfluentAdmin confluentAdminClient;

    @Mock
    private ClusterManager clusterManager;

    @Mock
    private DescribeBalancerStatusResult getBalancerStatusResult;

    @Mock
    private DescribeEvenClusterLoadStatusResult getAnyUnevenLoadStatusResult;
    private BalancerManager balancerManager;
    private static final Node NODE_1 = new Node(1, "broker-1", 9091);
    private static final Node NODE_2 = new Node(2, "broker-2", 9092);
    private static final Node NODE_3 = new Node(3, "broker-3", 9093);
    private static final Node NODE_4 = new Node(4, "broker-4", 9094);
    private static final String CLUSTER_ID = "cluster-1";
    private static final Broker BROKER_1 = Broker.fromNode(CLUSTER_ID, NODE_1);
    private static final Broker BROKER_2 = Broker.fromNode(CLUSTER_ID, NODE_2);
    private static final Broker BROKER_3 = Broker.fromNode(CLUSTER_ID, NODE_3);
    private static final Broker BROKER_4 = Broker.fromNode(CLUSTER_ID, NODE_4);
    private static final Cluster CLUSTER = Cluster.create(CLUSTER_ID, BROKER_1, Arrays.asList(BROKER_1, BROKER_2, BROKER_3, BROKER_4));
    private static final BalancerOperationError BALANCER_OPERATION_ERROR_1 = new BalancerOperationError(Errors.BALANCER_JBOD_ENABLED_MISCONFIGURATION_EXCEPTION, "The Confluent Balancer failed to start as JBOD is enabled for the cluster.");
    private static final BalancerOperationError BALANCER_OPERATION_ERROR_2 = new BalancerOperationError(Errors.BALANCER_OPERATION_OVERRIDDEN, "The Confluent Balancer operation was overridden by a higher priority operation.");
    private static final BalancerStatusDescription BALANCER_STATUS_DESCRIPTION = new BalancerStatusDescription(BalancerStatus.ERROR, Collections.singleton(1), BALANCER_OPERATION_ERROR_1);
    private static final EvenClusterLoadStatusDescription ANY_UNEVEN_LOAD_STATUS_DESCRIPTION = new EvenClusterLoadStatusDescription(EvenClusterLoadStatus.ABORTED, EvenClusterLoadStatus.BALANCED, 120, 100, BALANCER_OPERATION_ERROR_2);
    private static final Balancer BALANCER = Balancer.builder().setClusterId(CLUSTER_ID).setStatus(BalancerStatus.ERROR).setErrorCode(10014).setErrorMessage("The Confluent Balancer failed to start as JBOD is enabled for the cluster.").build();
    private static final BalancerAnyUnevenLoadStatus BALANCER_ANY_UNEVEN_LOAD_STATUS = BalancerAnyUnevenLoadStatus.builder().setClusterId(CLUSTER_ID).setStatus(EvenClusterLoadStatus.ABORTED).setPreviousStatus(EvenClusterLoadStatus.BALANCED).setStatusUpdatedAt(Instant.ofEpochMilli(120)).setPreviousStatusUpdatedAt(Instant.ofEpochMilli(100)).setErrorCode(10013).setErrorMessage("The Confluent Balancer operation was overridden by a higher priority operation.").build();

    @BeforeEach
    public void setUp() {
        this.balancerManager = new BalancerManagerImpl(this.clusterManager, this.confluentAdminClient);
    }

    @Test
    public void getBalancerStatus_existingCluster_returnsBalancer() throws Exception {
        EasyMock.expect(this.clusterManager.getCluster(CLUSTER_ID)).andReturn(CompletableFuture.completedFuture(Optional.of(CLUSTER)));
        EasyMock.expect(this.confluentAdminClient.describeBalancerStatus()).andReturn(this.getBalancerStatusResult);
        EasyMock.expect(this.getBalancerStatusResult.description()).andReturn(KafkaFuture.completedFuture(BALANCER_STATUS_DESCRIPTION));
        EasyMock.replay(new Object[]{this.clusterManager, this.confluentAdminClient, this.getBalancerStatusResult});
        Assertions.assertEquals(BALANCER, (Balancer) ((Optional) this.balancerManager.getBalancer(CLUSTER_ID).get()).get());
    }

    @Test
    public void getBalancerStatus_nonExistingCluster_throwsNotFound() throws Exception {
        EasyMock.expect(this.clusterManager.getCluster(CLUSTER_ID)).andReturn(CompletableFuture.completedFuture(Optional.empty()));
        EasyMock.replay(new Object[]{this.clusterManager});
        try {
            this.balancerManager.getBalancer(CLUSTER_ID).get();
            Assertions.fail("Fetching balancer status for non existing cluster did not fail!");
        } catch (ExecutionException e) {
            Assertions.assertEquals(NotFoundException.class, e.getCause().getClass());
        }
    }

    @Test
    public void getBalancerStatus_timeoutException_throwsTimeoutwithCause() throws Exception {
        EasyMock.expect(this.clusterManager.getCluster(CLUSTER_ID)).andReturn(CompletableFuture.completedFuture(Optional.of(CLUSTER)));
        EasyMock.expect(this.confluentAdminClient.describeBalancerStatus()).andReturn(this.getBalancerStatusResult);
        EasyMock.expect(this.getBalancerStatusResult.description()).andReturn(KafkaFutures.failedFuture(new TimeoutException("Call(callName=describeEvenClusterLoad, deadlineMs=1615829377969, tries=1, nextAllowedTryMs=-92233720", new TimeoutException("Timed out waiting for a node assignment. Call: describeEvenClusterLoad"))));
        EasyMock.replay(new Object[]{this.clusterManager, this.confluentAdminClient, this.getBalancerStatusResult});
        try {
            this.balancerManager.getBalancer(CLUSTER_ID).get();
            Assertions.fail();
        } catch (ExecutionException e) {
            assertTimeoutExceptionCause(e);
        }
    }

    @Test
    public void getAnyUnevenLoad_existingCluster_returnsAnyUnevenLoad() throws Exception {
        EasyMock.expect(this.clusterManager.getCluster(CLUSTER_ID)).andReturn(CompletableFuture.completedFuture(Optional.of(CLUSTER)));
        EasyMock.expect(this.confluentAdminClient.describeEvenClusterLoadStatus()).andReturn(this.getAnyUnevenLoadStatusResult);
        EasyMock.expect(this.getAnyUnevenLoadStatusResult.description()).andReturn(KafkaFuture.completedFuture(ANY_UNEVEN_LOAD_STATUS_DESCRIPTION));
        EasyMock.replay(new Object[]{this.clusterManager, this.confluentAdminClient, this.getAnyUnevenLoadStatusResult});
        Assertions.assertEquals(BALANCER_ANY_UNEVEN_LOAD_STATUS, (BalancerAnyUnevenLoadStatus) ((Optional) this.balancerManager.getAnyUnevenLoad(CLUSTER_ID).get()).get());
    }

    @Test
    public void getAnyUnevenLoad_nonExistingCluster_throwsNotFound() throws Exception {
        EasyMock.expect(this.clusterManager.getCluster(CLUSTER_ID)).andReturn(CompletableFuture.completedFuture(Optional.empty()));
        EasyMock.replay(new Object[]{this.clusterManager});
        try {
            this.balancerManager.getAnyUnevenLoad(CLUSTER_ID).get();
            Assertions.fail("Fetching even load status for non existing cluster did not fail!");
        } catch (ExecutionException e) {
            Assertions.assertEquals(NotFoundException.class, e.getCause().getClass());
        }
    }

    @Test
    public void getAnyUnevenLoad_timeoutException_throwsTimeoutwithCause() throws Exception {
        EasyMock.expect(this.clusterManager.getCluster(CLUSTER_ID)).andReturn(CompletableFuture.completedFuture(Optional.of(CLUSTER)));
        EasyMock.expect(this.confluentAdminClient.describeEvenClusterLoadStatus()).andReturn(this.getAnyUnevenLoadStatusResult);
        EasyMock.expect(this.getAnyUnevenLoadStatusResult.description()).andReturn(KafkaFutures.failedFuture(new TimeoutException("Call(callName=describeEvenClusterLoad, deadlineMs=1615829377969, tries=1, nextAllowedTryMs=-92233720", new TimeoutException("Timed out waiting for a node assignment. Call: describeEvenClusterLoad"))));
        EasyMock.replay(new Object[]{this.clusterManager, this.confluentAdminClient, this.getAnyUnevenLoadStatusResult});
        try {
            this.balancerManager.getAnyUnevenLoad(CLUSTER_ID).get();
            Assertions.fail();
        } catch (ExecutionException e) {
            assertTimeoutExceptionCause(e);
        }
    }

    private void assertTimeoutExceptionCause(Exception exc) {
        Assertions.assertEquals(io.confluent.kafkarest.exceptions.TimeoutException.class, exc.getCause().getClass());
        Assertions.assertEquals(exc.getCause().getClass().getName() + ": Call(callName=describeEvenClusterLoad, deadlineMs=1615829377969, tries=1, nextAllowedTryMs=-92233720 Caused by: Timed out waiting for a node assignment. Call: describeEvenClusterLoad", exc.getMessage());
    }
}
