package io.eventuate.local.common;

import io.eventuate.cdc.producer.wrappers.DataProducer;
import io.eventuate.cdc.producer.wrappers.DataProducerFactory;
import io.eventuate.common.eventuate.local.BinLogEvent;
import io.eventuate.local.common.exception.EventuateLocalPublishingException;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/eventuate/local/common/CdcDataPublisher.class */
public class CdcDataPublisher<EVENT extends BinLogEvent> {
    protected MeterRegistry meterRegistry;
    protected PublishingStrategy<EVENT> publishingStrategy;
    protected DataProducerFactory dataProducerFactory;
    protected DataProducer producer;
    protected Counter meterEventsPublished;
    protected Counter meterEventsDuplicates;
    protected DistributionSummary distributionSummaryEventAge;
    private PublishingFilter publishingFilter;
    private volatile boolean lastMessagePublishingFailed;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private AtomicLong timeOfLastProcessedEvent = new AtomicLong(0);
    private AtomicInteger totallyProcessedEvents = new AtomicInteger(0);
    private long sendTimeAccumulator = 0;

    public CdcDataPublisher(DataProducerFactory dataProducerFactory, PublishingFilter publishingFilter, PublishingStrategy<EVENT> publishingStrategy, MeterRegistry meterRegistry) {
        this.dataProducerFactory = dataProducerFactory;
        this.publishingStrategy = publishingStrategy;
        this.publishingFilter = publishingFilter;
        this.meterRegistry = meterRegistry;
        initMetrics();
    }

    public boolean isLastMessagePublishingFailed() {
        return this.lastMessagePublishingFailed;
    }

    private void initMetrics() {
        if (this.meterRegistry != null) {
            this.distributionSummaryEventAge = this.meterRegistry.summary("eventuate.cdc.event.age", new String[0]);
            this.meterEventsPublished = this.meterRegistry.counter("eventuate.cdc.events.sent", new String[0]);
            this.meterEventsDuplicates = this.meterRegistry.counter("eventuate.cdc.events.duplicates", new String[0]);
        }
    }

    public int getTotallyProcessedEventCount() {
        return this.totallyProcessedEvents.get();
    }

    public long getTimeOfLastProcessedEvent() {
        return this.timeOfLastProcessedEvent.get();
    }

    public long getSendTimeAccumulator() {
        return this.sendTimeAccumulator;
    }

    public void start() {
        this.logger.info("Starting CdcDataPublisher");
        this.producer = this.dataProducerFactory.create();
        this.logger.info("Started CdcDataPublisher");
    }

    public void stop() {
        this.logger.info("Stopping CdcDataPublisher");
        if (this.producer != null) {
            this.producer.close();
        }
        this.logger.info("Stopped CdcDataPublisher");
    }

    public CompletableFuture<?> sendMessage(EVENT event) throws EventuateLocalPublishingException {
        Objects.requireNonNull(event);
        String json = this.publishingStrategy.toJson(event);
        this.logger.debug("Got record: {}", json);
        String str = this.publishingStrategy.topicFor(event);
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        if (!((Boolean) event.getBinlogFileOffset().map(binlogFileOffset -> {
            return Boolean.valueOf(this.publishingFilter.shouldBePublished(binlogFileOffset, str));
        }).orElse(true)).booleanValue()) {
            this.logger.debug("Duplicate event {}", event);
            this.meterEventsDuplicates.increment();
            completableFuture.complete(null);
            return completableFuture;
        }
        this.logger.debug("sending record: {}", json);
        long nanoTime = System.nanoTime();
        send(event, str, json, completableFuture);
        this.meterEventsPublished.increment();
        this.publishingStrategy.getCreateTime(event).ifPresent(l -> {
            this.distributionSummaryEventAge.record(System.currentTimeMillis() - l.longValue());
        });
        this.sendTimeAccumulator += System.nanoTime() - nanoTime;
        return completableFuture;
    }

    private void send(EVENT event, String str, String str2, CompletableFuture<Object> completableFuture) {
        this.producer.send(str, this.publishingStrategy.partitionKeyFor(event), str2).whenComplete((obj, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            completableFuture.complete(obj);
            this.timeOfLastProcessedEvent.set(System.nanoTime());
            this.totallyProcessedEvents.incrementAndGet();
        });
    }
}
