package io.confluent.connect.jms.core.source;

import io.confluent.connect.jms.core.source.BaseJmsSourceConnectorConfig;
import io.confluent.connect.utils.retry.BackoffPolicies;
import io.confluent.connect.utils.retry.RetryCondition;
import io.confluent.connect.utils.retry.RetryPolicy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/jms/core/source/BaseJmsSourceTask.class */
public abstract class BaseJmsSourceTask<C extends BaseJmsSourceConnectorConfig> extends SourceTask {
    public static final String TASK_ID_CONFIG_NAME = "task.jms.id";
    private static final long JMS_RECV_LONG_DURATION_MS = 5000;
    private static final long INITIAL_BACKOFF_TIME_MS = 100;
    public JmsClientHelper<C> clientHelper;
    public RecordConverter converter;
    protected C config;
    private String connectorName;
    private String taskId;
    private String connectorNameAndTaskId;
    private ConnectionFactory connectionFactory;
    private int batchSize;
    private int maxPending;
    private long maxPollDuration;
    private RetryPolicy retryPolicy;
    private RetryCondition retryCondition;
    private long duration;
    private static final long JMS_RECV_SHORT_DURATION_MS = 10;
    private static final long MAX_BACKOFF_TIME_MS = TimeUnit.MINUTES.toMillis(JMS_RECV_SHORT_DURATION_MS);
    static final Logger LOG = LoggerFactory.getLogger(BaseJmsSourceTask.class);
    public final AtomicReference<JMSException> commitException = new AtomicReference<>(null);
    private final Lock pendingCommitsLock = new ReentrantLock();
    private final Condition pendingCommits = this.pendingCommitsLock.newCondition();
    protected final AtomicLong numProduced = new AtomicLong();
    protected final AtomicBoolean waitForAllPendingToCommit = new AtomicBoolean();
    protected ConcurrentMap<String, JmsSourceRecord> pendingMessages = new ConcurrentHashMap();

    protected abstract ConnectionFactory connectionFactory();

    protected abstract C config(Map<String, String> map);

    public void start(Map<String, String> map) {
        this.connectorName = map.get(RecordConverter.FIELD_NAME);
        this.taskId = map.get(TASK_ID_CONFIG_NAME);
        this.connectorNameAndTaskId = String.format("%s-%s", this.connectorName, this.taskId);
        LOG.trace("{} start()", this);
        this.config = config(map);
        this.converter = new RecordConverter(this.config);
        this.connectionFactory = connectionFactory();
        this.clientHelper = new JmsClientHelper<>(this.config, this.connectionFactory, this.connectorNameAndTaskId);
        this.batchSize = this.config.batchSize();
        this.maxPending = this.config.maxPending();
        this.maxPollDuration = this.config.maxPollDuration();
        this.retryCondition = RetryCondition.retryOnConnectRetriable();
        if (this.config.maxRetryTimeMs() < INITIAL_BACKOFF_TIME_MS) {
            this.retryPolicy = RetryPolicy.builder().withNoRetries().build();
        } else {
            this.retryPolicy = RetryPolicy.builder().maxAttempts(Integer.MAX_VALUE).maxRetryTimeout(Duration.ofMillis(this.config.maxRetryTimeMs())).backoffPolicy(BackoffPolicies.exponentialJitter(Duration.ofMillis(INITIAL_BACKOFF_TIME_MS), Duration.ofMillis(MAX_BACKOFF_TIME_MS))).when(this.retryCondition).build();
        }
    }

    protected boolean closeConnectionBeforeRetry(Throwable th) {
        return false;
    }

    protected boolean isRetriable(Throwable th) {
        return false;
    }

