package com.wu.framework.easy.upsert;

import com.wu.framework.easy.upsert.analyze.ElasticsearchEasyDataProcessAnalyze;
import com.wu.framework.easy.upsert.autoconfigure.config.SpringBootElasticsearchConfigProperties;
import com.wu.framework.easy.upsert.autoconfigure.config.SpringUpsertAutoConfigure;
import com.wu.framework.easy.upsert.autoconfigure.dynamic.EasyUpsertStrategy;
import com.wu.framework.easy.upsert.autoconfigure.enums.EasyUpsertType;
import com.wu.framework.easy.upsert.core.dynamic.IEasyUpsert;
import com.wu.framework.easy.upsert.core.dynamic.exception.UpsertException;
import com.wu.framework.inner.layer.data.ClassSchema;
import java.io.File;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;

@EasyUpsertStrategy(EasyUpsertType.ES)
@ConditionalOnProperty(prefix = "spring.elasticsearch.rest", value = {"uris"})
/* loaded from: input_file:com/wu/framework/easy/upsert/ElasticsearchEasyUpsert.class */
public class ElasticsearchEasyUpsert implements IEasyUpsert, ElasticsearchEasyDataProcessAnalyze, InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchEasyUpsert.class);
    protected final WebClient webClient = WebClient.builder().exchangeStrategies(ExchangeStrategies.builder().codecs(clientCodecConfigurer -> {
        clientCodecConfigurer.defaultCodecs().maxInMemorySize(16777216);
    }).build()).build();
    private final SpringBootElasticsearchConfigProperties elasticsearchProperties;
    private final SpringUpsertAutoConfigure springUpsertAutoConfigure;

    ElasticsearchEasyUpsert(SpringBootElasticsearchConfigProperties springBootElasticsearchConfigProperties, SpringUpsertAutoConfigure springUpsertAutoConfigure) {
        this.elasticsearchProperties = springBootElasticsearchConfigProperties;
        this.springUpsertAutoConfigure = springUpsertAutoConfigure;
    }

    public static String fileSize(long j) {
        if (j < 1024) {
            return j + "B";
        }
        long j2 = j / 1024;
        if (j2 < 1024) {
            return j2 + "KB";
        }
        long j3 = j2 / 1024;
        if (j3 < 1024) {
            long j4 = j3 * 100;
            return (j4 / 100) + "." + (j4 % 100) + "MB";
        }
        long j5 = (j3 * 100) / 1024;
        return (j5 / 100) + "." + (j5 % 100) + "GB";
    }

    protected void asySend(String str, File file) {
        this.webClient.post().uri(str + "/_bulk", new Object[0]).contentType(MediaType.APPLICATION_JSON).bodyValue(new FileSystemResource(file)).exchange().doOnSuccess(clientResponse -> {
            System.out.println("clientResponse.statusCode() = " + clientResponse.statusCode());
        }).flatMap(clientResponse2 -> {
            return clientResponse2.bodyToMono(String.class);
        }).block();
        file.delete();
    }

    public <T> Object upsert(List<T> list, ClassSchema classSchema) throws UpsertException, ExecutionException, InterruptedException {
        Integer valueOf = Integer.valueOf(((list.size() + this.springUpsertAutoConfigure.getBatchLimit().intValue()) - 1) / this.springUpsertAutoConfigure.getBatchLimit().intValue());
        log.info("计划处理写入文件 【{}】 个", valueOf);
        int i = 1;
        for (List<T> list2 : splitList(list, this.springUpsertAutoConfigure.getBatchLimit().intValue())) {
            log.info("处理步写入文件 【{}】 步 ,总文件 【{}】", Integer.valueOf(i), valueOf);
            writeFileToLocal(list2, this.springUpsertAutoConfigure.getCacheFileAddress());
            i++;
        }
        log.info("分步写入本地文件完成✅");
        EASY_UPSERT_EXECUTOR.execute(() -> {
            send();
        });
        return true;
    }

    public void afterPropertiesSet() throws Exception {
        try {
            send();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void send() {
        this.elasticsearchProperties.getUris().forEach(str -> {
            File[] listFiles = new File(this.springUpsertAutoConfigure.getCacheFileAddress()).listFiles((file, str) -> {
                return str.toLowerCase().endsWith(ElasticsearchEasyDataProcessAnalyze.ES_UPSERT_FILE_SUFFIX);
            });
            int i = 1;
            for (File file2 : listFiles) {
                log.info("分步发送ES数据 【{}】 大小【{}】M  总步数:【{}】当前步数:【{}】", new Object[]{file2, fileSize(file2.length()), Integer.valueOf(listFiles.length), Integer.valueOf(i)});
                asySend(str, file2);
                i++;
            }
        });
    }
}
