package pl.allegro.tech.hermes.domain.topic.schema;

import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SchemaSource;
import pl.allegro.tech.hermes.api.Topic;

/* loaded from: input_file:pl/allegro/tech/hermes/domain/topic/schema/DefaultCachedSchemaSourceProvider.class */
public class DefaultCachedSchemaSourceProvider implements CachedSchemaSourceProvider {
    private static final Logger logger = LoggerFactory.getLogger(SchemaRepository.class);
    private final LoadingCache<Topic, Optional<SchemaSource>> cache;
    private final List<Consumer<TopicWithSchema<SchemaSource>>> schemaReloadedConsumers;
    private final List<Consumer<TopicWithSchema<SchemaSource>>> schemaRemovedConsumers;

    /* loaded from: input_file:pl/allegro/tech/hermes/domain/topic/schema/DefaultCachedSchemaSourceProvider$SchemaSourceCacheLoader.class */
    private class SchemaSourceCacheLoader extends CacheLoader<Topic, Optional<SchemaSource>> {
        private final SchemaSourceProvider schemaSourceProvider;
        private final ExecutorService reloadSchemaSourceExecutor;

        public SchemaSourceCacheLoader(SchemaSourceProvider schemaSourceProvider, ExecutorService executorService) {
            this.schemaSourceProvider = schemaSourceProvider;
            this.reloadSchemaSourceExecutor = executorService;
        }

        public Optional<SchemaSource> load(Topic topic) throws Exception {
            DefaultCachedSchemaSourceProvider.logger.info("Loading schema source for topic {}", topic.getQualifiedName());
            return this.schemaSourceProvider.get(topic);
        }

        public ListenableFuture<Optional<SchemaSource>> reload(Topic topic, Optional<SchemaSource> optional) throws Exception {
            Runnable create = ListenableFutureTask.create(() -> {
                DefaultCachedSchemaSourceProvider.logger.info("Reloading schema for topic {}", topic.getQualifiedName());
                try {
                    Optional<SchemaSource> optional2 = this.schemaSourceProvider.get(topic);
                    if (!optional.equals(optional2)) {
                        DefaultCachedSchemaSourceProvider.this.notifyConsumersAboutSchemaReload(topic, optional2.get());
                    }
                    return optional2;
                } catch (Exception e) {
                    DefaultCachedSchemaSourceProvider.logger.warn("Could not reload schema source for topic {}", topic.getQualifiedName(), e);
                    throw e;
                }
            });
            this.reloadSchemaSourceExecutor.execute(create);
            return create;
        }
    }

    public DefaultCachedSchemaSourceProvider(int i, int i2, ExecutorService executorService, SchemaSourceProvider schemaSourceProvider) {
        this(i, i2, executorService, schemaSourceProvider, Ticker.systemTicker());
    }

    DefaultCachedSchemaSourceProvider(int i, int i2, ExecutorService executorService, SchemaSourceProvider schemaSourceProvider, Ticker ticker) {
        this.schemaReloadedConsumers = Lists.newArrayList();
        this.schemaRemovedConsumers = Lists.newArrayList();
        this.cache = CacheBuilder.newBuilder().ticker(ticker).refreshAfterWrite(i, TimeUnit.MINUTES).expireAfterWrite(i2, TimeUnit.MINUTES).removalListener(new RemovalListener<Topic, Optional<SchemaSource>>() { // from class: pl.allegro.tech.hermes.domain.topic.schema.DefaultCachedSchemaSourceProvider.1
            public void onRemoval(RemovalNotification<Topic, Optional<SchemaSource>> removalNotification) {
                DefaultCachedSchemaSourceProvider.this.schemaRemovedConsumers.forEach(consumer -> {
                    consumer.accept(new TopicWithSchema((Topic) removalNotification.getKey(), ((Optional) removalNotification.getValue()).get()));
                });
            }
        }).build(new SchemaSourceCacheLoader(schemaSourceProvider, executorService));
    }

    @Override // pl.allegro.tech.hermes.domain.topic.schema.CachedSchemaSourceProvider
    public void onReload(Consumer<TopicWithSchema<SchemaSource>> consumer) {
        this.schemaReloadedConsumers.add(consumer);
    }

    @Override // pl.allegro.tech.hermes.domain.topic.schema.CachedSchemaSourceProvider
    public void onRemove(Consumer<TopicWithSchema<SchemaSource>> consumer) {
        this.schemaRemovedConsumers.add(consumer);
    }

    @Override // pl.allegro.tech.hermes.domain.topic.schema.SchemaSourceProvider
    public Optional<SchemaSource> get(Topic topic) {
        try {
            return (Optional) this.cache.get(topic);
        } catch (ExecutionException e) {
            logger.error("Error while loading schema source for topic {}", topic.getQualifiedName(), e);
            return Optional.empty();
        }
    }

    @Override // pl.allegro.tech.hermes.domain.topic.schema.CachedSchemaSourceProvider
    public void reload(Topic topic) {
        this.cache.refresh(topic);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyConsumersAboutSchemaReload(Topic topic, SchemaSource schemaSource) {
        this.schemaReloadedConsumers.forEach(consumer -> {
            consumer.accept(new TopicWithSchema(topic, schemaSource));
        });
    }
}
