package tech.mhuang.pacebox.springboot.rediskafkamiddle;

import com.alibaba.fastjson2.JSON;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import tech.mhuang.pacebox.core.util.StringUtil;
import tech.mhuang.pacebox.kafka.global.exception.JkafkaException;
import tech.mhuang.pacebox.kafka.producer.bean.KafkaMsg;
import tech.mhuang.pacebox.springboot.redis.commands.RedisExtCommands;
import tech.mhuang.pacebox.springboot.redis.lock.DistributedLockHandler;
import tech.mhuang.pacebox.springboot.redis.lock.Lock;
import tech.mhuang.pacebox.springboot.rediskafkamiddle.annaotion.RedisKafka;

@Aspect
@Component
@Order(100)
/* loaded from: input_file:tech/mhuang/pacebox/springboot/rediskafkamiddle/RedisKafkaAspect.class */
public class RedisKafkaAspect {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final String CONSUMER = "-consumer";
    private final String LOCK = "-lock-";
    private final RedisExtCommands redisExtCommands;
    private final DistributedLockHandler distributedLockHandler;

    @Value("${redisKafkaDataBase:0}")
    private int redisKafkaDataBase;

    public RedisKafkaAspect(RedisExtCommands redisExtCommands, DistributedLockHandler distributedLockHandler) {
        this.redisExtCommands = redisExtCommands;
        this.distributedLockHandler = distributedLockHandler;
    }

    @Pointcut("@annotation(tech.mhuang.pacebox.springboot.rediskafkamiddle.annaotion.RedisKafka)")
    private void kafkaMsgProcess() {
    }

    public static Boolean getMethodByRemark(MethodSignature methodSignature) {
        return Boolean.valueOf(((RedisKafka) methodSignature.getMethod().getAnnotation(RedisKafka.class)).notRepeat());
    }

    @Before("kafkaMsgProcess()")
    public void doBefore(JoinPoint joinPoint) {
        this.logger.debug("---正在拦截---kafka---");
        KafkaMsg kafkaMsg = (KafkaMsg) joinPoint.getArgs()[0];
        Lock lock = new Lock();
        try {
            try {
                Boolean methodByRemark = getMethodByRemark(joinPoint.getSignature());
                lock.setName(kafkaMsg.getTopic() + "-lock-" + kafkaMsg.getOffset());
                lock.setValue(kafkaMsg.getMsg().toString());
                if (this.distributedLockHandler.tryLock(lock, methodByRemark.booleanValue())) {
                    if (!StringUtil.isEmpty(this.redisExtCommands.hget(this.redisKafkaDataBase, kafkaMsg.getTopic() + "-consumer", kafkaMsg.getOffset()))) {
                        this.logger.error("数据为：{}", JSON.toJSONString(kafkaMsg));
                        throw new JkafkaException("kafka这条消息已经处理过了！");
                    }
                    this.redisExtCommands.hset(this.redisKafkaDataBase, kafkaMsg.getTopic() + "-consumer", kafkaMsg.getOffset(), kafkaMsg.getMsg());
                }
                this.distributedLockHandler.releaseLock(lock);
            } catch (Exception e) {
                if (!(e instanceof JkafkaException)) {
                    this.redisExtCommands.hdel(this.redisKafkaDataBase, kafkaMsg.getTopic() + "-consumer", String.valueOf(kafkaMsg.getOffset()));
                }
                this.distributedLockHandler.releaseLock(lock);
            }
        } catch (Throwable th) {
            this.distributedLockHandler.releaseLock(lock);
            throw th;
        }
    }
}
