package io.confluent.command;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.UnknownFieldSet;
import io.confluent.command.record.Command;
import io.confluent.serializers.ProtoSerde;
import io.confluent.serializers.UberSerde;
import java.lang.Thread;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/command/CommandStore.class */
public class CommandStore {
    public static final String DEFAULT_COMMAND_TOPIC = "_confluent-command";
    private static final Logger log = LoggerFactory.getLogger(CommandStore.class);
    private static final UberSerde<Command.CommandKey> commandKeySerde = new ProtoSerde(Command.CommandKey.getDefaultInstance());
    private static final UberSerde<Command.CommandMessage> commandMessageSerde = new ProtoSerde(Command.CommandMessage.getDefaultInstance());
    private static final long DEFAULT_START_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
    private static final long DEFAULT_STOP_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
    private static final long DEFAULT_AWAIT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
    public static final String COMMANDER = "commander";
    private final String commandTopic;
    private final KafkaStreams streamsJob;
    private final KafkaProducer<Command.CommandKey, Command.CommandMessage> producer;
    private final AtomicLong highOffset;

    /* loaded from: input_file:io/confluent/command/CommandStore$Builder.class */
    public static class Builder {
        private static final Map<String, Object> DEFAULT_PROPERTIES = ImmutableMap.of("application.id", "test-commander", "bootstrap.servers", "localhost:9092", "topology.optimization", "all");
        private String commandTopic = CommandStore.DEFAULT_COMMAND_TOPIC;
        private StreamsConfig streamsConfig = null;
        private Map<String, Object> producerConfig = DEFAULT_PROPERTIES;

        public Builder topic(String str) {
            this.commandTopic = str;
            return this;
        }

        public Builder streamsConfig(StreamsConfig streamsConfig) {
            this.streamsConfig = streamsConfig;
            return this;
        }

        public Builder producerConfig(Map<String, Object> map) {
            this.producerConfig = map;
            return this;
        }

        public CommandStore build() {
            final AtomicLong atomicLong = new AtomicLong();
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            streamsBuilder.table(this.commandTopic, Consumed.with(Topology.AutoOffsetReset.EARLIEST).withKeySerde(CommandStore.commandKeySerde).withValueSerde(CommandStore.commandMessageSerde), Materialized.as(CommandStore.COMMANDER)).toStream().process(new ProcessorSupplier<Command.CommandKey, Command.CommandMessage>() { // from class: io.confluent.command.CommandStore.Builder.1
                public Processor<Command.CommandKey, Command.CommandMessage> get() {
                    return new AbstractProcessor<Command.CommandKey, Command.CommandMessage>() { // from class: io.confluent.command.CommandStore.Builder.1.1
                        public void process(Command.CommandKey commandKey, Command.CommandMessage commandMessage) {
                            atomicLong.set(context().offset());
                            synchronized (atomicLong) {
                                atomicLong.notifyAll();
                            }
                        }
                    };
                }
            }, new String[0]);
            Properties properties = new Properties();
            if (this.streamsConfig == null) {
                properties.putAll(DEFAULT_PROPERTIES);
            } else {
                properties.putAll(this.streamsConfig.originals());
            }
            return new CommandStore(this.commandTopic, new KafkaStreams(streamsBuilder.build(properties), this.streamsConfig == null ? new StreamsConfig(DEFAULT_PROPERTIES) : this.streamsConfig), new KafkaProducer(this.producerConfig, CommandStore.commandKeySerde, CommandStore.commandMessageSerde), atomicLong);
        }
    }

    protected CommandStore(String str, KafkaStreams kafkaStreams, KafkaProducer<Command.CommandKey, Command.CommandMessage> kafkaProducer, AtomicLong atomicLong) {
        this.commandTopic = str;
        this.streamsJob = kafkaStreams;
        this.producer = kafkaProducer;
        this.highOffset = atomicLong;
    }

    public String getCommandTopic() {
        return this.commandTopic;
    }

    public static UberSerde<Command.CommandKey> getCommandKeySerde() {
        return commandKeySerde;
    }

    public static UberSerde<Command.CommandMessage> getCommandMessageSerde() {
        return commandMessageSerde;
    }

    public void start() throws TimeoutException, InterruptedException {
        start(DEFAULT_START_TIMEOUT);
    }

