package co.cask.cdap.metrics.runtime;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.metrics.MetricsConstants;
import co.cask.cdap.metrics.process.KafkaMetricsProcessorServiceFactory;
import co.cask.cdap.watchdog.election.MultiLeaderElection;
import co.cask.cdap.watchdog.election.PartitionChangeHandler;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/metrics/runtime/KafkaMetricsProcessorService.class */
public final class KafkaMetricsProcessorService extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsProcessorService.class);
    private final MultiLeaderElection multiElection;
    private final SettableFuture<?> completion = SettableFuture.create();

    @Inject
    public KafkaMetricsProcessorService(CConfiguration cConfiguration, ZKClientService zKClientService, KafkaMetricsProcessorServiceFactory kafkaMetricsProcessorServiceFactory) {
        this.multiElection = new MultiLeaderElection(zKClientService, "metrics-processor", cConfiguration.getInt(MetricsConstants.ConfigKeys.KAFKA_PARTITION_SIZE, 1), createPartitionChangeHandler(kafkaMetricsProcessorServiceFactory));
    }

    protected Executor executor() {
        return new Executor() { // from class: co.cask.cdap.metrics.runtime.KafkaMetricsProcessorService.1
            @Override // java.util.concurrent.Executor
            public void execute(Runnable runnable) {
                Thread thread = new Thread(runnable, KafkaMetricsProcessorService.this.getServiceName());
                thread.setDaemon(true);
                thread.start();
            }
        };
    }

    protected void startUp() throws Exception {
        LOG.info("Starting Metrics Processor ...");
        this.multiElection.start();
    }

    protected void run() throws Exception {
        this.completion.get();
    }

    protected void triggerShutdown() {
        this.completion.set((Object) null);
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping Metrics Processor ...");
        try {
            this.multiElection.stop();
        } catch (Exception e) {
            LOG.error("Exception while shutting down.", e);
            throw Throwables.propagate(e);
        }
    }

    private PartitionChangeHandler createPartitionChangeHandler(final KafkaMetricsProcessorServiceFactory kafkaMetricsProcessorServiceFactory) {
        return new PartitionChangeHandler() { // from class: co.cask.cdap.metrics.runtime.KafkaMetricsProcessorService.2
            private co.cask.cdap.metrics.process.KafkaMetricsProcessorService service;

            @Override // co.cask.cdap.watchdog.election.PartitionChangeHandler
            public void partitionsChanged(Set<Integer> set) {
                KafkaMetricsProcessorService.LOG.info("Metrics Kafka partition changed {}", set);
                try {
                    if (this.service != null) {
                        this.service.stopAndWait();
                    }
                    if (set.isEmpty() || !KafkaMetricsProcessorService.this.multiElection.isRunning()) {
                        this.service = null;
                    } else {
                        this.service = kafkaMetricsProcessorServiceFactory.create(set);
                        this.service.startAndWait();
                    }
                } catch (Throwable th) {
                    KafkaMetricsProcessorService.LOG.error("Failed to change Kafka partition.", th);
                    KafkaMetricsProcessorService.this.completion.setException(th);
                }
            }
        };
    }
}
