package io.confluent.kafkarest.integration;

import io.confluent.kafkarest.CloudKafkaRestResourceExtension;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.KafkaRestResourceExtension;
import io.confluent.kafkarest.auth.KafkaRestContextFactory;
import io.confluent.kafkarest.auth.RestRequestProxyProtocolInfo;
import java.nio.charset.StandardCharsets;
import java.security.Principal;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Priority;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.core.Configurable;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

@Tag("IntegrationTest")
/* loaded from: input_file:io/confluent/kafkarest/integration/ContextCleanupIntegrationTest.class */
public class ContextCleanupIntegrationTest extends CloudClusterTestHarness {
    private static final String LKC = "lkc-abc123";
    private static final String PKC = "pkc-abc123";
    private static final String API_KEY = "api_key";
    private static final String API_SECRET = "api_secret";
    private static final Node NODE = new Node(1, "localhost", 10000);
    private static final Duration DELAY = Duration.ofMillis(100);

    /* loaded from: input_file:io/confluent/kafkarest/integration/ContextCleanupIntegrationTest$StaticMockResourceExtension.class */
    public static final class StaticMockResourceExtension extends KafkaRestResourceExtension {
        public void register(Configurable<?> configurable, KafkaRestConfig kafkaRestConfig) {
            configurable.register(StaticMockSetupFilter.class);
            configurable.register(StaticMockTearDownFilter.class);
        }
    }

    @Priority(0)
    /* loaded from: input_file:io/confluent/kafkarest/integration/ContextCleanupIntegrationTest$StaticMockSetupFilter.class */
    public static final class StaticMockSetupFilter implements ContainerRequestFilter {
        private static final ThreadLocal<MockedStatic<KafkaRestContextFactory>> contextFactory = new ThreadLocal<>();

        public void filter(ContainerRequestContext containerRequestContext) {
            if (contextFactory.get() == null) {
                contextFactory.set(Mockito.mockStatic(KafkaRestContextFactory.class));
            }
            KafkaRestContext kafkaRestContext = (KafkaRestContext) Mockito.mock(KafkaRestContext.class);
            Mockito.when(kafkaRestContext.getAdmin()).thenReturn(createMockAdmin());
            contextFactory.get().when(() -> {
                KafkaRestContextFactory.create((Principal) ArgumentMatchers.any(), (KafkaRestConfig) ArgumentMatchers.any(), (RestRequestProxyProtocolInfo) ArgumentMatchers.any());
            }).thenReturn(kafkaRestContext);
            containerRequestContext.setProperty("context", kafkaRestContext);
        }

        private static Admin createMockAdmin() {
            Admin admin = (Admin) Mockito.mock(Admin.class);
            DescribeClusterResult describeClusterResult = (DescribeClusterResult) Mockito.mock(DescribeClusterResult.class);
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            Mockito.when(admin.describeCluster((DescribeClusterOptions) ArgumentMatchers.any())).thenReturn(describeClusterResult);
            Mockito.when(describeClusterResult.authorizedOperations()).thenReturn(KafkaFuture.completedFuture(Collections.emptySet()));
            Mockito.when(describeClusterResult.clusterId()).thenAnswer(invocationOnMock -> {
                return completedFutureWithDelay(ContextCleanupIntegrationTest.PKC, ContextCleanupIntegrationTest.DELAY, newSingleThreadExecutor);
            });
            Mockito.when(describeClusterResult.controller()).thenReturn(KafkaFuture.completedFuture(ContextCleanupIntegrationTest.NODE));
            Mockito.when(describeClusterResult.nodes()).thenReturn(KafkaFuture.completedFuture(Collections.singletonList(ContextCleanupIntegrationTest.NODE)));
            ((Admin) Mockito.doAnswer(invocationOnMock2 -> {
                return newSingleThreadExecutor.shutdownNow();
            }).when(admin)).close();
            return admin;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static <T> KafkaFuture<T> completedFutureWithDelay(T t, Duration duration, Executor executor) {
            KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
            executor.execute(() -> {
                try {
                    Thread.sleep(duration.toMillis());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                kafkaFutureImpl.complete(t);
            });
            return kafkaFutureImpl;
        }
    }

    @Priority(0)
    /* loaded from: input_file:io/confluent/kafkarest/integration/ContextCleanupIntegrationTest$StaticMockTearDownFilter.class */
    public static final class StaticMockTearDownFilter implements ContainerResponseFilter {
        public void filter(ContainerRequestContext containerRequestContext, ContainerResponseContext containerResponseContext) {
            KafkaRestContext kafkaRestContext = (KafkaRestContext) containerRequestContext.getProperty("context");
            ((KafkaRestContext) Mockito.verify(kafkaRestContext, Mockito.times(1))).getAdmin();
            ((KafkaRestContext) Mockito.verify(kafkaRestContext, Mockito.times(1))).shutdown();
        }
    }

    public ContextCleanupIntegrationTest() {
        super(1, false);
    }

    protected void overrideKafkaRestConfigs(Properties properties) {
        properties.put("kafka.rest.resource.extension.class", CloudKafkaRestResourceExtension.class.getName() + "," + StaticMockResourceExtension.class.getName());
        properties.put("thread.pool.max", "6");
        properties.put("thread.pool.min", "5");
    }

    private Invocation.Builder authenticatedRequest(String str) {
        return request(str).header("Authorization", String.format("Basic %s", Base64.getEncoder().encodeToString("api_key:api_secret".getBytes(StandardCharsets.ISO_8859_1))));
    }

    @Disabled("Test is failed, Caused by: Wanted but not invoked:\nkafkaRestContext.getAdmin();\n-> at io.confluent.kafkarest.integration.ContextCleanupIntegrationTest$StaticMockTearDownFilter.filter(ContextCleanupIntegrationTest.java:207)\nActually, there were zero interactions with this mock.\n")
    @Test
    public void kafkaRestContextIsClosedAfterRequestResumes() throws InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        for (int i = 0; i < 100; i++) {
            newFixedThreadPool.execute(() -> {
                copyOnWriteArrayList.add(authenticatedRequest(String.format("/v3/clusters/%s/brokers", LKC)).accept(new String[]{"application/json"}).get());
            });
        }
        newFixedThreadPool.shutdown();
        Assertions.assertTrue(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS));
        Iterator it = copyOnWriteArrayList.iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(200, ((Response) it.next()).getStatus());
        }
    }
}