    public void start(long j) throws TimeoutException, InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + j;
        this.streamsJob.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: io.confluent.command.CommandStore.1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                CommandStore.log.error("streams uncaught exception thread={}", thread, th);
            }
        });
        this.streamsJob.start();
        while (this.streamsJob.state() != KafkaStreams.State.RUNNING) {
            log.info("waiting for streams to be in running state {}", this.streamsJob.state());
            if (currentTimeMillis < System.currentTimeMillis()) {
                log.warn("unable to start with allowance={}", Long.valueOf(j));
                throw new TimeoutException();
            }
            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
        }
        log.info("Streams state {}", this.streamsJob.state());
        ReadOnlyKeyValueStore<Command.CommandKey, Command.CommandMessage> readOnlyKeyValueStore = null;
        while (readOnlyKeyValueStore == null) {
            try {
                readOnlyKeyValueStore = getStore();
            } catch (Exception e) {
                log.info("waiting for command store to materialize {}", log.isDebugEnabled() ? e : "");
            }
            if (currentTimeMillis < System.currentTimeMillis()) {
                log.warn("unable to start with allowance={}", Long.valueOf(j));
                throw new TimeoutException();
            }
            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
        }
    }

    public void close() {
        close(false);
    }

    public void close(boolean z) {
        try {
            this.streamsJob.close(Duration.ofMillis(DEFAULT_STOP_TIMEOUT));
            if (z) {
                this.streamsJob.cleanUp();
            }
        } catch (Exception e) {
            log.error("closing streams job", e);
        }
        try {
            this.producer.close(Duration.ofMillis(DEFAULT_STOP_TIMEOUT));
        } catch (Exception e2) {
            log.error("closing producer", e2);
        }
    }

    public Future<RecordMetadata> sendCommand(Command.CommandKey commandKey, Command.CommandMessage commandMessage) {
        return this.producer.send(new ProducerRecord(this.commandTopic, 0, commandKey, commandMessage));
    }

    public RecordMetadata awaitCommand(Command.CommandKey commandKey, Command.CommandMessage commandMessage) throws InterruptedException, TimeoutException, ExecutionException {
        return awaitCommand(commandKey, commandMessage, DEFAULT_AWAIT_TIMEOUT);
    }

    public RecordMetadata awaitCommand(Command.CommandKey commandKey, Command.CommandMessage commandMessage, long j) throws InterruptedException, TimeoutException, ExecutionException {
        if (commandMessage == null) {
            return awaitDelete(commandKey, j);
        }
        UUID randomUUID = UUID.randomUUID();
        return awaitUpdate(commandKey, commandMessage.toBuilder().setUnknownFields(UnknownFieldSet.newBuilder().addField(999557, UnknownFieldSet.Field.newBuilder().addFixed64(randomUUID.getLeastSignificantBits()).addFixed64(randomUUID.getMostSignificantBits()).build()).build()).build(), j);
    }

    private RecordMetadata awaitUpdate(Command.CommandKey commandKey, Command.CommandMessage commandMessage, long j) throws InterruptedException, ExecutionException, TimeoutException {
        Preconditions.checkNotNull(commandMessage, "command must not be null");
        long currentTimeMillis = System.currentTimeMillis() + j;
        RecordMetadata recordMetadata = sendCommand(commandKey, commandMessage).get(j, TimeUnit.MILLISECONDS);
        synchronized (this.highOffset) {
            while (this.highOffset.get() < recordMetadata.offset() && currentTimeMillis > System.currentTimeMillis()) {
                this.highOffset.wait(Math.max(currentTimeMillis - System.currentTimeMillis(), 0L));
            }
        }
        if (this.highOffset.get() < recordMetadata.offset()) {
            throw new TimeoutException("Timed-out waiting to see " + commandKey + " update at offset " + recordMetadata.offset());
        }
        return recordMetadata;
    }

    private RecordMetadata awaitDelete(Command.CommandKey commandKey, long j) throws InterruptedException, ExecutionException, TimeoutException {
        RecordMetadata recordMetadata = sendCommand(commandKey, null).get(j, TimeUnit.MILLISECONDS);
        long currentTimeMillis = System.currentTimeMillis() + j;
        synchronized (this.highOffset) {
            while (get(commandKey) != null && currentTimeMillis > System.currentTimeMillis()) {
                this.highOffset.wait(100L);
            }
        }
        if (get(commandKey) != null) {
            throw new TimeoutException("Timed-out waiting to delete " + commandKey);
        }
        return recordMetadata;
    }

    public Command.CommandMessage get(Command.CommandKey commandKey) {
        return (Command.CommandMessage) getStore().get(commandKey);
    }

    public ReadOnlyKeyValueStore<Command.CommandKey, Command.CommandMessage> getStore() {
        return (ReadOnlyKeyValueStore) this.streamsJob.store(COMMANDER, QueryableStoreTypes.keyValueStore());
    }

    public static void main(String[] strArr) throws Throwable {
        CommandStore build = new Builder().build();
        try {
            build.start();
            Command.CommandKey build2 = Command.CommandKey.newBuilder().setConfigType(Command.CommandConfigType.LICENSE_INFO).setGuid("guid").build();
            build.awaitCommand(build2, Command.CommandMessage.newBuilder().setLicenseInfo(Command.LicenseInfo.newBuilder().setJwt("jwt 9").build()).build());
            log.info("{}", build.get(build2));
        } finally {
            build.close();
        }
    }
}
