package io.streamnative.pulsar.handlers.kop;

import com.google.common.base.Preconditions;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaTopicManager.class */
public class KafkaTopicManager {
    private final KafkaRequestHandler requestHandler;
    private final PulsarService pulsarService;
    private final BrokerService brokerService;
    private InternalServerCnx internalServerCnx;
    private final ScheduledFuture<?> cursorExpireTask;
    private static final Logger log = LoggerFactory.getLogger(KafkaTopicManager.class);
    public static final ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>> LOOKUP_CACHE = new ConcurrentHashMap<>();
    public static final ConcurrentHashMap<String, CompletableFuture<Optional<String>>> KOP_ADDRESS_CACHE = new ConcurrentHashMap<>();
    private long checkPeriodMillis = 60000;
    private long expirePeriodMillis = 120000;
    private final ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>> consumerTopicManagers = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, CompletableFuture<PersistentTopic>> topics = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Producer> references = new ConcurrentHashMap<>();
    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private boolean closed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) {
        this.requestHandler = kafkaRequestHandler;
        this.pulsarService = kafkaRequestHandler.getPulsarService();
        this.brokerService = this.pulsarService.getBrokerService();
        this.internalServerCnx = new InternalServerCnx(this.requestHandler);
        this.cursorExpireTask = this.brokerService.executor().scheduleWithFixedDelay(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            if (log.isDebugEnabled()) {
                log.debug("[{}] Schedule a check of expired cursor", this.requestHandler.ctx.channel());
            }
            this.consumerTopicManagers.values().forEach(completableFuture -> {
                if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
                    return;
                }
                ((KafkaTopicConsumerManager) completableFuture.join()).deleteExpiredCursor(currentTimeMillis, this.expirePeriodMillis);
            });
        }, this.checkPeriodMillis, this.checkPeriodMillis, TimeUnit.MILLISECONDS);
    }

    public void updateCtx() {
        this.internalServerCnx.updateCtx();
    }

    public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(String str) {
        return this.consumerTopicManagers.computeIfAbsent(str, str2 -> {
            CompletableFuture<PersistentTopic> topic = getTopic(str2);
            return topic.thenApply(persistentTopic -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Call getTopicConsumerManager for {}, and create TCM for {}.", new Object[]{this.requestHandler.ctx.channel(), str, persistentTopic});
                }
                if (persistentTopic != null) {
                    return new KafkaTopicConsumerManager(this.requestHandler, persistentTopic);
                }
                removeTopicManagerCache(topic.toString());
                return null;
            });
        });
    }

    public static void removeTopicManagerCache(String str) {
        LOOKUP_CACHE.remove(str);
        KOP_ADDRESS_CACHE.remove(str);
    }

    public static void clearTopicManagerCache() {
        LOOKUP_CACHE.clear();
        KOP_ADDRESS_CACHE.clear();
    }

    private Producer registerInPersistentTopic(PersistentTopic persistentTopic) throws Exception {
        InternalProducer internalProducer = new InternalProducer(persistentTopic, this.internalServerCnx, this.pulsarService.getClient().newRequestId(), this.brokerService.generateUniqueProducerName());
        if (log.isDebugEnabled()) {
            log.debug("[{}] Register Mock Producer {} into PersistentTopic {}", new Object[]{this.requestHandler.ctx.channel(), internalProducer, persistentTopic.getName()});
        }
        persistentTopic.addProducer(internalProducer);
        return internalProducer;
    }

    public CompletableFuture<InetSocketAddress> getTopicBroker(String str) {
        this.rwLock.readLock().lock();
        try {
            if (!this.closed) {
                this.rwLock.readLock().unlock();
                return LOOKUP_CACHE.computeIfAbsent(str, str2 -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] topic {} not in Lookup_cache, call lookupBroker", this.requestHandler.ctx.channel(), str);
                    }
                    CompletableFuture<InetSocketAddress> completableFuture = new CompletableFuture<>();
                    lookupBroker(str, new Backoff(100L, TimeUnit.MILLISECONDS, 30L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS), completableFuture);
                    return completableFuture;
                });
            }
            CompletableFuture<InetSocketAddress> completableFuture = new CompletableFuture<>();
            completableFuture.complete(null);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Return null for getTopicBroker({}) since channel closing", this.requestHandler.ctx.channel(), str);
            }
            return completableFuture;
        } finally {
            this.rwLock.readLock().unlock();
        }
    }

    public InternalServerCnx getInternalServerCnx() {
        return this.internalServerCnx;
    }

    private void lookupBroker(String str, Backoff backoff, CompletableFuture<InetSocketAddress> completableFuture) {
        try {
            this.rwLock.readLock().lock();
            try {
                if (!this.closed) {
                    this.rwLock.readLock().unlock();
                    this.pulsarService.getClient().getLookup().getBroker(TopicName.get(str)).thenAccept(pair -> {
                        Preconditions.checkState(((InetSocketAddress) pair.getLeft()).equals(pair.getRight()));
                        completableFuture.complete(pair.getLeft());
                    }).exceptionally(th -> {
                        long next = backoff.next();
                        if (backoff.isMandatoryStopMade()) {
                            log.warn("[{}] getBroker for topic {} failed, retried too many times {}, return null. throwable: ", new Object[]{this.requestHandler.ctx.channel(), str, Long.valueOf(next), th});
                            completableFuture.complete(null);
                            return null;
                        }
                        log.warn("[{}] getBroker for topic failed, will retry in {} ms. throwable: ", new Object[]{str, Long.valueOf(next), th});
                        this.requestHandler.getPulsarService().getExecutor().schedule(() -> {
                            lookupBroker(str, backoff, completableFuture);
                        }, next, TimeUnit.MILLISECONDS);
                        return null;
                    });
                } else {
                    completableFuture.complete(null);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Return null for getTopic({}) since channel closing", this.requestHandler.ctx.channel(), str);
                    }
                }
            } finally {
                this.rwLock.readLock().unlock();
            }
        } catch (PulsarServerException e) {
            log.error("[{}] getTopicBroker for topic {} failed get pulsar client, return null. throwable: ", new Object[]{this.requestHandler.ctx.channel(), str, e});
            completableFuture.complete(null);
        }
    }

    public CompletableFuture<PersistentTopic> getTopic(String str) {
        CompletableFuture<PersistentTopic> completableFuture = new CompletableFuture<>();
        this.rwLock.readLock().lock();
        try {
            if (this.closed) {
                completableFuture.complete(null);
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Return null for getTopic({}) since channel is closing", this.requestHandler.ctx.channel(), str);
                }
                return completableFuture;
            }
            this.rwLock.readLock().unlock();
            this.brokerService.getTopic(str, this.brokerService.isAllowAutoTopicCreation(str)).whenComplete((optional, th) -> {
                if (th != null) {
                    log.error("[{}] Failed to getTopic {}. exception:", new Object[]{this.requestHandler.ctx.channel(), str, th});
                    removeTopicManagerCache(str);
                    completableFuture.complete(null);
                } else if (optional.isPresent()) {
                    completableFuture.complete((PersistentTopic) optional.get());
                } else {
                    log.error("[{}]Get empty topic for name {}", this.requestHandler.ctx.channel(), str);
                    completableFuture.complete(null);
                }
            });
            this.topics.put(str, completableFuture);
            return completableFuture;
        } finally {
            this.rwLock.readLock().unlock();
        }
    }

    public void registerProducerInPersistentTopic(String str, PersistentTopic persistentTopic) {
        try {
            if (this.references.containsKey(str)) {
                return;
            }
            synchronized (this) {
                if (this.references.containsKey(str)) {
                    return;
                }
                this.references.put(str, registerInPersistentTopic(persistentTopic));
            }
        } catch (Exception e) {
            log.error("[{}] Failed to register producer in PersistentTopic {}. exception:", new Object[]{this.requestHandler.ctx.channel(), str, e});
        }
    }

    public synchronized void close() {
        this.rwLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Closing TopicManager", this.requestHandler.ctx.channel());
            }
            try {
                this.cursorExpireTask.cancel(true);
                Iterator<CompletableFuture<KafkaTopicConsumerManager>> it = this.consumerTopicManagers.values().iterator();
                while (it.hasNext()) {
                    it.next().get().close();
                }
                this.consumerTopicManagers.clear();
                for (Map.Entry<String, CompletableFuture<PersistentTopic>> entry : this.topics.entrySet()) {
                    String key = entry.getKey();
                    CompletableFuture<PersistentTopic> value = entry.getValue();
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] remove producer {} for topic {} at close()", new Object[]{this.requestHandler.ctx.channel(), this.references.get(key), key});
                    }
                    if (this.references.get(key) != null) {
                        PersistentTopic persistentTopic = value.get();
                        if (persistentTopic != null) {
                            persistentTopic.removeProducer(this.references.get(key));
                        }
                        this.references.remove(key);
                    }
                }
                this.topics.clear();
            } catch (Exception e) {
                log.error("[{}] Failed to close KafkaTopicManager. exception:", this.requestHandler.ctx.channel(), e);
            }
        } finally {
            this.rwLock.writeLock().unlock();
        }
    }

    public Producer getReferenceProducer(String str) {
        return this.references.get(str);
    }

    public void deReference(String str) {
        try {
            removeTopicManagerCache(str);
            if (this.consumerTopicManagers.containsKey(str)) {
                this.consumerTopicManagers.get(str).get().close();
                this.consumerTopicManagers.remove(str);
            }
            if (this.topics.containsKey(str)) {
                PersistentTopic persistentTopic = this.topics.get(str).get();
                if (persistentTopic != null) {
                    persistentTopic.removeProducer(this.references.get(str));
                }
                this.topics.remove(str);
            }
        } catch (Exception e) {
            log.error("[{}] Failed to close reference for individual topic {}. exception:", new Object[]{this.requestHandler.ctx.channel(), str, e});
        }
    }

    public ConcurrentHashMap<String, CompletableFuture<KafkaTopicConsumerManager>> getConsumerTopicManagers() {
        return this.consumerTopicManagers;
    }
}
