package io.confluent.kafkarest.integration.v3;

import com.google.common.collect.UnmodifiableIterator;
import io.confluent.kafkarest.KafkaRestResourceExtension;
import io.confluent.kafkarest.entities.v3.AlterMirrorsResponse;
import io.confluent.kafkarest.entities.v3.GetLinkConfigResponse;
import io.confluent.kafkarest.entities.v3.GetLinkResponse;
import io.confluent.kafkarest.entities.v3.GetMirrorResponse;
import io.confluent.kafkarest.entities.v3.LinkConfigData;
import io.confluent.kafkarest.entities.v3.LinkConfigDataList;
import io.confluent.kafkarest.entities.v3.LinkData;
import io.confluent.kafkarest.entities.v3.ListLinkConfigsResponse;
import io.confluent.kafkarest.entities.v3.ListLinksResponse;
import io.confluent.kafkarest.entities.v3.ListMirrorsResponse;
import io.confluent.kafkarest.entities.v3.MirrorData;
import io.confluent.kafkarest.entities.v3.MirrorLagData;
import io.confluent.kafkarest.exceptions.ClusterLinkNotFoundException;
import io.confluent.kafkarest.exceptions.MirrorTopicNotFoundException;
import io.confluent.kafkarest.integration.v3.CLTestHarness;
import io.confluent.shaded.com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import kafka.link.ClusterLinkTestHarness;
import kafka.server.link.ClusterLinkConfig;
import org.apache.commons.collections.map.HashedMap;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MirrorTopicError;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.apache.kafka.test.TestUtils;
import org.jetbrains.annotations.NotNull;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Option;
import scala.collection.Iterable;

@Disabled("This is the heaviest integration test, mainly responsible for flakiness and need major refactor")
/* loaded from: input_file:io/confluent/kafkarest/integration/v3/CLResourceIntegrationTest.class */
public class CLResourceIntegrationTest extends CLTestHarness {
    @Override // io.confluent.kafkarest.integration.v3.CLTestHarness
    protected void overrideKafkaRestConfigs(Properties properties) {
        properties.put("kafka.rest.resource.extension.class", KafkaRestResourceExtension.class.getName());
    }

    @Test
    public void testListEmptyLinks() {
        verifyNoLinks(getDestClusterId());
    }

