/*
 * Decompiled with CFR 0.152.
 */
package xin.manong.weapon.aliyun.ots;

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.TunnelClientInterface;
import com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.DescribeTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelInfo;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xin.manong.weapon.aliyun.ots.OTSTunnelWorkerConfig;

public class OTSTunnelWorker {
    private static final Logger logger = LoggerFactory.getLogger(OTSTunnelWorker.class);
    private OTSTunnelWorkerConfig config;
    private TunnelWorkerConfig workerConfig;
    private TunnelClient tunnelClient;
    private TunnelWorker worker;

    public OTSTunnelWorker(OTSTunnelWorkerConfig config, TunnelClient tunnelClient) {
        this.config = config;
        this.tunnelClient = tunnelClient;
        if (!this.check()) {
            throw new RuntimeException("invalid OTS tunnel worker config");
        }
    }

    private boolean check() {
        if (this.config == null) {
            logger.error("OTS tunnel worker config is null");
            return false;
        }
        return this.config.check();
    }

    public boolean start() {
        logger.info("OTS tunnel worker[{}/{}] is starting ...", (Object)this.config.table, (Object)this.config.tunnel);
        DescribeTunnelRequest request = new DescribeTunnelRequest(this.config.table, this.config.tunnel);
        try {
            DescribeTunnelResponse response = this.tunnelClient.describeTunnel(request);
            TunnelInfo tunnelInfo = response.getTunnelInfo();
            int threadNum = this.config.consumeThreadNum;
            this.workerConfig = new TunnelWorkerConfig(this.createThreadPoolExecutor("tunnel_reader", threadNum), this.createThreadPoolExecutor("tunnel_processor", threadNum), this.config.channelProcessor);
            this.workerConfig.setMaxRetryIntervalInMillis(this.config.maxRetryIntervalMs);
            this.workerConfig.setHeartbeatIntervalInSec((long)this.config.heartBeatIntervalSec);
            if (this.config.maxChannelParallel > 0) {
                this.workerConfig.setMaxChannelParallel(this.config.maxChannelParallel);
            }
            this.worker = new TunnelWorker(tunnelInfo.getTunnelId(), (TunnelClientInterface)this.tunnelClient, this.workerConfig);
            this.worker.connectAndWorking();
        }
        catch (Exception e) {
            logger.error("start OTS tunnel worker[{}/{}] failed", (Object)this.config.table, (Object)this.config.tunnel);
            logger.error(e.getMessage(), (Throwable)e);
            return false;
        }
        logger.info("OTS tunnel worker[{}/{}] has been started", (Object)this.config.table, (Object)this.config.tunnel);
        return true;
    }

    public void stop() {
        logger.info("OTS tunnel worker[{}/{}] is stopping ...", (Object)this.config.table, (Object)this.config.tunnel);
        if (this.worker != null) {
            this.worker.shutdown();
        }
        if (this.workerConfig != null) {
            this.workerConfig.shutdown();
        }
        logger.info("OTS tunnel worker[{}/{}] has been stopped", (Object)this.config.table, (Object)this.config.tunnel);
    }

    private ThreadPoolExecutor createThreadPoolExecutor(final String name, int threadNum) {
        logger.info("create thread pool executor[{}:{}]", (Object)name, (Object)threadNum);
        return new ThreadPoolExecutor(threadNum, threadNum, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(16), new ThreadFactory(){
            private final AtomicInteger counter = new AtomicInteger();

            @Override
            public Thread newThread(Runnable task) {
                String threadName = String.format("%s-%d", name, this.counter.getAndIncrement());
                logger.info("create channel receiver thread[{}] success", (Object)threadName);
                return new Thread(task, threadName);
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
    }
}

