package software.amazon.kinesis.shaded.com.amazonaws.services.dynamodbv2.streamsadapter.leases;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.impl.Lease;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker;

/* loaded from: input_file:software/amazon/kinesis/shaded/com/amazonaws/services/dynamodbv2/streamsadapter/leases/StreamsLeaseTaker.class */
public final class StreamsLeaseTaker<T extends Lease> implements ILeaseTaker<T> {
    private static final int SCAN_RETRIES = 1;
    private static final int TAKE_RETRIES = 3;
    private final ILeaseManager<T> leaseManager;
    private final String workerIdentifier;
    private final long leaseDurationNanos;
    private int maxLeasesForWorker = Integer.MAX_VALUE;
    private final Map<String, T> allLeases = new HashMap();
    private long lastScanTimeNanos = 0;
    private static final Log LOG = LogFactory.getLog(StreamsLeaseTaker.class);
    private static final Callable<Long> SYSTEM_CLOCK_CALLABLE = System::nanoTime;
    private static String SHARD_END = SentinelCheckpoint.SHARD_END.toString();

    public StreamsLeaseTaker(ILeaseManager<T> iLeaseManager, String str, long j) {
        this.leaseManager = iLeaseManager;
        this.workerIdentifier = str;
        this.leaseDurationNanos = TimeUnit.MILLISECONDS.toNanos(j);
    }

