package io.streamnative.pulsar.handlers.kop;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.NonNull;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaTopicManager.class */
public class KafkaTopicManager {
    private final KafkaRequestHandler requestHandler;
    private final BrokerService brokerService;
    private final LookupClient lookupClient;
    private volatile SocketAddress remoteAddress;
    private final InternalServerCnx internalServerCnx;
    private static final long checkPeriodMillis = 60000;
    private static final long expirePeriodMillis = 120000;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaTopicManager.class);
    private static final KafkaTopicConsumerManagerCache TCM_CACHE = KafkaTopicConsumerManagerCache.getInstance();
    private static final ConcurrentHashMap<String, CompletableFuture<Optional<PersistentTopic>>> topics = new ConcurrentHashMap<>();
    private static final ConcurrentHashMap<String, Producer> references = new ConcurrentHashMap<>();
    private static volatile ScheduledFuture<?> cursorExpireTask = null;
    public static final ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>> LOOKUP_CACHE = new ConcurrentHashMap<>();
    public static final ConcurrentHashMap<String, CompletableFuture<Optional<String>>> KOP_ADDRESS_CACHE = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) {
        this.requestHandler = kafkaRequestHandler;
        PulsarService pulsarService = kafkaRequestHandler.getPulsarService();
        this.brokerService = pulsarService.getBrokerService();
        this.internalServerCnx = new InternalServerCnx(this.requestHandler);
        this.lookupClient = KafkaProtocolHandler.getLookupClient(pulsarService);
        initializeCursorExpireTask(this.brokerService.executor());
    }

    private static void initializeCursorExpireTask(ScheduledExecutorService scheduledExecutorService) {
        if (cursorExpireTask == null) {
            synchronized (KafkaTopicManager.class) {
                if (cursorExpireTask == null) {
                    cursorExpireTask = scheduledExecutorService.scheduleWithFixedDelay(() -> {
                        long currentTimeMillis = System.currentTimeMillis();
                        TCM_CACHE.forEach(completableFuture -> {
                            if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
                                return;
                            }
                            ((KafkaTopicConsumerManager) completableFuture.join()).deleteExpiredCursor(currentTimeMillis, expirePeriodMillis);
                        });
                    }, 60000L, 60000L, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

    public void setRemoteAddress(SocketAddress socketAddress) {
        this.internalServerCnx.updateCtx(socketAddress);
        this.remoteAddress = socketAddress;
    }

    public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(String str) {
        if (this.closed.get()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Return null for getTopicConsumerManager({}) since channel closing", this.requestHandler.ctx.channel(), str);
            }
            return CompletableFuture.completedFuture(null);
        }
        if (this.remoteAddress != null) {
            return TCM_CACHE.computeIfAbsent(str, this.remoteAddress, () -> {
                CompletableFuture completableFuture = new CompletableFuture();
                getTopic(str).whenComplete((optional, th) -> {
                    if (optional.isPresent() && th == null) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Call getTopicConsumerManager for {}, and create TCM for {}.", this.requestHandler.ctx.channel(), str, optional);
                        }
                        completableFuture.complete(new KafkaTopicConsumerManager(this.requestHandler, (PersistentTopic) optional.get()));
                    } else {
                        if (th != null) {
                            log.error("[{}] Failed to getTopicConsumerManager caused by getTopic '{}' throws {}", this.requestHandler.ctx.channel(), str, th.getMessage());
                        } else {
                            log.error("[{}] Failed to getTopicConsumerManager caused by getTopic '{}' returns empty", this.requestHandler.ctx.channel(), str);
                        }
                        completableFuture.complete(null);
                    }
                });
                return completableFuture;
            });
        }
        log.error("[{}] Try to getTopicConsumerManager({}) while remoteAddress is not set", this.requestHandler.ctx.channel(), str);
        return CompletableFuture.completedFuture(null);
    }

    public static void removeTopicManagerCache(String str) {
        LOOKUP_CACHE.remove(str);
        KOP_ADDRESS_CACHE.remove(str);
    }

    public static void clearTopicManagerCache() {
        LOOKUP_CACHE.clear();
        KOP_ADDRESS_CACHE.clear();
    }

    private Producer registerInPersistentTopic(PersistentTopic persistentTopic) {
        InternalProducer internalProducer = new InternalProducer(persistentTopic, this.internalServerCnx, this.lookupClient.getPulsarClient().newRequestId(), this.brokerService.generateUniqueProducerName());
        if (log.isDebugEnabled()) {
            log.debug("[{}] Register Mock Producer {} into PersistentTopic {}", this.requestHandler.ctx.channel(), internalProducer, persistentTopic.getName());
        }
        persistentTopic.addProducer(internalProducer, new CompletableFuture());
        return internalProducer;
    }

    public CompletableFuture<InetSocketAddress> getTopicBroker(String str) {
        if (!this.closed.get()) {
            return LOOKUP_CACHE.computeIfAbsent(str, str2 -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] topic {} not in Lookup_cache, call lookupBroker", this.requestHandler.ctx.channel(), str);
                }
                return lookupBroker(str);
            });
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Return null for getTopicBroker({}) since channel closing", this.requestHandler.ctx.channel(), str);
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<InetSocketAddress> lookupBroker(String str) {
        if (!this.closed.get()) {
            return this.lookupClient.getBrokerAddress(TopicName.get(str));
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Return null for getTopic({}) since channel closing", this.requestHandler.ctx.channel(), str);
        }
        return CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<Optional<PersistentTopic>> getTopic(String str) {
        if (this.closed.get()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Return null for getTopic({}) since channel is closing", this.requestHandler.ctx.channel(), str);
            }
            return CompletableFuture.completedFuture(Optional.empty());
        }
        CompletableFuture<Optional<PersistentTopic>> completableFuture = new CompletableFuture<>();
        this.brokerService.getTopicIfExists(str).whenComplete((optional, th) -> {
            TopicName topicName = TopicName.get(str);
            if (th != null) {
                removeTopicManagerCache(str);
                if (topicName.getPartitionIndex() != 0) {
                    handleGetTopicException(str, completableFuture, th);
                    return;
                }
                log.warn("Get partition-0 error [{}].", th.getMessage());
            }
            if (optional != null && optional.isPresent()) {
                completableFuture.complete(Optional.of((PersistentTopic) optional.get()));
                return;
            }
            if (topicName.getPartitionIndex() != 0) {
                log.error("[{}]Get empty topic for name {}", this.requestHandler.ctx.channel(), str);
                removeTopicManagerCache(str);
                completableFuture.complete(Optional.empty());
            } else {
                String partitionedTopicName = topicName.getPartitionedTopicName();
                if (log.isDebugEnabled()) {
                    log.debug("[{}]Try to get non-partitioned topic for name {}", this.requestHandler.ctx.channel(), partitionedTopicName);
                }
                this.brokerService.getTopicIfExists(partitionedTopicName).whenComplete((optional, th) -> {
                    if (th != null) {
                        handleGetTopicException(partitionedTopicName, completableFuture, th);
                        removeTopicManagerCache(partitionedTopicName);
                    } else {
                        if (optional.isPresent()) {
                            completableFuture.complete(Optional.of((PersistentTopic) optional.get()));
                            return;
                        }
                        log.error("[{}]Get empty non-partitioned topic for name {}", this.requestHandler.ctx.channel(), partitionedTopicName);
                        removeTopicManagerCache(partitionedTopicName);
                        completableFuture.complete(Optional.empty());
                    }
                });
            }
        });
        topics.put(str, completableFuture);
        return completableFuture;
    }

    private void handleGetTopicException(@NonNull String str, @NonNull CompletableFuture<Optional<PersistentTopic>> completableFuture, @NonNull Throwable th) {
        if (str == null) {
            throw new NullPointerException("topicName is marked @NonNull but is null");
        }
        if (completableFuture == null) {
            throw new NullPointerException("topicCompletableFuture is marked @NonNull but is null");
        }
        if (th == null) {
            throw new NullPointerException("ex is marked @NonNull but is null");
        }
        if (th instanceof BrokerServiceException.ServiceUnitNotReadyException) {
            log.warn("[{}] Failed to getTopic {}: {}", this.requestHandler.ctx.channel(), str, th.getMessage());
            completableFuture.complete(Optional.empty());
        } else {
            log.error("[{}] Failed to getTopic {}. exception:", this.requestHandler.ctx.channel(), str, th);
            completableFuture.completeExceptionally(th);
        }
    }

    public void registerProducerInPersistentTopic(String str, PersistentTopic persistentTopic) {
        if (this.closed.get()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to registerProducerInPersistentTopic for topic '{}'", this.requestHandler.ctx.channel(), str);
            }
        } else {
            if (references.containsKey(str)) {
                return;
            }
            synchronized (this) {
                if (references.containsKey(str)) {
                    return;
                }
                references.put(str, registerInPersistentTopic(persistentTopic));
            }
        }
    }

    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Closing TopicManager", this.requestHandler.ctx.channel());
            }
        } else {
            try {
                closeKafkaTopicConsumerManagers();
                topics.keySet().forEach(str -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] remove producer {} for topic {} at close()", this.requestHandler.ctx.channel(), references.get(str), str);
                    }
                    removePersistentTopicAndReferenceProducer(str);
                });
            } catch (Exception e) {
                log.error("[{}] Failed to close KafkaTopicManager. exception:", this.requestHandler.ctx.channel(), e);
            }
        }
    }

    public static Producer getReferenceProducer(String str) {
        return references.get(str);
    }

    private static void removePersistentTopicAndReferenceProducer(String str) {
        CompletableFuture<Optional<PersistentTopic>> remove = topics.remove(str);
        Producer remove2 = references.remove(str);
        if (remove == null) {
            removeTopicManagerCache(str);
            return;
        }
        try {
            Optional<PersistentTopic> optional = remove.get();
            if (remove2 != null && optional.isPresent()) {
                try {
                    optional.get().removeProducer(remove2);
                } catch (IllegalArgumentException e) {
                    log.error("[{}] The producer's topic ({}) doesn't match the current PersistentTopic", str, remove2.getTopic() == null ? "null" : remove2.getTopic().getName());
                }
            }
        } catch (InterruptedException | ExecutionException e2) {
            log.error("Failed to get topic '{}' in removeTopicAndReferenceProducer", str, e2);
        }
    }

    public static void deReference(String str) {
        try {
            removeTopicManagerCache(str);
            TCM_CACHE.removeAndClose(str);
            removePersistentTopicAndReferenceProducer(str);
        } catch (Exception e) {
            log.error("Failed to close reference for individual topic {}. exception:", str, e);
        }
    }

    public static void closeKafkaTopicConsumerManagers() {
        synchronized (KafkaTopicManager.class) {
            if (cursorExpireTask != null) {
                cursorExpireTask.cancel(true);
                cursorExpireTask = null;
            }
        }
        TCM_CACHE.close();
    }

    public static ConcurrentHashMap<String, CompletableFuture<Optional<PersistentTopic>>> getTopics() {
        return topics;
    }

    public static ConcurrentHashMap<String, Producer> getReferences() {
        return references;
    }
}
