package cn.leancloud.kafka.consumer;

import java.io.Closeable;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;

/* loaded from: input_file:cn/leancloud/kafka/consumer/LcKafkaConsumer.class */
public final class LcKafkaConsumer<K, V> implements Closeable {
    private final Consumer<K, V> consumer;
    private final Thread fetcherThread;
    private final Fetcher<K, V> fetcher;
    private final ExecutorService workerPool;
    private final CommitPolicy<K, V> policy;
    private final boolean shutdownWorkerPoolOnStop;
    private volatile State state = State.INIT;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cn/leancloud/kafka/consumer/LcKafkaConsumer$State.class */
    public enum State {
        INIT(0),
        SUBSCRIBED(1),
        CLOSED(2);

        private int code;

        State(int i) {
            this.code = i;
        }

        int code() {
            return this.code;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LcKafkaConsumer(LcKafkaConsumerBuilder<K, V> lcKafkaConsumerBuilder) {
        this.consumer = lcKafkaConsumerBuilder.getConsumer();
        this.workerPool = lcKafkaConsumerBuilder.getWorkerPool();
        this.shutdownWorkerPoolOnStop = lcKafkaConsumerBuilder.isShutdownWorkerPoolOnStop();
        this.policy = lcKafkaConsumerBuilder.getPolicy();
        this.fetcher = new Fetcher<>(lcKafkaConsumerBuilder);
        this.fetcherThread = new Thread(this.fetcher);
    }

    public synchronized CompletableFuture<UnsubscribedStatus> subscribe(Collection<String> collection) {
        Objects.requireNonNull(collection, ConsumerProtocol.TOPICS_KEY_NAME);
        if (collection.isEmpty()) {
            throw new IllegalArgumentException("subscribe empty topics");
        }
        for (String str : collection) {
            if (str == null || str.trim().isEmpty()) {
                throw new IllegalArgumentException("topic collection to subscribe to cannot contain null or empty topic");
            }
        }
        ensureInInit();
        this.consumer.subscribe(collection, new RebalanceListener(this.consumer, this.policy));
        this.fetcherThread.setName(fetcherThreadName(collection));
        this.fetcherThread.start();
        this.state = State.SUBSCRIBED;
        return setupUnsubscribedFuture();
    }

    public synchronized CompletableFuture<UnsubscribedStatus> subscribe(Pattern pattern) {
        Objects.requireNonNull(pattern, "pattern");
        ensureInInit();
        this.consumer.subscribe(pattern, new RebalanceListener(this.consumer, this.policy));
        this.fetcherThread.setName(fetcherThreadName(pattern));
        this.fetcherThread.start();
        this.state = State.SUBSCRIBED;
        return setupUnsubscribedFuture();
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return this.consumer.metrics();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (closed()) {
            return;
        }
        synchronized (this) {
            if (closed()) {
                return;
            }
            this.state = State.CLOSED;
            try {
                if (Thread.currentThread() != this.fetcherThread) {
                    this.fetcher.close();
                    this.fetcherThread.join();
                }
                if (this.shutdownWorkerPoolOnStop) {
                    this.workerPool.shutdown();
                    this.workerPool.awaitTermination(1L, TimeUnit.DAYS);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @VisibleForTesting
    boolean subscribed() {
        return this.state.code() > State.INIT.code();
    }

    @VisibleForTesting
    boolean closed() {
        return this.state == State.CLOSED;
    }

    @VisibleForTesting
    CommitPolicy<K, V> policy() {
        return this.policy;
    }

    private CompletableFuture<UnsubscribedStatus> setupUnsubscribedFuture() {
        if (!$assertionsDisabled && this.fetcher.unsubscribeStatusFuture().isDone()) {
            throw new AssertionError();
        }
        CompletableFuture<UnsubscribedStatus> completableFuture = new CompletableFuture<>();
        this.fetcher.unsubscribeStatusFuture().thenAccept(unsubscribedStatus -> {
            close();
            completableFuture.complete(unsubscribedStatus);
        });
        return completableFuture;
    }

    private void ensureInInit() {
        if (subscribed() || closed()) {
            throw new IllegalStateException("consumer is closed or have subscribed to some topics or pattern");
        }
    }

    private String fetcherThreadName(Collection<String> collection) {
        String next = collection.iterator().next();
        return "kafka-fetcher-for-" + (next.substring(0, Integer.min(50, next.length())) + ((collection.size() > 1 || next.length() > 50) ? "..." : ""));
    }

    private String fetcherThreadName(Pattern pattern) {
        String pattern2 = pattern.toString();
        return "kafka-fetcher-for-" + (pattern2.substring(0, Integer.min(50, pattern2.length())) + (pattern2.length() > 50 ? "..." : ""));
    }

    static {
        $assertionsDisabled = !LcKafkaConsumer.class.desiredAssertionStatus();
    }
}