    @MethodSource({"destLinkModeCombinations"})
    @ParameterizedTest
    public void testCreateAndListLinks(final String str, String str2) {
        boolean booleanValue = Boolean.valueOf(str2).booleanValue();
        String destClusterId = getDestClusterId();
        final String srcClusterId = getSrcClusterId();
        final String str3 = "my-new-link-1";
        final String str4 = "my-new-link-2";
        final String str5 = "my-new-link-3";
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.1
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
                put("link.mode", str);
            }
        };
        Response createDestinationLinkValidateLink = createDestinationLinkValidateLink(destClusterId, srcClusterId, "my-new-link-1", hashMap, booleanValue);
        Assertions.assertEquals(Response.Status.CREATED.getStatusCode(), createDestinationLinkValidateLink.getStatus());
        Assertions.assertTrue(((String) createDestinationLinkValidateLink.readEntity(String.class)).isEmpty());
        listAndVerifyLinks(this.destRestProxy, destClusterId, new ArrayList<CLTestHarness.LinkMatcher>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.2
            {
                add(CLTestHarness.LinkMatcher.buildLinkMatcher(str, srcClusterId, str3, ImmutableList.of()));
            }
        });
        getAndVerifyLinkConfigs(destClusterId, "my-new-link-1", "link.mode", str, this.destRestProxy);
        createDestinationLinkValidateLink(destClusterId, srcClusterId, "my-new-link-2", hashMap, booleanValue);
        createDestinationLinkValidateLink(destClusterId, srcClusterId, "my-new-link-3", hashMap, booleanValue);
        listAndVerifyLinks(this.destRestProxy, destClusterId, new ArrayList<CLTestHarness.LinkMatcher>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.3
            {
                add(CLTestHarness.LinkMatcher.buildLinkMatcher(str, srcClusterId, str3, ImmutableList.of()));
                add(CLTestHarness.LinkMatcher.buildLinkMatcher(str, srcClusterId, str4, ImmutableList.of()));
                add(CLTestHarness.LinkMatcher.buildLinkMatcher(str, srcClusterId, str5, ImmutableList.of()));
            }
        });
        getAndVerifyLinkConfigs(destClusterId, "my-new-link-1", "link.mode", str, this.destRestProxy);
        getAndVerifyLinkConfigs(destClusterId, "my-new-link-2", "link.mode", str, this.destRestProxy);
        getAndVerifyLinkConfigs(destClusterId, "my-new-link-3", "link.mode", str, this.destRestProxy);
    }

    @Test
    public void testCreateAndListBothSidesOfBidirectionalLink() {
        startSourceRestProxy();
        testOneSideOfBidirectionalLink(getDestClusterId(), getSrcClusterId(), "my-new-link-1", this.destRestProxy, this.sourceCluster, Optional.empty());
        Response link = getLink(getDestClusterId(), "my-new-link-1");
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), link.getStatus());
        testOneSideOfBidirectionalLink(getSrcClusterId(), getDestClusterId(), "my-new-link-1", this.sourceRestProxy, this.destCluster, Optional.of(((GetLinkResponse) link.readEntity(GetLinkResponse.class)).getValue().getClusterLinkId()));
    }

    private void testOneSideOfBidirectionalLink(String str, final String str2, final String str3, RestProxyTestHarness restProxyTestHarness, ClusterLinkTestHarness clusterLinkTestHarness, Optional<String> optional) {
        HashMap hashMap = new HashMap();
        hashMap.put("link.mode", "BIDIRECTIONAL");
        hashMap.put("bootstrap.servers", clusterLinkTestHarness.bootstrapServers(clusterLinkTestHarness.listenerName()));
        Response createBidirectionalLinkValidateLink = createBidirectionalLinkValidateLink(str, str2, str3, hashMap, restProxyTestHarness, optional);
        Assertions.assertEquals(Response.Status.CREATED.getStatusCode(), createBidirectionalLinkValidateLink.getStatus());
        Assertions.assertTrue(((String) createBidirectionalLinkValidateLink.readEntity(String.class)).isEmpty());
        listAndVerifyLinks(restProxyTestHarness, str, new ArrayList<CLTestHarness.LinkMatcher>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.4
            {
                add(CLTestHarness.LinkMatcher.bidirectionalLinkMatcher(str2, str3, ImmutableList.of()));
            }
        });
        getAndVerifyLinkConfigs(str, str3, "link.mode", "BIDIRECTIONAL", restProxyTestHarness);
    }

    @Test
    public void testCreateBidirectionalLinkHandlesCorruptedRemoteLinkId() {
        startSourceRestProxy();
        String destClusterId = getDestClusterId();
        HashMap hashMap = new HashMap();
        hashMap.put("link.mode", "BIDIRECTIONAL");
        hashMap.put("bootstrap.servers", this.sourceCluster.bootstrapServers(this.sourceCluster.listenerName()));
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), createBidirectionalLinkValidateLink(destClusterId, getSrcClusterId(), "my-new-link-1", hashMap, this.destRestProxy, Optional.of("corrupted link id")).getStatus());
    }

    @Test
    public void testCreateBidirectionalLinkHandlesWrongRemoteLinkId() {
        startSourceRestProxy();
        String destClusterId = getDestClusterId();
        HashMap hashMap = new HashMap();
        hashMap.put("link.mode", "BIDIRECTIONAL");
        hashMap.put("bootstrap.servers", this.sourceCluster.bootstrapServers(this.sourceCluster.listenerName()));
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), createBidirectionalLinkValidateLink(destClusterId, getSrcClusterId(), "my-new-link-1", hashMap, this.destRestProxy, Optional.of(Uuid.randomUuid().toString())).getStatus());
    }

    @MethodSource({"destLinkModeCombinations"})
    @ParameterizedTest
    public void testCreateLinksValidateLinks(final String str, String str2) {
        boolean booleanValue = Boolean.valueOf(str2).booleanValue();
        String destClusterId = getDestClusterId();
        final String srcClusterId = getSrcClusterId();
        final String str3 = "my-new-link-1";
        verifyResponse(createLink(destClusterId, destClusterId, "", destClusterId, "my-new-link-1", new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.5
            {
                put("bootstrap.servers", "example.com:1234");
                put("link.mode", str);
            }
        }, true, false, booleanValue, this.destRestProxy, Optional.empty()), Response.Status.BAD_REQUEST);
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.6
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
                put("link.mode", str);
            }
        };
        verifyResponse(createLink(destClusterId, destClusterId, "", destClusterId, "my-new-link-1", hashMap, true, false, booleanValue, this.destRestProxy, Optional.empty()), Response.Status.BAD_REQUEST);
        verifyResponse(createLink(destClusterId, srcClusterId, "", srcClusterId, "my-new-link-1", hashMap, true, false, booleanValue, this.destRestProxy, Optional.empty()), Response.Status.CREATED);
        listAndVerifyLinks(this.destRestProxy, destClusterId, new ArrayList<CLTestHarness.LinkMatcher>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.7
            {
                add(CLTestHarness.LinkMatcher.buildLinkMatcher(str, srcClusterId, str3, ImmutableList.of()));
            }
        });
    }

    @MethodSource({"destLinkModeCombinations"})
    @ParameterizedTest
    public void testCreateLinksMalformedBootstrapServerProperty(final String str, String str2) {
        boolean booleanValue = Boolean.valueOf(str2).booleanValue();
        String destClusterId = getDestClusterId();
        verifyResponse(createDestinationLink(destClusterId, "src-cluster-id", "my-new-link-1", new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.8
            {
                put("acl.sync.enable", "false");
                put("link.mode", str);
            }
        }, booleanValue), Response.Status.BAD_REQUEST);
        verifyResponse(createDestinationLink(destClusterId, "123", "my-new-link-2", new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.9
            {
                put("bootstrap.servers", "example.com");
                put("link.mode", str);
            }
        }, booleanValue), Response.Status.BAD_REQUEST);
    }

    @MethodSource({"destLinkModeCombinations"})
    @ParameterizedTest
    public void testCreateLinksDuplication(final String str, String str2) {
        boolean booleanValue = Boolean.valueOf(str2).booleanValue();
        String destClusterId = getDestClusterId();
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.10
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
                put("link.mode", str);
            }
        };
        Response createDestinationLink = createDestinationLink(destClusterId, "src-cluster-id", "my-new-link-1", hashMap, booleanValue);
        Assertions.assertEquals(Response.Status.CREATED.getStatusCode(), createDestinationLink.getStatus());
        Assertions.assertTrue(((String) createDestinationLink.readEntity(String.class)).isEmpty());
        verifyResponse(createDestinationLink(destClusterId, "src-cluster-id", "my-new-link-1", hashMap, booleanValue), Response.Status.BAD_REQUEST);
    }

    @MethodSource({"destLinkModeCombinations"})
    @ParameterizedTest
    public void testCreateLinksInvalidLinkName(final String str, String str2) {
        verifyResponse(createDestinationLink(getDestClusterId(), "src-cluster-id", ".!@##$$%%%^^^&&", new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.11
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
                put("link.mode", str);
            }
        }, Boolean.valueOf(str2).booleanValue()), Response.Status.BAD_REQUEST);
    }

    @MethodSource({"destLinkModeCombinations"})
    @ParameterizedTest
    public void testCreateLinksValidateOnly(final String str, String str2) {
        boolean booleanValue = Boolean.valueOf(str2).booleanValue();
        String destClusterId = getDestClusterId();
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.12
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
                put("link.mode", str);
            }
        };
        Assertions.assertTrue(((String) createDestinationLinkValidateOnly(destClusterId, "src-cluster-id", "my-new-link-1", hashMap, booleanValue).readEntity(String.class)).isEmpty());
        verifyNoLinks(destClusterId);
        verifyResponse(createDestinationLinkValidateOnly(destClusterId, "src-cluster-id", "!@#$$%%^^^&&&**", hashMap, booleanValue), Response.Status.BAD_REQUEST);
    }

    @MethodSource({"destLinkModeCombinations"})
    @ParameterizedTest
    public void testCreateLinksValidateLink(final String str, String str2) {
        boolean booleanValue = Boolean.valueOf(str2).booleanValue();
        String destClusterId = getDestClusterId();
        verifyResponse(createDestinationLinkValidateLink(destClusterId, destClusterId, "my-new-link-1", new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.13
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
                put("link.mode", str);
            }
        }, booleanValue), Response.Status.BAD_REQUEST);
        verifyNoLinks(destClusterId);
    }

    @MethodSource({"destLinkModeCombinations"})
    @ParameterizedTest
    public void testCreateSourceInitiatedDestinationLink(String str, String str2) {
        boolean booleanValue = Boolean.valueOf(str2).booleanValue();
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        HashMap hashMap = new HashMap();
        hashMap.put("link.mode", str);
        hashMap.put("connection.mode", "INBOUND");
        verifyResponse(createDestinationLinkValidateLink(destClusterId, srcClusterId, "my-new-link-1", hashMap, booleanValue), Response.Status.CREATED);
        listAndVerifyLinks(this.destRestProxy, destClusterId, Collections.singletonList(CLTestHarness.LinkMatcher.buildLinkMatcher(str, srcClusterId, "my-new-link-1", ImmutableList.of())));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("link.mode", str);
        hashMap2.put("connection.mode", "INBOUND");
        verifyResponse(createDestinationLinkValidateLink(destClusterId, srcClusterId, "my-new-link-2", hashMap2, booleanValue), Response.Status.CREATED);
        ArrayList arrayList = new ArrayList();
        arrayList.add(CLTestHarness.LinkMatcher.buildLinkMatcher(str, srcClusterId, "my-new-link-1", ImmutableList.of()));
        arrayList.add(CLTestHarness.LinkMatcher.buildLinkMatcher(str, srcClusterId, "my-new-link-2", ImmutableList.of()));
        listAndVerifyLinks(this.destRestProxy, destClusterId, arrayList);
        verifyResponse(createDestinationLinkValidateLink(destClusterId, srcClusterId, "invalid-link", Collections.singletonMap("link.mode", str), booleanValue), Response.Status.BAD_REQUEST);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testCreateSourceInitiatedLink(boolean z) {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        HashMap hashMap = new HashMap();
        hashMap.put("link.mode", "DESTINATION");
        hashMap.put("connection.mode", "INBOUND");
        hashMap.put("bootstrap.servers", "can be removed");
        Assertions.assertEquals(Response.Status.CREATED.getStatusCode(), createDestinationLinkValidateLink(destClusterId, srcClusterId, "my-new-link-1", hashMap, z).getStatus());
        listAndVerifyLinks(this.destRestProxy, destClusterId, Collections.singletonList(CLTestHarness.LinkMatcher.destinationLinkMatcher(srcClusterId, "my-new-link-1", ImmutableList.of())));
        try {
            this.destRestProxy.stopRestProxy();
            startSourceRestProxy();
            HashMap hashMap2 = new HashMap();
            hashMap2.put("link.mode", "SOURCE");
            hashMap2.put("connection.mode", "OUTBOUND");
            hashMap2.put("bootstrap.servers", destCluster().bootstrapServers(this.sourceCluster.listenerName()));
            Assertions.assertEquals(Response.Status.CREATED.getStatusCode(), createSourceLinkValidateLink(srcClusterId, destClusterId, "my-new-link-1", hashMap2, z).getStatus());
            listAndVerifyLinks(this.sourceRestProxy, srcClusterId, Collections.singletonList(CLTestHarness.LinkMatcher.sourceLinkMatcher(destClusterId, "my-new-link-1", ImmutableList.of())));
        } catch (Exception e) {
            e.printStackTrace();
            throw new Error("Failed to stop destination rest proxy");
        }
    }

    @Test
    public void testListLinksInvalidClusterId() {
        verifyResponse(listLinks(this.destRestProxy, "!@##$$%^&&*(*("), Response.Status.NOT_FOUND);
    }

    @Test
    public void testGetLinks() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        Properties properties = new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.14
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", properties, Option.apply(srcClusterId), false);
        this.destCluster.createClusterLink("my-new-link-2", properties, Option.apply(srcClusterId), false);
        this.destCluster.createClusterLink("my-new-link-3", properties, Option.apply(srcClusterId), false);
        verifyGetLink(destClusterId, "my-new-link-1", true, CLTestHarness.LinkMatcher.destinationLinkMatcher(srcClusterId, "my-new-link-1", ImmutableList.of()));
        verifyGetLink(destClusterId, "my-new-link-2", true, CLTestHarness.LinkMatcher.destinationLinkMatcher(srcClusterId, "my-new-link-2", ImmutableList.of()));
        verifyGetLink(destClusterId, "my-new-link-3", false, CLTestHarness.LinkMatcher.destinationLinkMatcher(srcClusterId, "my-new-link-3", ImmutableList.of()));
    }

    @Test
    public void testDeleteLink() {
        final String srcClusterId = getSrcClusterId();
        String destClusterId = getDestClusterId();
        final String str = "link-nb-1";
        this.destCluster.createClusterLink("link-nb-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.15
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        listAndVerifyLinks(this.destRestProxy, destClusterId, new ArrayList<CLTestHarness.LinkMatcher>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.16
            {
                add(CLTestHarness.LinkMatcher.destinationLinkMatcher(srcClusterId, str, ImmutableList.of()));
            }
        });
        Response deleteLink = deleteLink(destClusterId, "link-nb-1", false, false);
        Assertions.assertEquals(Response.Status.NO_CONTENT.getStatusCode(), deleteLink.getStatus());
        Assertions.assertTrue(((String) deleteLink.readEntity(String.class)).isEmpty());
        verifyNoLinks(destClusterId);
    }

    @Test
    public void testDeleteLinkInUse() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        Properties properties = new Properties();
        properties.setProperty("unclean.leader.election.enable", "false");
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.17
            {
                put("unclean.leader.election.enable", "true");
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.18
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        this.sourceCluster.createTopic("topic-1", 5, 3, properties, this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", (String) null, (Short) 1, (Map<String, String>) hashMap), Response.Status.CREATED);
        verifyResponse(deleteLink(destClusterId, "my-new-link-1", false, false), Response.Status.FORBIDDEN);
    }

    @Test
    public void testDeleteLinkLinkDoesNotExist() {
        verifyResponse(deleteLink(getDestClusterId(), "link-nb-1", false, false), Response.Status.NOT_FOUND);
    }

    @Test
    public void testDeleteLinkValidateOnly() {
        final String str = "my-new-link";
        final String str2 = "bitcoin-cash";
        String destClusterId = getDestClusterId();
        Properties properties = new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.19
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        };
        verifyResponse(deleteLink(destClusterId, "my-new-link", false, true), Response.Status.NOT_FOUND);
        this.destCluster.createClusterLink("my-new-link", properties, Option.apply("bitcoin-cash"), false);
        listAndVerifyLinks(this.destRestProxy, destClusterId, new ArrayList<CLTestHarness.LinkMatcher>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.20
            {
                add(CLTestHarness.LinkMatcher.destinationLinkMatcher(str2, str, ImmutableList.of()));
            }
        });
        listAndVerifyLinks(this.destRestProxy, destClusterId, new ArrayList<CLTestHarness.LinkMatcher>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.21
            {
                add(CLTestHarness.LinkMatcher.destinationLinkMatcher(str2, str, ImmutableList.of()));
            }
        });
        Assertions.assertTrue(((String) deleteLink(destClusterId, "my-new-link", false, true).readEntity(String.class)).isEmpty());
        listAndVerifyLinks(this.destRestProxy, destClusterId, new ArrayList<CLTestHarness.LinkMatcher>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.22
            {
                add(CLTestHarness.LinkMatcher.destinationLinkMatcher(str2, str, ImmutableList.of()));
            }
        });
    }

    @MethodSource({"destLinkModeCombinations"})
    @ParameterizedTest
    public void testListLinkConfigs(final String str, String str2) {
        boolean booleanValue = Boolean.valueOf(str2).booleanValue();
        String destClusterId = getDestClusterId();
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.23
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
                put(ClusterLinkConfig.RetryTimeoutMsProp(), "2538940");
                put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
                put("link.mode", str);
            }
        };
        createDestinationLink(destClusterId, "bitcoin-cash", "link-nb-1", hashMap, booleanValue);
        verifyListLinkConfigs(destClusterId, "link-nb-1", hashMap);
    }

    @Test
    public void testListLinkConfigsLinkDoesNotExist() {
        verifyResponse(listLinkConfigs(getDestClusterId(), "link-nb-1"), Response.Status.NOT_FOUND);
    }

    @Test
    public void testGetLinkConfig() {
        String srcClusterId = getSrcClusterId();
        String destClusterId = getDestClusterId();
        String str = "link-db-1";
        Properties properties = new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.24
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
                put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
                put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), "3825940");
            }
        };
        this.destCluster.createClusterLink("link-db-1", properties, Option.apply(srcClusterId), false);
        properties.forEach((obj, obj2) -> {
            getAndVerifyLinkConfigs(destClusterId, str, (String) obj, (String) obj2, this.destRestProxy);
        });
    }

    @Test
    public void testGetLinkInvalidLink() {
        verifyResponse(getLink(getDestClusterId(), "|(*&^%$"), Response.Status.NOT_FOUND);
    }

    @Test
    public void testGetLinkConfigLinkDoesNotExist() {
        verifyResponse(getLinkConfigs(getDestClusterId(), "link-sb-1", ClusterLinkConfig.ConsumerOffsetSyncMsProp(), this.destRestProxy), Response.Status.NOT_FOUND);
    }

    @Test
    public void testGetLinkConfigInvalidConfigName() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        this.destCluster.createClusterLink("link-sb-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.25
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        verifyResponse(getLinkConfigs(destClusterId, "link-sb-1", "tron.balance", this.destRestProxy), Response.Status.BAD_REQUEST);
    }

    @Test
    public void testDeleteLinkConfig() {
        String srcClusterId = getSrcClusterId();
        String destClusterId = getDestClusterId();
        final HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.26
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
                put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
                put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), "3825940");
            }
        };
        this.destCluster.createClusterLink("link-nb-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.27
            {
                putAll(hashMap);
            }
        }, Option.apply(srcClusterId), false);
        verifyDeleteLinkConfig(destClusterId, "link-nb-1", ClusterLinkConfig.ConsumerOffsetSyncEnableProp());
        hashMap.remove(ClusterLinkConfig.ConsumerOffsetSyncEnableProp());
        verifyListLinkConfigs(destClusterId, "link-nb-1", hashMap);
        verifyDeleteLinkConfig(destClusterId, "link-nb-1", ClusterLinkConfig.ConsumerOffsetSyncEnableProp());
        hashMap.remove(ClusterLinkConfig.ConsumerOffsetSyncMsProp());
        verifyListLinkConfigs(destClusterId, "link-nb-1", hashMap);
    }

    @Test
    public void testDeleteLinkConfigLinkDoesNotExist() {
        verifyResponse(deleteLinkConfig(getDestClusterId(), "link-nb-1", ClusterLinkConfig.ConsumerOffsetSyncEnableProp()), Response.Status.NOT_FOUND);
    }

    @Test
    public void testDeleteLinkConfigInvalidConfigName() {
        String destClusterId = getDestClusterId();
        final HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.28
            {
                put("bootstrap.servers", "example.com:1234");
                put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
                put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), "3825940");
            }
        };
        this.destCluster.createClusterLink("link-nb-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.29
            {
                putAll(hashMap);
            }
        }, Option.apply("bitcoin-cash"), false);
        verifyResponse(deleteLinkConfig(destClusterId, "link-nb-1", "orchid"), Response.Status.NOT_FOUND);
    }

    @Test
    public void testUpdateLinkConfig() {
        String destClusterId = getDestClusterId();
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.30
            {
                put("bootstrap.servers", "example.com:1234");
                put(ClusterLinkConfig.RetryTimeoutMsProp(), "2538940");
            }
        }, Option.apply("src-cluster-id"), false);
        verifyResponse(updateLinkConfig(destClusterId, "my-new-link-1", ClusterLinkConfig.RetryTimeoutMsProp(), "142857"), Response.Status.NO_CONTENT);
        getAndVerifyLinkConfigs(destClusterId, "my-new-link-1", ClusterLinkConfig.RetryTimeoutMsProp(), "142857", this.destRestProxy);
    }

    @Test
    public void testAlterLinkConfigs() {
        String destClusterId = getDestClusterId();
        Properties properties = new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.31
            {
                put("bootstrap.servers", "example.com:1234");
                put(ClusterLinkConfig.RetryTimeoutMsProp(), "2538940");
            }
        };
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.32
            {
                put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
                put(ClusterLinkConfig.RetryTimeoutMsProp(), "DELETE");
            }
        };
        HashMap<String, String> hashMap2 = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.33
            {
                put("bootstrap.servers", "example.com:1234");
                put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", properties, Option.apply("src-cluster-id"), false);
        Assertions.assertTrue(((String) alterLinkConfigs(destClusterId, "my-new-link-1", hashMap, false).readEntity(String.class)).isEmpty());
        verifyListLinkConfigs(destClusterId, "my-new-link-1", hashMap2);
    }

    @Test
    public void testAlterLinkConfigsLinkDoesNotExist() {
        verifyResponse(alterLinkConfigs(getDestClusterId(), "my-new-link-1", new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.34
            {
                put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
                put(ClusterLinkConfig.RetryTimeoutMsProp(), "DELETE");
            }
        }, false), Response.Status.NOT_FOUND);
    }

    @Test
    public void testAlterLinkConfigsInvalidConfig() {
        String destClusterId = getDestClusterId();
        final String str = "bitcoin.balance";
        Properties properties = new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.35
            {
                put("bootstrap.servers", "example.com:1234");
                put(ClusterLinkConfig.RetryTimeoutMsProp(), "2538940");
            }
        };
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.36
            {
                put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
                put(str, "DELETE");
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", properties, Option.apply("src-cluster-id"), false);
        verifyResponse(alterLinkConfigs(destClusterId, "my-new-link-1", hashMap, false), Response.Status.NOT_FOUND);
    }

    @Test
    public void testAlterLinkConfigsValidateOnly() {
        String destClusterId = getDestClusterId();
        final String str = "bitcoin.balance";
        final HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.37
            {
                put("bootstrap.servers", "example.com:1234");
                put(ClusterLinkConfig.RetryTimeoutMsProp(), "2538940");
            }
        };
        HashMap<String, String> hashMap2 = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.38
            {
                put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
                put(ClusterLinkConfig.RetryTimeoutMsProp(), "DELETE");
            }
        };
        HashMap<String, String> hashMap3 = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.39
            {
                put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
                put(str, "DELETE");
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.40
            {
                putAll(hashMap);
            }
        }, Option.apply("src-cluster-id"), false);
        verifyResponse(alterLinkConfigs(destClusterId, "my-new-link-1", hashMap3, true), Response.Status.NOT_FOUND);
        verifyResponse(alterLinkConfigs(destClusterId, "litecoin-link-25", hashMap2, true), Response.Status.NOT_FOUND);
        Assertions.assertTrue(((String) alterLinkConfigs(destClusterId, "my-new-link-1", hashMap2, true).readEntity(String.class)).isEmpty());
        verifyListLinkConfigs(destClusterId, "my-new-link-1", hashMap);
    }

    @Test
    public void testCreateMirror() {
        String destClusterId = getDestClusterId();
        final String srcClusterId = getSrcClusterId();
        final String str = "my-new-link-1";
        final String str2 = "topic-1";
        final String str3 = "topic-2";
        Short sh = 3;
        Short sh2 = 1;
        Properties properties = new Properties();
        properties.setProperty("unclean.leader.election.enable", "false");
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.41
            {
                put("unclean.leader.election.enable", "true");
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.42
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        this.sourceCluster.createTopic("topic-1", 5, 3, properties, this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", (String) null, sh, hashMap), Response.Status.CREATED);
        Assertions.assertEquals("true", ((ConfigEntry) ((List) this.destCluster.describeTopicConfig("topic-1").entries().stream().filter(configEntry -> {
            return configEntry.name().equals("unclean.leader.election.enable");
        }).collect(Collectors.toList())).get(0)).value());
        Assertions.assertEquals(sh.intValue(), ((TopicPartitionInfo) this.destCluster.describeTopic("topic-1").partitions().iterator().next()).replicas().size());
        this.sourceCluster.createTopic("topic-2", 5, 3, properties, this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-2", (String) null, Optional.empty(), hashMap), Response.Status.CREATED);
        Assertions.assertEquals(sh2.intValue(), ((TopicPartitionInfo) this.destCluster.describeTopic("topic-2").partitions().iterator().next()).replicas().size());
        listAndVerifyLinks(this.destRestProxy, destClusterId, new ArrayList<CLTestHarness.LinkMatcher>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.43
            {
                add(CLTestHarness.LinkMatcher.destinationLinkMatcher(srcClusterId, str, ImmutableList.of(str2, str3)));
            }
        });
    }

    @Test
    public void testCreateMirrorWithLinkPrefix() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        Properties properties = new Properties();
        properties.setProperty("unclean.leader.election.enable", "false");
        Map<String, String> singletonMap = Collections.singletonMap("unclean.leader.election.enable", "true");
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", sourceCluster().bootstrapServers(this.sourceCluster.listenerName()));
        properties2.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "src_");
        this.destCluster.createClusterLink("my-new-link-1", properties2, Option.apply(srcClusterId), false);
        this.sourceCluster.createTopic("topic-1", 5, 3, properties, this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", "src_topic-1", (Short) 1, singletonMap), Response.Status.CREATED);
        Assertions.assertEquals("true", ((ConfigEntry) ((List) this.destCluster.describeTopicConfig("src_topic-1").entries().stream().filter(configEntry -> {
            return configEntry.name().equals("unclean.leader.election.enable");
        }).collect(Collectors.toList())).get(0)).value());
        listAndVerifyLinks(this.destRestProxy, destClusterId, Collections.singletonList(CLTestHarness.LinkMatcher.destinationLinkMatcher(srcClusterId, "my-new-link-1", ImmutableList.of("src_topic-1"))));
    }

    @Test
    public void testCreateMirrorInvalidRequestBody() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        Properties properties = new Properties();
        properties.setProperty("unclean.leader.election.enable", "false");
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.44
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        this.sourceCluster.createTopic("topic-1", 5, 3, properties, this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        JSONObject jSONObject = new JSONObject();
        verifyResponse(this.destRestProxy.request("/v3/clusters/" + destClusterId + "/links/my-new-link-1/mirrors").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE)), Response.Status.BAD_REQUEST);
        jSONObject.put("source_topic_name", "topic-1");
        verifyResponse(this.destRestProxy.request("/v3/clusters/" + destClusterId + "/links/my-new-link-1/mirrors").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE)), Response.Status.CREATED);
        verifyGetMirror(getMirror(destClusterId, "my-new-link-1", "topic-1", false), "topic-1", "");
    }

    @Test
    public void testCreateMirrorInvalidConfigs() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        Properties properties = new Properties();
        properties.setProperty("unclean.leader.election.enable", "false");
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.45
            {
                put("unclean.leader.election.enable", "true");
            }
        };
        HashMap<String, String> hashMap2 = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.46
            {
                put("unclean.leader.election.enable", "123");
            }
        };
        HashMap<String, String> hashMap3 = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.47
            {
                put("unclean.leader.election.enable", null);
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.48
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        this.sourceCluster.createTopic("topic-1", 5, 3, properties, this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", (String) null, (Short) 888, (Map<String, String>) hashMap), Response.Status.BAD_REQUEST);
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", (String) null, (Short) 1, (Map<String, String>) hashMap2), Response.Status.BAD_REQUEST);
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", (String) null, (Short) 1, (Map<String, String>) hashMap3), Response.Status.BAD_REQUEST);
    }

    @Test
    public void testCreateMirrorLinkDoesNotExist() {
        String destClusterId = getDestClusterId();
        Properties properties = new Properties();
        properties.setProperty("unclean.leader.election.enable", "false");
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.49
            {
                put("unclean.leader.election.enable", "true");
            }
        };
        this.sourceCluster.createTopic("topic-1", 5, 3, properties, this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", (String) null, (Short) 1, (Map<String, String>) hashMap), Response.Status.NOT_FOUND);
    }

    @Test
    public void testCreateMirrorSourceTopicNotFound() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.50
            {
                put("unclean.leader.election.enable", "true");
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.51
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", (String) null, (Short) 3, (Map<String, String>) hashMap), Response.Status.NOT_FOUND);
    }

    @Disabled
    @Test
    public void testCreateMirrorImmutableMirrorTopicConfig() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.52
            {
                put("file.delete.delay.ms", "142857");
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.53
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", (String) null, (Short) 3, (Map<String, String>) hashMap), Response.Status.NOT_FOUND);
    }

    @Test
    public void testListAllMirrors() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        String str = "my-new-link-1";
        HashSet<String> hashSet = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.54
            {
                add("topic-1");
                add("topic-2");
            }
        };
        short s = 3;
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.55
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        hashSet.forEach(str2 -> {
            this.sourceCluster.createTopic(str2, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str, str2, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        verifyListMirrors(listAllMirror(destClusterId), hashSet, "");
    }

    @Test
    public void testListAllMirrorsWithLinkPrefix() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        String str = "my-new-link-1";
        String str2 = "src_";
        HashSet hashSet = new HashSet(Arrays.asList("topic-1", "topic-2"));
        short s = 3;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", sourceCluster().bootstrapServers(this.sourceCluster.listenerName()));
        properties.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "src_");
        this.destCluster.createClusterLink("my-new-link-1", properties, Option.apply(srcClusterId), false);
        hashSet.forEach(str3 -> {
            this.sourceCluster.createTopic(str3, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str, str3, str2 + str3, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        verifyListMirrors(listAllMirror(destClusterId), (Set) hashSet.stream().map(str4 -> {
            return str2 + str4;
        }).collect(Collectors.toSet()), "src_");
    }

    @Test
    public void testListMirrors() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        HashSet<String> hashSet = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.56
            {
                add("link-1");
                add("link-2");
            }
        };
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.57
            {
                put("link-1", "topic-1");
                put("link-2", "topic-2");
            }
        };
        short s = 3;
        Properties properties = new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.58
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        };
        hashSet.forEach(str -> {
            this.destCluster.createClusterLink(str, properties, Option.apply(srcClusterId), false);
        });
        hashMap.forEach((str2, str3) -> {
            this.sourceCluster.createTopic(str3, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str2, str3, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        hashSet.forEach(str4 -> {
            verifyListMirrors(listMirror(destClusterId, str4, null), new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.59
                {
                    add(hashMap.get(str4));
                }
            }, "");
        });
    }

    @Test
    public void testListMirrorsWithBadMirrorStatus() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        HashSet<String> hashSet = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.60
            {
                add("link-1");
            }
        };
        HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.61
            {
                put("link-1", "topic-1");
            }
        };
        short s = 3;
        Properties properties = new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.62
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        };
        hashSet.forEach(str -> {
            this.destCluster.createClusterLink(str, properties, Option.apply(srcClusterId), false);
        });
        hashMap.forEach((str2, str3) -> {
            this.sourceCluster.createTopic(str3, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str2, str3, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        Assertions.assertTrue(listMirror(destClusterId, "link-1", "exploded").getStatus() >= 400);
    }

    @Test
    public void testGetMirror() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.63
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        this.sourceCluster.createTopic("topic-1", 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", (String) null, (Short) 3, Collections.emptyMap()), Response.Status.CREATED);
        verifyGetMirror(getMirror(destClusterId, "my-new-link-1", "topic-1", false), "topic-1", "");
        verifyGetMirror(getMirror(destClusterId, "my-new-link-1", "topic-1", true), "topic-1", "");
    }

    @Test
    public void testGetMirrorWithDeletedLink() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.64
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        this.sourceCluster.createTopic("topic-1", 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", (String) null, (Short) 3, Collections.emptyMap()), Response.Status.CREATED);
        verifyGetMirror(getMirror(destClusterId, "my-new-link-1", "topic-1", false), "topic-1", "");
        Response deleteLink = deleteLink(destClusterId, "my-new-link-1", true, false);
        Assertions.assertEquals(Response.Status.NO_CONTENT.getStatusCode(), deleteLink.getStatus());
        Assertions.assertTrue(((String) deleteLink.readEntity(String.class)).isEmpty());
        verifyNoLinks(destClusterId);
        Assertions.assertTrue(waitAndVerifyClusterLinkNotFound(destClusterId, "my-new-link-1", "topic-1", 5000L));
    }

    private boolean waitAndVerifyClusterLinkNotFound(String str, String str2, String str3, Long l) {
        Assertions.assertTrue(l.longValue() >= 0);
        Long valueOf = Long.valueOf(System.nanoTime() + (l.longValue() * 1000000));
        Response mirror = getMirror(str, str2, str3, false);
        while (true) {
            Response response = mirror;
            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
                String str4 = (String) response.readEntity(String.class);
                Assertions.assertTrue(str4.contains(new ClusterLinkNotFoundException("").getTitle()));
                Assertions.assertFalse(str4.contains(new MirrorTopicNotFoundException("").getTitle()));
                return true;
            }
            if (Long.valueOf(System.nanoTime()).longValue() > valueOf.longValue()) {
                return false;
            }
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            mirror = getMirror(str, str2, str3, false);
        }
    }

    @Test
    public void testGetMirrorWithLinkPrefix() {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", sourceCluster().bootstrapServers(this.sourceCluster.listenerName()));
        properties.put(ClusterLinkConfig.ClusterLinkPrefixProp(), "src_");
        this.destCluster.createClusterLink("my-new-link-1", properties, Option.apply(srcClusterId), false);
        this.sourceCluster.createTopic("topic-1", 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-1", "src_topic-1", (Short) 3, Collections.emptyMap()), Response.Status.CREATED);
        verifyGetMirror(getMirror(destClusterId, "my-new-link-1", "src_topic-1", false), "src_topic-1", "src_");
    }

    @Test
    public void testGetMirrorWrongLinkName() {
        String destClusterId = getDestClusterId();
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.65
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(getSrcClusterId()), false);
        this.sourceCluster.createTopic("topic-1", 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(getMirror(destClusterId, "my-new-link-1wrong", "topic-1", false), Response.Status.NOT_FOUND);
    }

    @Test
    public void testPausedMirrorTopicLag() throws InterruptedException {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        Map<String, Integer> singletonMap = Collections.singletonMap("topic-to-pause-1", null);
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.66
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        this.sourceCluster.createTopic("topic-to-pause-1", 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-to-pause-1", (String) null, (Short) 1, Collections.emptyMap()), Response.Status.CREATED);
        verifyAlterMirror(pauseMirrors(destClusterId, "my-new-link-1", singletonMap.keySet(), null), singletonMap);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "paused", singletonMap.keySet());
        }, 15000L, "paused mirrors should transit to paused status");
        Assertions.assertEquals(-1L, ((MirrorLagData) verifyGetMirror(getMirror(destClusterId, "my-new-link-1", "topic-to-pause-1", false), "topic-to-pause-1", "").getValue().getMirrorLags().get(0)).getLag().longValue());
    }

    @Test
    public void testAlterMirror() throws InterruptedException {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        String str = "my-new-link-1";
        final String str2 = "topic-to-promote-1";
        final String str3 = "topic-to-promote-2";
        final String str4 = "topic-to-failover-1";
        final String str5 = "topic-to-failover-2";
        final String str6 = "topic-to-pause-1";
        final String str7 = "topic-to-pause-2";
        final HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.67
            {
                put(str2, null);
                put(str3, null);
            }
        };
        final HashMap<String, Integer> hashMap2 = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.68
            {
                put(str4, null);
                put(str5, null);
            }
        };
        final HashMap<String, Integer> hashMap3 = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.69
            {
                put(str6, null);
                put(str7, null);
            }
        };
        HashSet<String> hashSet = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.70
            {
                addAll(hashMap.keySet());
                addAll(hashMap2.keySet());
            }
        };
        HashSet<String> hashSet2 = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.71
            {
                addAll(hashMap.keySet());
                addAll(hashMap2.keySet());
                addAll(hashMap3.keySet());
            }
        };
        short s = 3;
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.72
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
                put("request.timeout.ms", "1000");
                put("default.api.timeout.ms", "1000");
            }
        }, Option.apply(srcClusterId), false);
        hashSet2.forEach(str8 -> {
            this.sourceCluster.createTopic(str8, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str, str8, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        verifyListMirrors(listAllMirror(destClusterId, "active"), hashSet2, "");
        this.sourceCluster.killAllBrokers();
        Response promoteMirrors = promoteMirrors(destClusterId, "my-new-link-1", hashMap.keySet(), null);
        for (String str9 : hashMap.keySet()) {
            TestUtils.waitForCondition(() -> {
                Assertions.assertTrue(verifyGetMirror(getMirror(destClusterId, str, str9, true), str9, "").getValue().getMirrorStateTransitionErrors().size() > 0);
                Assertions.assertEquals(0, verifyGetMirror(getMirror(destClusterId, str, str9, false), str9, "").getValue().getMirrorStateTransitionErrors().size());
                return true;
            }, 15000L, "Failed to get mirror description as expected");
        }
        this.sourceCluster.restartDeadBrokers(false);
        this.sourceCluster.updateBootstrapServers();
        HashMap hashMap4 = new HashMap();
        hashMap4.put("bootstrap.servers", sourceCluster().bootstrapServers(this.sourceCluster.listenerName()));
        alterLinkConfigs(destClusterId, "my-new-link-1", hashMap4, false);
        for (String str10 : hashMap.keySet()) {
            TestUtils.waitForCondition(() -> {
                Assertions.assertEquals(0, verifyGetMirror(getMirror(destClusterId, str, str10, true), str10, "").getValue().getMirrorStateTransitionErrors().size());
                return true;
            }, 15000L, "Failed to get mirror description as expected");
        }
        verifyAlterMirror(promoteMirrors, hashMap);
        verifyAlterMirror(failoverMirrors(destClusterId, "my-new-link-1", hashMap2.keySet(), null), hashMap2);
        verifyAlterMirror(failoverMirrors(destClusterId, "my-new-link-1", new HashSet(), null), new HashMap());
        verifyAlterMirror(pauseMirrors(destClusterId, "my-new-link-1", hashMap3.keySet(), null), hashMap3);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "stopped", hashSet);
        }, 15000L, "promoted and failovered mirror topics should transit to stopped status");
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "paused", hashMap3.keySet());
        }, 15000L, "paused mirrors should transit to paused status");
        verifyAlterMirror(resumeMirrors(destClusterId, "my-new-link-1", hashMap3.keySet(), null), hashMap3);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "active", hashMap3.keySet());
        }, 15000L, "resumed mirrors which were paused before should transit to paused status");
    }

    @Test
    public void testReverseAndStartMirror() throws InterruptedException {
        ClusterLinkTestHarness clusterLinkTestHarness = this.destCluster;
        ClusterLinkTestHarness clusterLinkTestHarness2 = this.sourceCluster;
        RestProxyTestHarness restProxyTestHarness = this.destRestProxy;
        RestProxyTestHarness restProxyTestHarness2 = this.sourceRestProxy;
        startSourceRestProxy();
        String str = "my-new-link-1";
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        testOneSideOfBidirectionalLink(destClusterId, srcClusterId, "my-new-link-1", restProxyTestHarness, clusterLinkTestHarness2, Optional.empty());
        Response link = getLink(destClusterId, "my-new-link-1");
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), link.getStatus());
        testOneSideOfBidirectionalLink(srcClusterId, destClusterId, "my-new-link-1", restProxyTestHarness2, clusterLinkTestHarness, Optional.of(((GetLinkResponse) link.readEntity(GetLinkResponse.class)).getValue().getClusterLinkId()));
        final String str2 = "topic-to-reverse-and-start-1";
        final String str3 = "topic-to-reverse-and-start-2";
        HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.73
            {
                put(str2, null);
                put(str3, null);
            }
        };
        short s = 1;
        hashMap.keySet().forEach(str4 -> {
            clusterLinkTestHarness2.createTopic(str4, 1, 1, new Properties(), clusterLinkTestHarness2.listenerName(), clusterLinkTestHarness2.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str, str4, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        verifyListMirrors(listAllMirror(destClusterId, "active"), hashMap.keySet(), "");
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), reverseAndStartMirrors(destClusterId, "my-new-link-1", hashMap.keySet(), null, restProxyTestHarness).getStatus());
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "stopped", hashMap.keySet(), restProxyTestHarness);
        }, 15000L, "local mirror topics should transit to stopped status");
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(srcClusterId, "active", hashMap.keySet(), restProxyTestHarness2);
        }, 15000L, "remote mirror topics should transit to active status");
    }

    @Test
    public void testReverseAndPauseMirror() throws InterruptedException {
        ClusterLinkTestHarness clusterLinkTestHarness = this.destCluster;
        ClusterLinkTestHarness clusterLinkTestHarness2 = this.sourceCluster;
        RestProxyTestHarness restProxyTestHarness = this.destRestProxy;
        RestProxyTestHarness restProxyTestHarness2 = this.sourceRestProxy;
        startSourceRestProxy();
        String str = "my-new-link-1";
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        testOneSideOfBidirectionalLink(destClusterId, srcClusterId, "my-new-link-1", restProxyTestHarness, clusterLinkTestHarness2, Optional.empty());
        Response link = getLink(destClusterId, "my-new-link-1");
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), link.getStatus());
        testOneSideOfBidirectionalLink(srcClusterId, destClusterId, "my-new-link-1", restProxyTestHarness2, clusterLinkTestHarness, Optional.of(((GetLinkResponse) link.readEntity(GetLinkResponse.class)).getValue().getClusterLinkId()));
        final String str2 = "topic-to-reverse-and-pause-1";
        final String str3 = "topic-to-reverse-and-pause-2";
        HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.74
            {
                put(str2, null);
                put(str3, null);
            }
        };
        short s = 1;
        hashMap.keySet().forEach(str4 -> {
            clusterLinkTestHarness2.createTopic(str4, 1, 1, new Properties(), clusterLinkTestHarness2.listenerName(), clusterLinkTestHarness2.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str, str4, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        verifyListMirrors(listAllMirror(destClusterId, "active"), hashMap.keySet(), "");
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), reverseAndPauseMirrors(destClusterId, "my-new-link-1", hashMap.keySet(), null, restProxyTestHarness).getStatus());
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "stopped", hashMap.keySet(), restProxyTestHarness);
        }, 15000L, "local mirror topics should transit to stopped status");
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(srcClusterId, "paused", hashMap.keySet(), restProxyTestHarness2);
        }, 15000L, "remote mirror topics should transit to paused status");
    }

    @Test
    public void testAlterAllPromoteTopics() throws InterruptedException {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        String str = "my-new-link-1";
        final String str2 = "topic-to-promote-1";
        final String str3 = "topic-to-promote-2";
        HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.75
            {
                put(str2, null);
                put(str3, null);
            }
        };
        short s = 3;
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.76
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        hashMap.keySet().forEach(str4 -> {
            this.sourceCluster.createTopic(str4, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str, str4, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        verifyListMirrors(listAllMirror(destClusterId, "active"), hashMap.keySet(), "");
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), promoteMirrors(destClusterId, "my-new-link-1", new HashSet(Arrays.asList("topic-to-promote-1", "topic-to-promote-2")), ".*").getStatus());
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), promoteMirrors(destClusterId, "my-new-link-1", new HashSet(), null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), promoteMirrors(destClusterId, "my-new-link-1", null, null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), promoteMirrors(destClusterId, "my-new-link-1", null, "abc(def").getStatus());
        verifyAlterMirror(promoteMirrors(destClusterId, "my-new-link-1", null, ".*"), hashMap);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "stopped", hashMap.keySet());
        }, 15000L, "promoted mirror topics should transit to stopped status");
    }

    @Test
    public void testAlterPromoteTopicsWithRegex() throws InterruptedException {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        final String str = "topic-to-promote-1";
        final String str2 = "topic-to-promote-2";
        HashSet hashSet = new HashSet();
        hashSet.add("topic-to-promote-1");
        hashSet.add("topic-to-promote-2");
        hashSet.add("test-1");
        hashSet.add("test-2");
        HashSet hashSet2 = new HashSet();
        hashSet2.add("topic-to-promote-3");
        HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.77
            {
                put(str, null);
                put(str2, null);
            }
        };
        Properties properties = new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.78
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", properties, Option.apply(srcClusterId), false);
        this.destCluster.createClusterLink("my-new-link-2", properties, Option.apply(srcClusterId), false);
        createSourceAndMirrorTopics("my-new-link-1", hashSet, destClusterId);
        createSourceAndMirrorTopics("my-new-link-2", hashSet2, destClusterId);
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), promoteMirrors(destClusterId, "my-new-link-1", new HashSet(Arrays.asList("topic-to-promote-1", "topic-to-promote-2", "test-1", "test-2")), "^topic-to-promote.*").getStatus());
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), promoteMirrors(destClusterId, "my-new-link-1", new HashSet(), null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), promoteMirrors(destClusterId, "my-new-link-1", null, null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), promoteMirrors(destClusterId, "my-new-link-1", null, "abc(def").getStatus());
        verifyAlterMirror(promoteMirrors(destClusterId, "my-new-link-1", null, "^topic-to-promote.*"), hashMap);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "stopped", hashMap.keySet());
        }, 15000L, "promoted mirror topics should transit to stopped status");
        verifyAlterMirror(promoteMirrors(destClusterId, "my-new-link-1", null, "^topic-to-promote.*"), Collections.emptyMap());
    }

    @Test
    public void testAlterAllFailoverTopics() throws InterruptedException {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        String str = "my-new-link-1";
        final String str2 = "topic-to-failover-1";
        final String str3 = "topic-to-failover-2";
        HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.79
            {
                put(str2, null);
                put(str3, null);
            }
        };
        short s = 3;
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.80
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        hashMap.keySet().forEach(str4 -> {
            this.sourceCluster.createTopic(str4, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str, str4, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), failoverMirrors(destClusterId, "my-new-link-1", new HashSet(Arrays.asList("topic-to-failover-1", "topic-to-failover-2")), ".*").getStatus());
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), failoverMirrors(destClusterId, "my-new-link-1", new HashSet(), null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), failoverMirrors(destClusterId, "my-new-link-1", null, null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), failoverMirrors(destClusterId, "my-new-link-1", null, "abc(def").getStatus());
        verifyAlterMirror(failoverMirrors(destClusterId, "my-new-link-1", null, ".*"), hashMap);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "stopped", hashMap.keySet());
        }, 15000L, "failovered mirror topics should transit to stopped status");
    }

    @Test
    public void testAlterFailoverTopicsWithRegex() throws InterruptedException {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        final String str = "topic-to-failover-1";
        final String str2 = "topic-to-failover-2";
        HashSet hashSet = new HashSet();
        hashSet.add("topic-to-failover-1");
        hashSet.add("topic-to-failover-2");
        hashSet.add("test-1");
        hashSet.add("test-2");
        HashSet hashSet2 = new HashSet();
        hashSet2.add("topic-to-failover-3");
        HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.81
            {
                put(str, null);
                put(str2, null);
            }
        };
        Properties properties = new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.82
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", properties, Option.apply(srcClusterId), false);
        this.destCluster.createClusterLink("my-new-link-2", properties, Option.apply(srcClusterId), false);
        createSourceAndMirrorTopics("my-new-link-1", hashSet, destClusterId);
        createSourceAndMirrorTopics("my-new-link-2", hashSet2, destClusterId);
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), failoverMirrors(destClusterId, "my-new-link-1", new HashSet(Arrays.asList("topic-to-failover-1", "topic-to-failover-2", "test-1", "test-2")), "^topic-to-failover.*").getStatus());
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), failoverMirrors(destClusterId, "my-new-link-1", new HashSet(), null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), failoverMirrors(destClusterId, "my-new-link-1", null, null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), failoverMirrors(destClusterId, "my-new-link-1", null, "abc(def").getStatus());
        verifyAlterMirror(failoverMirrors(destClusterId, "my-new-link-1", null, "^topic-to-failover.*"), hashMap);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "stopped", hashMap.keySet());
        }, 15000L, "failovered mirror topics should transit to stopped status");
        verifyAlterMirror(failoverMirrors(destClusterId, "my-new-link-1", null, "^topic-to-promote.*"), Collections.emptyMap());
    }

    @Test
    public void testAlterAllPauseAndResumeTopics() throws InterruptedException {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        String str = "my-new-link-1";
        final String str2 = "topic-to-pause-1";
        final String str3 = "topic-to-pause-2";
        HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.83
            {
                put(str2, null);
                put(str3, null);
            }
        };
        short s = 3;
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.84
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        hashMap.keySet().forEach(str4 -> {
            this.sourceCluster.createTopic(str4, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str, str4, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), pauseMirrors(destClusterId, "my-new-link-1", new HashSet(Arrays.asList("topic-to-pause-1", "topic-to-pause-2")), ".*").getStatus());
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), pauseMirrors(destClusterId, "my-new-link-1", new HashSet(), null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), pauseMirrors(destClusterId, "my-new-link-1", null, null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), pauseMirrors(destClusterId, "my-new-link-1", null, "abc(def").getStatus());
        verifyAlterMirror(pauseMirrors(destClusterId, "my-new-link-1", null, ".*"), hashMap);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "paused", hashMap.keySet());
        }, 15000L, "paused mirrors should transit to paused status");
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resumeMirrors(destClusterId, "my-new-link-1", new HashSet(Arrays.asList("topic-to-pause-1", "topic-to-pause-2")), ".*").getStatus());
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), resumeMirrors(destClusterId, "my-new-link-1", new HashSet(), null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resumeMirrors(destClusterId, "my-new-link-1", null, null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resumeMirrors(destClusterId, "my-new-link-1", null, "abc(def").getStatus());
        verifyAlterMirror(resumeMirrors(destClusterId, "my-new-link-1", null, ".*"), hashMap);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "active", hashMap.keySet());
        }, 15000L, "resumed mirrors which were paused before should transit to active status");
    }

    @Test
    public void testAlterPauseAndResumeTopicsWithRegex() throws InterruptedException {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        final String str = "topic-to-pause-1";
        final String str2 = "topic-to-pause-2";
        HashSet hashSet = new HashSet();
        hashSet.add("topic-to-pause-1");
        hashSet.add("topic-to-pause-2");
        hashSet.add("test-1");
        hashSet.add("test-2");
        HashSet hashSet2 = new HashSet();
        hashSet2.add("topic-to-pause-3");
        HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.85
            {
                put(str, null);
                put(str2, null);
            }
        };
        HashMap<String, Integer> hashMap2 = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.86
            {
                put(str, null);
            }
        };
        Properties properties = new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.87
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        };
        this.destCluster.createClusterLink("my-new-link-1", properties, Option.apply(srcClusterId), false);
        this.destCluster.createClusterLink("my-new-link-2", properties, Option.apply(srcClusterId), false);
        createSourceAndMirrorTopics("my-new-link-1", hashSet, destClusterId);
        createSourceAndMirrorTopics("my-new-link-2", hashSet2, destClusterId);
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), pauseMirrors(destClusterId, "my-new-link-1", new HashSet(Arrays.asList("topic-to-pause-1", "topic-to-pause-2", "test-1", "test-2")), "^topic-to-pause.*").getStatus());
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), pauseMirrors(destClusterId, "my-new-link-1", new HashSet(), null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), pauseMirrors(destClusterId, "my-new-link-1", null, null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), pauseMirrors(destClusterId, "my-new-link-1", null, "abc(def").getStatus());
        verifyAlterMirror(pauseMirrors(destClusterId, "my-new-link-1", null, "^topic-to-pause.*"), hashMap);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "paused", hashMap.keySet());
        }, 15000L, "paused mirrors should transit to paused status");
        failoverMirrors(destClusterId, "my-new-link-1", new HashSet(Arrays.asList("test-1", "test-2")), null);
        failoverMirrors(destClusterId, "my-new-link-2", new HashSet(Arrays.asList("topic-to-pause-3")), null);
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resumeMirrors(destClusterId, "my-new-link-1", new HashSet(Arrays.asList("topic-to-pause-1", "topic-to-pause-2", "test-1", "test-2")), ".*pause-1$").getStatus());
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), resumeMirrors(destClusterId, "my-new-link-1", new HashSet(), null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resumeMirrors(destClusterId, "my-new-link-1", null, null).getStatus());
        Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resumeMirrors(destClusterId, "my-new-link-1", null, "abc(def").getStatus());
        verifyAlterMirror(resumeMirrors(destClusterId, "my-new-link-1", null, ".*pause-1$"), hashMap2);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "active", hashMap2.keySet());
        }, 15000L, "resumed mirrors which were paused before should transit to active status");
    }

    @Test
    public void testAlterMirrorMissingRequestBodyField() {
        String destClusterId = getDestClusterId();
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.88
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(getSrcClusterId()), false);
        this.sourceCluster.createTopic("topic-to-promote-1", 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "my-new-link-1", "topic-to-promote-1", (String) null, (Short) 3, Collections.emptyMap()), Response.Status.CREATED);
        verifyResponse(this.destRestProxy.request("/v3/clusters/" + destClusterId + "/links/my-new-link-1/mirrors:promote").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(new JSONObject().toString(), MediaType.APPLICATION_JSON_TYPE)), Response.Status.BAD_REQUEST);
    }

    @Test
    public void testAlterMirrorPartialSuccessIdempotence() throws InterruptedException {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        String str = "my-new-link-1";
        final String str2 = "topic-to-promote-1";
        final String str3 = "topic-to-promote-2";
        final String str4 = "topic-to-promote-3";
        final String str5 = "topic-to-promote-4";
        final String str6 = "topic-to-failover-1";
        final String str7 = "topic-to-failover-2";
        final String str8 = "topic-to-failover-3";
        final String str9 = "topic-to-failover-4";
        final String str10 = "topic-to-pause-1";
        final String str11 = "topic-to-pause-2";
        final String str12 = "topic-to-pause-3";
        final String str13 = "topic-to-pause-4";
        final HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.89
            {
                put(str2, null);
                put(str3, null);
            }
        };
        final HashMap<String, Integer> hashMap2 = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.90
            {
                put(str2, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
                put(str3, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
                put(str4, null);
                put(str5, null);
            }
        };
        final HashMap<String, Integer> hashMap3 = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.91
            {
                put(str6, null);
                put(str7, null);
            }
        };
        final HashMap<String, Integer> hashMap4 = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.92
            {
                put(str6, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
                put(str7, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
                put(str8, null);
                put(str9, null);
            }
        };
        final HashMap<String, Integer> hashMap5 = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.93
            {
                put(str10, null);
                put(str11, null);
            }
        };
        final HashMap<String, Integer> hashMap6 = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.94
            {
                put(str10, null);
                put(str11, null);
                put(str12, null);
                put(str13, null);
            }
        };
        final HashSet<String> hashSet = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.95
            {
                addAll(hashMap.keySet());
                addAll(hashMap2.keySet());
                addAll(hashMap3.keySet());
                addAll(hashMap4.keySet());
            }
        };
        HashSet<String> hashSet2 = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.96
            {
                addAll(hashSet);
                addAll(hashMap5.keySet());
                addAll(hashMap6.keySet());
            }
        };
        short s = 3;
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.97
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        hashSet2.forEach(str14 -> {
            this.sourceCluster.createTopic(str14, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str, str14, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        verifyListMirrors(listAllMirror(destClusterId, "active"), hashSet2, "");
        verifyAlterMirror(promoteMirrors(destClusterId, "my-new-link-1", hashMap.keySet(), null), hashMap);
        verifyAlterMirror(promoteMirrors(destClusterId, "my-new-link-1", hashMap2.keySet(), null), hashMap2);
        verifyAlterMirror(failoverMirrors(destClusterId, "my-new-link-1", hashMap3.keySet(), null), hashMap3);
        verifyAlterMirror(failoverMirrors(destClusterId, "my-new-link-1", hashMap4.keySet(), null), hashMap4);
        verifyAlterMirror(pauseMirrors(destClusterId, "my-new-link-1", hashMap5.keySet(), null), hashMap5);
        verifyAlterMirror(pauseMirrors(destClusterId, "my-new-link-1", hashMap6.keySet(), null), hashMap6);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "stopped", hashSet);
        }, 15000L, "promoted and failovered mirror topics should transit to stopped status");
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "paused", hashMap6.keySet());
        }, 15000L, "paused mirrors should transit to paused status");
        verifyAlterMirror(resumeMirrors(destClusterId, "my-new-link-1", hashMap5.keySet(), null), hashMap5);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "active", hashMap5.keySet());
        }, 15000L, "resumed mirrors which were paused before should transit to paused status");
    }

    @Test
    public void testAlterMirrorPartialSuccessTopicDoesNotExist() throws InterruptedException {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        String str = "my-new-link-1";
        final String str2 = "topic-to-promote-1";
        final String str3 = "topic-to-promote-2";
        final String str4 = "topic-to-promote-3";
        final String str5 = "topic-to-promote-4";
        final String str6 = "topic-to-failover-1";
        final String str7 = "topic-to-failover-2";
        final String str8 = "topic-to-failover-3";
        final String str9 = "topic-to-failover-4";
        final String str10 = "topic-to-pause-1";
        final String str11 = "topic-to-pause-2";
        final String str12 = "topic-to-pause-3";
        final String str13 = "topic-to-pause-4";
        final HashSet<String> hashSet = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.98
            {
                add(str2);
                add(str3);
            }
        };
        HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.99
            {
                put(str2, null);
                put(str3, null);
                put(str4, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
                put(str5, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
            }
        };
        final HashSet<String> hashSet2 = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.100
            {
                add(str6);
                add(str7);
            }
        };
        HashMap<String, Integer> hashMap2 = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.101
            {
                put(str6, null);
                put(str7, null);
                put(str8, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
                put(str9, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
            }
        };
        final HashSet<String> hashSet3 = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.102
            {
                add(str10);
                add(str11);
            }
        };
        HashMap<String, Integer> hashMap3 = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.103
            {
                put(str10, null);
                put(str11, null);
                put(str12, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
                put(str13, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
            }
        };
        HashSet<String> hashSet4 = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.104
            {
                addAll(hashSet);
                addAll(hashSet2);
            }
        };
        HashSet<String> hashSet5 = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.105
            {
                addAll(hashSet);
                addAll(hashSet2);
                addAll(hashSet3);
            }
        };
        short s = 3;
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.106
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        hashSet5.forEach(str14 -> {
            this.sourceCluster.createTopic(str14, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str, str14, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        verifyListMirrors(listAllMirror(destClusterId, "active"), hashSet5, "");
        verifyAlterMirror(promoteMirrors(destClusterId, "my-new-link-1", hashMap.keySet(), null), hashMap);
        verifyAlterMirror(failoverMirrors(destClusterId, "my-new-link-1", hashMap2.keySet(), null), hashMap2);
        verifyAlterMirror(pauseMirrors(destClusterId, "my-new-link-1", hashMap3.keySet(), null), hashMap3);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "stopped", hashSet4);
        }, 15000L, "promoted and failovered mirror topics should transit to stopped status");
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "paused", hashSet3);
        }, 15000L, "paused mirrors should transit to paused status");
        verifyAlterMirror(resumeMirrors(destClusterId, "my-new-link-1", hashMap3.keySet(), null), hashMap3);
        TestUtils.waitForCondition(() -> {
            return verifyListMirrorsSync(destClusterId, "active", hashSet3);
        }, 15000L, "resumed mirrors which were paused before should transit to paused status");
    }

    @Test
    public void testAlterMirrorPartialSuccessWrongLink() throws InterruptedException {
        String destClusterId = getDestClusterId();
        String srcClusterId = getSrcClusterId();
        String str = "my-new-link-1";
        final String str2 = "topic-to-promote-1";
        final String str3 = "topic-to-promote-2";
        HashSet<String> hashSet = new HashSet<String>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.107
            {
                add(str2);
                add(str3);
            }
        };
        HashMap<String, Integer> hashMap = new HashMap<String, Integer>() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.108
            {
                put(str2, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
                put(str3, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()));
            }
        };
        short s = 3;
        this.destCluster.createClusterLink("my-new-link-1", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.109
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(srcClusterId), false);
        hashSet.forEach(str4 -> {
            this.sourceCluster.createTopic(str4, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(destClusterId, str, str4, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        verifyAlterMirror(promoteMirrors(destClusterId, "my-new-link-1wrong", hashMap.keySet(), null), hashMap);
    }

    @Test
    public void testLastFetchedOffsetFailedOverMirrorTopicDescription() throws InterruptedException {
        testLastFetchedOffsetStoppedMirrorTopicDescription(false);
    }

    @Test
    public void testLastFetchedOffsetPromotedMirrorTopicDescription() throws InterruptedException {
        testLastFetchedOffsetStoppedMirrorTopicDescription(true);
    }

    private void testLastFetchedOffsetStoppedMirrorTopicDescription(boolean z) throws InterruptedException {
        String destClusterId = getDestClusterId();
        String str = "linkedTopic-1";
        Short sh = 2;
        Short sh2 = 1;
        this.destCluster.createClusterLink("testLinkName", new Properties() { // from class: io.confluent.kafkarest.integration.v3.CLResourceIntegrationTest.110
            {
                put("bootstrap.servers", CLResourceIntegrationTest.this.sourceCluster().bootstrapServers(CLResourceIntegrationTest.this.sourceCluster.listenerName()));
            }
        }, Option.apply(getSrcClusterId()), false);
        this.sourceCluster.createTopic("linkedTopic-1", sh2.shortValue(), sh.shortValue(), new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
        verifyResponse(createMirror(destClusterId, "testLinkName", "linkedTopic-1", (String) null, sh, Collections.emptyMap()), Response.Status.CREATED);
        io.confluent.kafkarest.TestUtils.waitForCondition(() -> {
            return Boolean.valueOf(this.destCluster.leaderEpoch(new TopicPartition(str, 0)) >= 1);
        }, 15000L, "Destination leader epoch not updated");
        produceToTopicPartition(this.sourceCluster, "linkedTopic-1", 0, 0, 10);
        waitForMirror("testLinkName", "linkedTopic-1", 15000L);
        Assertions.assertEquals(10L, getLastSourceFetchHighWatermarkForMirrorPartition("testLinkName", "linkedTopic-1", 0));
        verifyAlterMirror(z ? promoteMirrors(destClusterId, "testLinkName", Collections.singleton("linkedTopic-1"), null) : failoverMirrors(destClusterId, "testLinkName", Collections.singleton("linkedTopic-1"), null), Collections.singletonMap("linkedTopic-1", null));
        io.confluent.kafkarest.TestUtils.waitForCondition(() -> {
            return Boolean.valueOf(verifyListMirrorsSync(destClusterId, "stopped", Collections.singleton(str)));
        }, 15000L, "Mirror did not stop after trying to stop it.");
        produceToTopicPartition(this.destCluster, "linkedTopic-1", 0, 10, 10);
        Assertions.assertEquals(10L, getLastSourceFetchHighWatermarkForMirrorPartition("testLinkName", "linkedTopic-1", 0));
    }

    private long getLastSourceFetchHighWatermarkForMirrorPartition(String str, String str2, int i) {
        Response mirror = getMirror(getDestClusterId(), str, str2, false);
        Assertions.assertEquals(mirror.getStatus(), Response.Status.OK.getStatusCode());
        return ((MirrorLagData) ((GetMirrorResponse) mirror.readEntity(GetMirrorResponse.class)).getValue().getMirrorLags().get(i)).getLastSourceFetchHighWatermark().longValue();
    }

    private void produceToTopicPartition(@NotNull ClusterLinkTestHarness clusterLinkTestHarness, String str, int i, int i2, int i3) {
        KafkaProducer orCreateProducer = clusterLinkTestHarness.getOrCreateProducer();
        for (int i4 = i2; i4 < i3; i4++) {
            orCreateProducer.send(new ProducerRecord(str, Integer.valueOf(i), (Long) null, Integer.toString(i4).getBytes(), Integer.toString(i4).getBytes()));
        }
        orCreateProducer.flush();
        orCreateProducer.close();
    }

    private void waitForMirror(String str, String str2, long j) throws InterruptedException {
        Response mirror = getMirror(getDestClusterId(), str, str2, false);
        GetMirrorResponse getMirrorResponse = (GetMirrorResponse) mirror.readEntity(GetMirrorResponse.class);
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), mirror.getStatus());
        Assertions.assertEquals(getMirrorResponse.getValue().getLinkName(), str);
        Assertions.assertEquals(getMirrorResponse.getValue().getMirrorTopicName(), str2);
        int numPartitions = getMirrorResponse.getValue().getNumPartitions();
        Assertions.assertTrue(numPartitions > 0);
        HashedMap hashedMap = new HashedMap();
        for (int i = 0; i < numPartitions; i++) {
            hashedMap.put(Integer.valueOf(i), Long.valueOf(((ReplicaStatus) ((Iterable) this.destCluster.replicaStatus(str2, i, false).filter(replicaStatus -> {
                return Boolean.valueOf(replicaStatus.isLeader());
            })).head()).logEndOffset()));
        }
        for (int i2 = 0; i2 < numPartitions; i2++) {
            long longValue = ((Long) hashedMap.get(Integer.valueOf(i2))).longValue();
            int i3 = i2;
            io.confluent.kafkarest.TestUtils.waitForCondition(() -> {
                return Boolean.valueOf(((ReplicaStatus) ((Iterable) this.destCluster.replicaStatus(str2, i3, false).filter(replicaStatus2 -> {
                    return Boolean.valueOf(replicaStatus2.isLeader());
                })).head()).logEndOffset() == longValue);
            }, j, "Mirror partition " + new TopicPartition(str2, i2) + " didn't reach expected offset " + longValue);
        }
    }

    private Response createLink(String str, String str2, String str3, String str4, String str5, Map<String, String> map, boolean z, boolean z2, boolean z3, RestProxyTestHarness restProxyTestHarness, Optional<String> optional) {
        HashMap hashMap = new HashMap();
        hashMap.put("link_name", str5);
        hashMap.put("validate_link", z ? "true" : "false");
        hashMap.put("validate_only", z2 ? "true" : "false");
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("configs", (Collection) map.entrySet().stream().map(entry -> {
            JSONObject jSONObject2 = new JSONObject();
            jSONObject2.put("name", entry.getKey());
            jSONObject2.put("value", entry.getValue());
            return jSONObject2;
        }).collect(Collectors.toList()));
        if (!z3) {
            if (!str2.isEmpty()) {
                jSONObject.put("source_cluster_id", str2);
            }
            if (!str3.isEmpty()) {
                jSONObject.put("destination_cluster_id", str3);
            }
        } else if (!str4.isEmpty()) {
            jSONObject.put("remote_cluster_id", str4);
        }
        if (optional.isPresent()) {
            jSONObject.put("cluster_link_id", optional.get());
        }
        return restProxyTestHarness.request("/v3/clusters/" + str + "/links", hashMap).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
    }

    private Response createDestinationLink(String str, String str2, String str3, Map<String, String> map, boolean z) {
        return createLink(str, str2, "", str2, str3, map, false, false, z, this.destRestProxy, Optional.empty());
    }

    private Response createDestinationLinkValidateOnly(String str, String str2, String str3, Map<String, String> map, boolean z) {
        return createLink(str, str2, "", str2, str3, map, false, true, z, this.destRestProxy, Optional.empty());
    }

    private Response createDestinationLinkValidateLink(String str, String str2, String str3, Map<String, String> map, boolean z) {
        return createLink(str, str2, "", str2, str3, map, true, false, z, this.destRestProxy, Optional.empty());
    }

    private Response createBidirectionalLinkValidateLink(String str, String str2, String str3, Map<String, String> map, RestProxyTestHarness restProxyTestHarness, Optional<String> optional) {
        return createLink(str, "", "", str2, str3, map, true, false, true, restProxyTestHarness, optional);
    }

    private Response createSourceLinkValidateLink(String str, String str2, String str3, Map<String, String> map, boolean z) {
        return createLink(str, "", str2, str2, str3, map, true, false, z, this.sourceRestProxy, Optional.empty());
    }

    private Response listLinks(RestProxyTestHarness restProxyTestHarness, String str) {
        return restProxyTestHarness.request("/v3/clusters/" + str + "/links").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).get();
    }

    private Response deleteLink(String str, String str2, boolean z, boolean z2) {
        HashMap hashMap = new HashMap();
        hashMap.put("force", z ? "true" : "false");
        hashMap.put("validate_only", z2 ? "true" : "false");
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2, hashMap).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).delete();
    }

    private void verifyNoLinks(String str) {
        listAndVerifyLinks(this.destRestProxy, str, Collections.emptyList());
    }

    private void listAndVerifyLinks(RestProxyTestHarness restProxyTestHarness, String str, List<CLTestHarness.LinkMatcher> list) {
        String restBaseUrl = restProxyTestHarness.getRestBaseUrl();
        ListLinksResponse listLinksResponse = (ListLinksResponse) listLinks(restProxyTestHarness, str).readEntity(ListLinksResponse.class);
        Assertions.assertEquals("KafkaLinkDataList", listLinksResponse.getValue().getKind());
        ArrayList arrayList = new ArrayList((Collection) listLinksResponse.getValue().getData());
        Assertions.assertEquals(list.size(), arrayList.size());
        list.sort(Comparator.comparing((v0) -> {
            return v0.getLinkName();
        }));
        arrayList.sort(Comparator.comparing((v0) -> {
            return v0.getLinkName();
        }));
        for (int i = 0; i < list.size(); i++) {
            LinkData linkData = (LinkData) arrayList.get(i);
            CLTestHarness.LinkMatcher linkMatcher = list.get(i);
            linkMatcher.match(linkData);
            Assertions.assertEquals(restBaseUrl + "/v3/clusters/" + str + "/links/" + linkMatcher.getLinkName(), linkData.getMetadata().getSelf());
            Assertions.assertEquals("KafkaLinkData", linkData.getKind());
        }
    }

    private Response getLink(String str, String str2, boolean z) {
        HashMap hashMap = new HashMap();
        if (z) {
            hashMap.put("include_tasks", "true");
        }
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2, hashMap).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).get();
    }

    private Response getLink(String str, String str2) {
        return getLink(str, str2, false);
    }

    private void verifyGetLink(String str, String str2, boolean z, CLTestHarness.LinkMatcher linkMatcher) {
        String restBaseUrl = this.destRestProxy.getRestBaseUrl();
        Response link = getLink(str, str2, z);
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), link.getStatus());
        LinkData value = ((GetLinkResponse) link.readEntity(GetLinkResponse.class)).getValue();
        linkMatcher.match(value);
        Assertions.assertEquals(restBaseUrl + "/v3/clusters/" + str + "/links/" + str2, value.getMetadata().getSelf());
        Assertions.assertEquals("KafkaLinkData", value.getKind());
        if (!z) {
            Assertions.assertEquals(0, value.getTasks().size());
            return;
        }
        Assertions.assertTrue(value.getTasks().size() >= 4);
        Set set = (Set) value.getTasks().stream().map(linkTaskData -> {
            return linkTaskData.getTaskName();
        }).collect(Collectors.toSet());
        Assertions.assertTrue(set.contains("ConsumerOffsetSync"));
        Assertions.assertTrue(set.contains("AutoCreateMirror"));
        Assertions.assertTrue(set.contains("AclSync"));
        Assertions.assertTrue(set.contains("TopicConfigsSync"));
    }

    private Response listLinkConfigs(String str, String str2) {
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2 + "/configs").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).get();
    }

    private void verifyListLinkConfigs(String str, String str2, Map<String, String> map) {
        String restBaseUrl = this.destRestProxy.getRestBaseUrl();
        Response listLinkConfigs = listLinkConfigs(str, str2);
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), listLinkConfigs.getStatus());
        LinkConfigDataList value = ((ListLinkConfigsResponse) listLinkConfigs.readEntity(ListLinkConfigsResponse.class)).getValue();
        Assertions.assertEquals("KafkaLinkConfigDataList", value.getKind());
        Assertions.assertEquals(restBaseUrl + "/v3/clusters/" + str + "/links/" + str2 + "/configs", value.getMetadata().getSelf());
        for (LinkConfigData linkConfigData : value.getData().asList()) {
            Assertions.assertEquals(str2, linkConfigData.getLinkName());
            if (map.get(linkConfigData.getName()) != null) {
                Assertions.assertTrue(linkConfigData.getValue().isPresent());
                Assertions.assertEquals(map.get(linkConfigData.getName()), linkConfigData.getValue().get());
            }
        }
    }

    private Response getLinkConfigs(String str, String str2, String str3, RestProxyTestHarness restProxyTestHarness) {
        return restProxyTestHarness.request("/v3/clusters/" + str + "/links/" + str2 + "/configs/" + str3).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).get();
    }

    private void getAndVerifyLinkConfigs(String str, String str2, String str3, String str4, RestProxyTestHarness restProxyTestHarness) {
        String restBaseUrl = restProxyTestHarness.getRestBaseUrl();
        Response linkConfigs = getLinkConfigs(str, str2, str3, restProxyTestHarness);
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), linkConfigs.getStatus());
        LinkConfigData value = ((GetLinkConfigResponse) linkConfigs.readEntity(GetLinkConfigResponse.class)).getValue();
        Assertions.assertEquals("KafkaLinkConfigData", value.getKind());
        Assertions.assertEquals(restBaseUrl + "/v3/clusters/" + str + "/links/" + str2 + "/configs/" + str3, value.getMetadata().getSelf());
        Assertions.assertEquals(str2, value.getLinkName());
        Assertions.assertEquals(str3, value.getName());
        Assertions.assertTrue(value.getValue().isPresent());
        Assertions.assertEquals(str4, value.getValue().get());
    }

    private Response deleteLinkConfig(String str, String str2, String str3) {
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2 + "/configs/" + str3).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).delete();
    }

    private void verifyDeleteLinkConfig(String str, String str2, String str3) {
        Assertions.assertEquals(Response.Status.NO_CONTENT.getStatusCode(), deleteLinkConfig(str, str2, str3).getStatus());
    }

    private Response updateLinkConfig(String str, String str2, String str3, String str4) {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("value", str4);
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2 + "/configs/" + str3).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).put(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
    }

    private Response alterLinkConfigs(String str, String str2, Map<String, String> map, boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put("linkName", str2);
        hashMap.put("validate_only", z ? "true" : "false");
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("data", (Collection) map.entrySet().stream().map(entry -> {
            JSONObject jSONObject2 = new JSONObject();
            jSONObject2.put("name", entry.getKey());
            return (((String) entry.getValue()).equals("DELETE") || ((String) entry.getValue()).equals("SET")) ? jSONObject2.put("operation", entry.getValue()) : jSONObject2.put("value", entry.getValue());
        }).collect(Collectors.toList()));
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2 + "/configs:alter", hashMap).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).put(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
    }

    private Response createMirror(String str, String str2, String str3, String str4, Short sh, Map<String, String> map) {
        return createMirror(str, str2, str3, str4, Optional.of(sh), map);
    }

    private Response createMirror(String str, String str2, String str3, String str4, Optional<Short> optional, Map<String, String> map) {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("source_topic_name", str3);
        if (str4 != null) {
            jSONObject.put("mirror_topic_name", str4);
        }
        if (optional.isPresent()) {
            jSONObject.put("replication_factor", optional.get());
        }
        jSONObject.put("configs", (Collection) map.entrySet().stream().map(entry -> {
            JSONObject jSONObject2 = new JSONObject();
            jSONObject2.put("name", entry.getKey());
            jSONObject2.put("value", entry.getValue());
            return jSONObject2;
        }).collect(Collectors.toList()));
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2 + "/mirrors").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
    }

    private Response listAllMirror(String str) {
        return listAllMirror(str, null);
    }

    private Response listAllMirror(String str, String str2) {
        return listAllMirror(str, str2, this.destRestProxy);
    }

    private Response listAllMirror(String str, String str2, RestProxyTestHarness restProxyTestHarness) {
        HashMap hashMap = new HashMap();
        if (str2 != null) {
            hashMap.put("mirror_status", str2);
        }
        return restProxyTestHarness.request("/v3/clusters/" + str + "/links/-/mirrors", hashMap).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).get();
    }

    private Response listMirror(String str, String str2, String str3) {
        HashMap hashMap = new HashMap();
        if (str3 != null) {
            hashMap.put("mirror_status", str3);
        }
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2 + "/mirrors", hashMap).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).get();
    }

    private Response getMirror(String str, String str2, String str3, boolean z) {
        HashMap hashMap = new HashMap();
        if (z) {
            hashMap.put("include_state_transition_errors", "true");
        }
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2 + "/mirrors/" + str3, hashMap).accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).get();
    }

    private Response promoteMirrors(String str, String str2, Set<String> set, String str3) {
        JSONObject jSONObject = new JSONObject();
        if (set != null) {
            jSONObject.put("mirror_topic_names", (Collection) set);
        }
        if (str3 != null) {
            jSONObject.put("mirror_topic_name_pattern", str3);
        }
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2 + "/mirrors:promote").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
    }

    private Response failoverMirrors(String str, String str2, Set<String> set, String str3) {
        JSONObject jSONObject = new JSONObject();
        if (set != null) {
            jSONObject.put("mirror_topic_names", (Collection) set);
        }
        if (str3 != null) {
            jSONObject.put("mirror_topic_name_pattern", str3);
        }
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2 + "/mirrors:failover").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
    }

    private Response pauseMirrors(String str, String str2, Set<String> set, String str3) {
        JSONObject jSONObject = new JSONObject();
        if (set != null) {
            jSONObject.put("mirror_topic_names", (Collection) set);
        }
        if (str3 != null) {
            jSONObject.put("mirror_topic_name_pattern", str3);
        }
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2 + "/mirrors:pause").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
    }

    private Response resumeMirrors(String str, String str2, Set<String> set, String str3) {
        JSONObject jSONObject = new JSONObject();
        if (set != null) {
            jSONObject.put("mirror_topic_names", (Collection) set);
        }
        if (str3 != null) {
            jSONObject.put("mirror_topic_name_pattern", str3);
        }
        return this.destRestProxy.request("/v3/clusters/" + str + "/links/" + str2 + "/mirrors:resume").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
    }

    private Response reverseAndStartMirrors(String str, String str2, Set<String> set, String str3, RestProxyTestHarness restProxyTestHarness) {
        JSONObject jSONObject = new JSONObject();
        if (set != null) {
            jSONObject.put("mirror_topic_names", (Collection) set);
        }
        if (str3 != null) {
            jSONObject.put("mirror_topic_name_pattern", str3);
        }
        return restProxyTestHarness.request("/v3/clusters/" + str + "/links/" + str2 + "/mirrors:reverse-and-start-mirror").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
    }

    private Response reverseAndPauseMirrors(String str, String str2, Set<String> set, String str3, RestProxyTestHarness restProxyTestHarness) {
        JSONObject jSONObject = new JSONObject();
        if (set != null) {
            jSONObject.put("mirror_topic_names", (Collection) set);
        }
        if (str3 != null) {
            jSONObject.put("mirror_topic_name_pattern", str3);
        }
        return restProxyTestHarness.request("/v3/clusters/" + str + "/links/" + str2 + "/mirrors:reverse-and-pause-mirror").accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).post(Entity.entity(jSONObject.toString(), MediaType.APPLICATION_JSON_TYPE));
    }

    private boolean verifyListMirrorsSync(String str, String str2, Set<String> set) {
        Response listAllMirror = listAllMirror(str, str2);
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), listAllMirror.getStatus());
        ListMirrorsResponse listMirrorsResponse = (ListMirrorsResponse) listAllMirror.readEntity(ListMirrorsResponse.class);
        Predicate predicate = mirrorData -> {
            return set.contains(mirrorData.getMirrorTopicName()) && mirrorData.getSourceTopicName().equals(mirrorData.getMirrorTopicName()) && mirrorData.getMirrorStatus() != null;
        };
        UnmodifiableIterator it = listMirrorsResponse.getValue().getData().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(MirrorTopicError.NO_ERROR.name(), ((MirrorData) it.next()).getMirrorTopicError());
        }
        return set.size() == listMirrorsResponse.getValue().getData().size() && ((long) set.size()) == listMirrorsResponse.getValue().getData().stream().filter(predicate).count();
    }

    private boolean verifyListMirrorsSync(String str, String str2, Set<String> set, RestProxyTestHarness restProxyTestHarness) {
        Response listAllMirror = listAllMirror(str, str2, restProxyTestHarness);
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), listAllMirror.getStatus());
        ListMirrorsResponse listMirrorsResponse = (ListMirrorsResponse) listAllMirror.readEntity(ListMirrorsResponse.class);
        Predicate predicate = mirrorData -> {
            return set.contains(mirrorData.getMirrorTopicName()) && mirrorData.getSourceTopicName().equals(mirrorData.getMirrorTopicName()) && mirrorData.getMirrorStatus() != null;
        };
        UnmodifiableIterator it = listMirrorsResponse.getValue().getData().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(MirrorTopicError.NO_ERROR.name(), ((MirrorData) it.next()).getMirrorTopicError());
        }
        return set.size() == listMirrorsResponse.getValue().getData().size() && ((long) set.size()) == listMirrorsResponse.getValue().getData().stream().filter(predicate).count();
    }

    private void verifyListMirrors(Response response, Set<String> set, String str) {
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        ListMirrorsResponse listMirrorsResponse = (ListMirrorsResponse) response.readEntity(ListMirrorsResponse.class);
        Assertions.assertEquals(set.size(), listMirrorsResponse.getValue().getData().stream().filter(mirrorData -> {
            return set.contains(mirrorData.getMirrorTopicName()) && new StringBuilder().append(str).append(mirrorData.getSourceTopicName()).toString().equals(mirrorData.getMirrorTopicName()) && mirrorData.getMirrorStatus() != null;
        }).count());
        UnmodifiableIterator it = listMirrorsResponse.getValue().getData().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(MirrorTopicError.NO_ERROR.name(), ((MirrorData) it.next()).getMirrorTopicError());
        }
    }

    private GetMirrorResponse verifyGetMirror(Response response, String str, String str2) {
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        GetMirrorResponse getMirrorResponse = (GetMirrorResponse) response.readEntity(GetMirrorResponse.class);
        Assertions.assertEquals(str, getMirrorResponse.getValue().getMirrorTopicName());
        Assertions.assertEquals(str, str2 + getMirrorResponse.getValue().getSourceTopicName());
        Assertions.assertEquals(MirrorTopicError.NO_ERROR.name(), getMirrorResponse.getValue().getMirrorTopicError());
        return getMirrorResponse;
    }

    private void verifyAlterMirror(Response response, Map<String, Integer> map) {
        Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        AlterMirrorsResponse alterMirrorsResponse = (AlterMirrorsResponse) response.readEntity(AlterMirrorsResponse.class);
        Assertions.assertEquals(map.size(), alterMirrorsResponse.getValue().getData().size());
        Assertions.assertEquals(map.size(), alterMirrorsResponse.getValue().getData().stream().filter(alterMirrorsData -> {
            return Objects.equals(map.get(alterMirrorsData.getMirrorTopicName()), alterMirrorsData.getErrorCode());
        }).count());
    }

    private void verifyResponse(Response response, Response.Status status) {
        Assertions.assertEquals(status.getStatusCode(), response.getStatus());
    }

    private static Stream<Arguments> destLinkModeCombinations() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{"DESTINATION", "true"}), Arguments.of(new Object[]{"DESTINATION", "false"}), Arguments.of(new Object[]{"BIDIRECTIONAL", "true"})});
    }

    private void createSourceAndMirrorTopics(String str, Set<String> set, String str2) {
        short s = 3;
        set.forEach(str3 -> {
            this.sourceCluster.createTopic(str3, 1, 1, new Properties(), this.sourceCluster.listenerName(), this.sourceCluster.adminClientConfig());
            verifyResponse(createMirror(str2, str, str3, (String) null, s, Collections.emptyMap()), Response.Status.CREATED);
        });
        verifyListMirrors(listAllMirror(str2, "active"), set, "");
    }
}
