package software.amazon.kinesis.connectors.flink.internals.publisher.polling;

import org.apache.flink.annotation.Internal;
import software.amazon.kinesis.connectors.flink.internals.publisher.RecordPublisher;
import software.amazon.kinesis.connectors.flink.metrics.PollingRecordPublisherMetricsReporter;
import software.amazon.kinesis.connectors.flink.model.SequenceNumber;
import software.amazon.kinesis.connectors.flink.model.StartingPosition;
import software.amazon.kinesis.connectors.flink.model.StreamShardHandle;
import software.amazon.kinesis.connectors.flink.proxy.KinesisProxyInterface;

@Internal
/* loaded from: input_file:software/amazon/kinesis/connectors/flink/internals/publisher/polling/AdaptivePollingRecordPublisher.class */
public class AdaptivePollingRecordPublisher extends PollingRecordPublisher {
    private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2097152;
    private int lastRecordBatchSize;
    private long lastRecordBatchSizeInBytes;
    private long processingStartTimeNanos;
    private int maxNumberOfRecordsPerFetch;
    private final long fetchIntervalMillis;
    private final PollingRecordPublisherMetricsReporter metricsReporter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AdaptivePollingRecordPublisher(StartingPosition startingPosition, StreamShardHandle streamShardHandle, PollingRecordPublisherMetricsReporter pollingRecordPublisherMetricsReporter, KinesisProxyInterface kinesisProxyInterface, int i, long j) throws InterruptedException {
        super(startingPosition, streamShardHandle, pollingRecordPublisherMetricsReporter, kinesisProxyInterface, i, j);
        this.lastRecordBatchSize = 0;
        this.lastRecordBatchSizeInBytes = 0L;
        this.processingStartTimeNanos = System.nanoTime();
        this.maxNumberOfRecordsPerFetch = i;
        this.fetchIntervalMillis = j;
        this.metricsReporter = pollingRecordPublisherMetricsReporter;
    }

    @Override // software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher, software.amazon.kinesis.connectors.flink.internals.publisher.RecordPublisher
    public RecordPublisher.RecordPublisherRunResult run(RecordPublisher.RecordBatchConsumer recordBatchConsumer) throws InterruptedException {
        RecordPublisher.RecordPublisherRunResult run = super.run(recordBatch -> {
            SequenceNumber accept = recordBatchConsumer.accept(recordBatch);
            this.lastRecordBatchSize = recordBatch.getDeaggregatedRecordSize();
            this.lastRecordBatchSizeInBytes = recordBatch.getTotalSizeInBytes();
            return accept;
        }, this.maxNumberOfRecordsPerFetch);
        long adjustRunLoopFrequency = adjustRunLoopFrequency(this.processingStartTimeNanos, System.nanoTime());
        long j = adjustRunLoopFrequency - this.processingStartTimeNanos;
        this.maxNumberOfRecordsPerFetch = adaptRecordsToRead(j, this.lastRecordBatchSize, this.lastRecordBatchSizeInBytes, this.maxNumberOfRecordsPerFetch);
        this.processingStartTimeNanos = adjustRunLoopFrequency;
        this.metricsReporter.setRunLoopTimeNanos(j);
        return run;
    }

    private long adjustRunLoopFrequency(long j, long j2) throws InterruptedException {
        long j3 = j2;
        if (this.fetchIntervalMillis != 0) {
            long j4 = this.fetchIntervalMillis - ((j2 - j) / 1000000);
            if (j4 > 0) {
                Thread.sleep(j4);
                j3 = System.nanoTime();
                this.metricsReporter.setSleepTimeMillis(j4);
            }
        }
        return j3;
    }

    private int adaptRecordsToRead(long j, int i, long j2, int i2) {
        if (i != 0 && j != 0) {
            double d = 1.0E9d / j;
            double d2 = 2097152.0d / d;
            i2 = Math.max(1, Math.min((int) (d2 / (j2 / i)), 10000));
            this.metricsReporter.setLoopFrequencyHz(d);
            this.metricsReporter.setBytesPerRead(d2);
        }
        return i2;
    }
}
