package io.eventuate.messaging.redis.spring.consumer;

import io.eventuate.messaging.partitionmanagement.CommonMessageConsumer;
import io.eventuate.messaging.partitionmanagement.CoordinatorFactory;
import io.eventuate.messaging.partitionmanagement.SubscriptionLeaderHook;
import io.eventuate.messaging.partitionmanagement.SubscriptionLifecycleHook;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;

/* loaded from: input_file:io/eventuate/messaging/redis/spring/consumer/MessageConsumerRedisImpl.class */
public class MessageConsumerRedisImpl implements CommonMessageConsumer {
    private Logger logger;
    public final String consumerId;
    private Supplier<String> subscriptionIdSupplier;
    private RedisTemplate<String, String> redisTemplate;
    private ConcurrentLinkedQueue<Subscription> subscriptions;
    private final CoordinatorFactory coordinatorFactory;
    private long timeInMillisecondsToSleepWhenKeyDoesNotExist;
    private long blockStreamTimeInMilliseconds;

    public MessageConsumerRedisImpl(RedisTemplate<String, String> redisTemplate, CoordinatorFactory coordinatorFactory, long j, long j2) {
        this(() -> {
            return UUID.randomUUID().toString();
        }, UUID.randomUUID().toString(), redisTemplate, coordinatorFactory, j, j2);
    }

    public MessageConsumerRedisImpl(Supplier<String> supplier, String str, RedisTemplate<String, String> redisTemplate, CoordinatorFactory coordinatorFactory, long j, long j2) {
        this.logger = LoggerFactory.getLogger(getClass());
        this.subscriptions = new ConcurrentLinkedQueue<>();
        this.subscriptionIdSupplier = supplier;
        this.consumerId = str;
        this.redisTemplate = redisTemplate;
        this.coordinatorFactory = coordinatorFactory;
        this.timeInMillisecondsToSleepWhenKeyDoesNotExist = j;
        this.blockStreamTimeInMilliseconds = j2;
        this.logger.info("Consumer created (consumer id = {})", str);
    }

    public Subscription subscribe(String str, Set<String> set, RedisMessageHandler redisMessageHandler) {
        this.logger.info("Consumer subscribes to channels (consumer id = {}, subscriber id {}, channels = {})", new Object[]{this.consumerId, str, set});
        Subscription subscription = new Subscription(this.subscriptionIdSupplier.get(), this.consumerId, this.redisTemplate, str, set, redisMessageHandler, this.coordinatorFactory, this.timeInMillisecondsToSleepWhenKeyDoesNotExist, this.blockStreamTimeInMilliseconds);
        this.subscriptions.add(subscription);
        subscription.setClosingCallback(() -> {
            this.subscriptions.remove(subscription);
        });
        this.logger.info("Consumer subscribed to channels (consumer id = {}, subscriber id {}, channels = {})", new Object[]{this.consumerId, str, set});
        return subscription;
    }

    public void setSubscriptionLifecycleHook(SubscriptionLifecycleHook subscriptionLifecycleHook) {
        this.subscriptions.forEach(subscription -> {
            subscription.setSubscriptionLifecycleHook(subscriptionLifecycleHook);
        });
    }

    public void setLeaderHook(SubscriptionLeaderHook subscriptionLeaderHook) {
        this.subscriptions.forEach(subscription -> {
            subscription.setLeaderHook(subscriptionLeaderHook);
        });
    }

    public void close() {
        this.logger.info("Closing consumer (consumer id = {})", this.consumerId);
        this.subscriptions.forEach((v0) -> {
            v0.close();
        });
        this.subscriptions.clear();
        this.logger.info("Closed consumer (consumer id = {})", this.consumerId);
    }

    public String getId() {
        return this.consumerId;
    }
}
