/*
 * Decompiled with CFR 0.152.
 */
package io.axual.client.consumer.base;

import io.axual.client.config.BaseConsumerConfig;
import io.axual.client.consumer.Consumer;
import io.axual.client.consumer.Processor;
import io.axual.client.consumer.base.BaseMessage;
import io.axual.client.consumer.base.BaseMessageSource;
import io.axual.client.exception.ConsumeFailedException;
import io.axual.client.exception.RetriableException;
import io.axual.client.exception.SkippableException;
import io.axual.client.janitor.Janitor;
import io.axual.common.tools.ExecutorUtil;
import io.axual.common.tools.SleepUtil;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseConsumer<K, V>
extends Janitor.ManagedCloseable
implements Consumer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseConsumer.class);
    private static final String NEWLINE = "\n";
    private static final String LOG_ID = "ID: {}\n";
    private static final String LOG_KEY = "KEY: {}\n";
    private static final String LOG_VALUE = "VALUE: {}\n";
    private final BaseMessageSource<K, V> messageSource;
    private final Processor<K, V> processor;
    private final AtomicBoolean consumeThreadRunning = new AtomicBoolean(false);
    private final AtomicBoolean stopConsumer = new AtomicBoolean(false);
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private Future<ConsumeFailedException> consumeResult = null;

    public BaseConsumer(BaseMessageSource<K, V> messageSource, Processor<K, V> processor) {
        this.messageSource = messageSource;
        this.processor = processor;
        LOG.info("Created consumer with source of class: {}.", (Object)messageSource.getClass().getName());
    }

    @Override
    public BaseConsumerConfig<K, V> getConfig() {
        return this.messageSource.getConsumerConfig();
    }

    @Override
    public Future<ConsumeFailedException> startConsuming() {
        if (this.consumeThreadRunning.compareAndSet(false, true)) {
            this.consumeResult = this.executorService.submit(() -> {
                try {
                    ConsumeFailedException consumeFailedException = this.consumeLoop();
                    return consumeFailedException;
                }
                catch (ConsumeFailedException e) {
                    ConsumeFailedException consumeFailedException = e;
                    return consumeFailedException;
                }
                finally {
                    this.consumeThreadRunning.set(false);
                }
            });
            return this.consumeResult;
        }
        LOG.warn("There is already a process running, no new process is started.");
        return null;
    }

    @Override
    public ConsumeFailedException stopConsuming() {
        this.stopConsumer.set(true);
        if (this.consumeResult != null && this.consumeThreadRunning.get()) {
            return this.getConsumeResult();
        }
        return null;
    }

    @Override
    public boolean isConsuming() {
        return this.consumeThreadRunning.get();
    }

    @Override
    public ConsumeFailedException getConsumeResult() {
        if (this.consumeResult != null) {
            try {
                return this.consumeResult.get();
            }
            catch (CancellationException e) {
                return null;
            }
            catch (Exception e) {
                return new ConsumeFailedException(e);
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ConsumeFailedException consumeLoop() {
        while (this.consumeResult == null) {
            SleepUtil.sleep((Duration)Duration.ofMillis(50L));
        }
        block6: while (!this.stopConsumer.get()) {
            try {
                List<BaseMessage<K, V>> messages = this.messageSource.getMessages();
                LOG.trace("Fetched {} messages", (Object)messages.size());
                try {
                    for (BaseMessage<K, V> message : messages) {
                        this.handleMessage(message);
                        if (!this.stopConsumer.get()) continue;
                        LOG.warn("Consumer requested to stop, exiting batch processing loop");
                        continue block6;
                    }
                }
                finally {
                    this.messageSource.onAfterProcessBatch();
                    LOG.trace("Processed message batch");
                }
            }
            catch (Exception e) {
                LOG.error("Error processing message batch", (Throwable)e);
                return new ConsumeFailedException(e, this.messageSource.getInfo());
            }
        }
        return null;
    }

    private void handleMessage(BaseMessage<K, V> message) {
        int tryCount = 0;
        boolean success = false;
        while (!success && !this.stopConsumer.get()) {
            try {
                LOG.trace("Processing message (try #{}):\nID: {}\nKEY: {}\nVALUE: {}\n", new Object[]{++tryCount, message.getId(), message.getKey(), message.getValue()});
                this.processor.processMessage(message);
                LOG.debug("Message successfully processed");
                this.messageSource.onAfterProcessMessage(message, null);
                success = true;
            }
            catch (RetriableException e) {
                if (LOG.isTraceEnabled()) {
                    LOG.warn("Message could not be processed, retrying in {} ms.\nID: {}\nKEY: {}\nVALUE: {}\n", new Object[]{e.getSleepTime(), message.getId(), message.getKey(), message.getValue()});
                } else {
                    LOG.warn("Message could not be processed, retrying in {} ms.\nID: {}\n", (Object)e.getSleepTime(), (Object)message.getId());
                }
                this.messageSource.onAfterProcessMessage(message, (Throwable)((Object)e));
                SleepUtil.sleep((Duration)e.getSleepTime());
            }
            catch (SkippableException e) {
                if (LOG.isTraceEnabled()) {
                    LOG.warn("Skipping unprocessable message:\nID: {}\nKEY: {}\nVALUE: {}\n", new Object[]{message.getId(), message.getKey(), message.getValue()});
                } else {
                    LOG.warn("Skipping unprocessable message:\nID: {}\n", (Object)message.getId());
                }
                this.messageSource.onAfterProcessMessage(message, (Throwable)((Object)e));
                success = true;
            }
            catch (Exception e) {
                if (LOG.isTraceEnabled()) {
                    LOG.error("Message could not be processed:\nID: {}\nKEY: {}\nVALUE: {}\nEXCEPTION: {}", new Object[]{message.getId(), message.getKey(), message.getValue(), e});
                } else {
                    LOG.error("Message could not be processed:\nID: {}\nEXCEPTION: {}", (Object)message.getId(), (Object)e);
                }
                this.messageSource.onAfterProcessMessage(message, e);
                throw new ConsumeFailedException(e);
            }
        }
    }

    @Override
    public void close() {
        ConsumeFailedException consumeException = this.stopConsuming();
        ExecutorUtil.terminateExecutor((ExecutorService)this.executorService, (Duration)Duration.ofSeconds(10L));
        try {
            this.messageSource.close();
        }
        catch (Exception e) {
            LOG.error("Error during closing of the message source", (Throwable)e);
        }
        super.close();
        if (consumeException != null) {
            throw consumeException;
        }
    }
}

