package io.confluent.kafkarest.controllers;

import com.google.common.collect.ImmutableList;
import io.confluent.kafkarest.common.CompletableFutures;
import io.confluent.kafkarest.common.KafkaFutures;
import io.confluent.kafkarest.entities.AlterMirrors;
import io.confluent.kafkarest.entities.Cluster;
import io.confluent.kafkarest.entities.Mirror;
import io.confluent.kafkarest.entities.MirrorStatus;
import io.confluent.kafkarest.entities.TopicPartitionLag;
import io.confluent.kafkarest.exceptions.MirrorTopicNotFoundException;
import io.confluent.kafkarest.resources.v3.MirrorResource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.clients.admin.AlterMirrorsOptions;
import org.apache.kafka.clients.admin.AlterMirrorsResult;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeMirrorsOptions;
import org.apache.kafka.clients.admin.DescribeMirrorsResult;
import org.apache.kafka.clients.admin.ListMirrorsOptions;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.NewMirrorTopic;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.ReplicaStatusOptions;
import org.apache.kafka.clients.admin.ReplicaStatusResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.replica.ReplicaStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafkarest/controllers/MirrorManagerImpl.class */
final class MirrorManagerImpl implements MirrorManager {
    private final ConfluentAdmin adminClient;
    private final ClusterManager clusterManager;
    private static final Logger log = LoggerFactory.getLogger(MirrorResource.class);
    private static final Duration DEFAULT_METADATA_TIMEOUT = Duration.ofSeconds(60);
    private static final ListMirrorsOptions LIST_MIRRORS_OPTIONS = new ListMirrorsOptions().timeoutMs(Integer.valueOf(Math.toIntExact(DEFAULT_METADATA_TIMEOUT.toMillis()))).includeStopped(true);
    private static final DescribeMirrorsOptions DESCRIBE_MIRRORS_OPTIONS = new DescribeMirrorsOptions().timeoutMs(Integer.valueOf(Math.toIntExact(DEFAULT_METADATA_TIMEOUT.toMillis())));
    private static final ReplicaStatusOptions REPLICA_STATUS_OPTIONS = new ReplicaStatusOptions().timeoutMs(Integer.valueOf(Math.toIntExact(DEFAULT_METADATA_TIMEOUT.toMillis())));

    @Inject
    MirrorManagerImpl(ConfluentAdmin confluentAdmin, ClusterManager clusterManager) {
        this.adminClient = (ConfluentAdmin) Objects.requireNonNull(confluentAdmin);
        this.clusterManager = (ClusterManager) Objects.requireNonNull(clusterManager);
    }

    @Override // io.confluent.kafkarest.controllers.MirrorManager
    public CompletableFuture<List<Mirror>> listMirrors(String str, MirrorStatus mirrorStatus) {
        return getValidCluster(str).thenCompose(cluster -> {
            return KafkaFutures.toCompletableFuture(this.adminClient.listMirrors(LIST_MIRRORS_OPTIONS).result());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) collection -> {
            return getMirrors(collection, mirrorStatus);
        });
    }

    @Override // io.confluent.kafkarest.controllers.MirrorManager
    public CompletableFuture<Optional<Mirror>> getMirror(String str, String str2, String str3) {
        return getValidCluster(str).thenCompose(cluster -> {
            return getMirrors(Collections.singletonList(str3));
        }).thenApply((Function<? super U, ? extends U>) list -> {
            if (list.isEmpty()) {
                throw new MirrorTopicNotFoundException(str3);
            }
            return list.stream().filter(mirror -> {
                return mirror.getLinkName().equals(str2);
            }).findAny();
        });
    }

    @Override // io.confluent.kafkarest.controllers.MirrorManager
    public CompletableFuture<Void> createMirror(String str, String str2, String str3, Map<String, Optional<String>> map, Optional<Short> optional) {
        HashMap hashMap = new HashMap();
        map.forEach((str4, optional2) -> {
        });
        NewTopic mirror = new NewTopic(str3, Optional.empty(), optional).configs(hashMap).mirror(Optional.of(new NewMirrorTopic(str2, str3)));
        return ErrorUtils.catchClusterLinkingExceptions(getValidCluster(str).thenCompose(cluster -> {
            return KafkaFutures.toCompletableFuture(this.adminClient.createTopics(Collections.singletonList(mirror)).all());
        }));
    }

