/*
 * Decompiled with CFR 0.152.
 */
package ai.platon.pulsar.crawl.index.jit.indexer;

import ai.platon.pulsar.common.ExceptionsKt;
import ai.platon.pulsar.common.NetUtil;
import ai.platon.pulsar.common.config.ImmutableConfig;
import ai.platon.pulsar.common.config.Parameterized;
import ai.platon.pulsar.common.config.Params;
import ai.platon.pulsar.common.urls.UrlUtils;
import ai.platon.pulsar.crawl.common.JobInitialized;
import ai.platon.pulsar.crawl.fetch.batch.JobFetchTask;
import ai.platon.pulsar.crawl.index.IndexDocument;
import ai.platon.pulsar.crawl.index.IndexWriters;
import ai.platon.pulsar.crawl.index.IndexingFilters;
import ai.platon.pulsar.crawl.index.jit.indexer.IndexThread;
import ai.platon.pulsar.crawl.scoring.ScoringFilters;
import ai.platon.pulsar.persist.ParseStatus;
import ai.platon.pulsar.persist.WebPage;
import com.google.common.collect.Queues;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Metadata(mv={1, 5, 1}, k=1, xi=48, d1={"\u0000\u008e\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\b\n\u0002\b\u0007\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u000e\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010!\n\u0002\b\u0004\n\u0002\u0010\u000b\n\u0002\b\u0004\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u000b\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\u0018\u0000 I2\u00020\u00012\u00020\u00022\u00020\u0003:\u0001IB%\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\u0006\u0010\n\u001a\u00020\u000b\u00a2\u0006\u0002\u0010\fJ\b\u00103\u001a\u000204H\u0016J\b\u00105\u001a\u0004\u0018\u00010$J\b\u00106\u001a\u000207H\u0016J\u0010\u00108\u001a\u0002042\b\u00109\u001a\u0004\u0018\u00010$J\u0006\u0010:\u001a\u00020/J\u000e\u0010;\u001a\u0002042\u0006\u00109\u001a\u00020$J\u0015\u0010<\u001a\u0002042\u0006\u0010=\u001a\u00020\u000fH\u0000\u00a2\u0006\u0002\b>J\u0010\u0010?\u001a\u0002042\u0006\u0010@\u001a\u00020\u000bH\u0016J\u0012\u0010A\u001a\u00020/2\b\u0010B\u001a\u0004\u0018\u00010CH\u0002J\u0010\u0010D\u001a\u00020/2\u0006\u0010E\u001a\u00020FH\u0002J\u0015\u0010G\u001a\u0002042\u0006\u0010=\u001a\u00020\u000fH\u0000\u00a2\u0006\u0002\bHR\u0014\u0010\r\u001a\b\u0012\u0004\u0012\u00020\u000f0\u000eX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u0011\u0010\n\u001a\u00020\u000b\u00a2\u0006\b\n\u0000\u001a\u0004\b\u0012\u0010\u0013R\u000e\u0010\u0014\u001a\u00020\u0011X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0011\u0010\u0015\u001a\u00020\u00118F\u00a2\u0006\u0006\u001a\u0004\b\u0016\u0010\u0017R\u000e\u0010\u0018\u001a\u00020\u0019X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u001a\u001a\u00020\u001bX\u0082.\u00a2\u0006\u0002\n\u0000R\u0011\u0010\u001c\u001a\u00020\u001d\u00a2\u0006\b\n\u0000\u001a\u0004\b\u001e\u0010\u001fR\u0011\u0010 \u001a\u00020\u0011\u00a2\u0006\b\n\u0000\u001a\u0004\b!\u0010\u0017R\u0014\u0010\"\u001a\b\u0012\u0004\u0012\u00020$0#X\u0082.\u00a2\u0006\u0002\n\u0000R\u001a\u0010%\u001a\u00020\u0011X\u0086\u000e\u00a2\u0006\u000e\n\u0000\u001a\u0004\b&\u0010\u0017\"\u0004\b'\u0010(R\u0014\u0010)\u001a\b\u0012\u0004\u0012\u00020\u000f0*X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0011\u0010+\u001a\u00020\u00118F\u00a2\u0006\u0006\u001a\u0004\b,\u0010\u0017R\u000e\u0010-\u001a\u00020\u0019X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u001e\u00100\u001a\u00020/2\u0006\u0010.\u001a\u00020/@BX\u0086\u000e\u00a2\u0006\b\n\u0000\u001a\u0004\b0\u00101R\u000e\u00102\u001a\u00020\u0011X\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006J"}, d2={"Lai/platon/pulsar/crawl/index/jit/indexer/JITIndexer;", "Lai/platon/pulsar/common/config/Parameterized;", "Lai/platon/pulsar/crawl/common/JobInitialized;", "Ljava/lang/AutoCloseable;", "scoringFilters", "Lai/platon/pulsar/crawl/scoring/ScoringFilters;", "indexingFilters", "Lai/platon/pulsar/crawl/index/IndexingFilters;", "indexWriters", "Lai/platon/pulsar/crawl/index/IndexWriters;", "conf", "Lai/platon/pulsar/common/config/ImmutableConfig;", "(Lai/platon/pulsar/crawl/scoring/ScoringFilters;Lai/platon/pulsar/crawl/index/IndexingFilters;Lai/platon/pulsar/crawl/index/IndexWriters;Lai/platon/pulsar/common/config/ImmutableConfig;)V", "activeIndexThreads", "Ljava/util/concurrent/ConcurrentSkipListSet;", "Lai/platon/pulsar/crawl/index/jit/indexer/IndexThread;", "batchSize", "", "getConf", "()Lai/platon/pulsar/common/config/ImmutableConfig;", "id", "ignoredPageCount", "getIgnoredPageCount", "()I", "ignoredPages", "Ljava/util/concurrent/atomic/AtomicInteger;", "indexDocumentBuilder", "Lai/platon/pulsar/crawl/index/IndexDocument$Builder;", "indexServerHost", "", "getIndexServerHost", "()Ljava/lang/String;", "indexServerPort", "getIndexServerPort", "indexTasks", "Ljava/util/Queue;", "Lai/platon/pulsar/crawl/fetch/batch/JobFetchTask;", "indexThreadCount", "getIndexThreadCount", "setIndexThreadCount", "(I)V", "indexThreads", "", "indexedPageCount", "getIndexedPageCount", "indexedPages", "<set-?>", "", "isEnabled", "()Z", "minTextLength", "close", "", "consume", "getParams", "Lai/platon/pulsar/common/config/Params;", "index", "fetchTask", "isIndexServerAvailable", "produce", "registerFetchThread", "indexThread", "registerFetchThread$pulsar_skeleton", "setup", "jobConf", "shouldIndex", "doc", "Lai/platon/pulsar/crawl/index/IndexDocument;", "shouldProduce", "page", "Lai/platon/pulsar/persist/WebPage;", "unregisterFetchThread", "unregisterFetchThread$pulsar_skeleton", "Companion", "pulsar-skeleton"})
public final class JITIndexer
implements Parameterized,
JobInitialized,
AutoCloseable {
    @NotNull
    public static final Companion Companion = new Companion(null);
    @NotNull
    private final ScoringFilters scoringFilters;
    @NotNull
    private final IndexingFilters indexingFilters;
    @NotNull
    private final IndexWriters indexWriters;
    @NotNull
    private final ImmutableConfig conf;
    private final int id;
    private boolean isEnabled;
    @NotNull
    private final String indexServerHost;
    private final int indexServerPort;
    private int batchSize;
    private int indexThreadCount;
    private int minTextLength;
    @NotNull
    private final AtomicInteger indexedPages;
    @NotNull
    private final AtomicInteger ignoredPages;
    @NotNull
    private final List<IndexThread> indexThreads;
    @NotNull
    private final ConcurrentSkipListSet<IndexThread> activeIndexThreads;
    private Queue<JobFetchTask> indexTasks;
    private IndexDocument.Builder indexDocumentBuilder;
    private static final Logger LOG = LoggerFactory.getLogger(JITIndexer.class);
    @NotNull
    private static final AtomicInteger instanceSequence = new AtomicInteger(0);

    public JITIndexer(@NotNull ScoringFilters scoringFilters, @NotNull IndexingFilters indexingFilters, @NotNull IndexWriters indexWriters, @NotNull ImmutableConfig conf) {
        Intrinsics.checkNotNullParameter((Object)scoringFilters, (String)"scoringFilters");
        Intrinsics.checkNotNullParameter((Object)indexingFilters, (String)"indexingFilters");
        Intrinsics.checkNotNullParameter((Object)indexWriters, (String)"indexWriters");
        Intrinsics.checkNotNullParameter((Object)conf, (String)"conf");
        this.scoringFilters = scoringFilters;
        this.indexingFilters = indexingFilters;
        this.indexWriters = indexWriters;
        this.conf = conf;
        this.id = instanceSequence.incrementAndGet();
        this.indexServerHost = this.conf.get("index.server.hostname", "master");
        this.indexServerPort = this.conf.getInt("index.server.port", 8183);
        this.batchSize = this.conf.getInt("index.index.batch.size", 2000);
        this.indexThreadCount = this.conf.getInt("index.index.thread.count", 1);
        this.minTextLength = this.conf.getInt("index.minimal.text.length", 300);
        this.indexedPages = new AtomicInteger(0);
        this.ignoredPages = new AtomicInteger(0);
        boolean bl = false;
        this.indexThreads = new ArrayList();
        this.activeIndexThreads = new ConcurrentSkipListSet();
    }

    @NotNull
    public final ImmutableConfig getConf() {
        return this.conf;
    }

    public final boolean isEnabled() {
        return this.isEnabled;
    }

    @NotNull
    public final String getIndexServerHost() {
        return this.indexServerHost;
    }

    public final int getIndexServerPort() {
        return this.indexServerPort;
    }

    public final int getIndexThreadCount() {
        return this.indexThreadCount;
    }

    public final void setIndexThreadCount(int n) {
        this.indexThreadCount = n;
    }

    public final int getIndexedPageCount() {
        return this.indexedPages.get();
    }

    public final int getIgnoredPageCount() {
        return this.ignoredPages.get();
    }

    @Override
    public void setup(@NotNull ImmutableConfig jobConf) {
        Intrinsics.checkNotNullParameter((Object)jobConf, (String)"jobConf");
        this.isEnabled = jobConf.getBoolean("indexer.just.in.time", false);
        if (this.isEnabled) {
            LinkedBlockingQueue linkedBlockingQueue = Queues.newLinkedBlockingQueue((int)this.batchSize);
            Intrinsics.checkNotNullExpressionValue((Object)linkedBlockingQueue, (String)"newLinkedBlockingQueue<JobFetchTask>(batchSize)");
            this.indexTasks = linkedBlockingQueue;
            this.indexDocumentBuilder = new IndexDocument.Builder(this.conf).with(this.indexingFilters).with(this.scoringFilters);
            this.indexWriters.open();
        }
    }

    @NotNull
    public Params getParams() {
        Object[] objectArray = new Object[]{"batchSize", this.batchSize, "indexThreadCount", this.indexThreadCount, "minTextLength", this.minTextLength};
        Params params = Params.of((String)"isEnabled", (Object)this.isEnabled, (Object[])objectArray);
        Intrinsics.checkNotNullExpressionValue((Object)params, (String)"of(\n                \"isE\u2026, minTextLength\n        )");
        return params;
    }

    public final boolean isIndexServerAvailable() {
        return NetUtil.testHttpNetwork((String)this.indexServerHost, (int)this.indexServerPort);
    }

    public final void registerFetchThread$pulsar_skeleton(@NotNull IndexThread indexThread) {
        Intrinsics.checkNotNullParameter((Object)indexThread, (String)"indexThread");
        this.activeIndexThreads.add(indexThread);
    }

    public final void unregisterFetchThread$pulsar_skeleton(@NotNull IndexThread indexThread) {
        Intrinsics.checkNotNullParameter((Object)indexThread, (String)"indexThread");
        this.activeIndexThreads.remove(indexThread);
    }

    public final void produce(@NotNull JobFetchTask fetchTask) {
        Queue<JobFetchTask> queue;
        Intrinsics.checkNotNullParameter((Object)fetchTask, (String)"fetchTask");
        if (!this.isEnabled) {
            return;
        }
        WebPage page = fetchTask.getPage();
        if (!this.shouldProduce(page)) {
            return;
        }
        Queue<JobFetchTask> queue2 = this.indexTasks;
        if (queue2 == null) {
            Intrinsics.throwUninitializedPropertyAccessException((String)"indexTasks");
            queue = null;
        } else {
            queue = queue2;
        }
        queue.add(fetchTask);
    }

    @Nullable
    public final JobFetchTask consume() {
        Queue<JobFetchTask> queue;
        Queue<JobFetchTask> queue2 = this.indexTasks;
        if (queue2 == null) {
            Intrinsics.throwUninitializedPropertyAccessException((String)"indexTasks");
            queue = null;
        } else {
            queue = queue2;
        }
        return queue.poll();
    }

    @Override
    public void close() {
        if (!this.isEnabled) {
            return;
        }
        LOG.info("[Destruction] Closing JITIndexer #" + this.id + " ...");
        Iterable $this$forEach$iv = this.indexThreads;
        boolean $i$f$forEach = false;
        for (Object element$iv : $this$forEach$iv) {
            IndexThread it = (IndexThread)element$iv;
            boolean bl = false;
            it.exitAndJoin();
        }
        try {
            JobFetchTask fetchTask = this.consume();
            while (fetchTask != null) {
                this.index(fetchTask);
                fetchTask = this.consume();
            }
        }
        catch (Throwable e) {
            LOG.error(e.toString());
        }
        LOG.info("There are " + this.getIgnoredPageCount() + " not indexed short pages out of total " + this.getIndexedPageCount() + " pages");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void index(@Nullable JobFetchTask fetchTask) {
        block9: {
            if (!this.isEnabled) {
                return;
            }
            try {
                IndexDocument.Builder builder;
                if (fetchTask == null) {
                    LOG.error("Failed to index, null fetchTask");
                    return;
                }
                String url = fetchTask.getUrlString();
                String reverseUrl = UrlUtils.reverseUrl((String)url);
                WebPage page = fetchTask.getPage();
                Object object = this.indexDocumentBuilder;
                if (object == null) {
                    Intrinsics.throwUninitializedPropertyAccessException((String)"indexDocumentBuilder");
                    builder = null;
                } else {
                    builder = object;
                }
                IndexDocument doc = builder.build(reverseUrl, page);
                if (!this.shouldIndex(doc)) break block9;
                object = this.indexWriters;
                boolean bl = false;
                boolean bl2 = false;
                synchronized (object) {
                    boolean bl3 = false;
                    this.indexWriters.write(doc);
                    page.putIndexTimeHistory(Instant.now());
                    Unit unit = Unit.INSTANCE;
                }
                this.indexedPages.incrementAndGet();
            }
            catch (Throwable e) {
                LOG.error("Failed to index a page " + ExceptionsKt.stringify$default((Throwable)e, null, null, (int)3, null));
            }
        }
    }

    private final boolean shouldIndex(IndexDocument doc) {
        if (doc == null) {
            return false;
        }
        String textContent = doc.getFieldValueAsString("text_content");
        if (textContent == null || textContent.length() < this.minTextLength) {
            this.ignoredPages.incrementAndGet();
            LOG.warn("Invalid text content to index, url : " + doc.getUrl());
            return false;
        }
        return true;
    }

    private final boolean shouldProduce(WebPage page) {
        if (page.isSeed()) {
            return false;
        }
        ParseStatus parseStatus = page.getParseStatus();
        Intrinsics.checkNotNullExpressionValue((Object)parseStatus, (String)"page.parseStatus");
        ParseStatus status = parseStatus;
        if (!status.isSuccess() || status.getMajorCode() == 100) {
            return false;
        }
        if (page.getContentText().length() < this.minTextLength) {
            this.ignoredPages.incrementAndGet();
            return false;
        }
        return true;
    }

    @Metadata(mv={1, 5, 1}, k=1, xi=48, d1={"\u0000\u001a\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0000\b\u0086\u0003\u0018\u00002\u00020\u0001B\u0007\b\u0002\u00a2\u0006\u0002\u0010\u0002R\u0019\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004\u00a2\u0006\b\n\u0000\u001a\u0004\b\u0006\u0010\u0007R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\n"}, d2={"Lai/platon/pulsar/crawl/index/jit/indexer/JITIndexer$Companion;", "", "()V", "LOG", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "getLOG", "()Lorg/slf4j/Logger;", "instanceSequence", "Ljava/util/concurrent/atomic/AtomicInteger;", "pulsar-skeleton"})
    public static final class Companion {
        private Companion() {
        }

        public final Logger getLOG() {
            return LOG;
        }

        public /* synthetic */ Companion(DefaultConstructorMarker $constructor_marker) {
            this();
        }
    }
}

