package kz.greetgo.kafka.consumer;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.model.Box;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

/* loaded from: input_file:kz/greetgo/kafka/consumer/InvokeSessionFactoryImpl.class */
public class InvokeSessionFactoryImpl implements InvokeSessionFactory {
    private final Invoker invoker;
    private final Logger logger;

    public InvokeSessionFactoryImpl(Logger logger, Invoker invoker) {
        this.invoker = invoker;
        this.logger = logger;
    }

    @Override // kz.greetgo.kafka.consumer.InvokeSessionFactory
    public InvokeSession createSession() {
        return new InvokeSession() { // from class: kz.greetgo.kafka.consumer.InvokeSessionFactoryImpl.1
            private final InvokeSessionContext context = new InvokeSessionContext();

            @Override // kz.greetgo.kafka.consumer.InvokeSession
            public InvokeResult invoke(ConsumerRecords<byte[], Box> consumerRecords) {
                boolean z = true;
                Throwable th = null;
                ArrayList arrayList = new ArrayList();
                Iterator it = consumerRecords.iterator();
                while (it.hasNext()) {
                    ConsumerRecord<byte[], Box> consumerRecord = (ConsumerRecord) it.next();
                    if (InvokeSessionFactoryImpl.this.invoker.isInFilter(consumerRecord)) {
                        this.context.kafkaFutures.clear();
                        InvokeResult invoke = InvokeSessionFactoryImpl.this.invoker.invoke(consumerRecord, this.context);
                        if (!invoke.needToCommit()) {
                            z = false;
                        }
                        Throwable invocationError = invoke.invocationError();
                        if (invocationError != null) {
                            th = invocationError;
                            if (InvokeSessionFactoryImpl.this.logger.isShow(LoggerType.CONSUMER_LOG_DIRECT_INVOKE_ERROR)) {
                                InvokeSessionFactoryImpl.this.logger.logConsumerLogDirectInvokeError(invocationError);
                            }
                        }
                        arrayList.addAll(this.context.kafkaFutures);
                        this.context.kafkaFutures.clear();
                    }
                }
                arrayList.stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).forEach((v0) -> {
                    v0.awaitAndGet();
                });
                return InvokeResult.of(z, th);
            }

            @Override // kz.greetgo.kafka.consumer.InvokeSession, java.lang.AutoCloseable
            public void close() {
                this.context.close();
            }
        };
    }
}
