package org.springframework.cloud.stream.binder.kafka;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.util.Assert;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.GroupedObservable;
import rx.observables.MathObservable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/WindowingOffsetManager.class */
public class WindowingOffsetManager implements OffsetManager, InitializingBean, DisposableBean {
    private final OffsetManager delegate;
    private Subject<PartitionAndOffset, PartitionAndOffset> offsets;
    private Subscription subscription;
    private CountDownLatch shutdownLatch;
    private final CreatePartitionAndOffsetFunction createPartitionAndOffsetFunction = new CreatePartitionAndOffsetFunction();
    private final GetOffsetFunction getOffsetFunction = new GetOffsetFunction();
    private final ComputeMaximumOffsetByPartitionFunction findHighestOffsetInPartitionGroup = new ComputeMaximumOffsetByPartitionFunction();
    private final GetPartitionFunction getPartition = new GetPartitionFunction();
    private final FindHighestOffsetsByPartitionFunction findHighestOffsetsByPartition = new FindHighestOffsetsByPartitionFunction();
    private final DelegateUpdateOffsetAction delegateUpdateOffsetAction = new DelegateUpdateOffsetAction();
    private final NotifyObservableClosedAction notifyObservableClosed = new NotifyObservableClosedAction();
    private long timespan = 10000;
    private int count = 0;
    private int shutdownTimeout = 2000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/WindowingOffsetManager$ComputeMaximumOffsetByPartitionFunction.class */
    public class ComputeMaximumOffsetByPartitionFunction implements Func1<GroupedObservable<Partition, PartitionAndOffset>, Observable<PartitionAndOffset>> {
        private ComputeMaximumOffsetByPartitionFunction() {
        }

        public Observable<PartitionAndOffset> call(GroupedObservable<Partition, PartitionAndOffset> groupedObservable) {
            return Observable.zip(Observable.just(groupedObservable.getKey()), MathObservable.max(groupedObservable.map(WindowingOffsetManager.this.getOffsetFunction)), WindowingOffsetManager.this.createPartitionAndOffsetFunction);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/WindowingOffsetManager$CreatePartitionAndOffsetFunction.class */
    public class CreatePartitionAndOffsetFunction implements Func2<Partition, Long, PartitionAndOffset> {
        private CreatePartitionAndOffsetFunction() {
        }

        public PartitionAndOffset call(Partition partition, Long l) {
            return new PartitionAndOffset(partition, l);
        }
    }

    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/WindowingOffsetManager$DelegateUpdateOffsetAction.class */
    private class DelegateUpdateOffsetAction implements Action1<PartitionAndOffset> {
        private DelegateUpdateOffsetAction() {
        }

        public void call(PartitionAndOffset partitionAndOffset) {
            WindowingOffsetManager.this.delegate.updateOffset(partitionAndOffset.getPartition(), partitionAndOffset.getOffset().longValue());
        }
    }

    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/WindowingOffsetManager$FindHighestOffsetsByPartitionFunction.class */
    private class FindHighestOffsetsByPartitionFunction implements Func1<Observable<PartitionAndOffset>, Observable<PartitionAndOffset>> {
        private FindHighestOffsetsByPartitionFunction() {
        }

        public Observable<PartitionAndOffset> call(Observable<PartitionAndOffset> observable) {
            return observable.groupBy(WindowingOffsetManager.this.getPartition).flatMap(WindowingOffsetManager.this.findHighestOffsetInPartitionGroup);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/WindowingOffsetManager$GetOffsetFunction.class */
    public class GetOffsetFunction implements Func1<PartitionAndOffset, Long> {
        private GetOffsetFunction() {
        }

        public Long call(PartitionAndOffset partitionAndOffset) {
            return partitionAndOffset.getOffset();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/WindowingOffsetManager$GetPartitionFunction.class */
    public class GetPartitionFunction implements Func1<PartitionAndOffset, Partition> {
        private GetPartitionFunction() {
        }

        public Partition call(PartitionAndOffset partitionAndOffset) {
            return partitionAndOffset.getPartition();
        }
    }

    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/WindowingOffsetManager$NotifyObservableClosedAction.class */
    private class NotifyObservableClosedAction implements Action0 {
        private NotifyObservableClosedAction() {
        }

        public void call() {
            if (WindowingOffsetManager.this.shutdownLatch != null) {
                WindowingOffsetManager.this.shutdownLatch.countDown();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/WindowingOffsetManager$PartitionAndOffset.class */
    public class PartitionAndOffset {
        private final Partition partition;
        private final Long offset;

        public PartitionAndOffset(Partition partition, Long l) {
            this.partition = partition;
            this.offset = l;
        }

        public Partition getPartition() {
            return this.partition;
        }

        public Long getOffset() {
            return this.offset;
        }
    }

    public WindowingOffsetManager(OffsetManager offsetManager) {
        this.delegate = offsetManager;
    }

    public void setTimespan(long j) {
        Assert.isTrue(j >= 0, "Timespan must be a positive value");
        this.timespan = j;
    }

    public void setCount(int i) {
        Assert.isTrue(i >= 0, "Count must be a positive value");
        this.count = i;
    }

    public void setShutdownTimeout(int i) {
        this.shutdownTimeout = i;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.isTrue((this.timespan > 0) ^ (this.count > 0), "Only one of the timespan or count must be set");
        if (this.timespan <= 0 && this.count <= 1) {
            this.offsets = null;
        } else {
            this.offsets = new SerializedSubject(PublishSubject.create());
            this.subscription = (this.timespan > 0 ? this.offsets.window(this.timespan, TimeUnit.MILLISECONDS) : this.offsets.window(this.count)).flatMap(this.findHighestOffsetsByPartition).doOnCompleted(this.notifyObservableClosed).subscribe(this.delegateUpdateOffsetAction);
        }
    }

    public void destroy() throws Exception {
        flush();
        close();
        if (this.delegate instanceof DisposableBean) {
            this.delegate.destroy();
        }
    }

    public void updateOffset(Partition partition, long j) {
        if (this.offsets != null) {
            this.offsets.onNext(new PartitionAndOffset(partition, Long.valueOf(j)));
        } else {
            this.delegate.updateOffset(partition, j);
        }
    }

    public long getOffset(Partition partition) {
        return this.delegate.getOffset(partition);
    }

    public void deleteOffset(Partition partition) {
        this.delegate.deleteOffset(partition);
    }

    public void resetOffsets(Collection<Partition> collection) {
        this.delegate.resetOffsets(collection);
    }

    public void close() throws IOException {
        if (this.offsets != null) {
            this.shutdownLatch = new CountDownLatch(1);
            this.offsets.onCompleted();
            try {
                this.shutdownLatch.await(this.shutdownTimeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
            this.subscription.unsubscribe();
        }
        this.delegate.close();
    }

    public void flush() throws IOException {
        this.delegate.flush();
    }
}