    protected JmsSourceRecord receive(long j) {
        LOG.trace("receive()");
        try {
            if (this.clientHelper.isClosed()) {
                LOG.debug("Connection is closed. Connecting...");
                this.clientHelper.connect();
            }
            Message receive = this.clientHelper.receive(j);
            if (receive == null) {
                LOG.trace("No message received.");
                return null;
            }
            LOG.trace("Received message with id='{}'", receive.getJMSMessageID());
            return this.converter.record(receive);
        } catch (JMSException e) {
            if (closeConnectionBeforeRetry(e)) {
                this.clientHelper.close();
            }
            if (isRetriable(e)) {
                throw new RetriableException(e);
            }
            throw new ConnectException(e);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        if (this.commitException.get() != null) {
            JMSException jMSException = this.commitException.get();
            this.commitException.set(null);
            throw new ConnectException("Encountered an unrecoverable exception while committing a record.", jMSException);
        }
        LOG.trace("{} poll()", this);
        if (this.waitForAllPendingToCommit.get()) {
            this.pendingCommitsLock.lock();
            try {
                if (this.numProduced.get() > 0) {
                    long currentTimeMillis = System.currentTimeMillis();
                    int size = this.pendingMessages.size();
                    LOG.debug("{} Reached {} pending records set by '{}', so waiting up to 1000ms for them to be committed to Kafka and acknowledged to JMS before consuming more", new Object[]{this, Integer.valueOf(this.maxPending), BaseJmsConnectorConfig.MAX_PENDING_CONFIG});
                    this.pendingCommits.await(1L, TimeUnit.SECONDS);
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    int size2 = this.pendingMessages.size();
                    if (size2 > 0) {
                        LOG.debug("{} Exceeded {}ms wait time for {} pending messages to be committed: {} committed and {} remain, so returning null from poll()", new Object[]{this, Long.valueOf(currentTimeMillis2), Integer.valueOf(size), Integer.valueOf(size - size2), Integer.valueOf(size2)});
                        this.pendingCommitsLock.unlock();
                        return null;
                    }
                    LOG.debug("{} All {} pending messages have been committed in {}ms. Consuming messages", new Object[]{this, Integer.valueOf(size), Long.valueOf(currentTimeMillis2)});
                }
            } finally {
                this.pendingCommitsLock.unlock();
            }
        }
        ArrayList arrayList = null;
        this.duration = JMS_RECV_LONG_DURATION_MS;
        long currentTimeMillis3 = System.currentTimeMillis() + this.maxPollDuration;
        for (int i = 0; i < this.batchSize; i++) {
            if (i != 0 && exceededPollDuration(currentTimeMillis3)) {
                LOG.debug("{} Returning {} records after exceeded max poll duration of {}ms", new Object[]{this, Integer.valueOf(recordCount(arrayList)), Long.valueOf(this.maxPollDuration)});
                this.waitForAllPendingToCommit.set(true);
                return arrayList;
            }
            JmsSourceRecord jmsSourceRecord = (JmsSourceRecord) this.retryPolicy.call("receive JMS message", () -> {
                return receive(this.duration);
            });
            if (jmsSourceRecord == null) {
                LOG.debug("{} Returning {} records after receiving no new messages within {}ms", new Object[]{this, Integer.valueOf(recordCount(arrayList)), Long.valueOf(this.duration)});
                this.waitForAllPendingToCommit.set(true);
                return arrayList;
            }
            if (arrayList == null) {
                arrayList = new ArrayList();
            }
            arrayList.add(jmsSourceRecord);
            this.pendingMessages.put(jmsSourceRecord.internalMessageId, jmsSourceRecord);
            this.duration = JMS_RECV_SHORT_DURATION_MS;
            if (this.numProduced.incrementAndGet() >= this.maxPending) {
                LOG.debug("{} Returning {} records after reaching max pending records {}", new Object[]{this, Integer.valueOf(recordCount(arrayList)), Integer.valueOf(this.maxPending)});
                this.waitForAllPendingToCommit.set(true);
                return arrayList;
            }
        }
        LOG.debug("Returning {} records (full batch), with total of {} pending", Integer.valueOf(recordCount(arrayList)), Integer.valueOf(this.pendingMessages.size()));
        return arrayList;
    }

    private int recordCount(List<SourceRecord> list) {
        if (list == null) {
            return 0;
        }
        return list.size();
    }

    private boolean exceededPollDuration(long j) {
        return System.currentTimeMillis() > j;
    }

    public void commitRecord(SourceRecord sourceRecord) {
        LOG.trace("{} commitRecord()", this);
        this.pendingMessages.remove(((JmsSourceRecord) sourceRecord).internalMessageId);
        if (this.waitForAllPendingToCommit.get() && this.pendingMessages.isEmpty()) {
            this.retryPolicy.call("Acknowledge Jms Message", () -> {
                return acknowledge(sourceRecord);
            });
            this.pendingCommitsLock.lock();
            try {
                this.numProduced.set(0L);
                this.waitForAllPendingToCommit.set(false);
                this.pendingCommits.signalAll();
            } finally {
                this.pendingCommitsLock.unlock();
            }
        }
    }

    public Message acknowledge(SourceRecord sourceRecord) {
        try {
            if (this.clientHelper.isClosed()) {
                LOG.debug("Connection is closed. Connecting...");
                this.clientHelper.connect();
            }
            Message message = ((JmsSourceRecord) sourceRecord).message;
            this.clientHelper.acknowledge(message);
            LOG.trace("{} Acknowledged message with id='{}'", this, message.getJMSMessageID());
            return message;
        } catch (JMSException e) {
            if (closeConnectionBeforeRetry(e)) {
                this.clientHelper.close();
            }
            if (isRetriable(e)) {
                throw new RetriableException(e);
            }
            this.commitException.set(e);
            throw new ConnectException(e);
        }
    }

    public void stop() {
        this.clientHelper.deactivate();
        LOG.trace("{} stop()", this);
        if (this.clientHelper == null) {
            LOG.info("{} JMS connection is null, skipping closing", this);
        } else {
            LOG.info("Closing JMS connection");
            this.clientHelper.close();
        }
    }

    public String connectorName() {
        return this.connectorName;
    }

    public int taskId() {
        return Integer.parseInt(this.taskId);
    }

    public String clientId() {
        return this.clientHelper.clientId();
    }

    public String toString() {
        return this.connectorNameAndTaskId;
    }
}
