package me.tfeng.play.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoCursorNotFoundException;
import me.tfeng.play.spring.Startable;
import org.bson.types.BSONTimestamp;
import play.Logger;
import play.core.enhancers.PropertiesEnhancer;

@PropertiesEnhancer.GeneratedAccessor
@PropertiesEnhancer.RewrittenAccessor
/* loaded from: input_file:me/tfeng/play/mongodb/OplogListener.class */
public class OplogListener implements Startable {
    public static final String COLLECTION_NAME = "oplog.rs";
    public static final String DB_NAME = "local";
    private static final Logger.ALogger LOG = Logger.of(OplogListener.class);
    private DBCollection collection;
    private DBCursor cursor;
    private OplogItemHandler handler;
    private MongoClient mongoClient;
    private String namespace;
    private BSONTimestamp startTimestamp;
    private Thread thread;

    @PropertiesEnhancer.GeneratedAccessor
    @PropertiesEnhancer.RewrittenAccessor
    /* loaded from: input_file:me/tfeng/play/mongodb/OplogListener$OplogListenerThread.class */
    private class OplogListenerThread implements Runnable {
        private OplogListenerThread() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    DBObject next = OplogListener.this.cursor.next();
                    if (OplogListener.LOG.isDebugEnabled()) {
                        OplogListener.LOG.info("Received oplog item " + next);
                    }
                    OplogListener.this.handler.handle(RecordConverter.toRecord(OplogItem.class, next));
                } catch (MongoCursorNotFoundException e) {
                    OplogListener.LOG.info("Handler thread stopped");
                    return;
                }
            }
        }
    }

    public void onStart() throws Throwable {
        if (this.mongoClient == null || this.handler == null) {
            throw new Exception("mongoClient and handler must both be provided");
        }
        LOG.info("Connecting to local.oplog.rs in MongoDB");
        this.collection = this.mongoClient.getDB(DB_NAME).getCollection(COLLECTION_NAME);
        this.cursor = this.collection.find(getQuery()).sort(getSort()).setOptions(getOptions());
        this.thread = new Thread(new OplogListenerThread());
        this.thread.start();
        LOG.info("Handler thread started");
    }

    public void onStop() throws Throwable {
        this.cursor.close();
        LOG.info("Waiting for handler thread to stop");
        this.thread.join();
    }

    public void setHandler(OplogItemHandler oplogItemHandler) {
        this.handler = oplogItemHandler;
    }

    public void setMongoClient(MongoClient mongoClient) {
        this.mongoClient = mongoClient;
    }

    public void setNamespace(String str) {
        this.namespace = str;
    }

    public void setStartTimestamp(BSONTimestamp bSONTimestamp) {
        this.startTimestamp = bSONTimestamp;
    }

    protected int getOptions() {
        return 2;
    }

    protected DBObject getQuery() {
        BasicDBObject basicDBObject = new BasicDBObject();
        if (this.startTimestamp != null) {
            basicDBObject.put("ts", new BasicDBObject("$gt", this.startTimestamp));
        }
        if (this.namespace != null) {
            basicDBObject.put("ns", this.namespace);
        }
        return basicDBObject;
    }

    protected DBObject getSort() {
        return new BasicDBObject("$natural", 1);
    }
}
