package io.eventuate.messaging.redis.spring.consumer;

import io.lettuce.core.RedisCommandExecutionException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;

/* loaded from: input_file:io/eventuate/messaging/redis/spring/consumer/ChannelProcessor.class */
public class ChannelProcessor {
    private String subscriptionIdentificationInfo;
    private String subscriberId;
    private String channel;
    private RedisMessageHandler messageHandler;
    private RedisTemplate<String, String> redisTemplate;
    private long timeInMillisecondsToSleepWhenKeyDoesNotExist;
    private long blockStreamTimeInMilliseconds;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private CountDownLatch stopCountDownLatch = new CountDownLatch(1);
    private AtomicBoolean running = new AtomicBoolean(false);

    public ChannelProcessor(RedisTemplate<String, String> redisTemplate, String str, String str2, RedisMessageHandler redisMessageHandler, String str3, long j, long j2) {
        this.redisTemplate = redisTemplate;
        this.subscriberId = str;
        this.channel = str2;
        this.messageHandler = redisMessageHandler;
        this.subscriptionIdentificationInfo = str3;
        this.timeInMillisecondsToSleepWhenKeyDoesNotExist = j;
        this.blockStreamTimeInMilliseconds = j2;
        this.logger.info("Channel processor is created (channel = {}, {})", str2, str3);
    }

    public void process() {
        try {
            this.logger.info("Channel processor started processing (channel = {}, {})", this.channel, this.subscriptionIdentificationInfo);
            this.running.set(true);
            makeSureConsumerGroupExists();
            processPendingRecords();
            processRegularRecords();
            this.stopCountDownLatch.countDown();
            this.logger.info("Channel processor finished processing (channel = {}, {})", this.channel, this.subscriptionIdentificationInfo);
        } catch (Throwable th) {
            this.logger.error("Channel processor got error: (channel = {}, subscriberId = {})", this.channel, this.subscriberId);
            this.logger.error("Channel processor got error", th);
            throw th;
        }
    }

    public void stop() {
        this.logger.info("stopping channel (channel = {}, {})", this.channel, this.subscriptionIdentificationInfo);
        this.running.set(false);
        try {
            this.stopCountDownLatch.await();
            this.logger.info("Stopped channel (channel = {}, {})", this.channel, this.subscriptionIdentificationInfo);
        } catch (InterruptedException e) {
            this.logger.error("Stopping channel failed (channel = {}, {})", this.channel, this.subscriptionIdentificationInfo);
            this.logger.error("Stopping channel failed", e);
            throw new RuntimeException(e);
        }
    }

    private void makeSureConsumerGroupExists() {
        this.logger.info("Ensuring consumer group exists {} {}", this.channel, this.subscriberId);
        while (this.running.get()) {
            try {
                this.logger.info("Creating group {} {}", this.channel, this.subscriberId);
                this.redisTemplate.opsForStream().createGroup(this.channel, ReadOffset.from("0"), this.subscriberId);
                this.logger.info("Ensured consumer group exists {}", this.channel);
                return;
            } catch (RedisSystemException e) {
                if (!isKeyDoesNotExist(e)) {
                    if (isGroupExistsAlready(e)) {
                        this.logger.info("Ensured consumer group exists {}", this.channel);
                        return;
                    } else {
                        this.logger.error("Got exception ensuring consumer group exists: " + this.channel, e);
                        throw e;
                    }
                }
                this.logger.info("Stream {} does not exist!", this.channel);
                sleep(this.timeInMillisecondsToSleepWhenKeyDoesNotExist);
            }
        }
    }

    private boolean isKeyDoesNotExist(RedisSystemException redisSystemException) {
        return isRedisCommandExceptionContainingMessage(redisSystemException, "ERR The XGROUP subcommand requires the key to exist");
    }

    private boolean isGroupExistsAlready(RedisSystemException redisSystemException) {
        return isRedisCommandExceptionContainingMessage(redisSystemException, "Consumer Group name already exists");
    }

    private boolean isRedisCommandExceptionContainingMessage(RedisSystemException redisSystemException, String str) {
        String message = redisSystemException.getCause().getMessage();
        return (redisSystemException.getCause() instanceof RedisCommandExecutionException) && message != null && message.contains(str);
    }

    private void processPendingRecords() {
        this.logger.info("Processing pending records {}", this.channel);
        while (this.running.get()) {
            List<MapRecord<String, Object, Object>> pendingRecords = getPendingRecords();
            if (pendingRecords.isEmpty()) {
                return;
            } else {
                processRecords(pendingRecords);
            }
        }
        this.logger.info("Processing pending records finished {}", this.channel);
    }

    private void processRegularRecords() {
        this.logger.trace("Processing regular records {}", this.channel);
        while (this.running.get()) {
            processRecords(getUnprocessedRecords());
        }
        this.logger.trace("Processing regular records finished {}", this.channel);
    }

    private void processRecords(List<MapRecord<String, Object, Object>> list) {
        list.forEach(mapRecord -> {
            ((Map) mapRecord.getValue()).values().forEach(obj -> {
                processMessage(obj.toString(), mapRecord.getId());
            });
        });
    }

    private void processMessage(String str, RecordId recordId) {
        this.logger.trace("Channel processor {} with channel {} got message: {}", new Object[]{this.subscriptionIdentificationInfo, this.channel, str});
        try {
            this.logger.trace("Invoked message handler");
            this.messageHandler.accept(new RedisMessage(str));
            this.logger.trace("Message handler invoked");
            this.redisTemplate.opsForStream().acknowledge(this.channel, this.subscriberId, new RecordId[]{recordId});
        } catch (Throwable th) {
            this.logger.error("Message processing failed", th);
            this.stopCountDownLatch.countDown();
            throw th;
        }
    }

    private List<MapRecord<String, Object, Object>> getPendingRecords() {
        return getRecords(ReadOffset.from("0"), StreamReadOptions.empty());
    }

    private List<MapRecord<String, Object, Object>> getUnprocessedRecords() {
        return getRecords(ReadOffset.from(">"), StreamReadOptions.empty().block(Duration.ofMillis(this.blockStreamTimeInMilliseconds)));
    }

    private List<MapRecord<String, Object, Object>> getRecords(ReadOffset readOffset, StreamReadOptions streamReadOptions) {
        List<MapRecord<String, Object, Object>> read = this.redisTemplate.opsForStream().read(Consumer.from(this.subscriberId, this.subscriberId), streamReadOptions, new StreamOffset[]{StreamOffset.create(this.channel, readOffset)});
        if (read.size() > 0) {
            this.logger.trace("getRecords {} {} found {} records", new Object[]{this.channel, readOffset, Integer.valueOf(read.size())});
        }
        return read;
    }

    private void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (Exception e) {
            this.logger.error("Sleeping failed", e);
            throw new RuntimeException(e);
        }
    }
}