    public StreamsLeaseTaker<T> maxLeasesForWorker(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("maxLeasesForWorker should be >= 1");
        }
        this.maxLeasesForWorker = i;
        return this;
    }

    @Override // software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker
    public synchronized Map<String, T> takeLeases() throws DependencyException, InvalidStateException {
        long currentTimeMillis = System.currentTimeMillis();
        refreshAllLeases(SYSTEM_CLOCK_CALLABLE);
        Set<T> expiredLeases = getExpiredLeases();
        Map<String, Integer> unfinishedLeaseCountsByHost = getUnfinishedLeaseCountsByHost(expiredLeases);
        int size = unfinishedLeaseCountsByHost.size();
        List<T> unfinishedLeases = getUnfinishedLeases(this.allLeases.values());
        List<T> finishedLeases = getFinishedLeases(this.allLeases.values());
        int myLeaseCount = getMyLeaseCount(unfinishedLeases, expiredLeases);
        int myLeaseCount2 = getMyLeaseCount(finishedLeases, expiredLeases);
        int targetUnfinishedLeasesCount = getTargetUnfinishedLeasesCount(unfinishedLeases.size(), size);
        int targetFinishedLeasesCount = getTargetFinishedLeasesCount(finishedLeases.size(), size);
        int i = targetUnfinishedLeasesCount - myLeaseCount;
        int i2 = targetFinishedLeasesCount - myLeaseCount2;
        List<T> unfinishedLeases2 = getUnfinishedLeases(expiredLeases);
        List<T> leasesToTakeFromExpiredLeases = getLeasesToTakeFromExpiredLeases(unfinishedLeases2, i);
        leasesToTakeFromExpiredLeases.addAll(getLeasesToTakeFromExpiredLeases(getFinishedLeases(expiredLeases), i2));
        leasesToTakeFromExpiredLeases.addAll(getLeasesToSteal(unfinishedLeaseCountsByHost, i - unfinishedLeases2.size(), targetUnfinishedLeasesCount, unfinishedLeases));
        LOG.info(String.format("Worker %s saw %d total leases, %d expired leases, %d workers.Unfinished lease target: %d leases, I have %d unfinished leases. Finished leases target is %d and I have %d finished leases. I will take %d leases in total.", this.workerIdentifier, Integer.valueOf(this.allLeases.size()), Integer.valueOf(expiredLeases.size()), Integer.valueOf(size), Integer.valueOf(targetUnfinishedLeasesCount), Integer.valueOf(myLeaseCount), Integer.valueOf(targetFinishedLeasesCount), Integer.valueOf(myLeaseCount2), Integer.valueOf(leasesToTakeFromExpiredLeases.size())));
        Map<String, T> takeLeases = takeLeases(leasesToTakeFromExpiredLeases);
        if (i2 < 0) {
            evictLeases(getLeasesToEvict(Math.abs(i2)));
        }
        LOG.info(String.format("TakeLeases took %d seconds.", Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000)));
        return takeLeases;
    }

    private void refreshAllLeases(Callable<Long> callable) throws InvalidStateException, DependencyException {
        ProvisionedThroughputException provisionedThroughputException = null;
        for (int i = 1; i <= 1; i++) {
            try {
                updateAllLeases(callable);
                provisionedThroughputException = null;
            } catch (ProvisionedThroughputException e) {
                LOG.info(String.format("Worker %s could not find expired leases on try %d out of %d", this.workerIdentifier, Integer.valueOf(i), 1));
                provisionedThroughputException = e;
            }
        }
        if (provisionedThroughputException != null) {
            LOG.error("Worker " + this.workerIdentifier + " could not scan leases table, aborting takeLeases. Exception caught by last retry:", provisionedThroughputException);
        }
    }

    private void updateAllLeases(Callable<Long> callable) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        List<T> listLeases = this.leaseManager.listLeases();
        try {
            this.lastScanTimeNanos = callable.call().longValue();
            HashSet hashSet = new HashSet(this.allLeases.keySet());
            for (T t : listLeases) {
                String leaseKey = t.getLeaseKey();
                T t2 = this.allLeases.get(leaseKey);
                this.allLeases.put(leaseKey, t);
                hashSet.remove(leaseKey);
                if (t2 != null) {
                    if (t2.getLeaseCounter().equals(t.getLeaseCounter())) {
                        t.setLastCounterIncrementNanos(t2.getLastCounterIncrementNanos());
                    } else {
                        t.setLastCounterIncrementNanos(Long.valueOf(this.lastScanTimeNanos));
                    }
                } else if (t.getLeaseOwner() == null) {
                    t.setLastCounterIncrementNanos(0L);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Treating new lease with key " + leaseKey + " as never renewed because it is new and unowned.");
                    }
                } else {
                    t.setLastCounterIncrementNanos(Long.valueOf(this.lastScanTimeNanos));
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Treating new lease with key " + leaseKey + " as recently renewed because it is new and owned.");
                    }
                }
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                this.allLeases.remove((String) it.next());
            }
        } catch (Exception e) {
            throw new DependencyException("Exception caught from timeProvider", e);
        }
    }

    private Set<T> getExpiredLeases() {
        return (Set) this.allLeases.values().stream().filter(lease -> {
            return lease.isExpired(this.leaseDurationNanos, this.lastScanTimeNanos);
        }).collect(Collectors.toSet());
    }

    private Map<String, Integer> getUnfinishedLeaseCountsByHost(Set<T> set) {
        HashMap hashMap = new HashMap();
        Stream<T> filter = this.allLeases.values().stream().filter(lease -> {
            return (set.contains(lease) || lease.getLeaseOwner() == null) ? false : true;
        });
        Class<KinesisClientLease> cls = KinesisClientLease.class;
        KinesisClientLease.class.getClass();
        Stream<T> filter2 = filter.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<KinesisClientLease> cls2 = KinesisClientLease.class;
        KinesisClientLease.class.getClass();
        filter2.map((v1) -> {
            return r1.cast(v1);
        }).forEach(kinesisClientLease -> {
            int i = 0;
            if (kinesisClientLease.getCheckpoint() != null && !SHARD_END.equals(kinesisClientLease.getCheckpoint().getSequenceNumber())) {
                i = 1;
            }
            hashMap.merge(kinesisClientLease.getLeaseOwner(), Integer.valueOf(i), (v0, v1) -> {
                return Integer.sum(v0, v1);
            });
        });
        hashMap.putIfAbsent(this.workerIdentifier, 0);
        return hashMap;
    }

    private List<T> getUnfinishedLeases(Collection<T> collection) {
        return (List) collection.stream().filter(lease -> {
            return lease instanceof KinesisClientLease;
        }).filter(lease2 -> {
            return ((KinesisClientLease) lease2).getCheckpoint() != null;
        }).filter(lease3 -> {
            return !SHARD_END.equals(((KinesisClientLease) lease3).getCheckpoint().getSequenceNumber());
        }).collect(Collectors.toList());
    }

    private List<T> getFinishedLeases(Collection<T> collection) {
        return (List) collection.stream().filter(lease -> {
            return lease instanceof KinesisClientLease;
        }).filter(lease2 -> {
            return ((KinesisClientLease) lease2).getCheckpoint() != null;
        }).filter(lease3 -> {
            return SHARD_END.equals(((KinesisClientLease) lease3).getCheckpoint().getSequenceNumber());
        }).collect(Collectors.toList());
    }

    private int getTargetUnfinishedLeasesCount(int i, int i2) {
        int i3;
        if (i2 >= i) {
            i3 = 1;
        } else {
            i3 = (i / i2) + (i % i2 == 0 ? 0 : 1);
        }
        int max = Math.max(0, i3 - this.maxLeasesForWorker);
        if (max > 0) {
            LOG.warn(String.format("Worker %s : target is %d unfinished shard leases and maxLeasesForWorker is %d. Resetting target to %d, lease spillover is %d.  Note that some shards may not be processed if no other workers are able to pick them up resulting in a possible stall.", this.workerIdentifier, Integer.valueOf(i3), Integer.valueOf(this.maxLeasesForWorker), Integer.valueOf(this.maxLeasesForWorker), Integer.valueOf(max)));
            i3 = this.maxLeasesForWorker;
        }
        return i3;
    }

    private int getTargetFinishedLeasesCount(int i, int i2) {
        int i3;
        if (i2 >= i) {
            i3 = 1;
        } else {
            i3 = (i / i2) + (i % i2 == 0 ? 0 : 1);
        }
        return i3;
    }

    private int getMyLeaseCount(List<T> list, Set<T> set) {
        return Math.toIntExact(list.stream().filter(lease -> {
            return !set.contains(lease);
        }).filter(lease2 -> {
            return this.workerIdentifier.equals(lease2.getLeaseOwner());
        }).count());
    }

    private List<T> getLeasesToTakeFromExpiredLeases(List<T> list, int i) {
        if (i <= 0) {
            return new ArrayList();
        }
        Collections.shuffle(list);
        return (List) list.stream().limit(i).collect(Collectors.toList());
    }

    private List<T> getLeasesToSteal(Map<String, Integer> map, int i, int i2, List<T> list) {
        ArrayList<Lease> arrayList = new ArrayList();
        if (i <= 0) {
            return arrayList;
        }
        HashMap hashMap = new HashMap();
        Map map2 = (Map) map.entrySet().stream().filter(entry -> {
            return !((String) entry.getKey()).equals(this.workerIdentifier);
        }).filter(entry2 -> {
            return ((Integer) entry2.getValue()).intValue() > i2;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry3 -> {
            return Integer.valueOf(((Integer) entry3.getValue()).intValue() - i2);
        }));
        int size = map2.size();
        if (size <= 0) {
            return arrayList;
        }
        int i3 = (i / size) + (i % size > 0 ? 1 : 0);
        Collections.shuffle(list);
        for (T t : list) {
            String leaseOwner = t.getLeaseOwner();
            if (leaseOwner != null) {
                int intValue = ((Integer) map2.getOrDefault(leaseOwner, 0)).intValue();
                int intValue2 = ((Integer) hashMap.getOrDefault(leaseOwner, 0)).intValue();
                if (intValue > 0 && intValue2 < i3) {
                    map2.put(leaseOwner, Integer.valueOf(intValue - 1));
                    hashMap.put(leaseOwner, Integer.valueOf(intValue2 + 1));
                    arrayList.add(t);
                }
                if (arrayList.size() >= i) {
                    break;
                }
            }
        }
        for (Lease lease : arrayList) {
            LOG.info(String.format("Worker %s needs %d leases. It will steal lease %s from %s", this.workerIdentifier, Integer.valueOf(i), lease.getLeaseKey(), lease.getLeaseOwner()));
        }
        LOG.info(String.format("Worker %s will try to steal total %d leases", this.workerIdentifier, Integer.valueOf(arrayList.size())));
        return arrayList;
    }

    private Map<String, T> takeLeases(List<T> list) throws DependencyException, InvalidStateException {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (T t : list) {
            String leaseKey = t.getLeaseKey();
            int i = 0;
            while (true) {
                if (i < 3) {
                    try {
                        if (this.leaseManager.takeLease(t, this.workerIdentifier)) {
                            t.setLastCounterIncrementNanos(Long.valueOf(System.nanoTime()));
                            hashMap.put(leaseKey, t);
                        } else {
                            hashSet.add(leaseKey);
                        }
                    } catch (ProvisionedThroughputException e) {
                        LOG.info(String.format("Could not take lease with key %s for worker %s on try %d out of %d due to capacity", leaseKey, this.workerIdentifier, Integer.valueOf(i), 3));
                        i++;
                    }
                }
            }
        }
        if (hashMap.size() > 0) {
            LOG.info(String.format("Worker %s successfully took %d leases: %s", this.workerIdentifier, Integer.valueOf(hashMap.size()), stringJoin(hashMap.keySet(), ", ")));
        }
        if (hashSet.size() > 0) {
            LOG.info(String.format("Worker %s failed to take %d leases: %s", this.workerIdentifier, Integer.valueOf(hashSet.size()), stringJoin(hashSet, ", ")));
        }
        return hashMap;
    }

    private List<T> getLeasesToEvict(int i) {
        ArrayList arrayList = new ArrayList();
        for (T t : this.allLeases.values()) {
            if (i <= 0) {
                return arrayList;
            }
            if (this.workerIdentifier.equals(t.getLeaseOwner()) && (t instanceof KinesisClientLease)) {
                KinesisClientLease kinesisClientLease = (KinesisClientLease) t;
                if (kinesisClientLease.getCheckpoint() != null) {
                    if (SHARD_END.equals(kinesisClientLease.getCheckpoint().getSequenceNumber())) {
                        arrayList.add(t);
                        i--;
                    }
                }
            }
        }
        return arrayList;
    }

    private void evictLeases(List<T> list) throws DependencyException, InvalidStateException {
        for (T t : list) {
            LOG.info(String.format("Worker %s : LeaseTaker will try to evict lease %s", this.workerIdentifier, t.getLeaseKey()));
            try {
                this.leaseManager.evictLease(t);
            } catch (ProvisionedThroughputException e) {
                LOG.info(String.format("Worker %s could not evict leases to take due to capacity", this.workerIdentifier));
                return;
            }
        }
        LOG.info(String.format("Worker %s : LeaseTaker evicted %d leases", this.workerIdentifier, Integer.valueOf(list.size())));
    }

    static String stringJoin(Collection<String> collection, String str) {
        StringBuilder sb = new StringBuilder();
        boolean z = false;
        for (String str2 : collection) {
            if (z) {
                sb.append(str);
            }
            sb.append(str2);
            z = true;
        }
        return sb.toString();
    }

    @Override // software.amazon.kinesis.shaded.com.amazonaws.services.kinesis.leases.interfaces.ILeaseTaker
    public String getWorkerIdentifier() {
        return this.workerIdentifier;
    }
}
