package dev.sanda.apifi.service.graphql_subcriptions.pubsub.redis_pubsub;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.sanda.apifi.service.graphql_subcriptions.pubsub.PubSubMessagingService;
import dev.sanda.apifi.service.graphql_subcriptions.pubsub.PubSubTopicHandler;
import dev.sanda.datafi.reflection.runtime_services.ReflectionCache;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.Transactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;

@ConditionalOnBean({DefaultRedisConfig.class})
@Component
/* loaded from: input_file:dev/sanda/apifi/service/graphql_subcriptions/pubsub/redis_pubsub/RedisPubSubMessagingService.class */
public class RedisPubSubMessagingService implements PubSubMessagingService {
    private static final Logger log = LoggerFactory.getLogger(RedisPubSubMessagingService.class);
    private final RedisMessagePublisher redisPublisher;
    private final RedisMessageListenerContainer redisListeners;
    private final ReflectionCache reflectionCache;
    private final Map<String, Map<String, PubSubTopicHandler>> topicHandlers = new ConcurrentHashMap();
    private final Map<String, MessageListener> topicListeners = new ConcurrentHashMap();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    @Autowired
    public RedisPubSubMessagingService(RedisMessagePublisher redisMessagePublisher, RedisMessageListenerContainer redisMessageListenerContainer, ReflectionCache reflectionCache) {
        this.redisPublisher = redisMessagePublisher;
        this.redisListeners = redisMessageListenerContainer;
        this.reflectionCache = reflectionCache;
    }

    @Transactional
    public void handleDataPayload(String str, Object obj) {
        synchronized (this.topicHandlers.get(str)) {
            this.topicHandlers.get(str).values().forEach(pubSubTopicHandler -> {
                pubSubTopicHandler.handleDataInTransaction(obj);
            });
        }
    }

    @Override // dev.sanda.apifi.service.graphql_subcriptions.pubsub.PubSubMessagingService
    public void publishToTopic(String str, Object obj) {
        this.redisPublisher.publish(str, publicationMapper().writeValueAsString(obj));
    }

    private ObjectMapper publicationMapper() {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.disable(new MapperFeature[]{MapperFeature.AUTO_DETECT_CREATORS, MapperFeature.AUTO_DETECT_GETTERS, MapperFeature.AUTO_DETECT_IS_GETTERS});
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
        return objectMapper;
    }

    @Override // dev.sanda.apifi.service.graphql_subcriptions.pubsub.PubSubMessagingService
    public void registerTopicHandler(String str, PubSubTopicHandler pubSubTopicHandler) {
        this.lock.writeLock().lock();
        log.info("registering handler for topic \"{}\"", str);
        this.topicHandlers.putIfAbsent(str, new ConcurrentHashMap());
        this.topicHandlers.get(str).put(pubSubTopicHandler.getId(), pubSubTopicHandler);
        if (!this.topicListeners.containsKey(str)) {
            RedisMessageSubscriber redisMessageSubscriber = new RedisMessageSubscriber(str, this, this.reflectionCache);
            this.topicListeners.put(str, redisMessageSubscriber);
            try {
                this.redisListeners.addMessageListener(redisMessageSubscriber, redisMessageSubscriber.getTopic());
            } catch (Exception e) {
                log.info(e.toString());
            }
        }
        this.lock.writeLock().unlock();
    }

    @Override // dev.sanda.apifi.service.graphql_subcriptions.pubsub.PubSubMessagingService
    public void removeTopicHandler(String str, String str2) {
        this.lock.readLock().lock();
        log.info("removing handler with id \"{}\" from topic \"{}\"", str2, str);
        this.topicHandlers.get(str).remove(str2);
        if (this.topicHandlers.get(str).isEmpty()) {
            log.info("no listeners left on topic \"{}\", unsubscribing from redis channel \"{}\"", str, str);
            this.redisListeners.removeMessageListener(this.topicListeners.get(str));
            this.topicListeners.remove(str);
        }
        this.lock.readLock().unlock();
    }
}
