package io.eventuate.messaging.redis.spring.consumer;

import io.eventuate.messaging.partitionmanagement.Assignment;
import io.eventuate.messaging.partitionmanagement.Coordinator;
import io.eventuate.messaging.partitionmanagement.CoordinatorFactory;
import io.eventuate.messaging.partitionmanagement.SubscriptionLeaderHook;
import io.eventuate.messaging.partitionmanagement.SubscriptionLifecycleHook;
import io.eventuate.messaging.redis.spring.common.RedisUtil;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;

/* loaded from: input_file:io/eventuate/messaging/redis/spring/consumer/Subscription.class */
public class Subscription {
    private final String subscriptionId;
    private String consumerId;
    private RedisTemplate<String, String> redisTemplate;
    private String subscriberId;
    private RedisMessageHandler handler;
    private long timeInMillisecondsToSleepWhenKeyDoesNotExist;
    private long blockStreamTimeInMilliseconds;
    private Coordinator coordinator;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private ExecutorService executorService = Executors.newCachedThreadPool();
    private Map<String, Set<Integer>> currentPartitionsByChannel = new HashMap();
    private ConcurrentHashMap<ChannelPartition, ChannelProcessor> channelProcessorsByChannelAndPartition = new ConcurrentHashMap<>();
    private Optional<SubscriptionLifecycleHook> subscriptionLifecycleHook = Optional.empty();
    private Optional<SubscriptionLeaderHook> leaderHook = Optional.empty();
    private Optional<Runnable> closingCallback = Optional.empty();

    /* loaded from: input_file:io/eventuate/messaging/redis/spring/consumer/Subscription$ChannelPartition.class */
    private static class ChannelPartition {
        private String channel;
        private int partition;

        public ChannelPartition() {
        }

        public ChannelPartition(String str, int i) {
            this.channel = str;
            this.partition = i;
        }

        public String getChannel() {
            return this.channel;
        }

        public void setChannel(String str) {
            this.channel = str;
        }

        public int getPartition() {
            return this.partition;
        }

        public void setPartition(int i) {
            this.partition = i;
        }

        public int hashCode() {
            return HashCodeBuilder.reflectionHashCode(this);
        }

        public boolean equals(Object obj) {
            return EqualsBuilder.reflectionEquals(this, obj);
        }
    }

    public Subscription(String str, String str2, RedisTemplate<String, String> redisTemplate, String str3, Set<String> set, RedisMessageHandler redisMessageHandler, CoordinatorFactory coordinatorFactory, long j, long j2) {
        this.subscriptionId = str;
        this.consumerId = str2;
        this.redisTemplate = redisTemplate;
        this.subscriberId = str3;
        this.handler = redisMessageHandler;
        this.timeInMillisecondsToSleepWhenKeyDoesNotExist = j;
        this.blockStreamTimeInMilliseconds = j2;
        set.forEach(str4 -> {
            this.currentPartitionsByChannel.put(str4, new HashSet());
        });
        this.coordinator = coordinatorFactory.makeCoordinator(str3, set, str, this::assignmentUpdated, RedisKeyUtil.keyForLeaderLock(str3), leadershipController -> {
            this.leaderHook.ifPresent(subscriptionLeaderHook -> {
                subscriptionLeaderHook.leaderUpdated(true, str);
            });
        }, () -> {
            this.leaderHook.ifPresent(subscriptionLeaderHook -> {
                subscriptionLeaderHook.leaderUpdated(false, str);
            });
        });
        this.logger.info("subscription created (channels = {}, {})", set, identificationInformation());
    }

    public void setSubscriptionLifecycleHook(SubscriptionLifecycleHook subscriptionLifecycleHook) {
        this.subscriptionLifecycleHook = Optional.ofNullable(subscriptionLifecycleHook);
    }

    public void setLeaderHook(SubscriptionLeaderHook subscriptionLeaderHook) {
        this.leaderHook = Optional.ofNullable(subscriptionLeaderHook);
    }

    public void setClosingCallback(Runnable runnable) {
        this.closingCallback = Optional.of(runnable);
    }

    private void assignmentUpdated(Assignment assignment) {
        this.logger.info("assignment is updated (assignment = {}, {})", assignment, identificationInformation());
        for (String str : this.currentPartitionsByChannel.keySet()) {
            Set<Integer> set = this.currentPartitionsByChannel.get(str);
            Set<Integer> set2 = (Set) assignment.getPartitionAssignmentsByChannel().get(str);
            Set set3 = (Set) set.stream().filter(num -> {
                return !set2.contains(num);
            }).collect(Collectors.toSet());
            this.logger.info("partitions resigned (resigned partitions = {}, {})", set3, identificationInformation());
            Set set4 = (Set) set2.stream().filter(num2 -> {
                return !set.contains(num2);
            }).collect(Collectors.toSet());
            this.logger.info("partitions asigned (resigned partitions = {}, {})", assignment, identificationInformation());
            set3.forEach(num3 -> {
                this.channelProcessorsByChannelAndPartition.remove(new ChannelPartition(str, num3.intValue())).stop();
            });
            set4.forEach(num4 -> {
                ChannelProcessor channelProcessor = new ChannelProcessor(this.redisTemplate, this.subscriberId, RedisUtil.channelToRedisStream(str, num4.intValue()), this.handler, identificationInformation(), this.timeInMillisecondsToSleepWhenKeyDoesNotExist, this.blockStreamTimeInMilliseconds);
                ExecutorService executorService = this.executorService;
                channelProcessor.getClass();
                executorService.submit(channelProcessor::process);
                this.channelProcessorsByChannelAndPartition.put(new ChannelPartition(str, num4.intValue()), channelProcessor);
            });
            this.currentPartitionsByChannel.put(str, set2);
            this.subscriptionLifecycleHook.ifPresent(subscriptionLifecycleHook -> {
                subscriptionLifecycleHook.partitionsUpdated(str, this.subscriptionId, set2);
            });
        }
    }

    public void close() {
        this.coordinator.close();
        this.channelProcessorsByChannelAndPartition.values().forEach((v0) -> {
            v0.stop();
        });
    }

    private String identificationInformation() {
        return String.format("(consumerId = %s, subscriptionId = %s, subscriberId = %s)", this.consumerId, this.subscriptionId, this.subscriberId);
    }
}