    @Override // io.confluent.kafkarest.controllers.MirrorManager
    public CompletableFuture<List<AlterMirrors>> pauseMirrors(String str, String str2, Set<String> set, boolean z) {
        return alterMirrors(str, str2, set, z, AlterMirrorOp.PAUSE, (Long) Long.MAX_VALUE);
    }

    @Override // io.confluent.kafkarest.controllers.MirrorManager
    public CompletableFuture<List<AlterMirrors>> resumeMirrors(String str, String str2, Set<String> set, boolean z) {
        return alterMirrors(str, str2, set, z, AlterMirrorOp.RESUME, (Long) Long.MAX_VALUE);
    }

    @Override // io.confluent.kafkarest.controllers.MirrorManager
    public CompletableFuture<List<AlterMirrors>> promoteMirrors(String str, String str2, Set<String> set, boolean z) {
        return alterMirrors(str, str2, set, z, AlterMirrorOp.PROMOTE, (Long) 0L);
    }

    @Override // io.confluent.kafkarest.controllers.MirrorManager
    public CompletableFuture<List<AlterMirrors>> failOverMirrors(String str, String str2, Set<String> set, boolean z) {
        return alterMirrors(str, str2, set, z, AlterMirrorOp.FAILOVER, (Long) Long.MAX_VALUE);
    }

    private CompletableFuture<List<Mirror>> getMirrors(Collection<String> collection) {
        return getMirrors(collection, null);
    }

    private CompletableFuture<List<Mirror>> getMirrors(Collection<String> collection, @Nullable MirrorStatus mirrorStatus) {
        return fromDescribeMirrorResult(this.adminClient.describeMirrors(collection, DESCRIBE_MIRRORS_OPTIONS), mirrorStatus).thenCompose(list -> {
            return composeFromReplicaStatusResult(list, this.adminClient.replicaStatus(topicPartitionsFromNumPartitionsMap(toNumPartitionsMap(list)), REPLICA_STATUS_OPTIONS));
        });
    }

    private CompletableFuture<List<AlterMirrors>> alterMirrors(String str, String str2, Set<String> set, boolean z, AlterMirrorOp alterMirrorOp, Long l) {
        AlterMirrorsOptions timeoutMs = new AlterMirrorsOptions().validateOnly(z).timeoutMs(Integer.valueOf(Math.toIntExact(DEFAULT_METADATA_TIMEOUT.toMillis())));
        return getValidCluster(str).thenCompose(cluster -> {
            return getMirrors(set);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) list -> {
            return alterMirrors(str2, (Set<String>) set, (List<Mirror>) list, alterMirrorOp, timeoutMs, l.longValue());
        });
    }

