package cz.o2.proxima.direct.time;

import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.time.AbstractWatermarkEstimator;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.time.WatermarkEstimatorFactory;
import cz.o2.proxima.time.WatermarkIdlePolicy;
import cz.o2.proxima.time.WatermarkIdlePolicyFactory;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.shell.CopyCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/time/UnboundedOutOfOrdernessWatermarkEstimator.class */
public class UnboundedOutOfOrdernessWatermarkEstimator extends AbstractWatermarkEstimator {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) UnboundedOutOfOrdernessWatermarkEstimator.class);
    private static final long serialVersionUID = 1;
    static final long DEFAULT_MIN_WATERMARK = -9223372005318775808L;
    public static final String MIN_WATERMARK = "min-watermark";
    public static final String ESTIMATE_DURATION_MS = "estimate-duration";
    public static final String STEP_MS = "step";
    public static final String ALLOWED_TIMESTAMP_SKEW = "allowed-timestamp-skew";
    public static final long DEFAULT_ESTIMATE_DURATION_MS = 10000;
    public static final long DEFAULT_STEP_MS = 200;
    public static final long DEFAULT_ALLOWED_TIMESTAMP_SKEW = 200;
    private final long stepMs;
    private final long estimateDurationMs;
    private final long allowedTimestampSkew;
    private final TimestampSupplier timestampSupplier;
    private final long[] stepDiffs;
    private final AtomicLong lastRotate;
    private final AtomicInteger rotatesToInitialize;
    private final AtomicLong watermark;
    private final AtomicLong lastStatLogged;

    /* loaded from: input_file:cz/o2/proxima/direct/time/UnboundedOutOfOrdernessWatermarkEstimator$Builder.class */
    public static class Builder {
        private final long durationMs;
        private final long stepMs;
        private final long allowedTimestampSkew;
        private final long minWatermark;
        private final TimestampSupplier timestampSupplier;
        private final WatermarkIdlePolicy watermarkIdlePolicy;

        Builder() {
            this(10000L, 200L, 200L, UnboundedOutOfOrdernessWatermarkEstimator.DEFAULT_MIN_WATERMARK, System::currentTimeMillis, new NotProgressingWatermarkIdlePolicy());
        }

        private Builder(long j, long j2, long j3, long j4, TimestampSupplier timestampSupplier, WatermarkIdlePolicy watermarkIdlePolicy) {
            this.durationMs = j;
            this.stepMs = j2;
            this.allowedTimestampSkew = j3;
            this.minWatermark = j4;
            this.timestampSupplier = timestampSupplier;
            this.watermarkIdlePolicy = watermarkIdlePolicy;
        }

        public Builder withDurationMs(long j) {
            return new Builder(j, this.stepMs, this.allowedTimestampSkew, this.minWatermark, this.timestampSupplier, this.watermarkIdlePolicy);
        }

        public Builder withStepMs(long j) {
            return new Builder(this.durationMs, j, this.allowedTimestampSkew, this.minWatermark, this.timestampSupplier, this.watermarkIdlePolicy);
        }

        public Builder withAllowedTimestampSkew(long j) {
            return new Builder(this.durationMs, this.stepMs, j, this.minWatermark, this.timestampSupplier, this.watermarkIdlePolicy);
        }

        public Builder withMinWatermark(long j) {
            return new Builder(this.durationMs, this.stepMs, this.allowedTimestampSkew, j, this.timestampSupplier, this.watermarkIdlePolicy);
        }

        public Builder withTimestampSupplier(TimestampSupplier timestampSupplier) {
            return new Builder(this.durationMs, this.stepMs, this.allowedTimestampSkew, this.minWatermark, timestampSupplier, this.watermarkIdlePolicy);
        }

        public Builder withWatermarkIdlePolicy(WatermarkIdlePolicy watermarkIdlePolicy) {
            return new Builder(this.durationMs, this.stepMs, this.allowedTimestampSkew, this.minWatermark, this.timestampSupplier, watermarkIdlePolicy);
        }

        public UnboundedOutOfOrdernessWatermarkEstimator build() {
            return new UnboundedOutOfOrdernessWatermarkEstimator(this.durationMs, this.stepMs, this.allowedTimestampSkew, this.minWatermark, this.timestampSupplier, this.watermarkIdlePolicy);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 1555800524:
                    if (implMethodName.equals("currentTimeMillis")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/time/TimestampSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals(CopyCommands.Get.NAME) && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()J") && serializedLambda.getImplClass().equals("java/lang/System") && serializedLambda.getImplMethodSignature().equals("()J")) {
                        return System::currentTimeMillis;
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:cz/o2/proxima/direct/time/UnboundedOutOfOrdernessWatermarkEstimator$Factory.class */
    public static class Factory implements WatermarkEstimatorFactory {
        @Override // cz.o2.proxima.time.WatermarkEstimatorFactory
        public WatermarkEstimator create(Map<String, Object> map, WatermarkIdlePolicyFactory watermarkIdlePolicyFactory) {
            long configuration = getConfiguration(UnboundedOutOfOrdernessWatermarkEstimator.ESTIMATE_DURATION_MS, map, 10000L);
            long configuration2 = getConfiguration(UnboundedOutOfOrdernessWatermarkEstimator.STEP_MS, map, 200L);
            long configuration3 = getConfiguration(UnboundedOutOfOrdernessWatermarkEstimator.ALLOWED_TIMESTAMP_SKEW, map, 200L);
            return UnboundedOutOfOrdernessWatermarkEstimator.newBuilder().withAllowedTimestampSkew(configuration3).withDurationMs(configuration).withStepMs(configuration2).withMinWatermark(getConfiguration(UnboundedOutOfOrdernessWatermarkEstimator.MIN_WATERMARK, map, UnboundedOutOfOrdernessWatermarkEstimator.DEFAULT_MIN_WATERMARK)).withWatermarkIdlePolicy(watermarkIdlePolicyFactory.create(map)).build();
        }

        private long getConfiguration(String str, Map<String, Object> map, long j) {
            return ((Long) Optional.ofNullable(map.get(WatermarkConfiguration.prefixedKey(str))).map(obj -> {
                return Long.valueOf(obj.toString());
            }).orElse(Long.valueOf(j))).longValue();
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    @VisibleForTesting
    UnboundedOutOfOrdernessWatermarkEstimator(long j, long j2, long j3, long j4, TimestampSupplier timestampSupplier, WatermarkIdlePolicy watermarkIdlePolicy) {
        super(watermarkIdlePolicy);
        this.lastStatLogged = new AtomicLong();
        this.estimateDurationMs = j;
        this.stepMs = j2;
        this.allowedTimestampSkew = j3;
        this.timestampSupplier = (TimestampSupplier) Objects.requireNonNull(timestampSupplier);
        this.watermark = new AtomicLong(j4);
        Preconditions.checkArgument(j > 0, "durationMs must be positive");
        Preconditions.checkArgument(j2 > 0, "stepMs must be positive");
        Preconditions.checkArgument((j / j2) * j2 == j, "durationMs must be divisible by stepMs");
        this.stepDiffs = new long[((int) (j / j2)) + 1];
        Arrays.fill(this.stepDiffs, 0L);
        this.rotatesToInitialize = new AtomicInteger(this.stepDiffs.length - 1);
        this.lastRotate = new AtomicLong(timestampSupplier.get() - j2);
    }

    @Override // cz.o2.proxima.time.AbstractWatermarkEstimator
    protected long estimateWatermark() {
        rotateIfNeeded();
        if (this.rotatesToInitialize.get() <= 0 && !Arrays.stream(this.stepDiffs).anyMatch(j -> {
            return j > this.allowedTimestampSkew;
        })) {
            this.watermark.accumulateAndGet(this.timestampSupplier.get() - this.allowedTimestampSkew, Math::max);
        }
        return this.watermark.get();
    }

    @Override // cz.o2.proxima.time.AbstractWatermarkEstimator
    public void updateWatermark(StreamElement streamElement) {
        add(streamElement.getStamp());
    }

    @Override // cz.o2.proxima.time.WatermarkEstimator
    public void setMinWatermark(long j) {
        this.watermark.accumulateAndGet(j, Math::max);
    }

    @VisibleForTesting
    void add(long j) {
        rotateIfNeeded();
        long j2 = this.timestampSupplier.get() - j;
        if (this.stepDiffs[0] < j2) {
            this.stepDiffs[0] = j2;
        }
    }

    private void rotateIfNeeded() {
        long j = this.timestampSupplier.get();
        if (j > this.lastRotate.get() + this.stepMs) {
            rotate(j, (int) ((j - this.lastRotate.get()) / this.stepMs));
        }
        if (!log.isDebugEnabled() || j - this.lastStatLogged.get() <= 10000) {
            return;
        }
        log.debug("Watermark delay stats: {} with allowedTimestampSkew {}", Arrays.toString(this.stepDiffs), Long.valueOf(this.allowedTimestampSkew));
        this.lastStatLogged.set(j);
    }

    private void rotate(long j, int i) {
        int min = Math.min(this.stepDiffs.length - 1, i);
        System.arraycopy(this.stepDiffs, 0, this.stepDiffs, min, this.stepDiffs.length - min);
        if (this.rotatesToInitialize.get() > 0) {
            this.rotatesToInitialize.addAndGet(-min);
        }
        for (int i2 = 0; i2 < min; i2++) {
            this.stepDiffs[i2] = 0;
        }
        this.lastRotate.set(j);
    }

    public long getStepMs() {
        return this.stepMs;
    }

    public long getEstimateDurationMs() {
        return this.estimateDurationMs;
    }

    public long getAllowedTimestampSkew() {
        return this.allowedTimestampSkew;
    }
}
