package io.github.rapid.queue.core.file;

import io.github.rapid.queue.core.EventMessage;
import io.github.rapid.queue.core.MessageCallback;
import io.github.rapid.queue.core.SequenceListener;
import io.github.rapid.queue.core.SnapshotReader;
import io.github.rapid.queue.core.file.FileSequencerCircularCache;
import io.github.rapid.queue.core.kit.UUIDKit;
import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/github/rapid/queue/core/file/FileSequenceListener.class */
public final class FileSequenceListener implements SequenceListener {
    private static final Logger logger = LoggerFactory.getLogger(FileSequenceListener.class);
    private final FileSequencer fileSequencer;
    private volatile Long lastOffsetId;
    private volatile MessageCallback messageCallback;
    private final Executor executor = Executors.newFixedThreadPool(10);
    private final AtomicBoolean stop = new AtomicBoolean(false);
    private final AtomicBoolean active = new AtomicBoolean(false);
    private boolean firstMessage = true;
    private final String listenerId = UUIDKit.randomUUID();

    /* JADX INFO: Access modifiers changed from: package-private */
    public FileSequenceListener(FileSequencer fileSequencer) {
        this.fileSequencer = fileSequencer;
    }

    @Override // io.github.rapid.queue.core.SequenceListener
    public void start(@Nullable Long l, MessageCallback messageCallback) {
        Objects.requireNonNull(messageCallback, "messageListener can not null");
        this.messageCallback = messageCallback;
        this.lastOffsetId = l;
        this.executor.execute(() -> {
            try {
                readExistingMessageActiveListener();
            } catch (Exception e) {
                logger.warn("FileSequenceListener readExistingMessageActiveListener error:", e);
                stop();
            }
        });
    }

    private void readExistingMessageActiveListener() throws IOException {
        boolean z = false;
        do {
            try {
                if (!this.fileSequencer.tryLock(3000L)) {
                    throw new IllegalArgumentException("reader wait time out");
                }
                FileSequencerCircularCache.FullReader createReader = this.fileSequencer.circularCache.createReader(this.lastOffsetId);
                FileSequencerCircularCache.ReaderStatus status = createReader.getStatus();
                if (status.equals(FileSequencerCircularCache.ReaderStatus.GREATER)) {
                    this.fileSequencer.putListener(this.listenerId, this);
                    z = false;
                } else if (status.equals(FileSequencerCircularCache.ReaderStatus.WITHIN)) {
                    Iterator<EventMessage> it = createReader.iterator();
                    while (it.hasNext()) {
                        EventMessage next = it.next();
                        if (this.stop.get()) {
                            this.fileSequencer.unLock();
                            return;
                        }
                        onMessage(next);
                        if (this.stop.get()) {
                            this.fileSequencer.unLock();
                            return;
                        }
                    }
                    this.fileSequencer.putListener(this.listenerId, this);
                    this.active.set(true);
                    z = false;
                } else if (!status.equals(FileSequencerCircularCache.ReaderStatus.EMPTY)) {
                    z = true;
                } else if (z) {
                    this.fileSequencer.putListener(this.listenerId, this);
                    this.active.set(true);
                    z = false;
                } else {
                    z = true;
                }
                if (z) {
                    SnapshotReader readSnapshot = this.fileSequencer.readSnapshot(this.lastOffsetId);
                    Throwable th = null;
                    try {
                        try {
                            for (EventMessage eventMessage : readSnapshot) {
                                if (this.stop.get()) {
                                    if (readSnapshot != null) {
                                        if (0 == 0) {
                                            readSnapshot.close();
                                            return;
                                        }
                                        try {
                                            readSnapshot.close();
                                            return;
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                            return;
                                        }
                                    }
                                    return;
                                }
                                onMessage(eventMessage);
                                if (this.stop.get()) {
                                    if (readSnapshot != null) {
                                        if (0 == 0) {
                                            readSnapshot.close();
                                            return;
                                        }
                                        try {
                                            readSnapshot.close();
                                            return;
                                        } catch (Throwable th3) {
                                            th.addSuppressed(th3);
                                            return;
                                        }
                                    }
                                    return;
                                }
                            }
                            if (readSnapshot != null) {
                                if (0 != 0) {
                                    try {
                                        readSnapshot.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    readSnapshot.close();
                                }
                            }
                        } catch (Throwable th5) {
                            th = th5;
                            throw th5;
                        }
                    } catch (Throwable th6) {
                        if (readSnapshot != null) {
                            if (th != null) {
                                try {
                                    readSnapshot.close();
                                } catch (Throwable th7) {
                                    th.addSuppressed(th7);
                                }
                            } else {
                                readSnapshot.close();
                            }
                        }
                        throw th6;
                    }
                }
            } finally {
                this.fileSequencer.unLock();
            }
        } while (z);
    }

    @Override // io.github.rapid.queue.core.SequenceListener
    public void stop() {
        this.stop.set(true);
        this.active.set(false);
        this.fileSequencer.removeTailListener(this.listenerId);
    }

    @Override // io.github.rapid.queue.core.SequenceListener
    public boolean statusActive() {
        return this.active.get();
    }

    @Override // io.github.rapid.queue.core.SequenceListener
    public boolean statusStop() {
        return this.stop.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onMessage(EventMessage eventMessage) {
        if (this.stop.get()) {
            return;
        }
        if (!eventMessage.isDurable()) {
            this.messageCallback.onMessage(eventMessage);
            return;
        }
        try {
            if (this.firstMessage && this.lastOffsetId == null) {
                this.lastOffsetId = Long.valueOf(eventMessage.getOffset());
            }
            int compareOffset = StoreBase.compareOffset(eventMessage.getOffset(), this.lastOffsetId.longValue());
            if (compareOffset >= 0) {
                if (compareOffset != 0) {
                    this.messageCallback.onMessage(eventMessage);
                    this.lastOffsetId = Long.valueOf(eventMessage.getOffset());
                } else if (this.firstMessage) {
                    this.messageCallback.onMessage(eventMessage);
                    this.firstMessage = false;
                    this.lastOffsetId = Long.valueOf(eventMessage.getOffset());
                }
            }
        } catch (Exception e) {
            logger.warn("FileSequenceListener error: ", e);
            stop();
        }
    }
}
