package io.eventuate.local.unified.cdc.pipeline.common.health;

import io.eventuate.messaging.kafka.basic.consumer.ConsumerPropertiesFactory;
import io.eventuate.messaging.kafka.spring.basic.consumer.EventuateKafkaConsumerSpringConfigurationProperties;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:io/eventuate/local/unified/cdc/pipeline/common/health/KafkaHealthCheck.class */
public class KafkaHealthCheck extends AbstractHealthCheck {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${eventuatelocal.kafka.bootstrap.servers}")
    private String kafkaServers;

    @Autowired
    private EventuateKafkaConsumerSpringConfigurationProperties eventuateKafkaConsumerSpringConfigurationProperties;
    private volatile KafkaConsumer<String, String> consumer;

    @Override // io.eventuate.local.unified.cdc.pipeline.common.health.AbstractHealthCheck
    protected void determineHealth(HealthBuilder healthBuilder) {
        if (this.consumer == null) {
            synchronized (this) {
                if (this.consumer == null) {
                    Properties makeDefaultConsumerProperties = ConsumerPropertiesFactory.makeDefaultConsumerProperties(this.kafkaServers, UUID.randomUUID().toString());
                    makeDefaultConsumerProperties.put("session.timeout.ms", "500");
                    makeDefaultConsumerProperties.put("request.timeout.ms", "1000");
                    makeDefaultConsumerProperties.put("heartbeat.interval.ms", "100");
                    makeDefaultConsumerProperties.putAll(this.eventuateKafkaConsumerSpringConfigurationProperties.getProperties());
                    this.consumer = new KafkaConsumer<>(makeDefaultConsumerProperties);
                }
            }
        }
        try {
            synchronized (this) {
                this.consumer.partitionsFor("__consumer_offsets");
            }
            healthBuilder.addDetail("Connected to Kafka");
        } catch (Exception e) {
            this.logger.error(e.getMessage(), e);
            healthBuilder.addError("Connection to kafka failed");
        }
    }
}
