package com.wu.framework.easy.upsert.sink;

import com.wu.framework.easy.upsert.PulsarSchema;
import com.wu.framework.easy.upsert.autoconfigure.config.SpringBootPulsarConfigProperties;
import com.wu.framework.easy.upsert.autoconfigure.config.SpringUpsertAutoConfigure;
import com.wu.framework.easy.upsert.autoconfigure.dynamic.EasyUpsertStrategy;
import com.wu.framework.easy.upsert.autoconfigure.enums.EasyUpsertType;
import com.wu.framework.easy.upsert.core.dynamic.IEasyUpsert;
import com.wu.framework.easy.upsert.core.dynamic.exception.UpsertException;
import com.wu.framework.easy.upsert.core.dynamic.function.EasyUpsertFunction;
import com.wu.framework.inner.layer.CamelAndUnderLineConverter;
import com.wu.framework.inner.layer.data.ClassSchema;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.util.ObjectUtils;

@EasyUpsertStrategy(EasyUpsertType.PULSAR)
@ConditionalOnBean({SpringBootPulsarConfigProperties.class})
/* loaded from: input_file:com/wu/framework/easy/upsert/sink/PulsarEasyUpsertSink.class */
public class PulsarEasyUpsertSink implements IEasyUpsert, InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(PulsarEasyUpsertSink.class);
    private final PulsarClient pulsarClient;
    private final SpringUpsertAutoConfigure configure;

    public PulsarEasyUpsertSink(PulsarClient pulsarClient, SpringUpsertAutoConfigure springUpsertAutoConfigure) {
        this.pulsarClient = pulsarClient;
        this.configure = springUpsertAutoConfigure;
    }

    public <T> Object upsert(List<T> list, final ClassSchema classSchema) throws UpsertException, ExecutionException, InterruptedException {
        final ArrayList arrayList = new ArrayList();
        splitListThen(list, this.configure.getBatchLimit().intValue(), new EasyUpsertFunction() { // from class: com.wu.framework.easy.upsert.sink.PulsarEasyUpsertSink.1
            public <t> void handle(List<t> list2) {
                PulsarSchema pulsarSchema = (PulsarSchema) classSchema.classAnnotation(PulsarSchema.class);
                String humpToLine2 = CamelAndUnderLineConverter.humpToLine2(classSchema.clazz().getSimpleName());
                if (!ObjectUtils.isEmpty(pulsarSchema) && !ObjectUtils.isEmpty(pulsarSchema.topic())) {
                    humpToLine2 = pulsarSchema.topic();
                }
                Iterator<t> it = list2.iterator();
                while (it.hasNext()) {
                    try {
                        arrayList.add(PulsarEasyUpsertSink.this.pulsarClient.newProducer(JSONSchema.of(classSchema.clazz())).topic(humpToLine2).create().send(it.next()));
                    } catch (Exception e) {
                        e.printStackTrace();
                        throw new UpsertException(e);
                    }
                }
            }
        });
        return arrayList;
    }
}
