package io.github.rapid.queue.core.file;

import io.github.rapid.queue.core.RapidQueueCallback;
import io.github.rapid.queue.core.RapidQueueListener;
import io.github.rapid.queue.core.RapidQueueMessage;
import io.github.rapid.queue.core.RapidQueueReader;
import io.github.rapid.queue.core.file.FileMessageCircularCache;
import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/github/rapid/queue/core/file/FileRapidQueueListener.class */
public class FileRapidQueueListener implements RapidQueueListener {
    private static final Logger logger = LoggerFactory.getLogger(FileRapidQueueListener.class);
    private static final AtomicLong ID = new AtomicLong();
    private final FileRapidQueue fileRapidQueue;
    private volatile Long lastOffsetId;
    private volatile RapidQueueCallback messageCallback;
    private final Executor executor = Executors.newFixedThreadPool(10);
    private final AtomicBoolean stop = new AtomicBoolean(false);
    private final AtomicBoolean active = new AtomicBoolean(false);
    private boolean firstMessage = true;
    private final long listenerId = ID.incrementAndGet();

    /* JADX INFO: Access modifiers changed from: package-private */
    public FileRapidQueueListener(FileRapidQueue fileRapidQueue) {
        this.fileRapidQueue = fileRapidQueue;
    }

    @Override // io.github.rapid.queue.core.RapidQueueListener
    public void start(@Nullable Long l, RapidQueueCallback rapidQueueCallback) {
        Objects.requireNonNull(rapidQueueCallback, "messageListener can not null");
        this.messageCallback = rapidQueueCallback;
        this.lastOffsetId = l;
        this.executor.execute(() -> {
            try {
                readExistingMessageActiveListener();
            } catch (Exception e) {
                logger.warn("FileSequenceListener readExistingMessageActiveListener error:", e);
                stop();
            }
        });
    }

    private void readExistingMessageActiveListener() throws IOException {
        boolean z = false;
        do {
            try {
                if (!this.fileRapidQueue.tryLock(3000L)) {
                    throw new IllegalArgumentException("reader wait time out");
                }
                FileMessageCircularCache.Reader createReader = this.fileRapidQueue.circularCache.createReader(this.lastOffsetId);
                FileMessageCircularCache.ReaderStatus status = createReader.getStatus();
                if (status.equals(FileMessageCircularCache.ReaderStatus.GREATER)) {
                    this.fileRapidQueue.putListener(Long.valueOf(this.listenerId), this);
                    z = false;
                } else if (status.equals(FileMessageCircularCache.ReaderStatus.WITHIN)) {
                    Iterator<RapidQueueMessage> it = createReader.iterator();
                    while (it.hasNext()) {
                        RapidQueueMessage next = it.next();
                        if (this.stop.get()) {
                            this.fileRapidQueue.unLock();
                            return;
                        }
                        onMessage(next);
                        if (this.stop.get()) {
                            this.fileRapidQueue.unLock();
                            return;
                        }
                    }
                    this.fileRapidQueue.putListener(Long.valueOf(this.listenerId), this);
                    this.active.set(true);
                    z = false;
                } else if (!status.equals(FileMessageCircularCache.ReaderStatus.EMPTY)) {
                    z = true;
                } else if (z) {
                    this.fileRapidQueue.putListener(Long.valueOf(this.listenerId), this);
                    this.active.set(true);
                    z = false;
                } else {
                    z = true;
                }
                if (z) {
                    RapidQueueReader readSnapshot = this.fileRapidQueue.readSnapshot(this.lastOffsetId);
                    Throwable th = null;
                    try {
                        try {
                            for (RapidQueueMessage rapidQueueMessage : readSnapshot) {
                                if (this.stop.get()) {
                                    if (readSnapshot != null) {
                                        if (0 == 0) {
                                            readSnapshot.close();
                                            return;
                                        }
                                        try {
                                            readSnapshot.close();
                                            return;
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                            return;
                                        }
                                    }
                                    return;
                                }
                                onMessage(rapidQueueMessage);
                                if (this.stop.get()) {
                                    if (readSnapshot != null) {
                                        if (0 == 0) {
                                            readSnapshot.close();
                                            return;
                                        }
                                        try {
                                            readSnapshot.close();
                                            return;
                                        } catch (Throwable th3) {
                                            th.addSuppressed(th3);
                                            return;
                                        }
                                    }
                                    return;
                                }
                            }
                            if (readSnapshot != null) {
                                if (0 != 0) {
                                    try {
                                        readSnapshot.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    readSnapshot.close();
                                }
                            }
                        } catch (Throwable th5) {
                            if (readSnapshot != null) {
                                if (th != null) {
                                    try {
                                        readSnapshot.close();
                                    } catch (Throwable th6) {
                                        th.addSuppressed(th6);
                                    }
                                } else {
                                    readSnapshot.close();
                                }
                            }
                            throw th5;
                        }
                    } catch (Throwable th7) {
                        th = th7;
                        throw th7;
                    }
                }
            } finally {
                this.fileRapidQueue.unLock();
            }
        } while (z);
    }

    @Override // io.github.rapid.queue.core.RapidQueueListener
    public void stop() {
        this.stop.set(true);
        this.active.set(false);
        this.fileRapidQueue.removeTailListener(Long.valueOf(this.listenerId));
    }

    @Override // io.github.rapid.queue.core.RapidQueueListener
    public boolean statusActive() {
        return this.active.get();
    }

    @Override // io.github.rapid.queue.core.RapidQueueListener
    public boolean statusStop() {
        return this.stop.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onMessage(RapidQueueMessage rapidQueueMessage) {
        if (this.stop.get()) {
            return;
        }
        if (!rapidQueueMessage.isDurable()) {
            this.messageCallback.onMessage(rapidQueueMessage);
            return;
        }
        try {
            if (this.firstMessage && this.lastOffsetId == null) {
                this.lastOffsetId = Long.valueOf(rapidQueueMessage.getOffset());
            }
            int compareOffset = StoreBase.compareOffset(rapidQueueMessage.getOffset(), this.lastOffsetId.longValue());
            if (compareOffset >= 0) {
                if (compareOffset != 0) {
                    this.messageCallback.onMessage(rapidQueueMessage);
                    this.lastOffsetId = Long.valueOf(rapidQueueMessage.getOffset());
                } else if (this.firstMessage) {
                    this.messageCallback.onMessage(rapidQueueMessage);
                    this.firstMessage = false;
                    this.lastOffsetId = Long.valueOf(rapidQueueMessage.getOffset());
                }
            }
        } catch (Exception e) {
            logger.warn("FileSequenceListener error: ", e);
            stop();
        }
    }
}
