package top.fzqblog.ant.worker;

import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import top.fzqblog.ant.exception.AntException;
import top.fzqblog.ant.handler.DefaultHandler;
import top.fzqblog.ant.handler.IHandler;
import top.fzqblog.ant.http.HttpClientKit;
import top.fzqblog.ant.http.IHttpKit;
import top.fzqblog.ant.listener.AntListener;
import top.fzqblog.ant.pipeline.ConsolePipeline;
import top.fzqblog.ant.pipeline.IPipeline;
import top.fzqblog.ant.proxy.ProxyProvider;
import top.fzqblog.ant.queue.AntQueue;
import top.fzqblog.ant.task.Task;
import top.fzqblog.ant.task.TaskErrorResponse;
import top.fzqblog.ant.task.TaskResponse;
import top.fzqblog.ant.thread.CountableThreadPool;
import top.fzqblog.ant.utils.Constants;
import top.fzqblog.ant.utils.DateUtils;

/* loaded from: input_file:top/fzqblog/ant/worker/Ant.class */
public class Ant implements Runnable {
    private CountableThreadPool threadPool;
    private Date startTime;
    private AntListener antListener;
    private ExecutorService executorService;
    private AntQueue queue;
    private IPipeline pipeline;
    private IHandler handler;
    private IHttpKit httpKit;
    private ProxyProvider proxyProvider;
    private transient Logger logger = LoggerFactory.getLogger(getClass());
    private ReentrantLock newUrlLock = new ReentrantLock();
    private Condition newUrlCondition = this.newUrlLock.newCondition();
    private Integer threadNum = Constants.DEFAULT_THREAD_NUM;
    private boolean autoClose = true;
    private Long sleep = Constants.DEFAULT_SLEEP_TIME;

    private Ant() {
    }

    public static Ant create() {
        return new Ant();
    }

    public Ant startQueue(AntQueue antQueue) {
        this.queue = antQueue;
        return this;
    }

    public Ant thread(int i) throws AntException {
        if (i <= 0) {
            throw new AntException("线程数小于等于0，这也太秀了吧");
        }
        this.threadNum = Integer.valueOf(i);
        return this;
    }

    public Ant pipeline(IPipeline iPipeline) {
        this.pipeline = iPipeline;
        return this;
    }

    public Ant withHandler(IHandler iHandler) {
        this.handler = iHandler;
        return this;
    }

    public Ant httpKit(IHttpKit iHttpKit) {
        this.httpKit = iHttpKit;
        return this;
    }

    public Ant autoClose(boolean z) {
        this.autoClose = z;
        return this;
    }

    public Ant sleep(Long l) {
        this.sleep = l;
        return this;
    }

    public Ant proxy(ProxyProvider proxyProvider) {
        this.proxyProvider = proxyProvider;
        return this;
    }

    private void init() {
        if (this.pipeline == null) {
            this.pipeline = new ConsolePipeline();
        }
        if (this.handler == null) {
            this.handler = new DefaultHandler();
        }
        if (this.httpKit == null) {
            this.httpKit = new HttpClientKit();
        }
        if (this.threadPool == null) {
            this.threadPool = new CountableThreadPool(this.threadNum.intValue());
            this.httpKit.setPoolSize(this.threadNum.intValue());
        }
        if (this.proxyProvider != null) {
            this.httpKit.setProxyProvider(this.proxyProvider);
        }
        this.startTime = new Date();
        this.logger.info("------------------------------ant启动------------------------------于" + DateUtils.format(this.startTime, DateUtils.FORMAT_FULL_CN));
    }

    public Date getStartTime() {
        return this.startTime;
    }

    private void sleep() {
        try {
            TimeUnit.MILLISECONDS.sleep(this.sleep.longValue());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void setAntListener(AntListener antListener) {
        this.antListener = antListener;
    }

    private void process(Task task) throws Exception {
        TaskResponse gain = this.httpKit.gain(task);
        gain.setQueue(this.queue);
        gain.setTask(task);
        task.setTaskResponse(gain);
    }

    private void onSuccess(Task task) throws Exception {
        this.pipeline.stream(task.getTaskResponse());
        if (this.antListener != null) {
            this.antListener.onSuccess();
        }
    }

    private void onFailed(Task task, Exception exc) {
        this.handler.handle(new TaskErrorResponse(task.getTaskResponse(), exc));
    }

    public int getThreadAlive() {
        return this.threadPool.getThreadAlive();
    }

    @Override // java.lang.Runnable
    public void run() {
        init();
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                break;
            }
            Task task = null;
            try {
                task = this.autoClose ? this.queue.poll() : this.queue.take();
            } catch (Exception e) {
                this.logger.error("获取任务出错， {}", e);
            }
            Task task2 = task;
            if (task2 != null) {
                this.threadPool.execute(() -> {
                    try {
                        try {
                            process(task2);
                            onSuccess(task2);
                            signalNewUrl();
                        } catch (Exception e2) {
                            TaskResponse taskResponse = new TaskResponse();
                            taskResponse.setQueue(this.queue);
                            taskResponse.setTask(task2);
                            task2.setTaskResponse(taskResponse);
                            onFailed(task2, e2);
                            signalNewUrl();
                        }
                    } catch (Throwable th) {
                        signalNewUrl();
                        throw th;
                    }
                });
                sleep();
            } else {
                if (this.threadPool.getThreadAlive() == 0) {
                    this.logger.info("★★★★★★★★★★★★★★★★★★★线程池中没有执行的任务★★★★★★★★★★★★★★★★★★★");
                    this.logger.info("★★★★★★★★★★★★★★★★★★★我要下车了---^_^告辞★★★★★★★★★★★★★★★★★★★");
                    Date date = new Date();
                    this.logger.info("------------------------------ant执行结束------------------------------于" + DateUtils.format(date, DateUtils.FORMAT_FULL_CN));
                    this.logger.info("------------------------------总共耗时------------------------------" + DateUtils.getDatePoor(this.startTime, date));
                    break;
                }
                this.logger.info("当前还有{}个线程活着", Integer.valueOf(this.threadPool.getThreadAlive()));
                waitNewUrl();
            }
        }
        if (this.autoClose) {
            destroy();
        }
    }

    private void destroy() {
        this.threadPool.shutdown();
    }

    private void waitNewUrl() {
        this.newUrlLock.lock();
        try {
            if (this.threadPool.getThreadAlive() == 0 && this.autoClose) {
                return;
            }
            this.newUrlCondition.await(Constants.DEFAULT_EMPTY_WAITING_TIME.intValue(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            this.logger.warn("waitNewUrl - interrupted, error {}", e);
        } finally {
            this.newUrlLock.unlock();
        }
    }

    private void signalNewUrl() {
        try {
            this.newUrlLock.lock();
            this.newUrlCondition.signalAll();
        } finally {
            this.newUrlLock.unlock();
        }
    }
}
