/*
 * Decompiled with CFR 0.152.
 */
package me.tfeng.toolbox.mongodb;

import com.mongodb.CursorType;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import java.util.concurrent.atomic.AtomicBoolean;
import me.tfeng.toolbox.mongodb.OplogItem;
import me.tfeng.toolbox.mongodb.OplogItemHandler;
import me.tfeng.toolbox.mongodb.RecordConverter;
import me.tfeng.toolbox.spring.Startable;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OplogListener
implements Startable {
    public static final String COLLECTION_NAME = "oplog.rs";
    public static final String DB_NAME = "local";
    private static final Logger LOG = LoggerFactory.getLogger(OplogListener.class);
    private static final AtomicBoolean stopping = new AtomicBoolean(false);
    private MongoCollection<Document> collection;
    private MongoCursor<Document> cursor;
    private OplogItemHandler handler;
    private MongoClient mongoClient;
    private String namespace;
    private BsonTimestamp startTimestamp;
    private Thread thread;

    public void onStart() throws Throwable {
        if (this.mongoClient == null || this.handler == null) {
            throw new Exception("mongoClient and handler must both be provided");
        }
        LOG.info("Connecting to local.oplog.rs in MongoDB");
        this.collection = this.mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
        this.cursor = this.collection.find((Bson)this.getQuery()).sort((Bson)this.getSort()).cursorType(this.getCursorType()).iterator();
        stopping.set(false);
        this.thread = new Thread(new OplogListenerThread());
        this.thread.start();
        LOG.info("Handler thread started");
    }

    public void onStop() throws Throwable {
        stopping.set(true);
        this.cursor.close();
        LOG.info("Waiting for handler thread to stop");
    }

    public void setHandler(OplogItemHandler oplogItemHandler) {
        this.handler = oplogItemHandler;
    }

    public void setMongoClient(MongoClient mongoClient) {
        this.mongoClient = mongoClient;
    }

    public void setNamespace(String string) {
        this.namespace = string;
    }

    public void setStartTimestamp(BsonTimestamp bsonTimestamp) {
        this.startTimestamp = bsonTimestamp;
    }

    protected CursorType getCursorType() {
        return CursorType.TailableAwait;
    }

    protected Document getQuery() {
        Document document = new Document();
        if (this.startTimestamp != null) {
            document.put("ts", (Object)new Document("$gt", (Object)this.startTimestamp));
        }
        if (this.namespace != null) {
            document.put("ns", (Object)this.namespace);
        }
        return document;
    }

    protected Document getSort() {
        return new Document("$natural", (Object)1);
    }

    private class OplogListenerThread
    implements Runnable {
        private OplogListenerThread() {
        }

        @Override
        public void run() {
            do {
                Document document;
                try {
                    document = (Document)OplogListener.this.cursor.next();
                }
                catch (Exception exception) {
                    if (stopping.get()) break;
                    if (exception instanceof RuntimeException) {
                        throw (RuntimeException)exception;
                    }
                    throw new RuntimeException("Unexpected exception occurred while trying to read the next oplog item", exception);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.info("Received oplog item " + document);
                }
                OplogItem oplogItem = RecordConverter.toRecord(OplogItem.class, document);
                OplogListener.this.handler.handle(oplogItem);
            } while (!stopping.get());
        }
    }
}