    private CompletableFuture<List<AlterMirrors>> alterMirrors(String str, Set<String> set, List<Mirror> list, AlterMirrorOp alterMirrorOp, AlterMirrorsOptions alterMirrorsOptions, long j) {
        Map<String, Mirror> mirrorTopics = toMirrorTopics(list);
        Set<String> laggingTopics = toLaggingTopics(list, Long.valueOf(j));
        HashMap hashMap = new HashMap();
        LinkedList linkedList = new LinkedList();
        for (String str2 : set) {
            if (laggingTopics.contains(str2)) {
                linkedList.add(AlterMirrors.create(str2, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()), laggingMessage(Long.valueOf(j)), mirrorTopics.get(str2).getPartitionLagList()));
            } else if (!mirrorTopics.containsKey(str2)) {
                linkedList.add(AlterMirrors.create(str2, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()), topicNotFoundMessage(str2), Collections.emptyList()));
            } else if (mirrorTopics.get(str2).getLinkName().equals(str)) {
                hashMap.put(str2, alterMirrorOp);
            } else {
                linkedList.add(AlterMirrors.create(str2, Integer.valueOf(Response.Status.BAD_REQUEST.getStatusCode()), incorrectLinkMessage(str2, str), Collections.emptyList()));
            }
        }
        return fromAlterMirrorResult(this.adminClient.alterMirrors(hashMap, alterMirrorsOptions), mirrorTopics).thenApply(list2 -> {
            return ImmutableList.builder().addAll(linkedList).addAll(list2).build();
        });
    }

    private String incorrectLinkMessage(String str, String str2) {
        return String.format("Mirror topic %s does not belongs to link %s", str, str2);
    }

    private String topicNotFoundMessage(String str) {
        return String.format("Mirror topic %s not found", str);
    }

    private String laggingMessage(Long l) {
        return String.format("Operation failed because some partitions have mirror lag greater than %s", l);
    }

    private static CompletableFuture<List<Mirror>> fromDescribeMirrorResult(DescribeMirrorsResult describeMirrorsResult, @Nullable MirrorStatus mirrorStatus) {
        return CompletableFutures.allAsList((List) describeMirrorsResult.result().entrySet().stream().map(entry -> {
            return KafkaFutures.toCompletableFuture((KafkaFuture) entry.getValue()).handle((mirrorTopicDescription, th) -> {
                if (th == null) {
                    if (mirrorStatusMatched(mirrorStatus, mirrorTopicDescription)) {
                        return Mirror.create(mirrorTopicDescription.linkName(), (String) entry.getKey(), mirrorTopicDescription.mirrorTopic(), mirrorTopicDescription.numPartitions(), Collections.emptyList(), mirrorTopicDescription.state(), mirrorTopicDescription.stateTimeMs(), mirrorTopicDescription.stoppedLogEndOffsets());
                    }
                    return null;
                }
                if ((th instanceof UnknownTopicOrPartitionException) || (th instanceof AuthorizationException)) {
                    return null;
                }
                if (th instanceof TimeoutException) {
                    throw new io.confluent.kafkarest.exceptions.TimeoutException(th);
                }
                log.warn("Unexpected error happens during describing mirror", th);
                throw new InternalServerErrorException(th);
            });
        }).collect(Collectors.toList())).thenApply(list -> {
            return (List) list.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
        });
    }

    private static boolean mirrorStatusMatched(@Nullable MirrorStatus mirrorStatus, MirrorTopicDescription mirrorTopicDescription) {
        return mirrorStatus == null || mirrorStatus.getState() == mirrorTopicDescription.state();
    }

    private static CompletableFuture<List<AlterMirrors>> fromAlterMirrorResult(AlterMirrorsResult alterMirrorsResult, Map<String, Mirror> map) {
        return CompletableFutures.allAsList((List) alterMirrorsResult.values().entrySet().stream().map(entry -> {
            return KafkaFutures.toCompletableFuture((KafkaFuture) entry.getValue()).handle((r8, th) -> {
                Response.Status status;
                if (th == null) {
                    return AlterMirrors.create((String) entry.getKey(), null, null, ((Mirror) map.get(entry.getKey())).getPartitionLagList());
                }
                String message = th.getMessage();
                if (th instanceof ExecutionException) {
                    Throwable cause = th.getCause();
                    if ((cause instanceof InvalidRequestException) || (cause instanceof UnknownTopicOrPartitionException)) {
                        status = Response.Status.BAD_REQUEST;
                    } else if (cause instanceof AuthorizationException) {
                        status = Response.Status.UNAUTHORIZED;
                    } else if (cause instanceof TimeoutException) {
                        status = Response.Status.REQUEST_TIMEOUT;
                    } else {
                        log.warn("Unexpected execution exception during altering mirror", th);
                        status = Response.Status.INTERNAL_SERVER_ERROR;
                    }
                } else if (th instanceof InvalidRequestException) {
                    status = Response.Status.BAD_REQUEST;
                } else {
                    log.warn("Unexpected error happens during altering mirror", th);
                    status = Response.Status.INTERNAL_SERVER_ERROR;
                }
                return AlterMirrors.create((String) entry.getKey(), Integer.valueOf(status.getStatusCode()), message, Collections.emptyList());
            });
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<List<Mirror>> composeFromReplicaStatusResult(List<Mirror> list, ReplicaStatusResult replicaStatusResult) {
        return calculateMirrorLags(replicaStatusResult).thenApply(list2 -> {
            ArrayList arrayList = new ArrayList();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Mirror mirror = (Mirror) it.next();
                arrayList.add(Mirror.create(mirror.getLinkName(), mirror.getMirrorTopicName(), mirror.getSourceTopicName(), mirror.getNumPartitions(), (List) list2.stream().filter(topicPartitionLag -> {
                    return topicPartitionLag.getTopicPartition().topic().equals(mirror.getMirrorTopicName());
                }).map(topicPartitionLag2 -> {
                    return getTopicPartitionLag(topicPartitionLag2, mirror);
                }).collect(Collectors.toList()), mirror.getMirrorStatus(), mirror.getStatusTimeMs(), mirror.getStoppedLogEndOffsets()));
            }
            return arrayList;
        });
    }

    private TopicPartitionLag getTopicPartitionLag(TopicPartitionLag topicPartitionLag, Mirror mirror) {
        MirrorTopicDescription.State mirrorStatus = mirror.getMirrorStatus();
        return (mirrorStatus != MirrorTopicDescription.State.STOPPED || topicPartitionLag.getTopicPartition().partition() >= mirror.getStoppedLogEndOffsets().size()) ? (mirrorStatus == MirrorTopicDescription.State.PAUSED || mirrorStatus == MirrorTopicDescription.State.LINK_PAUSED || mirrorStatus == MirrorTopicDescription.State.SOURCE_UNAVAILABLE || mirrorStatus == MirrorTopicDescription.State.UNKNOWN) ? TopicPartitionLag.create(topicPartitionLag.getTopicPartition(), -1L, -1L) : topicPartitionLag : TopicPartitionLag.create(topicPartitionLag.getTopicPartition(), 0L, mirror.getStoppedLogEndOffsets().get(topicPartitionLag.getTopicPartition().partition()).longValue());
    }

    private CompletableFuture<List<TopicPartitionLag>> calculateMirrorLags(ReplicaStatusResult replicaStatusResult) {
        return replicaStatusResult.result().size() == 0 ? CompletableFuture.completedFuture(Collections.emptyList()) : CompletableFutures.allAsList((List) replicaStatusResult.result().entrySet().stream().map(entry -> {
            return KafkaFutures.toCompletableFuture((KafkaFuture) entry.getValue()).handle((list, th) -> {
                if (th == null) {
                    return (TopicPartitionLag) ((List) list.stream().filter(replicaStatus -> {
                        return replicaStatus.mirrorInfo().isPresent();
                    }).filter((v0) -> {
                        return v0.isLeader();
                    }).map(replicaStatus2 -> {
                        return toTopicPartitionLag((TopicPartition) entry.getKey(), replicaStatus2);
                    }).collect(Collectors.toList())).get(0);
                }
                log.warn("Error happens during fetching replica status", th);
                throw new InternalServerErrorException("Replica status is temporarily unavailable.");
            });
        }).collect(Collectors.toList()));
    }

    private TopicPartitionLag toTopicPartitionLag(TopicPartition topicPartition, ReplicaStatus replicaStatus) {
        long lastFetchSourceHighWatermark = ((ReplicaStatus.MirrorInfo) replicaStatus.mirrorInfo().orElseThrow(() -> {
            return new InternalServerErrorException("Mirror info is temporarily unavailable.");
        })).lastFetchSourceHighWatermark();
        return TopicPartitionLag.create(topicPartition, Long.max(0L, lastFetchSourceHighWatermark - replicaStatus.logEndOffset()), lastFetchSourceHighWatermark);
    }

    private static Set<TopicPartition> topicPartitionsFromNumPartitionsMap(Map<String, Integer> map) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            for (int i = 0; i < entry.getValue().intValue(); i++) {
                hashSet.add(new TopicPartition(entry.getKey(), i));
            }
        }
        return hashSet;
    }

    private static Map<String, Integer> toNumPartitionsMap(List<Mirror> list) {
        return (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getMirrorTopicName();
        }, (v0) -> {
            return v0.getNumPartitions();
        }));
    }

    private static Map<String, Mirror> toMirrorTopics(List<Mirror> list) {
        return (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getMirrorTopicName();
        }, Function.identity()));
    }

    private static Set<String> toLaggingTopics(List<Mirror> list, Long l) {
        return (Set) list.stream().filter(mirror -> {
            return mirror.getPartitionLagList().stream().anyMatch(topicPartitionLag -> {
                return topicPartitionLag.getLag() > l.longValue();
            });
        }).map((v0) -> {
            return v0.getMirrorTopicName();
        }).collect(Collectors.toSet());
    }

    private CompletableFuture<Cluster> getValidCluster(String str) {
        return this.clusterManager.getCluster(str).thenApply(optional -> {
            return (Cluster) Entities.checkEntityExists(optional, "Cluster %s cannot be found.", new Object[]{str});
        });
    }
}
