package ch.squaredesk.nova.comm.kafka;

import ch.squaredesk.nova.comm.CommAdapter;
import ch.squaredesk.nova.comm.CommAdapterBuilder;
import ch.squaredesk.nova.comm.DefaultMessageTranscriberForStringAsTransportType;
import ch.squaredesk.nova.comm.MessageTranscriber;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/squaredesk/nova/comm/kafka/KafkaAdapter.class */
public class KafkaAdapter extends CommAdapter<String> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaAdapter.class);
    private final MessageSender messageSender;
    private final MessageReceiver messageReceiver;

    /* loaded from: input_file:ch/squaredesk/nova/comm/kafka/KafkaAdapter$Builder.class */
    public static class Builder extends CommAdapterBuilder<String, KafkaAdapter> {
        private String serverAddress;
        private String identifier;
        private MessageSender messageSender;
        private MessageReceiver messageReceiver;
        private Scheduler subscriptionScheduler;
        private Properties consumerProperties;
        private Properties producerProperties;
        private long pollTimeout;
        private TimeUnit pollTimeUnit;

        private Builder() {
            this.consumerProperties = new Properties();
            this.producerProperties = new Properties();
            this.pollTimeout = 1L;
            this.pollTimeUnit = TimeUnit.SECONDS;
        }

        public Builder setMessagePollingTimeout(long j, TimeUnit timeUnit) {
            Objects.requireNonNull(timeUnit, "pollTimeUnit must not be null");
            this.pollTimeout = j;
            this.pollTimeUnit = timeUnit;
            return this;
        }

        public Builder setConsumerProperties(Properties properties) {
            if (properties != null) {
                this.consumerProperties.putAll(properties);
            }
            return this;
        }

        private Builder addProperty(Properties properties, String str, String str2) {
            Objects.requireNonNull(str, "property key must not be null");
            Objects.requireNonNull(str2, "value for property " + str + " must not be null");
            properties.setProperty(str, str2);
            return this;
        }

        public Builder addConsumerProperty(String str, String str2) {
            return addProperty(this.consumerProperties, str, str2);
        }

        public Builder addProducerProperty(String str, String str2) {
            return addProperty(this.producerProperties, str, str2);
        }

        public Builder setProducerProperties(Properties properties) {
            if (properties != null) {
                this.producerProperties.putAll(properties);
            }
            return this;
        }

        public Builder setServerAddress(String str) {
            this.serverAddress = str;
            return this;
        }

        public Builder setSubscriptionScheduler(Scheduler scheduler) {
            this.subscriptionScheduler = scheduler;
            return this;
        }

        public Builder setIdentifier(String str) {
            this.identifier = str;
            return this;
        }

        public Builder setMessageSender(MessageSender messageSender) {
            this.messageSender = messageSender;
            return this;
        }

        public Builder setMessageReceiver(MessageReceiver messageReceiver) {
            this.messageReceiver = messageReceiver;
            return this;
        }

        public void validate() {
            Objects.requireNonNull(this.serverAddress, "serverAddress must be provided");
            Objects.requireNonNull(this.metrics, "metrics must be provided");
            if (this.subscriptionScheduler == null) {
                this.subscriptionScheduler = Schedulers.from(Executors.newSingleThreadExecutor(runnable -> {
                    Thread thread = new Thread(runnable, "KafkaSubscription");
                    thread.setDaemon(true);
                    return thread;
                }));
            }
            if (this.consumerProperties == null) {
                this.consumerProperties = new Properties();
            }
            if (this.producerProperties == null) {
                this.producerProperties = new Properties();
            }
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public KafkaAdapter m1createInstance() {
            String str = this.identifier == null ? "KafkaAdapter-" + UUID.randomUUID() : this.identifier;
            String str2 = this.identifier == null ? "KafkaAdapter-ReadGroup" : this.identifier + "ReadGroup";
            setPropertyIfNotPresent(this.consumerProperties, "bootstrap.servers", this.serverAddress);
            setPropertyIfNotPresent(this.consumerProperties, "key.deserializer", StringDeserializer.class.getName());
            setPropertyIfNotPresent(this.consumerProperties, "value.deserializer", StringDeserializer.class.getName());
            setPropertyIfNotPresent(this.consumerProperties, "client.id", str);
            setPropertyIfNotPresent(this.consumerProperties, "group.id", str2);
            setPropertyIfNotPresent(this.producerProperties, "bootstrap.servers", this.serverAddress);
            setPropertyIfNotPresent(this.producerProperties, "key.serializer", StringSerializer.class.getName());
            setPropertyIfNotPresent(this.producerProperties, "value.serializer", StringSerializer.class.getName());
            setPropertyIfNotPresent(this.producerProperties, "client.id", str);
            if (this.messageReceiver == null) {
                this.messageReceiver = new MessageReceiver(this.identifier, this.consumerProperties, this.pollTimeout, this.pollTimeUnit, this.metrics);
            }
            if (this.messageSender == null) {
                this.messageSender = new MessageSender(this.identifier, this.producerProperties, this.metrics);
            }
            if (this.messageTranscriber == null) {
                this.messageTranscriber = new DefaultMessageTranscriberForStringAsTransportType();
            }
            return new KafkaAdapter(this.messageSender, this.messageReceiver, this.messageTranscriber, this.metrics);
        }

        private static void setPropertyIfNotPresent(Properties properties, String str, String str2) {
            if (properties.containsKey(str)) {
                return;
            }
            properties.setProperty(str, str2);
        }
    }

    KafkaAdapter(MessageSender messageSender, MessageReceiver messageReceiver, MessageTranscriber<String> messageTranscriber, Metrics metrics) {
        super(messageTranscriber, metrics);
        this.messageReceiver = messageReceiver;
        this.messageSender = messageSender;
    }

    public Completable sendMessage(String str, String str2) {
        return this.messageSender.send(str2, new OutgoingMessageMetaData(str, new SendInfo()));
    }

    public <T> Completable sendMessage(String str, T t) {
        return sendMessage(str, t, this.messageTranscriber.getOutgoingMessageTranscriber(t.getClass()));
    }

    public <T> Completable sendMessage(String str, T t, Function<T, String> function) {
        Objects.requireNonNull(t, "message must not be null");
        return this.messageSender.send(t, new OutgoingMessageMetaData(str, new SendInfo()), function);
    }

    public Flowable<String> messages(String str) {
        return this.messageReceiver.messages(str).map(incomingMessage -> {
            return (String) incomingMessage.message;
        });
    }

    public <T> Flowable<T> messages(String str, Class<T> cls) {
        return messages(str, this.messageTranscriber.getIncomingMessageTranscriber(cls));
    }

    public <T> Flowable<T> messages(String str, Function<String, T> function) {
        return this.messageReceiver.messages(str, function).map(incomingMessage -> {
            return incomingMessage.message;
        });
    }

    public void shutdown() {
        if (this.messageReceiver instanceof MessageReceiver) {
            this.messageReceiver.shutdown();
        }
        logger.info("KafkaAdapter shut down");
    }

    public static Builder builder() {
        return new Builder();
    }
}
