package io.github.icodegarden.wing.distribution.sync;

import io.github.icodegarden.commons.lang.serialization.KryoDeserializer;
import io.github.icodegarden.commons.lang.serialization.KryoSerializer;
import io.github.icodegarden.wing.common.EnvException;
import io.github.icodegarden.wing.common.SyncFailedException;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/github/icodegarden/wing/distribution/sync/KafkaBroadcast.class */
public class KafkaBroadcast extends AbstractDistributionSyncStrategy {
    private static final String TOPIC = "io.cahce.sync";
    private KafkaProducer<String, byte[]> producer;
    private KafkaConsumer<String, byte[]> consumer;
    private boolean closed = false;
    private final String bootstrapServers;
    private static final Logger log = LoggerFactory.getLogger(KafkaBroadcast.class);
    private static final KryoSerializer KRYO_SERIALIZER = new KryoSerializer();
    private static final KryoDeserializer KRYO_DESERIALIZER = new KryoDeserializer();

    public KafkaBroadcast(String str) {
        this.bootstrapServers = str;
    }

    @Override // io.github.icodegarden.wing.distribution.sync.AbstractDistributionSyncStrategy, io.github.icodegarden.wing.distribution.sync.DistributionSyncStrategy
    public boolean injectCacher(DistributionSyncCacher distributionSyncCacher) {
        boolean injectCacher = super.injectCacher(distributionSyncCacher);
        if (injectCacher) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.bootstrapServers);
            this.producer = buildProducer(properties);
            this.consumer = buildConsumer(properties);
            subBroadcast();
        }
        return injectCacher;
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [io.github.icodegarden.wing.distribution.sync.KafkaBroadcast$1] */
    private void subBroadcast() throws EnvException {
        this.consumer.subscribe(Arrays.asList(TOPIC));
        new Thread(getClass().getSimpleName() + "-subscribe") { // from class: io.github.icodegarden.wing.distribution.sync.KafkaBroadcast.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!KafkaBroadcast.this.closed) {
                    try {
                        Iterator it = KafkaBroadcast.this.consumer.poll(Duration.ofMillis(100L)).iterator();
                        while (it.hasNext()) {
                            try {
                                KafkaBroadcast.this.receiveSync((DistributionSyncDTO) KafkaBroadcast.KRYO_DESERIALIZER.deserialize((byte[]) ((ConsumerRecord) it.next()).value()));
                            } catch (Exception e) {
                                KafkaBroadcast.log.error("ex on handle cache sync from kafka", e);
                            }
                        }
                    } finally {
                        KafkaBroadcast.this.consumer.close(Duration.ofMillis(3000L));
                    }
                }
            }
        }.start();
    }

    @Override // io.github.icodegarden.wing.distribution.sync.AbstractDistributionSyncStrategy
    protected void broadcast(DistributionSyncDTO distributionSyncDTO) throws SyncFailedException {
        this.producer.send(new ProducerRecord(TOPIC, KRYO_SERIALIZER.serialize(distributionSyncDTO)));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closed = true;
        this.producer.close(Duration.ofMillis(3000L));
    }

    private KafkaProducer<String, byte[]> buildProducer(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("acks", "1");
        properties2.put("retries", 2);
        properties2.put("max.request.size", 1000012);
        properties2.put("delivery.timeout.ms", 3000);
        properties2.put("linger.ms", 5);
        properties2.put("request.timeout.ms", 2500);
        properties2.put("compression.type", "none");
        properties2.putAll(properties);
        properties2.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties2.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer<>(properties2);
    }

    private KafkaConsumer<String, byte[]> buildConsumer(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("fetch.min.bytes", 1);
        properties2.put("fetch.max.bytes", 52428800);
        properties2.put("fetch.max.wait.ms", 500);
        properties2.put("max.poll.interval.ms", 600000);
        properties2.put("max.poll.records", 100);
        properties2.put("heartbeat.interval.ms", 3000);
        properties2.put("session.timeout.ms", 10000);
        properties2.put("max.partition.fetch.bytes", 1048576);
        properties2.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        properties2.put("auto.offset.reset", "latest");
        properties2.put("connections.max.idle.ms", Integer.MAX_VALUE);
        properties2.putAll(properties);
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("group.id", this.distributionSyncCacher.getApplicationName() + "-" + UUID.randomUUID().toString().hashCode());
        properties2.put("enable.auto.commit", true);
        return new KafkaConsumer<>(properties2);
    }
}
